publish_task.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """
  2. @author: luojunhui
  3. 发布到 pq 获取视频 id
  4. """
  5. import asyncio
  6. import json
  7. import time
  8. from applications.config import Config
  9. from applications.functions.log import logging
  10. from applications.functions.pqFunctions import publishToPQ
  11. class publishTask(object):
  12. """
  13. 在 match_videos 表中, 获取 content_status = 1 的 content_id
  14. 用 content_id 在 crawler_videos 表, 查询 download_status为 2 的视频,表示该 content_id 已经匹配完的视频
  15. 通过 流量池tag 逻辑
  16. 把 crawler_videos 中的视频路径发布至 pq, 获得 videoId
  17. match_videos表将 content_status 修改为 2,response中记录 videoId && ossName等信息
  18. """
  19. def __init__(self, mysql_client):
  20. self.mysql_client = mysql_client
  21. self.article_video = Config().articleVideos
  22. self.article_text = Config().articleText
  23. self.article_crawler_video = Config().articleCrawlerVideos
  24. async def getTasks(self):
  25. """
  26. 获取 task
  27. :return:
  28. """
  29. select_sql = f"""
  30. SELECT trace_id, content_id, flow_pool_level, gh_id
  31. FROM {self.article_video}
  32. WHERE content_status = 1
  33. limit 10;
  34. """
  35. tasks = await self.mysql_client.asyncSelect(select_sql)
  36. if tasks:
  37. return [
  38. {
  39. "trace_id": i[0],
  40. "content_id": i[1],
  41. "flow_pool_level": i[2],
  42. "gh_id": i[3]
  43. }
  44. for i in tasks
  45. ]
  46. else:
  47. return []
  48. async def getVideoList(self, content_id):
  49. """
  50. content_id
  51. :return:
  52. """
  53. sql = f"""
  54. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  55. FROM {self.article_crawler_video}
  56. WHERE content_id = '{content_id}' and download_status = 2;
  57. """
  58. res_tuple = await self.mysql_client.asyncSelect(sql)
  59. if len(res_tuple) >= 3:
  60. return [
  61. {
  62. "platform": i[0],
  63. "play_count": i[1],
  64. "like_count": i[2],
  65. "video_oss_path": i[3],
  66. "cover_oss_path": i[4],
  67. "uid": i[5]
  68. }
  69. for i in res_tuple]
  70. else:
  71. return []
  72. async def getKimiTitle(self, content_id):
  73. """
  74. 获取 kimiTitle
  75. :param content_id:
  76. :return:
  77. """
  78. select_sql = f"""
  79. select kimi_title from {self.article_text} where content_id = '{content_id}';
  80. """
  81. res_tuple = await self.mysql_client.asyncSelect(select_sql)
  82. if res_tuple:
  83. return res_tuple[0][0]
  84. else:
  85. return False
  86. async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos):
  87. """
  88. 发布至 pq
  89. :param trace_id:
  90. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  91. :param gh_id: 公众号 id ---> str
  92. :param kimi_title: kimi 标题 ---> str
  93. :param flow_pool_level: 流量池层级 ---> str
  94. :return:
  95. """
  96. video_list = download_videos[:3]
  97. match flow_pool_level:
  98. case "autoArticlePoolLevel4":
  99. print("冷启层")
  100. video_list = []
  101. case "autoArticlePoolLevel3":
  102. print("暂时未知层")
  103. video_list = []
  104. case "autoArticlePoolLevel2":
  105. print("次条层")
  106. video_list = []
  107. case "autoArticlePoolLevel1":
  108. print("头条层")
  109. video_list = []
  110. L = []
  111. for video_obj in video_list:
  112. params = {
  113. "videoPath": video_obj['video_oss_path'],
  114. "uid": video_obj['uid'],
  115. "title": kimi_title
  116. }
  117. response = await publishToPQ(params)
  118. print(response)
  119. time.sleep(2)
  120. obj = {
  121. "uid": video_obj['uid'],
  122. "source": video_obj['platform'],
  123. "kimiTitle": kimi_title,
  124. "videoId": response['data']['id'],
  125. "videoCover": response['data']['shareImgPath'],
  126. "videoPath": response['data']['videoPath'],
  127. "videoOss": video_obj['video_oss_path'].split("/")[-1]
  128. }
  129. L.append(obj)
  130. update_sql = f"""
  131. UPDATE {self.article_video}
  132. SET content_status = %s, response = %s
  133. WHERE trace_id = %s;
  134. """
  135. await self.mysql_client.asyncInsert(
  136. sql=update_sql,
  137. params=(2, json.dumps(L, ensure_ascii=False), trace_id)
  138. )
  139. async def processTask(self, params):
  140. """
  141. 处理任务
  142. :return:
  143. """
  144. gh_id = params['gh_id']
  145. flow_pool_level = params['flow_pool_level']
  146. content_id = params['content_id']
  147. trace_id = params['trace_id']
  148. download_videos = await self.getVideoList(content_id)
  149. if download_videos:
  150. kimi_title = await self.getKimiTitle(content_id)
  151. if kimi_title:
  152. await self.publishVideosToPq(
  153. flow_pool_level=flow_pool_level,
  154. kimi_title=kimi_title,
  155. gh_id=gh_id,
  156. trace_id=trace_id,
  157. download_videos=download_videos
  158. )
  159. else:
  160. print("Kimi title 生成失败---后续加报警")
  161. else:
  162. print("该 content_id还未下载完成")
  163. async def deal(self):
  164. """
  165. function
  166. :return:
  167. """
  168. task_list = await self.getTasks()
  169. if task_list:
  170. tasks = [self.processTask(params) for params in task_list]
  171. await asyncio.gather(*tasks)
  172. else:
  173. logging(
  174. code="9008",
  175. info="没有要处理的请求"
  176. )