task2.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video
  6. from applications.functions.log import logging
  7. from static.config import mysql_coroutines
  8. class MatchTask2(object):
  9. """
  10. 定时执行任务
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def get_task(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql = f"""
  23. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  24. FROM {db_article}
  25. WHERE content_status = 0 and process_times <= 5
  26. ORDER BY request_time_stamp
  27. ASC
  28. LIMIT {mysql_coroutines};
  29. """
  30. task_list = await self.mysql_client.async_select(sql=select_sql)
  31. task_obj_list = [
  32. {
  33. "trace_id": item[0],
  34. "content_id": item[1],
  35. "gh_id": item[2],
  36. "title": item[3],
  37. "text": item[4],
  38. "content_status": item[5],
  39. "process_times": item[6]
  40. } for item in task_list
  41. ]
  42. logging(
  43. code="9001",
  44. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  45. data=task_obj_list
  46. )
  47. return task_obj_list
  48. async def get_history_videos(self, content_id):
  49. """
  50. check whether the contents videos exists
  51. :param content_id:
  52. :return:
  53. """
  54. select_sql = f"""
  55. SELECT video_id
  56. FROM {db_video}
  57. where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
  58. """
  59. content_videos = await self.mysql_client.async_select(select_sql)
  60. videos = [vid for vid in content_videos]
  61. if len(videos) >= 3:
  62. return videos
  63. else:
  64. return None
  65. async def use_exists_contents_videos(self, video_id_list, params):
  66. """
  67. 使用已经存在的视频id
  68. :return:
  69. """
  70. trace_id = params['trace_id']
  71. content_id = params['content_id']
  72. select_sql = f"""
  73. SELECT kimi_title
  74. FROM {db_article}
  75. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  76. """
  77. info = await self.mysql_client.async_select(sql=select_sql)
  78. kimi_title = info[0]
  79. update_sql = f"""
  80. UPDATE {db_article}
  81. SET
  82. kimi_title=%s,
  83. recall_video_id1=%s,
  84. recall_video_id2=%s,
  85. recall_video_id3=%s,
  86. content_status=%s,
  87. process_times = %s
  88. WHERE trace_id = %s
  89. """
  90. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  91. await self.mysql_client.async_insert(
  92. sql=update_sql,
  93. params=(
  94. kimi_title,
  95. video_id_list[0],
  96. "NULL" if vid2 is None else vid2,
  97. "NULL" if vid3 is None else vid3,
  98. 2,
  99. int(params['process_times']) + 1,
  100. trace_id
  101. )
  102. )
  103. logging(
  104. code="9002",
  105. info="已从历史文章更新,文章id: {}".format(content_id),
  106. trace_id=trace_id
  107. )
  108. async def process_video_id(self, title, trace_id, process_times):
  109. """
  110. 如果video_id在标题中,则做特殊处理
  111. :return:
  112. """
  113. video_id = title.split("video_id=")[-1]
  114. update_sql = f"""
  115. UPDATE
  116. {db_article}
  117. SET
  118. recall_video_id1 = %s,
  119. content_status = %s,
  120. process_times = %s
  121. WHERE
  122. trace_id = %s;"""
  123. await self.mysql_client.async_insert(
  124. sql=update_sql,
  125. params=(video_id, 2, int(process_times) + 1, trace_id)
  126. )
  127. async def process_task(self, params):
  128. """
  129. 异步执行
  130. :param params:
  131. :return:
  132. """
  133. content_id = params['content_id']
  134. trace_id = params['trace_id']
  135. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  136. video_id_list = await self.get_history_videos(content_id=content_id)
  137. if video_id_list:
  138. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  139. logging(
  140. code="9001",
  141. info="存在历史文章",
  142. trace_id=trace_id
  143. )
  144. await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
  145. else:
  146. pass
  147. async def deal(self):
  148. """
  149. 处理
  150. :return:
  151. """
  152. task_list = await self.get_task()
  153. if task_list:
  154. tasks = [self.process_task(params) for params in task_list]
  155. await asyncio.gather(*tasks)
  156. else:
  157. logging(
  158. code="9008",
  159. info="没有要处理的请求"
  160. )