task3.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video, mysql_coroutines
  6. from applications.functions.log import logging
  7. from applications.functions.pqFunctions import *
  8. class MatchTask3(object):
  9. """
  10. 处理已经匹配过小程序的文章
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def getTaskList(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql1 = f"""
  23. SELECT
  24. ART.trace_id,
  25. ART.content_id,
  26. ART.gh_id,
  27. ART.article_title,
  28. ART.article_text,
  29. ART.content_status,
  30. ART.process_times
  31. FROM {db_article} ART
  32. JOIN (
  33. select content_id, count(1) as cnt
  34. from {db_video}
  35. where oss_status = 1
  36. group by content_id
  37. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  38. WHERE ART.content_status = 0 and ART.process_times <= 3
  39. ORDER BY request_time_stamp
  40. LIMIT {mysql_coroutines};
  41. """
  42. tasks = await self.mysql_client.async_select(sql=select_sql1)
  43. task_obj_list = [
  44. {
  45. "trace_id": item[0],
  46. "content_id": item[1],
  47. "gh_id": item[2],
  48. "title": item[3],
  49. "text": item[4],
  50. "content_status": item[5],
  51. "process_times": item[6]
  52. } for item in tasks
  53. ]
  54. logging(
  55. code="9001",
  56. function="task3.get_task",
  57. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  58. data=[x['content_id'] for x in task_obj_list]
  59. )
  60. return task_obj_list
  61. async def getHistoryVideoOssPath(self, content_id):
  62. """
  63. check whether the contents videos exists
  64. :param content_id:
  65. :return:
  66. """
  67. select_sql = f"""
  68. SELECT video_title, uid, video_path, cover_path
  69. FROM {db_video}
  70. where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
  71. """
  72. content_videos = await self.mysql_client.async_select(select_sql)
  73. video_list = [
  74. {
  75. "title": line[0],
  76. "uid": line[1],
  77. "videoPath": line[2],
  78. "coverPath": line[3]
  79. }
  80. for line in content_videos
  81. ]
  82. if len(video_list) >= 3:
  83. return video_list
  84. else:
  85. return None
  86. async def useExistOssPath(self, video_info_list, params):
  87. """
  88. 使用已经存在的视频id
  89. :return:
  90. """
  91. trace_id = params['trace_id']
  92. content_id = params['content_id']
  93. select_sql = f"""
  94. SELECT kimi_title
  95. FROM {db_article}
  96. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  97. """
  98. info = await self.mysql_client.async_select(sql=select_sql)
  99. kimi_title = info[0]
  100. video_id_list = await getNewVideoIds(video_info_list)
  101. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  102. update_sql = f"""
  103. UPDATE {db_article}
  104. SET
  105. kimi_title=%s,
  106. recall_video_id1=%s,
  107. recall_video_id2=%s,
  108. recall_video_id3=%s,
  109. content_status=%s,
  110. process_times = %s
  111. WHERE trace_id = %s
  112. """
  113. await self.mysql_client.async_insert(
  114. sql=update_sql,
  115. params=(
  116. kimi_title,
  117. vid1,
  118. vid2,
  119. vid3,
  120. 2,
  121. int(params['process_times']) + 1,
  122. trace_id
  123. )
  124. )
  125. logging(
  126. code="9002",
  127. info="已从历史文章更新,文章id: {}".format(content_id),
  128. trace_id=trace_id
  129. )
  130. async def processTask(self, params):
  131. """
  132. 异步执行
  133. :param params:
  134. :return:
  135. """
  136. content_id = params['content_id']
  137. trace_id = params['trace_id']
  138. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  139. oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
  140. if oss_path_list:
  141. # 说明已经存在了结果, 将该条记录下的video_oss拿出来
  142. logging(
  143. code="9001",
  144. info="存在历史文章",
  145. trace_id=trace_id
  146. )
  147. try:
  148. await self.useExistOssPath(video_info_list=oss_path_list, params=params)
  149. except Exception as e:
  150. print(e)
  151. else:
  152. pass
  153. async def deal(self):
  154. """
  155. 处理
  156. :return:
  157. """
  158. task_list = await self.getTaskList()
  159. if task_list:
  160. tasks = [self.processTask(params) for params in task_list]
  161. await asyncio.gather(*tasks)
  162. else:
  163. logging(
  164. code="9008",
  165. info="没有要处理的请求"
  166. )