content_rematch.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. """
  2. @author: luojunhui
  3. """
  4. import time
  5. import asyncio
  6. import datetime
  7. from applications.db import AsyncMySQLClient
  8. from applications.spider import search_videos_from_web
  9. from tasks.utils.kimi_task import get_kimi_result
  10. from tasks.utils.etl_task import async_download_videos
  11. class ContentRematch:
  12. """
  13. 内容重新匹配
  14. """
  15. def __init__(self, db_client):
  16. self.db_client = db_client
  17. async def get_content_list(self):
  18. """
  19. 获取待匹配的content_id列表
  20. """
  21. select_sql = f"""
  22. SELECT content_id, count(1)
  23. FROM long_articles_crawler_videos
  24. WHERE download_status = 2
  25. GROUP BY content_id
  26. HAVING count(1) < 3
  27. LIMIT 10;
  28. """
  29. content_list = await self.db_client.async_select(select_sql)
  30. return content_list
  31. async def get_illegal_out_ids(self, content_id: str) -> list:
  32. """
  33. 获取违规的外站视频id
  34. """
  35. select_sql = f"""
  36. SELECT platform, out_video_id
  37. FROM long_articles_crawler_videos
  38. WHERE content_id = '{content_id}' and is_illegal = 1;
  39. """
  40. response = await self.db_client.async_select(select_sql)
  41. if response:
  42. result = ["{}_{}".format(line[0], line[1]) for line in response]
  43. return result
  44. else:
  45. return []
  46. async def spider_task(self, params, kimi_result):
  47. """
  48. 爬虫任务
  49. :return:
  50. """
  51. content_id = params['content_id']
  52. gh_id = "test_gh_id"
  53. trace_id = "rematch_{}".format(content_id)
  54. try:
  55. search_videos_count = await search_videos_from_web(
  56. info={
  57. "ori_title": kimi_result['ori_title'],
  58. "kimi_summary": kimi_result['kimi_summary'],
  59. "kimi_keys": kimi_result['kimi_keys'],
  60. "trace_id": trace_id,
  61. "gh_id": gh_id,
  62. "content_id": content_id,
  63. "crawler_video_table": "long_articles_crawler_videos",
  64. },
  65. gh_id_map={},
  66. db_client=self.db_client
  67. )
  68. if search_videos_count >= 3:
  69. return True
  70. else:
  71. return False
  72. except Exception as e:
  73. return False
  74. async def etl_task(self, content_id, trace_id):
  75. """
  76. etl任务
  77. """
  78. # download videos
  79. illegal_videos = await self.get_illegal_out_ids(content_id)
  80. downloaded_count = await async_download_videos(
  81. trace_id=trace_id,
  82. content_id=content_id,
  83. article_crawler_video_table="long_articles_crawler_videos",
  84. db_client=self.db_client,
  85. illegal_videos=illegal_videos
  86. )
  87. if downloaded_count >= 3:
  88. return True
  89. else:
  90. return False
  91. async def process_each_content(self, content):
  92. """
  93. 处理每个content_id
  94. """
  95. content_id = content[0]
  96. kimi_result = await get_kimi_result(
  97. content_id=content_id,
  98. db_client=self.db_client,
  99. article_text_table="long_articles_text",
  100. )
  101. if not kimi_result:
  102. return
  103. spider_flag = await self.spider_task(
  104. params={
  105. "content_id": content_id,
  106. },
  107. kimi_result=kimi_result
  108. )
  109. if not spider_flag:
  110. return
  111. etl_flag = await self.etl_task(
  112. content_id=content_id,
  113. trace_id="rematch_{}".format(content_id)
  114. )
  115. if not etl_flag:
  116. return
  117. return True
  118. async def deal(self):
  119. """
  120. 处理每个content_id
  121. """
  122. content_list = await self.get_content_list()
  123. for content in content_list:
  124. print(content)
  125. await self.process_each_content(content)
  126. async def main_job():
  127. """
  128. main job
  129. :return:
  130. """
  131. async with AsyncMySQLClient() as long_articles_pool:
  132. _task = ContentRematch(long_articles_pool)
  133. await _task.deal()
  134. if __name__ == '__main__':
  135. while True:
  136. asyncio.run(main_job())
  137. now_str = datetime.datetime.now().__str__()
  138. print("{} 请求执行完成, 等待60s".format(now_str))
  139. time.sleep(60)