history_task.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import traceback
  8. from applications.feishu import bot
  9. from applications.config import Config
  10. from applications.log import logging
  11. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  12. from applications.functions.common import shuffle_list
  13. class historyContentIdTask(object):
  14. """
  15. 处理已经匹配过小程序的文章
  16. """
  17. TASK_PROCESSING_STATUS = 101
  18. EXIT_STATUS = 97
  19. MISMATCH_STATUS = 96
  20. TASK_INIT_STATUS = 0
  21. TASK_PUBLISHED_STATUS = 4
  22. def __init__(self, mysql_client):
  23. """
  24. :param mysql_client:
  25. """
  26. self.mysql_client = mysql_client
  27. self.config = Config()
  28. self.article_match_video_table = self.config.article_match_video_table
  29. self.article_text_table = self.config.article_text_table
  30. self.article_crawler_video_table = self.config.article_crawler_video_table
  31. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  32. self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
  33. self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
  34. async def get_tasks(self):
  35. """
  36. 获取任务
  37. :return:
  38. """
  39. select_sql1 = f"""
  40. SELECT
  41. ART.trace_id,
  42. ART.content_id,
  43. ART.flow_pool_level,
  44. ART.gh_id,
  45. ART.process_times
  46. FROM {self.article_match_video_table} ART
  47. JOIN (
  48. select content_id, count(1) as cnt
  49. from {self.article_crawler_video_table}
  50. where download_status = 2
  51. group by content_id
  52. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  53. WHERE ART.content_status = 0 and ART.process_times <= 3
  54. ORDER BY ART.flow_pool_level, ART.request_timestamp
  55. LIMIT {self.history_coroutines};
  56. """
  57. tasks = await self.mysql_client.async_select(sql=select_sql1)
  58. task_obj_list = [
  59. {
  60. "trace_id": item[0],
  61. "content_id": item[1],
  62. "flow_pool_level": item[2],
  63. "gh_id": item[3],
  64. "process_times": item[4]
  65. } for item in tasks
  66. ]
  67. logging(
  68. code="9001",
  69. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  70. data=task_obj_list
  71. )
  72. return task_obj_list
  73. async def get_video_list(self, content_id):
  74. """
  75. content_id
  76. :return:
  77. """
  78. sql = f"""
  79. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  80. FROM {self.article_crawler_video_table}
  81. WHERE content_id = '{content_id}' and download_status = 2
  82. ORDER BY score DESC;
  83. """
  84. res_tuple = await self.mysql_client.async_select(sql)
  85. if len(res_tuple) >= 3:
  86. return [
  87. {
  88. "platform": i[0],
  89. "play_count": i[1],
  90. "like_count": i[2],
  91. "video_oss_path": i[3],
  92. "cover_oss_path": i[4],
  93. "uid": i[5]
  94. }
  95. for i in res_tuple
  96. ]
  97. else:
  98. return []
  99. async def get_kimi_title(self, content_id):
  100. """
  101. 获取 kimiTitle
  102. :param content_id:
  103. :return:
  104. """
  105. select_sql = f"""
  106. select kimi_title from {self.article_text_table} where content_id = '{content_id}';
  107. """
  108. res_tuple = await self.mysql_client.async_select(select_sql)
  109. if res_tuple:
  110. return res_tuple[0][0]
  111. else:
  112. return False
  113. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  114. """
  115. :param new_content_status:
  116. :param trace_id:
  117. :param ori_content_status:
  118. :return:
  119. """
  120. update_sql = f"""
  121. UPDATE {self.article_match_video_table}
  122. SET content_status = %s, content_status_update_time = %s
  123. WHERE trace_id = %s and content_status = %s;
  124. """
  125. row_counts = await self.mysql_client.async_insert(
  126. sql=update_sql,
  127. params=(
  128. new_content_status,
  129. int(time.time()),
  130. trace_id,
  131. ori_content_status
  132. )
  133. )
  134. return row_counts
  135. async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
  136. """
  137. 发布至 pq
  138. :param process_times:
  139. :param trace_id:
  140. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  141. :param gh_id: 公众号 id ---> str
  142. :param kimi_title: kimi 标题 ---> str
  143. :param flow_pool_level: 流量池层级 ---> str
  144. :return:
  145. """
  146. match flow_pool_level:
  147. case "autoArticlePoolLevel4":
  148. # 冷启层, 全量做
  149. video_list = shuffle_list(download_videos)[:3]
  150. case "autoArticlePoolLevel3":
  151. # 次条,只针对具体账号做
  152. if self.gh_id_dict.get(gh_id):
  153. video_list = shuffle_list(download_videos)[:3]
  154. else:
  155. video_list = download_videos[:3]
  156. case "autoArticlePoolLevel2":
  157. video_list = []
  158. case "autoArticlePoolLevel1":
  159. # 头条,先不做
  160. video_list = download_videos[:3]
  161. case _:
  162. print("未传流量池信息")
  163. video_list = download_videos[:3]
  164. L = []
  165. for video_obj in video_list:
  166. params = {
  167. "videoPath": video_obj['video_oss_path'],
  168. "uid": video_obj['uid'],
  169. "title": kimi_title
  170. }
  171. publish_response = await publish_to_pq(params)
  172. video_id = publish_response['data']['id']
  173. response = await get_pq_video_detail(video_id)
  174. # time.sleep(2)
  175. obj = {
  176. "uid": video_obj['uid'],
  177. "source": video_obj['platform'],
  178. "kimiTitle": kimi_title,
  179. "videoId": response['data'][0]['id'],
  180. "videoCover": response['data'][0]['shareImgPath'],
  181. "videoPath": response['data'][0]['videoPath'],
  182. "videoOss": video_obj['video_oss_path']
  183. }
  184. logging(
  185. code="history1006",
  186. info="视频已经发布到 pq",
  187. trace_id=trace_id,
  188. data=obj
  189. )
  190. L.append(obj)
  191. update_sql = f"""
  192. UPDATE {self.article_match_video_table}
  193. SET content_status = %s, response = %s, process_times = %s
  194. WHERE trace_id = %s and content_status = %s;
  195. """
  196. await self.mysql_client.async_insert(
  197. sql=update_sql,
  198. params=(
  199. self.TASK_PUBLISHED_STATUS,
  200. json.dumps(L, ensure_ascii=False),
  201. process_times + 1,
  202. trace_id,
  203. self.TASK_PROCESSING_STATUS
  204. )
  205. )
  206. logging(
  207. code="history1007",
  208. info="已经更文章状态为已发布",
  209. trace_id=trace_id,
  210. data=L
  211. )
  212. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  213. """
  214. 处理失败,回滚至初始状态,处理次数加 1
  215. :param process_times:
  216. :param trace_id:
  217. :return:
  218. """
  219. update_article_sql = f"""
  220. UPDATE {self.article_match_video_table}
  221. SET
  222. content_status = %s,
  223. content_status_update_time = %s,
  224. process_times = %s
  225. WHERE trace_id = %s and content_status = %s;
  226. """
  227. await self.mysql_client.async_insert(
  228. sql=update_article_sql,
  229. params=(
  230. self.TASK_INIT_STATUS,
  231. int(time.time()),
  232. process_times + 1,
  233. trace_id,
  234. self.TASK_PROCESSING_STATUS
  235. )
  236. )
  237. async def check_title_whether_exit(self, content_id):
  238. """
  239. 校验文章是标题是否晋升 or 退场
  240. :return:
  241. """
  242. UP_LEVEL_STATUS = 1
  243. TITLE_EXIT_STATUS = -1
  244. sql = f"""
  245. SELECT lat.article_title, cstp.status
  246. FROM long_articles_text lat
  247. JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title
  248. WHERE lat.content_id = '{content_id}';
  249. """
  250. result = await self.mysql_client.async_select(sql)
  251. if result:
  252. status = result[0][1]
  253. if status in {UP_LEVEL_STATUS, TITLE_EXIT_STATUS}:
  254. return True
  255. else:
  256. return False
  257. else:
  258. return False
  259. async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
  260. """
  261. 判断该文章的品类是否属于该账号的品类
  262. :param trace_id:
  263. :param content_id:
  264. :param gh_id:
  265. :return:
  266. """
  267. bad_category_list = self.account_negative_category.get(gh_id, [])
  268. logging(
  269. code="history1101",
  270. info="该账号的 negative 类型列表",
  271. trace_id=trace_id,
  272. data=bad_category_list
  273. )
  274. if bad_category_list:
  275. sql = f"""
  276. SELECT category
  277. FROM article_category
  278. WHERE produce_content_id = '{content_id}';
  279. """
  280. result = await self.mysql_client.async_select(sql)
  281. if result:
  282. category = result[0][0]
  283. logging(
  284. code="history1102",
  285. info="文章的品类-{}".format(category),
  286. trace_id=trace_id
  287. )
  288. if category in bad_category_list:
  289. return True
  290. return False
  291. async def process_task(self, params):
  292. """
  293. 异步执行
  294. :param params:
  295. :return:
  296. """
  297. content_id = params['content_id']
  298. trace_id = params['trace_id']
  299. flow_pool_level = params['flow_pool_level']
  300. gh_id = params['gh_id']
  301. if flow_pool_level == "autoArticlePoolLevel4":
  302. # 校验文章是否属于该账号的negative 类型
  303. negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id)
  304. if negative_category_status:
  305. # 修改状态为品类不匹配状态
  306. logging(
  307. code="history1002",
  308. info="文章属于该账号的negative 类型",
  309. trace_id=trace_id
  310. )
  311. affected_rows = await self.update_content_status(
  312. trace_id=trace_id,
  313. new_content_status=self.MISMATCH_STATUS,
  314. ori_content_status=self.TASK_INIT_STATUS
  315. )
  316. logging(
  317. code="history1003",
  318. info="已经修改该文章状态为 品类不匹配状态",
  319. trace_id=trace_id
  320. )
  321. if affected_rows == 0:
  322. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  323. return
  324. # 校验文章是否晋升 or 退场
  325. exit_status = await self.check_title_whether_exit(content_id)
  326. if exit_status:
  327. # 修改状态为退出状态
  328. logging(
  329. code="history1004",
  330. info="文章已经晋升 or 退场",
  331. trace_id=trace_id
  332. )
  333. affected_rows = await self.update_content_status(
  334. trace_id=trace_id,
  335. new_content_status=self.EXIT_STATUS,
  336. ori_content_status=self.TASK_INIT_STATUS
  337. )
  338. logging(
  339. code="history1005",
  340. info="已经修改该文章状态为 退出状态",
  341. trace_id=trace_id
  342. )
  343. if affected_rows == 0:
  344. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  345. return
  346. gh_id = params['gh_id']
  347. process_times = params['process_times']
  348. download_videos = await self.get_video_list(content_id=content_id)
  349. # time.sleep(3)
  350. if download_videos:
  351. # 修改状态为执行状态,获取该任务的锁
  352. affected_rows = await self.update_content_status(
  353. trace_id=trace_id,
  354. new_content_status=self.TASK_PROCESSING_STATUS,
  355. ori_content_status=self.TASK_INIT_STATUS
  356. )
  357. if affected_rows == 0:
  358. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  359. return
  360. try:
  361. kimi_title = await self.get_kimi_title(content_id)
  362. await self.publish_videos_to_pq(
  363. flow_pool_level=flow_pool_level,
  364. kimi_title=kimi_title,
  365. gh_id=gh_id,
  366. trace_id=trace_id,
  367. download_videos=download_videos,
  368. process_times=process_times
  369. )
  370. except Exception as e:
  371. logging(
  372. code="history1008",
  373. info="history task 在发布的时候出现异常, error = {}".format(e),
  374. trace_id=trace_id,
  375. data={
  376. "error": str(e),
  377. "traceback": traceback.format_exc()
  378. }
  379. )
  380. bot(
  381. title="history task failed",
  382. detail={
  383. "trace_id": trace_id,
  384. "error": str(e)
  385. },
  386. mention=False
  387. )
  388. await self.roll_back_content_status_when_fails(
  389. trace_id=trace_id,
  390. process_times=process_times
  391. )
  392. else:
  393. return
  394. async def deal(self):
  395. """
  396. 处理
  397. :return:
  398. """
  399. task_list = await self.get_tasks()
  400. logging(
  401. code="history1001",
  402. info="History content_task Task Got {} this time".format(len(task_list)),
  403. function="History Contents Task"
  404. )
  405. if task_list:
  406. a = time.time()
  407. tasks = [self.process_task(params) for params in task_list]
  408. await asyncio.gather(*tasks)
  409. b = time.time()
  410. print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
  411. else:
  412. print("暂时未获得历史已存在文章")