task3.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video, mysql_coroutines
  6. from applications.functions.log import logging
  7. from applications.functions.pqFunctions import *
  8. class MatchTask3(object):
  9. """
  10. 处理已经匹配过小程序的文章
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def getTaskList(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql1 = f"""
  23. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  24. FROM {db_article}
  25. WHERE content_status = 0 and process_times <= 3
  26. ORDER BY request_time_stamp
  27. ASC
  28. LIMIT {mysql_coroutines};
  29. """
  30. tasks = await self.mysql_client.async_select(sql=select_sql1)
  31. task_obj_list = [
  32. {
  33. "trace_id": item[0],
  34. "content_id": item[1],
  35. "gh_id": item[2],
  36. "title": item[3],
  37. "text": item[4],
  38. "content_status": item[5],
  39. "process_times": item[6]
  40. } for item in tasks
  41. ]
  42. logging(
  43. code="9001",
  44. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  45. data=task_obj_list
  46. )
  47. return task_obj_list
  48. async def getHistoryVideoOssPath(self, content_id):
  49. """
  50. check whether the contents videos exists
  51. :param content_id:
  52. :return:
  53. """
  54. select_sql = f"""
  55. SELECT video_title, uid, video_path, cover_path
  56. FROM {db_video}
  57. where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
  58. """
  59. content_videos = await self.mysql_client.async_select(select_sql)
  60. video_list = [
  61. {
  62. "title": line[0],
  63. "uid": line[1],
  64. "videoPath": line[2],
  65. "coverPath": line[3]
  66. }
  67. for line in content_videos
  68. ]
  69. if len(video_list) >= 3:
  70. return video_list
  71. else:
  72. return None
  73. async def useExistOssPath(self, video_info_list, params):
  74. """
  75. 使用已经存在的视频id
  76. :return:
  77. """
  78. trace_id = params['trace_id']
  79. content_id = params['content_id']
  80. select_sql = f"""
  81. SELECT kimi_title
  82. FROM {db_article}
  83. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  84. """
  85. info = await self.mysql_client.async_select(sql=select_sql)
  86. kimi_title = info[0]
  87. video_id_list = await getNewVideoIds(video_info_list)
  88. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  89. update_sql = f"""
  90. UPDATE {db_article}
  91. SET
  92. kimi_title=%s,
  93. recall_video_id1=%s,
  94. recall_video_id2=%s,
  95. recall_video_id3=%s,
  96. content_status=%s,
  97. process_times = %s
  98. WHERE trace_id = %s
  99. """
  100. await self.mysql_client.async_insert(
  101. sql=update_sql,
  102. params=(
  103. kimi_title,
  104. vid1,
  105. vid2,
  106. vid3,
  107. 2,
  108. int(params['process_times']) + 1,
  109. trace_id
  110. )
  111. )
  112. logging(
  113. code="9002",
  114. info="已从历史文章更新,文章id: {}".format(content_id),
  115. trace_id=trace_id
  116. )
  117. async def processTask(self, params):
  118. """
  119. 异步执行
  120. :param params:
  121. :return:
  122. """
  123. content_id = params['content_id']
  124. trace_id = params['trace_id']
  125. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  126. oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
  127. if oss_path_list:
  128. # 说明已经存在了结果, 将该条记录下的video_oss拿出来
  129. logging(
  130. code="9001",
  131. info="存在历史文章",
  132. trace_id=trace_id
  133. )
  134. await self.useExistOssPath(video_info_list=oss_path_list, params=params)
  135. else:
  136. pass
  137. async def deal(self):
  138. """
  139. 处理
  140. :return:
  141. """
  142. task_list = await self.getTaskList()
  143. task_dict = {}
  144. for task in task_list:
  145. key = task['content_id']
  146. task_dict[key] = task
  147. process_list = []
  148. for item in task_dict:
  149. process_list.append(task_dict[item])
  150. if process_list:
  151. tasks = [self.processTask(params) for params in process_list]
  152. await asyncio.gather(*tasks)
  153. else:
  154. logging(
  155. code="9008",
  156. info="没有要处理的请求"
  157. )