rm_match_task.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. """
  2. @author: luojunhui
  3. """
  4. from typing import Dict, List
  5. from aiomysql.cursors import DictCursor
  6. from applications.log import logging
  7. from applications.config import Config
  8. from applications.const import rematch_task_const
  9. class ReMatchTask(object):
  10. """
  11. 重新匹配任务
  12. """
  13. def __init__(self, db_client):
  14. self.db_client = db_client
  15. self.config = Config()
  16. self.article_match_video_table = self.config.article_match_video_table
  17. self.article_text_table = self.config.article_text_table
  18. self.article_crawler_video_table = self.config.article_crawler_video_table
  19. self.rematch_coroutines = int(self.config.get_config_value(key='rematchCoroutines'))
  20. async def get_tasks(self, business_type) -> List[Dict]:
  21. """
  22. get task needs to rematch
  23. """
  24. if business_type == 'process':
  25. select_sql = f"""
  26. SELECT trace_id, content_id
  27. FROM article_re_match_record
  28. WHERE status = {rematch_task_const.REMATCH_INIT_STATUS}
  29. LIMIT {self.rematch_coroutines};
  30. """
  31. elif business_type == 'check':
  32. select_sql = f"""
  33. SELECT trace_id, content_id
  34. FROM article_re_match_record
  35. WHERE status = {rematch_task_const.REMATCH_PROCESSING_STATUS}
  36. LIMIT {self.rematch_coroutines};
  37. """
  38. else:
  39. return []
  40. result = await self.db_client.async_select(select_sql, cursor_type=DictCursor)
  41. task_dict = {task['content_id']: task for task in result}
  42. task_list = list(task_dict.values())
  43. logging(
  44. code="rematch_1001",
  45. function="get_tasks",
  46. info="获取content_id数量: {}".format(len(task_list))
  47. )
  48. return task_list
  49. async def check_whether_get_enough_videos(self, content_id: str) -> bool:
  50. """
  51. check 该content_id是否存在足够的视频
  52. """
  53. select_sql = f"""
  54. SELECT count(1)
  55. FROM {self.article_crawler_video_table}
  56. WHERE content_id = '{content_id}'
  57. AND download_status = {rematch_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  58. AND is_illegal = {rematch_task_const.VIDEO_SAFE};
  59. """
  60. count_tuple = await self.db_client.async_select(select_sql)
  61. count = count_tuple[0][0]
  62. if count >= rematch_task_const.MIN_MATCH_VIDEO_NUM:
  63. return True
  64. else:
  65. return False
  66. async def update_content_id_status_to_success(self, content_id: str) -> int:
  67. """
  68. 更新content_id的status为成功
  69. """
  70. update_sql = f"""
  71. UPDATE article_re_match_record
  72. SET status = %s
  73. WHERE content_id = '{content_id}';
  74. """
  75. affected_rows = await self.db_client.async_insert(
  76. update_sql,
  77. params=(rematch_task_const.REMATCH_SUCCESS_STATUS,)
  78. )
  79. return affected_rows
  80. async def get_task_lock(self, trace_id: str) -> int:
  81. """
  82. 将任务上锁,防止被其他进程抢占
  83. """
  84. update_sql = f"""
  85. UPDATE article_re_match_record
  86. SET status = %s
  87. WHERE trace_id = %s and status = %s;
  88. """
  89. affected_rows = await self.db_client.async_insert(
  90. sql=update_sql,
  91. params=(
  92. rematch_task_const.REMATCH_PROCESSING_STATUS,
  93. trace_id,
  94. rematch_task_const.REMATCH_INIT_STATUS
  95. )
  96. )
  97. return affected_rows
  98. async def whether_same_content_id_processing(self, content_id: str) -> bool:
  99. """
  100. 是否相同的content_id处理中 or 处理完成
  101. """
  102. select_sql = f"""
  103. SELECT DISTINCT status
  104. FROM article_re_match_record
  105. WHERE content_id = '{content_id}';
  106. """
  107. response = await self.db_client.async_select(select_sql)
  108. status_list = list(i[0] for i in response)
  109. for status in status_list:
  110. if status == rematch_task_const.REMATCH_PROCESSING_STATUS:
  111. return True
  112. return False
  113. async def process_content_id(self, content_id: str) -> int:
  114. """
  115. 处理content_id
  116. """
  117. update_sql = f"""
  118. UPDATE {self.article_match_video_table}
  119. SET content_status = %s, success_status = %s, process_times = %s
  120. WHERE content_id = %s;
  121. """
  122. affected_rows = await self.db_client.async_insert(
  123. sql=update_sql,
  124. params=(
  125. rematch_task_const.TASK_INIT_STATUS,
  126. rematch_task_const.AIGC_DONT_GET_RESULT_STATUS,
  127. rematch_task_const.TASK_INIT_PROCESS_TIMES,
  128. content_id
  129. )
  130. )
  131. return affected_rows
  132. async def process_task(self, task_list: List[Dict]) -> None:
  133. """
  134. 处理
  135. """
  136. for task in task_list:
  137. content_id = task['content_id']
  138. trace_id = task['trace_id']
  139. processing_flag = await self.whether_same_content_id_processing(content_id=content_id)
  140. if processing_flag:
  141. continue
  142. else:
  143. affected_row = await self.get_task_lock(trace_id)
  144. if not affected_row:
  145. continue
  146. affected_rows = await self.process_content_id(content_id)
  147. if affected_rows:
  148. logging(
  149. code="rematch_1002",
  150. function="deal",
  151. info="回滚content_id成功",
  152. data={
  153. "content_id": content_id,
  154. "trace_id": trace_id,
  155. "affected_rows": affected_rows
  156. }
  157. )
  158. else:
  159. logging(
  160. code="rematch_1003",
  161. function="deal",
  162. info="回滚content_id失败",
  163. data={
  164. "content_id": content_id,
  165. "trace_id": trace_id
  166. }
  167. )
  168. async def check_task(self, task_list: List[Dict]) -> None:
  169. """
  170. 校验任务是否完成
  171. """
  172. for task in task_list:
  173. content_id = task['content_id']
  174. enough_video_flag = await self.check_whether_get_enough_videos(content_id=content_id)
  175. if enough_video_flag:
  176. affected_rows = await self.update_content_id_status_to_success(content_id=content_id)
  177. if affected_rows:
  178. logging(
  179. code="rematch_1004",
  180. function="check_task",
  181. info="修改状态成功",
  182. data={
  183. "content_id": content_id,
  184. "affected_rows": affected_rows
  185. }
  186. )
  187. else:
  188. continue
  189. async def deal(self):
  190. """
  191. do job here
  192. """
  193. # 处理任务
  194. task_list = await self.get_tasks(business_type="process")
  195. if task_list:
  196. await self.process_task(task_list)
  197. else:
  198. logging(
  199. code="rematch_5001",
  200. info="do not get article to process"
  201. )
  202. # 校验任务
  203. task_list = await self.get_tasks(business_type="check")
  204. if task_list:
  205. await self.check_task(task_list)
  206. else:
  207. logging(
  208. code="rematch_5002",
  209. info="do not get article to check"
  210. )