history_task.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import traceback
  8. from applications.feishu import bot
  9. from applications.config import Config
  10. from applications.const import HistoryContentIdTaskConst
  11. from applications.log import logging
  12. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  13. from applications.functions.common import shuffle_list
  14. from applications.functions.aigc import record_trace_id
  15. class historyContentIdTask(object):
  16. """
  17. 处理已经匹配过小程序的文章
  18. """
  19. def __init__(self, mysql_client, publish_flag):
  20. """
  21. :param mysql_client:
  22. """
  23. self.mysql_client = mysql_client
  24. self.config = Config()
  25. self.const = HistoryContentIdTaskConst()
  26. self.article_match_video_table = self.config.article_match_video_table
  27. self.article_text_table = self.config.article_text_table
  28. self.article_crawler_video_table = self.config.article_crawler_video_table
  29. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  30. self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
  31. self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
  32. self.publish_flag = publish_flag
  33. async def get_tasks(self):
  34. """
  35. 获取任务
  36. :return:
  37. """
  38. select_sql1 = f"""
  39. SELECT
  40. ART.trace_id,
  41. ART.content_id,
  42. ART.flow_pool_level,
  43. ART.gh_id,
  44. ART.process_times,
  45. ART.publish_flag
  46. FROM {self.article_match_video_table} ART
  47. JOIN (
  48. select content_id, count(1) as cnt
  49. from {self.article_crawler_video_table}
  50. where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  51. group by content_id
  52. ) VID on ART.content_id = VID.content_id and VID.cnt >= {self.const.MIN_VIDEO_NUM}
  53. WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
  54. AND ART.publish_flag = {self.publish_flag}
  55. -- ORDER BY ART.flow_pool_level, ART.request_timestamp
  56. LIMIT {self.history_coroutines};
  57. """
  58. tasks = await self.mysql_client.async_select(sql=select_sql1)
  59. task_obj_list = [
  60. {
  61. "trace_id": item[0],
  62. "content_id": item[1],
  63. "flow_pool_level": item[2],
  64. "gh_id": item[3],
  65. "process_times": item[4],
  66. "publish_flag": item[5]
  67. } for item in tasks
  68. ]
  69. logging(
  70. code="9001",
  71. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  72. data=task_obj_list
  73. )
  74. return task_obj_list
  75. async def get_video_list(self, content_id):
  76. """
  77. content_id
  78. :return:
  79. """
  80. sql = f"""
  81. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  82. FROM {self.article_crawler_video_table}
  83. WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  84. ORDER BY score DESC;
  85. """
  86. res_tuple = await self.mysql_client.async_select(sql)
  87. if len(res_tuple) >= 3:
  88. return [
  89. {
  90. "platform": i[0],
  91. "play_count": i[1],
  92. "like_count": i[2],
  93. "video_oss_path": i[3],
  94. "cover_oss_path": i[4],
  95. "uid": i[5]
  96. }
  97. for i in res_tuple
  98. ]
  99. else:
  100. return []
  101. async def get_kimi_title(self, content_id):
  102. """
  103. 获取 kimiTitle
  104. :param content_id:
  105. :return:
  106. """
  107. select_sql = f"""
  108. select kimi_title from {self.article_text_table} where content_id = '{content_id}';
  109. """
  110. res_tuple = await self.mysql_client.async_select(select_sql)
  111. if res_tuple:
  112. return res_tuple[0][0]
  113. else:
  114. return False
  115. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  116. """
  117. :param new_content_status:
  118. :param trace_id:
  119. :param ori_content_status:
  120. :return:
  121. """
  122. update_sql = f"""
  123. UPDATE {self.article_match_video_table}
  124. SET content_status = %s, content_status_update_time = %s
  125. WHERE trace_id = %s and content_status = %s;
  126. """
  127. row_counts = await self.mysql_client.async_insert(
  128. sql=update_sql,
  129. params=(
  130. new_content_status,
  131. int(time.time()),
  132. trace_id,
  133. ori_content_status
  134. )
  135. )
  136. return row_counts
  137. async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
  138. """
  139. 发布至 pq
  140. :param process_times:
  141. :param trace_id:
  142. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  143. :param gh_id: 公众号 id ---> str
  144. :param kimi_title: kimi 标题 ---> str
  145. :param flow_pool_level: 流量池层级 ---> str
  146. :return:
  147. """
  148. match flow_pool_level:
  149. case "autoArticlePoolLevel4":
  150. # 冷启层, 全量做
  151. video_list = shuffle_list(download_videos)[:3]
  152. case "autoArticlePoolLevel3":
  153. # 次条,只针对具体账号做
  154. if self.gh_id_dict.get(gh_id):
  155. video_list = shuffle_list(download_videos)[:3]
  156. else:
  157. video_list = download_videos[:3]
  158. case "autoArticlePoolLevel2":
  159. video_list = []
  160. case "autoArticlePoolLevel1":
  161. # 头条,先不做
  162. video_list = download_videos[:3]
  163. case _:
  164. print("未传流量池信息")
  165. video_list = download_videos[:3]
  166. L = []
  167. for video_obj in video_list:
  168. params = {
  169. "videoPath": video_obj['video_oss_path'],
  170. "uid": video_obj['uid'],
  171. "title": kimi_title
  172. }
  173. publish_response = await publish_to_pq(params)
  174. video_id = publish_response['data']['id']
  175. response = await get_pq_video_detail(video_id)
  176. # time.sleep(2)
  177. obj = {
  178. "uid": video_obj['uid'],
  179. "source": video_obj['platform'],
  180. "kimiTitle": kimi_title,
  181. "videoId": response['data'][0]['id'],
  182. "videoCover": response['data'][0]['shareImgPath'],
  183. "videoPath": response['data'][0]['videoPath'],
  184. "videoOss": video_obj['video_oss_path']
  185. }
  186. logging(
  187. code="history1006",
  188. info="视频已经发布到 pq",
  189. trace_id=trace_id,
  190. data=obj
  191. )
  192. L.append(obj)
  193. update_sql = f"""
  194. UPDATE {self.article_match_video_table}
  195. SET content_status = %s, response = %s, process_times = %s
  196. WHERE trace_id = %s and content_status = %s;
  197. """
  198. await self.mysql_client.async_insert(
  199. sql=update_sql,
  200. params=(
  201. self.const.TASK_PUBLISHED_STATUS,
  202. json.dumps(L, ensure_ascii=False),
  203. process_times + 1,
  204. trace_id,
  205. self.const.TASK_PROCESSING_STATUS
  206. )
  207. )
  208. logging(
  209. code="history1007",
  210. info="已经更文章状态为已发布",
  211. trace_id=trace_id,
  212. data=L
  213. )
  214. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_SUCCESS_TRACE_ID_CODE)
  215. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  216. """
  217. 处理失败,回滚至初始状态,处理次数加 1
  218. :param process_times:
  219. :param trace_id:
  220. :return:
  221. """
  222. update_article_sql = f"""
  223. UPDATE {self.article_match_video_table}
  224. SET
  225. content_status = %s,
  226. content_status_update_time = %s,
  227. process_times = %s
  228. WHERE trace_id = %s and content_status = %s;
  229. """
  230. await self.mysql_client.async_insert(
  231. sql=update_article_sql,
  232. params=(
  233. self.const.TASK_INIT_STATUS,
  234. int(time.time()),
  235. process_times + 1,
  236. trace_id,
  237. self.const.TASK_PROCESSING_STATUS
  238. )
  239. )
  240. async def check_title_whether_exit(self, content_id):
  241. """
  242. 校验文章是标题是否晋升 or 退场
  243. :return:
  244. """
  245. sql = f"""
  246. SELECT lat.article_title, cstp.status
  247. FROM long_articles_text lat
  248. JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title
  249. WHERE lat.content_id = '{content_id}';
  250. """
  251. result = await self.mysql_client.async_select(sql)
  252. if result:
  253. status = result[0][1]
  254. if status in {self.const.UP_LEVEL_STATUS, self.const.TITLE_EXIT_STATUS}:
  255. return True
  256. else:
  257. return False
  258. else:
  259. return False
  260. async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
  261. """
  262. 判断该文章的品类是否属于该账号的品类
  263. :param trace_id:
  264. :param content_id:
  265. :param gh_id:
  266. :return:
  267. """
  268. bad_category_list = self.account_negative_category.get(gh_id, [])
  269. logging(
  270. code="history1101",
  271. info="该账号的 negative 类型列表",
  272. trace_id=trace_id,
  273. data=bad_category_list
  274. )
  275. if bad_category_list:
  276. sql = f"""
  277. SELECT category
  278. FROM article_category
  279. WHERE produce_content_id = '{content_id}';
  280. """
  281. result = await self.mysql_client.async_select(sql)
  282. if result:
  283. category = result[0][0]
  284. logging(
  285. code="history1102",
  286. info="文章的品类-{}".format(category),
  287. trace_id=trace_id
  288. )
  289. if category in bad_category_list:
  290. return True
  291. return False
  292. async def process_task(self, params):
  293. """
  294. 异步执行
  295. :param params:
  296. :return:
  297. """
  298. content_id = params['content_id']
  299. trace_id = params['trace_id']
  300. flow_pool_level = params['flow_pool_level']
  301. gh_id = params['gh_id']
  302. process_times = params['process_times']
  303. publish_flag = params['publish_flag']
  304. if flow_pool_level == "autoArticlePoolLevel4":
  305. # 校验文章是否属于该账号的negative 类型
  306. negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id,
  307. trace_id=trace_id)
  308. if negative_category_status:
  309. # 修改状态为品类不匹配状态
  310. logging(
  311. code="history1002",
  312. info="文章属于该账号的negative 类型",
  313. trace_id=trace_id
  314. )
  315. affected_rows = await self.update_content_status(
  316. trace_id=trace_id,
  317. new_content_status=self.const.MISMATCH_STATUS,
  318. ori_content_status=self.const.TASK_INIT_STATUS
  319. )
  320. logging(
  321. code="history1003",
  322. info="已经修改该文章状态为 品类不匹配状态",
  323. trace_id=trace_id
  324. )
  325. if affected_rows == 0:
  326. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  327. return
  328. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
  329. return
  330. # 校验文章是否晋升 or 退场
  331. exit_status = await self.check_title_whether_exit(content_id)
  332. if exit_status:
  333. # 修改状态为退出状态
  334. logging(
  335. code="history1004",
  336. info="文章已经晋升 or 退场",
  337. trace_id=trace_id
  338. )
  339. affected_rows = await self.update_content_status(
  340. trace_id=trace_id,
  341. new_content_status=self.const.EXIT_STATUS,
  342. ori_content_status=self.const.TASK_INIT_STATUS
  343. )
  344. logging(
  345. code="history1005",
  346. info="已经修改该文章状态为 退出状态",
  347. trace_id=trace_id
  348. )
  349. if affected_rows == 0:
  350. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  351. return
  352. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
  353. return
  354. download_videos = await self.get_video_list(content_id=content_id)
  355. # time.sleep(3)
  356. if download_videos:
  357. # 修改状态为执行状态,获取该任务的锁
  358. affected_rows = await self.update_content_status(
  359. trace_id=trace_id,
  360. new_content_status=self.const.TASK_PROCESSING_STATUS,
  361. ori_content_status=self.const.TASK_INIT_STATUS
  362. )
  363. if affected_rows == 0:
  364. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  365. return
  366. if publish_flag == self.const.DO_NOT_NEED_PUBLISH:
  367. logging(
  368. code="3013",
  369. info="长文系统托管, 不进行发布",
  370. trace_id=trace_id
  371. )
  372. # 把状态改为3
  373. await self.update_content_status(
  374. trace_id=trace_id,
  375. new_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
  376. ori_content_status=self.const.TASK_PROCESSING_STATUS
  377. )
  378. return
  379. try:
  380. kimi_title = await self.get_kimi_title(content_id)
  381. await self.publish_videos_to_pq(
  382. flow_pool_level=flow_pool_level,
  383. kimi_title=kimi_title,
  384. gh_id=gh_id,
  385. trace_id=trace_id,
  386. download_videos=download_videos,
  387. process_times=process_times
  388. )
  389. except Exception as e:
  390. logging(
  391. code="history1008",
  392. info="history task 在发布的时候出现异常, error = {}".format(e),
  393. trace_id=trace_id,
  394. data={
  395. "error": str(e),
  396. "traceback": traceback.format_exc()
  397. }
  398. )
  399. bot(
  400. title="history task failed",
  401. detail={
  402. "trace_id": trace_id,
  403. "error": str(e)
  404. },
  405. mention=False
  406. )
  407. await self.roll_back_content_status_when_fails(
  408. trace_id=trace_id,
  409. process_times=process_times
  410. )
  411. else:
  412. return
  413. async def deal(self):
  414. """
  415. 处理
  416. :return:
  417. """
  418. task_list = await self.get_tasks()
  419. logging(
  420. code="history1001",
  421. info="History content_task Task Got {} this time".format(len(task_list)),
  422. function="History Contents Task {}".format(self.publish_flag)
  423. )
  424. if task_list:
  425. a = time.time()
  426. tasks = [self.process_task(params) for params in task_list]
  427. await asyncio.gather(*tasks)
  428. b = time.time()
  429. print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
  430. else:
  431. print("暂时未获得历史已存在文章")