task3.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video, mysql_coroutines
  6. from applications.functions.log import logging
  7. from applications.functions.pqFunctions import *
  8. class MatchTask3(object):
  9. """
  10. 处理已经匹配过小程序的文章
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def getTaskList(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql1 = f"""
  23. SELECT
  24. ART.trace_id,
  25. ART.content_id,
  26. ART.gh_id,
  27. ART.article_title,
  28. ART.article_text,
  29. ART.content_status,
  30. ART.process_times
  31. FROM {db_article} ART
  32. JOIN (
  33. select content_id, count(1) as cnt
  34. from {db_video}
  35. where oss_status = 1
  36. group by content_id
  37. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  38. WHERE ART.content_status = 0 and ART.process_times <= 3
  39. ORDER BY request_time_stamp
  40. LIMIT {mysql_coroutines};
  41. """
  42. tasks = await self.mysql_client.async_select(sql=select_sql1)
  43. task_obj_list = [
  44. {
  45. "trace_id": item[0],
  46. "content_id": item[1],
  47. "gh_id": item[2],
  48. "title": item[3],
  49. "text": item[4],
  50. "content_status": item[5],
  51. "process_times": item[6]
  52. } for item in tasks
  53. ]
  54. logging(
  55. code="9001",
  56. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  57. data=task_obj_list
  58. )
  59. return task_obj_list
  60. async def getHistoryVideoOssPath(self, content_id):
  61. """
  62. check whether the contents videos exists
  63. :param content_id:
  64. :return:
  65. """
  66. select_sql = f"""
  67. SELECT video_title, uid, video_path, cover_path
  68. FROM {db_video}
  69. where content_id = '{content_id}' and oss_status = 1 order by request_time DESC;
  70. """
  71. content_videos = await self.mysql_client.async_select(select_sql)
  72. video_list = [
  73. {
  74. "title": line[0],
  75. "uid": line[1],
  76. "videoPath": line[2],
  77. "coverPath": line[3]
  78. }
  79. for line in content_videos
  80. ]
  81. if len(video_list) >= 3:
  82. return video_list
  83. else:
  84. return None
  85. async def useExistOssPath(self, video_info_list, params):
  86. """
  87. 使用已经存在的视频id
  88. :return:
  89. """
  90. trace_id = params['trace_id']
  91. content_id = params['content_id']
  92. select_sql = f"""
  93. SELECT kimi_title
  94. FROM {db_article}
  95. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  96. """
  97. info = await self.mysql_client.async_select(sql=select_sql)
  98. kimi_title = info[0]
  99. video_id_list = await getNewVideoIds(video_info_list)
  100. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  101. update_sql = f"""
  102. UPDATE {db_article}
  103. SET
  104. kimi_title=%s,
  105. recall_video_id1=%s,
  106. recall_video_id2=%s,
  107. recall_video_id3=%s,
  108. content_status=%s,
  109. process_times = %s
  110. WHERE trace_id = %s
  111. """
  112. await self.mysql_client.async_insert(
  113. sql=update_sql,
  114. params=(
  115. kimi_title,
  116. vid1,
  117. vid2,
  118. vid3,
  119. 2,
  120. int(params['process_times']) + 1,
  121. trace_id
  122. )
  123. )
  124. logging(
  125. code="9002",
  126. info="已从历史文章更新,文章id: {}".format(content_id),
  127. trace_id=trace_id
  128. )
  129. async def processTask(self, params):
  130. """
  131. 异步执行
  132. :param params:
  133. :return:
  134. """
  135. content_id = params['content_id']
  136. trace_id = params['trace_id']
  137. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  138. oss_path_list = await self.getHistoryVideoOssPath(content_id=content_id)
  139. if oss_path_list:
  140. # 说明已经存在了结果, 将该条记录下的video_oss拿出来
  141. logging(
  142. code="9001",
  143. info="存在历史文章",
  144. trace_id=trace_id
  145. )
  146. try:
  147. await self.useExistOssPath(video_info_list=oss_path_list, params=params)
  148. except Exception as e:
  149. print(e)
  150. else:
  151. pass
  152. async def deal(self):
  153. """
  154. 处理
  155. :return:
  156. """
  157. task_list = await self.getTaskList()
  158. if task_list:
  159. tasks = [self.processTask(params) for params in task_list]
  160. await asyncio.gather(*tasks)
  161. else:
  162. logging(
  163. code="9008",
  164. info="没有要处理的请求"
  165. )