chadui.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. """
  2. @author: luojunhui
  3. """
  4. """
  5. @author: luojunhui
  6. """
  7. import asyncio
  8. from static.config import db_article, db_video
  9. from applications.functions.log import logging
  10. from static.config import mysql_coroutines
  11. class MatchTask5(object):
  12. """
  13. 定时执行任务
  14. """
  15. def __init__(self, mysql_client):
  16. """
  17. :param mysql_client:
  18. """
  19. self.mysql_client = mysql_client
  20. async def get_task(self):
  21. """
  22. 获取任务
  23. :return:
  24. """
  25. select_sql = f"""
  26. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  27. FROM {db_article}
  28. WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
  29. ORDER BY request_time_stamp
  30. DESC
  31. LIMIT {mysql_coroutines};
  32. """
  33. task_list = await self.mysql_client.async_select(sql=select_sql)
  34. task_obj_list = [
  35. {
  36. "trace_id": item[0],
  37. "content_id": item[1],
  38. "gh_id": item[2],
  39. "title": item[3],
  40. "text": item[4],
  41. "content_status": item[5],
  42. "process_times": item[6]
  43. } for item in task_list
  44. ]
  45. print("本次任务获取到 {} 条视频".format(len(task_obj_list)))
  46. # logging(
  47. # code="9001",
  48. # info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  49. # data=task_obj_list
  50. # )
  51. return task_obj_list
  52. async def get_history_videos(self, content_id):
  53. """
  54. check whether the contents videos exists
  55. :param content_id:
  56. :return:
  57. """
  58. select_sql = f"""
  59. SELECT video_id
  60. FROM {db_video}
  61. where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
  62. """
  63. content_videos = await self.mysql_client.async_select(select_sql)
  64. videos = [vid for vid in content_videos]
  65. print(len(videos))
  66. if len(videos) >= 3:
  67. return videos
  68. else:
  69. return None
  70. async def use_exists_contents_videos(self, video_id_list, params):
  71. """
  72. 使用已经存在的视频id
  73. :return:
  74. """
  75. trace_id = params['trace_id']
  76. content_id = params['content_id']
  77. select_sql = f"""
  78. SELECT kimi_title
  79. FROM {db_article}
  80. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  81. """
  82. info = await self.mysql_client.async_select(sql=select_sql)
  83. kimi_title = info[0]
  84. update_sql = f"""
  85. UPDATE {db_article}
  86. SET
  87. kimi_title=%s,
  88. recall_video_id1=%s,
  89. recall_video_id2=%s,
  90. recall_video_id3=%s,
  91. content_status=%s,
  92. process_times = %s
  93. WHERE trace_id = %s
  94. """
  95. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  96. await self.mysql_client.async_insert(
  97. sql=update_sql,
  98. params=(
  99. kimi_title,
  100. video_id_list[0],
  101. "NULL" if vid2 is None else vid2,
  102. "NULL" if vid3 is None else vid3,
  103. 2,
  104. int(params['process_times']) + 1,
  105. trace_id
  106. )
  107. )
  108. logging(
  109. code="9002",
  110. info="已从历史文章更新,文章id: {}".format(content_id),
  111. trace_id=trace_id
  112. )
  113. async def process_task(self, params):
  114. """
  115. 异步执行
  116. :param params:
  117. :return:
  118. """
  119. content_id = params['content_id']
  120. print(content_id)
  121. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  122. video_id_list = await self.get_history_videos(content_id=content_id)
  123. print(video_id_list)
  124. if video_id_list:
  125. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  126. print("存在历史文章")
  127. await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
  128. else:
  129. pass
  130. async def deal(self):
  131. """
  132. 处理
  133. :return:
  134. """
  135. task_list = await self.get_task()
  136. print(len(task_list))
  137. if task_list:
  138. tasks = [self.process_task(params) for params in task_list]
  139. await asyncio.gather(*tasks)
  140. else:
  141. logging(
  142. code="9008",
  143. info="没有要处理的请求"
  144. )