chadui.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. """
  2. @author: luojunhui
  3. """
  4. import asyncio
  5. from static.config import db_article, db_video
  6. from applications.functions.log import logging
  7. from static.config import mysql_coroutines
  8. class MatchTask5(object):
  9. """
  10. 定时执行任务
  11. """
  12. def __init__(self, mysql_client):
  13. """
  14. :param mysql_client:
  15. """
  16. self.mysql_client = mysql_client
  17. async def get_task(self):
  18. """
  19. 获取任务
  20. :return:
  21. """
  22. select_sql = f"""
  23. SELECT trace_id, content_id, gh_id, article_title, article_text, content_status, process_times
  24. FROM {db_article}
  25. WHERE content_status = 0 and process_times <= 5 and account_name = '万事如意一家子'
  26. ORDER BY request_time_stamp
  27. DESC
  28. LIMIT {mysql_coroutines};
  29. """
  30. task_list = await self.mysql_client.async_select(sql=select_sql)
  31. task_obj_list = [
  32. {
  33. "trace_id": item[0],
  34. "content_id": item[1],
  35. "gh_id": item[2],
  36. "title": item[3],
  37. "text": item[4],
  38. "content_status": item[5],
  39. "process_times": item[6]
  40. } for item in task_list
  41. ]
  42. print("本次任务获取到 {} 条视频".format(len(task_obj_list)))
  43. # logging(
  44. # code="9001",
  45. # info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  46. # data=task_obj_list
  47. # )
  48. return task_obj_list
  49. async def get_history_videos(self, content_id):
  50. """
  51. check whether the contents videos exists
  52. :param content_id:
  53. :return:
  54. """
  55. select_sql = f"""
  56. SELECT video_id
  57. FROM {db_video}
  58. where content_id = '{content_id}' and video_status = 1 order by request_time DESC;
  59. """
  60. content_videos = await self.mysql_client.async_select(select_sql)
  61. videos = [vid for vid in content_videos]
  62. print(len(videos))
  63. if len(videos) >= 3:
  64. return videos
  65. else:
  66. return None
  67. async def use_exists_contents_videos(self, video_id_list, params):
  68. """
  69. 使用已经存在的视频id
  70. :return:
  71. """
  72. trace_id = params['trace_id']
  73. content_id = params['content_id']
  74. select_sql = f"""
  75. SELECT kimi_title
  76. FROM {db_article}
  77. WHERE content_id = '{content_id}' and kimi_title is not null limit 1;
  78. """
  79. info = await self.mysql_client.async_select(sql=select_sql)
  80. kimi_title = info[0]
  81. update_sql = f"""
  82. UPDATE {db_article}
  83. SET
  84. kimi_title=%s,
  85. recall_video_id1=%s,
  86. recall_video_id2=%s,
  87. recall_video_id3=%s,
  88. content_status=%s,
  89. process_times = %s
  90. WHERE trace_id = %s
  91. """
  92. vid1, vid2, vid3 = video_id_list[0], video_id_list[1], video_id_list[2]
  93. await self.mysql_client.async_insert(
  94. sql=update_sql,
  95. params=(
  96. kimi_title,
  97. video_id_list[0],
  98. "NULL" if vid2 is None else vid2,
  99. "NULL" if vid3 is None else vid3,
  100. 2,
  101. int(params['process_times']) + 1,
  102. trace_id
  103. )
  104. )
  105. logging(
  106. code="9002",
  107. info="已从历史文章更新,文章id: {}".format(content_id),
  108. trace_id=trace_id
  109. )
  110. async def process_task(self, params):
  111. """
  112. 异步执行
  113. :param params:
  114. :return:
  115. """
  116. content_id = params['content_id']
  117. print(content_id)
  118. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  119. video_id_list = await self.get_history_videos(content_id=content_id)
  120. print(video_id_list)
  121. if video_id_list:
  122. # 说明已经存在了结果, 将该条记录下的video_id拿出来
  123. print("存在历史文章")
  124. await self.use_exists_contents_videos(video_id_list=video_id_list, params=params)
  125. else:
  126. pass
  127. async def deal(self):
  128. """
  129. 处理
  130. :return:
  131. """
  132. task_list = await self.get_task()
  133. print(len(task_list))
  134. if task_list:
  135. tasks = [self.process_task(params) for params in task_list]
  136. await asyncio.gather(*tasks)
  137. else:
  138. logging(
  139. code="9008",
  140. info="没有要处理的请求"
  141. )