publish_task.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. """
  2. @author: luojunhui
  3. 发布到 pq 获取视频 id
  4. """
  5. import asyncio
  6. import json
  7. import time
  8. from applications.config import Config
  9. from applications.log import logging
  10. from applications.functions.pqFunctions import publish_to_pq
  11. from applications.functions.common import shuffle_list
  12. class publishTask(object):
  13. """
  14. 在 match_videos 表中, 获取 content_status = 1 的 content_id
  15. 用 content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频
  16. 通过 流量池tag 逻辑
  17. 把 crawler_videos 中的视频路径发布至 pq, 获得 videoId
  18. match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息
  19. """
  20. def __init__(self, mysql_client):
  21. self.mysql_client = mysql_client
  22. self.config = Config()
  23. self.article_match_video_table = self.config.article_match_video_table
  24. self.article_text_table = self.config.article_text_table
  25. self.article_crawler_video_table = self.config.article_crawler_video_table
  26. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  27. async def get_tasks(self):
  28. """
  29. 获取 task
  30. :return:
  31. """
  32. select_sql = f"""
  33. SELECT trace_id, content_id, flow_pool_level, gh_id
  34. FROM {self.article_match_video_table}
  35. WHERE content_status = 3
  36. limit 10;
  37. """
  38. tasks = await self.mysql_client.async_select(select_sql)
  39. if tasks:
  40. return [
  41. {
  42. "trace_id": i[0],
  43. "content_id": i[1],
  44. "flow_pool_level": i[2],
  45. "gh_id": i[3]
  46. }
  47. for i in tasks
  48. ]
  49. else:
  50. return []
  51. async def get_video_list(self, content_id):
  52. """
  53. content_id
  54. :return:
  55. """
  56. sql = f"""
  57. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  58. FROM {self.article_crawler_video_table}
  59. WHERE content_id = '{content_id}' and download_status = 2;
  60. """
  61. res_tuple = await self.mysql_client.async_select(sql)
  62. if len(res_tuple) >= 3:
  63. return [
  64. {
  65. "platform": i[0],
  66. "play_count": i[1],
  67. "like_count": i[2],
  68. "video_oss_path": i[3],
  69. "cover_oss_path": i[4],
  70. "uid": i[5]
  71. }
  72. for i in res_tuple]
  73. else:
  74. return []
  75. async def get_kimi_title(self, content_id):
  76. """
  77. 获取 kimiTitle
  78. :param content_id:
  79. :return:
  80. """
  81. select_sql = f"""
  82. select kimi_title from {self.article_text_table} where content_id = '{content_id}';
  83. """
  84. res_tuple = await self.mysql_client.async_select(select_sql)
  85. if res_tuple:
  86. return res_tuple[0][0]
  87. else:
  88. return False
  89. async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
  90. """
  91. 发布至 pq
  92. :param trace_id:
  93. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  94. :param gh_id: 公众号 id ---> str
  95. :param kimi_title: kimi 标题 ---> str
  96. :param flow_pool_level: 流量池层级 ---> str
  97. :return:
  98. """
  99. publish_success_status = 4
  100. match flow_pool_level:
  101. case "autoArticlePoolLevel4":
  102. # 冷启层, 全量做
  103. video_list = shuffle_list(download_videos)[:3]
  104. case "autoArticlePoolLevel3":
  105. if self.gh_id_dict.get(gh_id):
  106. video_list = shuffle_list(download_videos)[:3]
  107. else:
  108. video_list = download_videos[:3]
  109. case "autoArticlePoolLevel2":
  110. # 次条,只针对具体账号做
  111. video_list = []
  112. case "autoArticlePoolLevel1":
  113. # 头条,先不做
  114. video_list = download_videos[:3]
  115. case _:
  116. print("未传流量池信息")
  117. video_list = download_videos[:3]
  118. L = []
  119. for video_obj in video_list:
  120. params = {
  121. "videoPath": video_obj['video_oss_path'],
  122. "uid": video_obj['uid'],
  123. "title": kimi_title
  124. }
  125. response = await publish_to_pq(params)
  126. time.sleep(2)
  127. obj = {
  128. "uid": video_obj['uid'],
  129. "source": video_obj['platform'],
  130. "kimiTitle": kimi_title,
  131. "videoId": response['data']['id'],
  132. "videoCover": response['data']['shareImgPath'],
  133. "videoPath": response['data']['videoPath'],
  134. "videoOss": video_obj['video_oss_path']
  135. }
  136. L.append(obj)
  137. update_sql = f"""
  138. UPDATE {self.article_match_video_table}
  139. SET content_status = %s, response = %s
  140. WHERE trace_id = %s;
  141. """
  142. await self.mysql_client.async_insert(
  143. sql=update_sql,
  144. params=(publish_success_status, json.dumps(L, ensure_ascii=False), trace_id)
  145. )
  146. async def process_task(self, params):
  147. """
  148. 处理任务
  149. :return:
  150. """
  151. gh_id = params['gh_id']
  152. flow_pool_level = params['flow_pool_level']
  153. content_id = params['content_id']
  154. trace_id = params['trace_id']
  155. download_videos = await self.get_video_list(content_id)
  156. if download_videos:
  157. kimi_title = await self.get_kimi_title(content_id)
  158. await self.publish_videos_to_pq(
  159. flow_pool_level=flow_pool_level,
  160. kimi_title=kimi_title,
  161. gh_id=gh_id,
  162. trace_id=trace_id,
  163. download_videos=download_videos
  164. )
  165. else:
  166. print("该 content_id还未下载完成")
  167. async def deal(self):
  168. """
  169. function
  170. :return:
  171. """
  172. task_list = await self.get_tasks()
  173. logging(
  174. code="5004",
  175. info="PublishTask Got {} this time".format(len(task_list)),
  176. function="Publish Task"
  177. )
  178. if task_list:
  179. tasks = [self.process_task(params) for params in task_list]
  180. await asyncio.gather(*tasks)
  181. else:
  182. logging(
  183. code="9008",
  184. info="没有要处理的请求"
  185. )