publish_task.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. """
  2. @author: luojunhui
  3. 发布到 pq 获取视频 id
  4. """
  5. import asyncio
  6. import json
  7. import time
  8. from applications.config import Config
  9. from applications.log import logging
  10. from applications.functions.pqFunctions import publishToPQ
  11. from applications.functions.common import shuffleList
  12. class publishTask(object):
  13. """
  14. 在 match_videos 表中, 获取 content_status = 1 的 content_id
  15. 用 content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频
  16. 通过 流量池tag 逻辑
  17. 把 crawler_videos 中的视频路径发布至 pq, 获得 videoId
  18. match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息
  19. """
  20. def __init__(self, mysql_client):
  21. self.mysql_client = mysql_client
  22. self.article_video = Config().articleVideos
  23. self.article_text = Config().articleText
  24. self.article_crawler_video = Config().articleCrawlerVideos
  25. self.gh_id_dict = json.loads(Config().getConfigValue("testAccountLevel2"))
  26. async def getTasks(self):
  27. """
  28. 获取 task
  29. :return:
  30. """
  31. select_sql = f"""
  32. SELECT trace_id, content_id, flow_pool_level, gh_id
  33. FROM {self.article_video}
  34. WHERE content_status = 1
  35. limit 10;
  36. """
  37. tasks = await self.mysql_client.asyncSelect(select_sql)
  38. if tasks:
  39. return [
  40. {
  41. "trace_id": i[0],
  42. "content_id": i[1],
  43. "flow_pool_level": i[2],
  44. "gh_id": i[3]
  45. }
  46. for i in tasks
  47. ]
  48. else:
  49. return []
  50. async def getVideoList(self, content_id):
  51. """
  52. content_id
  53. :return:
  54. """
  55. sql = f"""
  56. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  57. FROM {self.article_crawler_video}
  58. WHERE content_id = '{content_id}' and download_status = 2;
  59. """
  60. res_tuple = await self.mysql_client.asyncSelect(sql)
  61. if len(res_tuple) >= 3:
  62. return [
  63. {
  64. "platform": i[0],
  65. "play_count": i[1],
  66. "like_count": i[2],
  67. "video_oss_path": i[3],
  68. "cover_oss_path": i[4],
  69. "uid": i[5]
  70. }
  71. for i in res_tuple]
  72. else:
  73. return []
  74. async def getKimiTitle(self, content_id):
  75. """
  76. 获取 kimiTitle
  77. :param content_id:
  78. :return:
  79. """
  80. select_sql = f"""
  81. select kimi_title from {self.article_text} where content_id = '{content_id}';
  82. """
  83. res_tuple = await self.mysql_client.asyncSelect(select_sql)
  84. if res_tuple:
  85. return res_tuple[0][0]
  86. else:
  87. return False
  88. async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
  89. """
  90. 发布至 pq
  91. :param trace_id:
  92. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  93. :param gh_id: 公众号 id ---> str
  94. :param kimi_title: kimi 标题 ---> str
  95. :param flow_pool_level: 流量池层级 ---> str
  96. :return:
  97. """
  98. # video_list = download_videos[:3]
  99. match flow_pool_level:
  100. case "autoArticlePoolLevel4":
  101. # 冷启层, 全量做
  102. video_list = shuffleList(download_videos)[:3]
  103. case "autoArticlePoolLevel3":
  104. if self.gh_id_dict.get(gh_id):
  105. video_list = shuffleList(download_videos)[:3]
  106. else:
  107. video_list = download_videos[:3]
  108. case "autoArticlePoolLevel2":
  109. # 次条,只针对具体账号做
  110. video_list = []
  111. case "autoArticlePoolLevel1":
  112. # 头条,先不做
  113. video_list = download_videos[:3]
  114. case _:
  115. print("未传流量池信息")
  116. video_list = download_videos[:3]
  117. L = []
  118. for video_obj in video_list:
  119. params = {
  120. "videoPath": video_obj['video_oss_path'],
  121. "uid": video_obj['uid'],
  122. "title": kimi_title
  123. }
  124. response = await publishToPQ(params)
  125. time.sleep(2)
  126. obj = {
  127. "uid": video_obj['uid'],
  128. "source": video_obj['platform'],
  129. "kimiTitle": kimi_title,
  130. "videoId": response['data']['id'],
  131. "videoCover": response['data']['shareImgPath'],
  132. "videoPath": response['data']['videoPath'],
  133. "videoOss": video_obj['video_oss_path'].split("/")[-1]
  134. }
  135. L.append(obj)
  136. update_sql = f"""
  137. UPDATE {self.article_video}
  138. SET content_status = %s, response = %s
  139. WHERE trace_id = %s;
  140. """
  141. await self.mysql_client.asyncInsert(
  142. sql=update_sql,
  143. params=(2, json.dumps(L, ensure_ascii=False), trace_id)
  144. )
  145. async def processTask(self, params):
  146. """
  147. 处理任务
  148. :return:
  149. """
  150. gh_id = params['gh_id']
  151. flow_pool_level = params['flow_pool_level']
  152. content_id = params['content_id']
  153. trace_id = params['trace_id']
  154. download_videos = await self.getVideoList(content_id)
  155. if download_videos:
  156. kimi_title = await self.getKimiTitle(content_id)
  157. if kimi_title:
  158. await self.publishVideosToPq(
  159. flow_pool_level=flow_pool_level,
  160. kimi_title=kimi_title,
  161. gh_id=gh_id,
  162. trace_id=trace_id,
  163. download_videos=download_videos
  164. )
  165. else:
  166. print("Kimi title 生成失败---后续加报警")
  167. else:
  168. print("该 content_id还未下载完成")
  169. async def deal(self):
  170. """
  171. function
  172. :return:
  173. """
  174. task_list = await self.getTasks()
  175. logging(
  176. code="5004",
  177. info="PublishTask Got {} this time".format(len(task_list)),
  178. function="Publish Task"
  179. )
  180. if task_list:
  181. tasks = [self.processTask(params) for params in task_list]
  182. await asyncio.gather(*tasks)
  183. else:
  184. logging(
  185. code="9008",
  186. info="没有要处理的请求"
  187. )