history_task.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from applications.config import Config
  8. from applications.log import logging
  9. from applications.functions.pqFunctions import publishToPQ
  10. from applications.functions.common import shuffleList
  11. class historyContentIdTask(object):
  12. """
  13. 处理已经匹配过小程序的文章
  14. """
  15. def __init__(self, mysql_client):
  16. """
  17. :param mysql_client:
  18. """
  19. self.mysql_client = mysql_client
  20. self.article_text = Config().articleText
  21. self.article_video = Config().articleVideos
  22. self.article_crawler_video = Config().articleCrawlerVideos
  23. self.history_coroutines = Config().getConfigValue("historyArticleCoroutines")
  24. self.gh_id_dict = json.loads(Config().getConfigValue("testAccountLevel2"))
  25. async def getTaskList(self):
  26. """
  27. 获取任务
  28. :return:
  29. """
  30. select_sql1 = f"""
  31. SELECT
  32. ART.trace_id,
  33. ART.content_id,
  34. ART.flow_pool_level,
  35. ART.gh_id,
  36. ART.process_times
  37. FROM {self.article_video} ART
  38. JOIN (
  39. select content_id, count(1) as cnt
  40. from {self.article_crawler_video}
  41. where download_status = 2
  42. group by content_id
  43. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  44. WHERE ART.content_status = 0 and ART.process_times <= 3
  45. ORDER BY request_timestamp
  46. LIMIT {self.history_coroutines};
  47. """
  48. tasks = await self.mysql_client.asyncSelect(sql=select_sql1)
  49. task_obj_list = [
  50. {
  51. "trace_id": item[0],
  52. "content_id": item[1],
  53. "flow_pool_level": item[2],
  54. "gh_id": item[3],
  55. "process_times": item[4]
  56. } for item in tasks
  57. ]
  58. logging(
  59. code="9001",
  60. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  61. data=task_obj_list
  62. )
  63. return task_obj_list
  64. async def getVideoList(self, content_id):
  65. """
  66. content_id
  67. :return:
  68. """
  69. sql = f"""
  70. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  71. FROM {self.article_crawler_video}
  72. WHERE content_id = '{content_id}' and download_status = 2;
  73. """
  74. res_tuple = await self.mysql_client.asyncSelect(sql)
  75. if len(res_tuple) >= 3:
  76. return [
  77. {
  78. "platform": i[0],
  79. "play_count": i[1],
  80. "like_count": i[2],
  81. "video_oss_path": i[3],
  82. "cover_oss_path": i[4],
  83. "uid": i[5]
  84. }
  85. for i in res_tuple]
  86. else:
  87. return []
  88. async def getKimiTitle(self, content_id):
  89. """
  90. 获取 kimiTitle
  91. :param content_id:
  92. :return:
  93. """
  94. select_sql = f"""
  95. select kimi_title from {self.article_text} where content_id = '{content_id}';
  96. """
  97. res_tuple = await self.mysql_client.asyncSelect(select_sql)
  98. if res_tuple:
  99. return res_tuple[0][0]
  100. else:
  101. return False
  102. async def publishVideosToPq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
  103. """
  104. 发布至 pq
  105. :param process_times:
  106. :param trace_id:
  107. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  108. :param gh_id: 公众号 id ---> str
  109. :param kimi_title: kimi 标题 ---> str
  110. :param flow_pool_level: 流量池层级 ---> str
  111. :return:
  112. """
  113. # video_list = download_videos[:3]
  114. match flow_pool_level:
  115. case "autoArticlePoolLevel4":
  116. # 冷启层, 全量做
  117. video_list = shuffleList(download_videos)[:3]
  118. case "autoArticlePoolLevel3":
  119. # 次条,只针对具体账号做
  120. if self.gh_id_dict.get(gh_id):
  121. video_list = shuffleList(download_videos)[:3]
  122. else:
  123. video_list = download_videos[:3]
  124. case "autoArticlePoolLevel2":
  125. video_list = []
  126. case "autoArticlePoolLevel1":
  127. # 头条,先不做
  128. video_list = download_videos[:3]
  129. case _:
  130. print("未传流量池信息")
  131. video_list = download_videos[:3]
  132. L = []
  133. for video_obj in video_list:
  134. params = {
  135. "videoPath": video_obj['video_oss_path'],
  136. "uid": video_obj['uid'],
  137. "title": kimi_title
  138. }
  139. response = await publishToPQ(params)
  140. time.sleep(2)
  141. obj = {
  142. "uid": video_obj['uid'],
  143. "source": video_obj['platform'],
  144. "kimiTitle": kimi_title,
  145. "videoId": response['data']['id'],
  146. "videoCover": response['data']['shareImgPath'],
  147. "videoPath": response['data']['videoPath'],
  148. "videoOss": video_obj['video_oss_path'].split("/")[-1]
  149. }
  150. L.append(obj)
  151. update_sql = f"""
  152. UPDATE {self.article_video}
  153. SET content_status = %s, response = %s, process_times = %s
  154. WHERE trace_id = %s;
  155. """
  156. await self.mysql_client.asyncInsert(
  157. sql=update_sql,
  158. params=(2, json.dumps(L, ensure_ascii=False), process_times + 1, trace_id)
  159. )
  160. logging(
  161. code="9002",
  162. info="已经从历史文章更新",
  163. trace_id=trace_id
  164. )
  165. async def processTask(self, params):
  166. """
  167. 异步执行
  168. :param params:
  169. :return:
  170. """
  171. content_id = params['content_id']
  172. trace_id = params['trace_id']
  173. flow_pool_level = params['flow_pool_level']
  174. gh_id = params['gh_id']
  175. process_times = params['process_times']
  176. # 判断该篇文章是否存在未下架的视频,且判断是否有3条, 如果没有三条,则启动新抓取任务,后续优化点
  177. download_videos = await self.getVideoList(content_id=content_id)
  178. if download_videos:
  179. # 把状态修改为 4
  180. update_sql = f"""
  181. UPDATE {self.article_video}
  182. SET content_status = %s
  183. WHERE trace_id = %s;
  184. """
  185. await self.mysql_client.asyncInsert(
  186. sql=update_sql,
  187. params=(4, trace_id)
  188. )
  189. kimi_title = await self.getKimiTitle(content_id)
  190. if kimi_title:
  191. await self.publishVideosToPq(
  192. flow_pool_level=flow_pool_level,
  193. kimi_title=kimi_title,
  194. gh_id=gh_id,
  195. trace_id=trace_id,
  196. download_videos=download_videos,
  197. process_times=process_times
  198. )
  199. else:
  200. print("Kimi title 生成失败---后续加报警")
  201. else:
  202. pass
  203. async def deal(self):
  204. """
  205. 处理
  206. :return:
  207. """
  208. task_list = await self.getTaskList()
  209. logging(
  210. code="5002",
  211. info="History content_task Task Got {} this time".format(len(task_list)),
  212. function="History Contents Task"
  213. )
  214. if task_list:
  215. tasks = [self.processTask(params) for params in task_list]
  216. await asyncio.gather(*tasks)
  217. else:
  218. print("暂时未获得历史已存在文章")