rm_match_task.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. """
  2. @author: luojunhui
  3. """
  4. import traceback
  5. from typing import Dict, List
  6. from aiomysql.cursors import DictCursor
  7. from applications.log import logging
  8. from applications.config import Config
  9. from applications.const import rematch_task_const
  10. from applications.etl_function import download_cover
  11. from applications.etl_function import download_video
  12. from applications.etl_function import generate_video_path
  13. from applications.etl_function import upload_to_oss
  14. from applications.spider import search_videos_from_web
  15. from .utils import get_kimi_result
  16. class ReMatchTask(object):
  17. """
  18. 重新匹配任务
  19. """
  20. def __init__(self, db_client):
  21. self.db_client = db_client
  22. self.config = Config()
  23. self.article_match_video_table = self.config.article_match_video_table
  24. self.article_text_table = self.config.article_text_table
  25. self.article_crawler_video_table = self.config.article_crawler_video_table
  26. async def get_tasks(self) -> List[Dict]:
  27. """
  28. get task needs to rematch
  29. """
  30. select_sql = f"""
  31. SELECT DISTINCT content_id
  32. FROM article_re_match_record
  33. WHERE status = {rematch_task_const.REMATCH_INIT_STATUS};
  34. """
  35. result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
  36. logging(
  37. code="rematch_1001",
  38. function="get_tasks",
  39. info="获取content_id数量: {}".format(len(result))
  40. )
  41. return result
  42. async def async_download_video_list(self, to_download_videos: List[Dict], illegal_platform_id_list: List[str]) -> int:
  43. """
  44. 异步下载视频
  45. """
  46. success_count = 0
  47. for video in to_download_videos:
  48. # check whether video is illegal
  49. out_key = "{}_{}".format(video['platform'], video['out_video_id'])
  50. if out_key in illegal_platform_id_list:
  51. continue
  52. # start download
  53. try:
  54. local_video_path, local_cover_path = generate_video_path(video['platform'], video['out_video_id'])
  55. # download videos
  56. file_path = await download_video(
  57. file_path=local_video_path,
  58. platform=video['platform'],
  59. video_url=video['video_url']
  60. )
  61. if not file_path:
  62. # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
  63. update_sql = f"""
  64. UPDATE {self.article_crawler_video_table}
  65. SET download_status = %s
  66. WHERE id = %s;
  67. """
  68. await self.db_client.async_insert(
  69. sql=update_sql,
  70. params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
  71. )
  72. else:
  73. # download cover
  74. cover_path = await download_cover(
  75. file_path=local_cover_path,
  76. platform=video['platform'],
  77. cover_url=video['cover_url']
  78. )
  79. # upload video to oss
  80. oss_video = await upload_to_oss(
  81. local_video_path=file_path,
  82. download_type="video"
  83. )
  84. # upload cover to oss
  85. if cover_path:
  86. oss_cover = await upload_to_oss(
  87. local_video_path=cover_path,
  88. download_type="image"
  89. )
  90. else:
  91. oss_cover = None
  92. # change status to success
  93. update_sql = f"""
  94. UPDATE {self.article_crawler_video_table}
  95. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  96. WHERE id = %s;
  97. """
  98. await self.db_client.async_insert(
  99. sql=update_sql,
  100. params=(
  101. oss_video,
  102. oss_cover,
  103. rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
  104. video['id']
  105. )
  106. )
  107. success_count += 1
  108. # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
  109. if success_count > rematch_task_const.MIN_MATCH_VIDEO_NUM:
  110. return success_count
  111. except Exception as e:
  112. update_sql = f"""
  113. UPDATE {self.article_crawler_video_table}
  114. SET download_status = %s
  115. WHERE id = %s;
  116. """
  117. await self.db_client.async_insert(
  118. sql=update_sql,
  119. params=(rematch_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, video['id'])
  120. )
  121. return success_count
  122. async def download_upload_task(self, content_id: str, illegal_platform_id_list: List[str], legal_not_downloaded_videos=None) -> int:
  123. """
  124. 下载任务
  125. """
  126. success_count = 0
  127. if legal_not_downloaded_videos:
  128. # 下载legal_not_download_videos, 记录下载成功数量
  129. new_download_count = await self.async_download_video_list(
  130. to_download_videos=legal_not_downloaded_videos,
  131. illegal_platform_id_list=illegal_platform_id_list
  132. )
  133. success_count += new_download_count
  134. return success_count
  135. select_sql = f"""
  136. SELECT id, out_video_id, platform, video_title, video_url, cover_url
  137. FROM {self.article_crawler_video_table}
  138. WHERE content_id = {content_id} AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS};
  139. """
  140. to_download_videos = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
  141. new_download_count = await self.async_download_video_list(
  142. to_download_videos=to_download_videos,
  143. illegal_platform_id_list=illegal_platform_id_list
  144. )
  145. success_count += new_download_count
  146. return success_count
  147. async def spider_task(self, content_id: str) -> int:
  148. """
  149. 爬虫抓取任务
  150. """
  151. kimi_result = await get_kimi_result(
  152. content_id=content_id,
  153. article_text_table=self.article_text_table,
  154. db_client=self.db_client
  155. )
  156. # 该搜索任务是content_id粒度,因此将trace_id 传为content_id
  157. search_videos_count = await search_videos_from_web(
  158. info={
  159. "ori_title": kimi_result['ori_title'],
  160. "kimi_summary": kimi_result['kimi_summary'],
  161. "kimi_keys": kimi_result['kimi_keys'],
  162. "trace_id": content_id,
  163. "gh_id": rematch_task_const.TASK_DEFAULT_GH_ID,
  164. "content_id": content_id,
  165. "crawler_video_table": self.article_crawler_video_table
  166. },
  167. gh_id_map={},
  168. db_client=self.db_client
  169. )
  170. return search_videos_count
  171. async def update_each_content_id(self, content_id: str) -> None:
  172. """
  173. 更新每一个content_id的视频信息
  174. """
  175. select_sql = f"""
  176. SELECT
  177. id, platform, out_video_id, video_oss_path, download_status, is_illegal, video_url, cover_url
  178. FROM
  179. {self.article_crawler_video_table}
  180. WHERE content_id = '{content_id}';
  181. """
  182. record_list = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
  183. # 获取违规视频,违规视频out_id && platform
  184. illegal_videos = [record for record in record_list if record["is_illegal"] == rematch_task_const.VIDEO_UNSAFE]
  185. illegal_platform_id_list = [
  186. "{}_{}".format(record['platform'], record['out_video_id']) for record in illegal_videos
  187. ]
  188. # 处理非违规字段
  189. legal_downloaded_videos = []
  190. legal_not_downloaded_videos = []
  191. for record in record_list:
  192. if record['is_illegal'] == rematch_task_const.VIDEO_UNSAFE:
  193. continue
  194. else:
  195. out_key = "{}_{}".format(record['platform'], record['out_video_id'])
  196. if out_key in illegal_platform_id_list:
  197. continue
  198. if record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_INIT_STATUS:
  199. legal_not_downloaded_videos.append(record)
  200. elif record['download_status'] == rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS:
  201. legal_downloaded_videos.append(record)
  202. else:
  203. continue
  204. if len(legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
  205. logging(
  206. code="rematch_1002",
  207. info="存在{}条以上待合规已下载内容,无需处理该content_id".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
  208. data={
  209. "content_id": content_id
  210. }
  211. )
  212. return
  213. elif len(legal_not_downloaded_videos + legal_downloaded_videos) >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
  214. logging(
  215. code="rematch_1003",
  216. info="存在{}条以上待合规内容".format(rematch_task_const.MIN_MATCH_VIDEO_NUM),
  217. data={
  218. "content_id": content_id,
  219. "待下载数量": len(legal_not_downloaded_videos),
  220. "已下载数量": len(legal_downloaded_videos)
  221. }
  222. )
  223. await self.download_upload_task(
  224. content_id=content_id,
  225. illegal_platform_id_list=illegal_platform_id_list,
  226. legal_not_downloaded_videos=legal_not_downloaded_videos
  227. )
  228. else:
  229. logging(
  230. code="rematch_1004",
  231. info="重新执行爬虫任务",
  232. data={
  233. "content_id": content_id,
  234. }
  235. )
  236. await self.spider_task(content_id=content_id)
  237. await self.download_upload_task(
  238. content_id=content_id,
  239. illegal_platform_id_list=illegal_platform_id_list
  240. )
  241. async def check_whether_get_enough_videos(self, content_id: str) -> bool:
  242. """
  243. check 该content_id是否存在足够的视频
  244. """
  245. select_sql = f"""
  246. SELECT count(1)
  247. FROM {self.article_crawler_video_table}
  248. WHERE content_id = '{content_id}'
  249. AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  250. AND is_illegal = {rematch_task_const.VIDEO_SAFE};
  251. """
  252. count_tuple = await self.db_client.async_select(select_sql)
  253. count = count_tuple[0][0]
  254. if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
  255. return True
  256. else:
  257. return False
  258. async def update_content_id_status(self, content_id: str) -> int:
  259. """
  260. 更新content_id的status为成功
  261. """
  262. update_sql = f"""
  263. UPDATE article_re_match_record
  264. SET status = %s
  265. WHERE content_id = '{content_id}';
  266. """
  267. affected_rows = await self.db_client.async_insert(
  268. update_sql,
  269. params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
  270. )
  271. return affected_rows
  272. async def deal(self):
  273. """
  274. do job here
  275. """
  276. task_list = await self.get_tasks()
  277. for task in task_list:
  278. content_id = task['content_id']
  279. try:
  280. await self.update_each_content_id(content_id=content_id)
  281. if await self.check_whether_get_enough_videos(content_id=content_id):
  282. # 修改状态为1
  283. await self.update_content_id_status(content_id=content_id)
  284. else:
  285. continue
  286. except Exception as e:
  287. error_stack = traceback.format_exc()
  288. logging(
  289. code="rematch_1010",
  290. function="update_each_content_id",
  291. data={
  292. "error_stack": error_stack,
  293. "error": str(e),
  294. "content_id": content_id
  295. }
  296. )