history_task.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import traceback
  8. from applications.feishu import bot
  9. from applications.config import Config, HistoryContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  12. from applications.functions.common import shuffle_list
  13. from applications.functions.aigc import record_trace_id
  14. class historyContentIdTask(object):
  15. """
  16. 处理已经匹配过小程序的文章
  17. """
  18. def __init__(self, mysql_client, publish_flag):
  19. """
  20. :param mysql_client:
  21. """
  22. self.mysql_client = mysql_client
  23. self.config = Config()
  24. self.const = HistoryContentIdTaskConst()
  25. self.article_match_video_table = self.config.article_match_video_table
  26. self.article_text_table = self.config.article_text_table
  27. self.article_crawler_video_table = self.config.article_crawler_video_table
  28. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  29. self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
  30. self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
  31. self.publish_flag = publish_flag
  32. async def get_tasks(self):
  33. """
  34. 获取任务
  35. :return:
  36. """
  37. select_sql1 = f"""
  38. SELECT
  39. ART.trace_id,
  40. ART.content_id,
  41. ART.flow_pool_level,
  42. ART.gh_id,
  43. ART.process_times,
  44. ART.publish_flag
  45. FROM {self.article_match_video_table} ART
  46. JOIN (
  47. select content_id, count(1) as cnt
  48. from {self.article_crawler_video_table}
  49. where download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  50. group by content_id
  51. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  52. WHERE ART.content_status = {self.const.TASK_INIT_STATUS} and ART.process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
  53. AND ART.publish_flag = {self.publish_flag}
  54. ORDER BY ART.flow_pool_level, ART.request_timestamp
  55. LIMIT {self.history_coroutines};
  56. """
  57. tasks = await self.mysql_client.async_select(sql=select_sql1)
  58. task_obj_list = [
  59. {
  60. "trace_id": item[0],
  61. "content_id": item[1],
  62. "flow_pool_level": item[2],
  63. "gh_id": item[3],
  64. "process_times": item[4],
  65. "publish_flag": item[5]
  66. } for item in tasks
  67. ]
  68. logging(
  69. code="9001",
  70. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  71. data=task_obj_list
  72. )
  73. return task_obj_list
  74. async def get_video_list(self, content_id):
  75. """
  76. content_id
  77. :return:
  78. """
  79. sql = f"""
  80. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  81. FROM {self.article_crawler_video_table}
  82. WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  83. ORDER BY score DESC;
  84. """
  85. res_tuple = await self.mysql_client.async_select(sql)
  86. if len(res_tuple) >= 3:
  87. return [
  88. {
  89. "platform": i[0],
  90. "play_count": i[1],
  91. "like_count": i[2],
  92. "video_oss_path": i[3],
  93. "cover_oss_path": i[4],
  94. "uid": i[5]
  95. }
  96. for i in res_tuple
  97. ]
  98. else:
  99. return []
  100. async def get_kimi_title(self, content_id):
  101. """
  102. 获取 kimiTitle
  103. :param content_id:
  104. :return:
  105. """
  106. select_sql = f"""
  107. select kimi_title from {self.article_text_table} where content_id = '{content_id}';
  108. """
  109. res_tuple = await self.mysql_client.async_select(select_sql)
  110. if res_tuple:
  111. return res_tuple[0][0]
  112. else:
  113. return False
  114. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  115. """
  116. :param new_content_status:
  117. :param trace_id:
  118. :param ori_content_status:
  119. :return:
  120. """
  121. update_sql = f"""
  122. UPDATE {self.article_match_video_table}
  123. SET content_status = %s, content_status_update_time = %s
  124. WHERE trace_id = %s and content_status = %s;
  125. """
  126. row_counts = await self.mysql_client.async_insert(
  127. sql=update_sql,
  128. params=(
  129. new_content_status,
  130. int(time.time()),
  131. trace_id,
  132. ori_content_status
  133. )
  134. )
  135. return row_counts
  136. async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
  137. """
  138. 发布至 pq
  139. :param process_times:
  140. :param trace_id:
  141. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  142. :param gh_id: 公众号 id ---> str
  143. :param kimi_title: kimi 标题 ---> str
  144. :param flow_pool_level: 流量池层级 ---> str
  145. :return:
  146. """
  147. match flow_pool_level:
  148. case "autoArticlePoolLevel4":
  149. # 冷启层, 全量做
  150. video_list = shuffle_list(download_videos)[:3]
  151. case "autoArticlePoolLevel3":
  152. # 次条,只针对具体账号做
  153. if self.gh_id_dict.get(gh_id):
  154. video_list = shuffle_list(download_videos)[:3]
  155. else:
  156. video_list = download_videos[:3]
  157. case "autoArticlePoolLevel2":
  158. video_list = []
  159. case "autoArticlePoolLevel1":
  160. # 头条,先不做
  161. video_list = download_videos[:3]
  162. case _:
  163. print("未传流量池信息")
  164. video_list = download_videos[:3]
  165. L = []
  166. for video_obj in video_list:
  167. params = {
  168. "videoPath": video_obj['video_oss_path'],
  169. "uid": video_obj['uid'],
  170. "title": kimi_title
  171. }
  172. publish_response = await publish_to_pq(params)
  173. video_id = publish_response['data']['id']
  174. response = await get_pq_video_detail(video_id)
  175. # time.sleep(2)
  176. obj = {
  177. "uid": video_obj['uid'],
  178. "source": video_obj['platform'],
  179. "kimiTitle": kimi_title,
  180. "videoId": response['data'][0]['id'],
  181. "videoCover": response['data'][0]['shareImgPath'],
  182. "videoPath": response['data'][0]['videoPath'],
  183. "videoOss": video_obj['video_oss_path']
  184. }
  185. logging(
  186. code="history1006",
  187. info="视频已经发布到 pq",
  188. trace_id=trace_id,
  189. data=obj
  190. )
  191. L.append(obj)
  192. update_sql = f"""
  193. UPDATE {self.article_match_video_table}
  194. SET content_status = %s, response = %s, process_times = %s
  195. WHERE trace_id = %s and content_status = %s;
  196. """
  197. await self.mysql_client.async_insert(
  198. sql=update_sql,
  199. params=(
  200. self.const.TASK_PUBLISHED_STATUS,
  201. json.dumps(L, ensure_ascii=False),
  202. process_times + 1,
  203. trace_id,
  204. self.const.TASK_PROCESSING_STATUS
  205. )
  206. )
  207. logging(
  208. code="history1007",
  209. info="已经更文章状态为已发布",
  210. trace_id=trace_id,
  211. data=L
  212. )
  213. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_SUCCESS_TRACE_ID_CODE)
  214. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  215. """
  216. 处理失败,回滚至初始状态,处理次数加 1
  217. :param process_times:
  218. :param trace_id:
  219. :return:
  220. """
  221. update_article_sql = f"""
  222. UPDATE {self.article_match_video_table}
  223. SET
  224. content_status = %s,
  225. content_status_update_time = %s,
  226. process_times = %s
  227. WHERE trace_id = %s and content_status = %s;
  228. """
  229. await self.mysql_client.async_insert(
  230. sql=update_article_sql,
  231. params=(
  232. self.const.TASK_INIT_STATUS,
  233. int(time.time()),
  234. process_times + 1,
  235. trace_id,
  236. self.const.TASK_PROCESSING_STATUS
  237. )
  238. )
  239. async def check_title_whether_exit(self, content_id):
  240. """
  241. 校验文章是标题是否晋升 or 退场
  242. :return:
  243. """
  244. sql = f"""
  245. SELECT lat.article_title, cstp.status
  246. FROM long_articles_text lat
  247. JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title
  248. WHERE lat.content_id = '{content_id}';
  249. """
  250. result = await self.mysql_client.async_select(sql)
  251. if result:
  252. status = result[0][1]
  253. if status in {self.const.UP_LEVEL_STATUS, self.const.TITLE_EXIT_STATUS}:
  254. return True
  255. else:
  256. return False
  257. else:
  258. return False
  259. async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
  260. """
  261. 判断该文章的品类是否属于该账号的品类
  262. :param trace_id:
  263. :param content_id:
  264. :param gh_id:
  265. :return:
  266. """
  267. bad_category_list = self.account_negative_category.get(gh_id, [])
  268. logging(
  269. code="history1101",
  270. info="该账号的 negative 类型列表",
  271. trace_id=trace_id,
  272. data=bad_category_list
  273. )
  274. if bad_category_list:
  275. sql = f"""
  276. SELECT category
  277. FROM article_category
  278. WHERE produce_content_id = '{content_id}';
  279. """
  280. result = await self.mysql_client.async_select(sql)
  281. if result:
  282. category = result[0][0]
  283. logging(
  284. code="history1102",
  285. info="文章的品类-{}".format(category),
  286. trace_id=trace_id
  287. )
  288. if category in bad_category_list:
  289. return True
  290. return False
  291. async def process_task(self, params):
  292. """
  293. 异步执行
  294. :param params:
  295. :return:
  296. """
  297. content_id = params['content_id']
  298. trace_id = params['trace_id']
  299. flow_pool_level = params['flow_pool_level']
  300. gh_id = params['gh_id']
  301. process_times = params['process_times']
  302. publish_flag = params['publish_flag']
  303. if flow_pool_level == "autoArticlePoolLevel4":
  304. # 校验文章是否属于该账号的negative 类型
  305. negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id,
  306. trace_id=trace_id)
  307. if negative_category_status:
  308. # 修改状态为品类不匹配状态
  309. logging(
  310. code="history1002",
  311. info="文章属于该账号的negative 类型",
  312. trace_id=trace_id
  313. )
  314. affected_rows = await self.update_content_status(
  315. trace_id=trace_id,
  316. new_content_status=self.const.MISMATCH_STATUS,
  317. ori_content_status=self.const.TASK_INIT_STATUS
  318. )
  319. logging(
  320. code="history1003",
  321. info="已经修改该文章状态为 品类不匹配状态",
  322. trace_id=trace_id
  323. )
  324. if affected_rows == 0:
  325. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  326. return
  327. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
  328. return
  329. # 校验文章是否晋升 or 退场
  330. exit_status = await self.check_title_whether_exit(content_id)
  331. if exit_status:
  332. # 修改状态为退出状态
  333. logging(
  334. code="history1004",
  335. info="文章已经晋升 or 退场",
  336. trace_id=trace_id
  337. )
  338. affected_rows = await self.update_content_status(
  339. trace_id=trace_id,
  340. new_content_status=self.const.EXIT_STATUS,
  341. ori_content_status=self.const.TASK_INIT_STATUS
  342. )
  343. logging(
  344. code="history1005",
  345. info="已经修改该文章状态为 退出状态",
  346. trace_id=trace_id
  347. )
  348. if affected_rows == 0:
  349. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  350. return
  351. await record_trace_id(trace_id=trace_id, status=self.const.RECORD_FAIL_TRACE_ID_CODE)
  352. return
  353. download_videos = await self.get_video_list(content_id=content_id)
  354. # time.sleep(3)
  355. if download_videos:
  356. # 修改状态为执行状态,获取该任务的锁
  357. affected_rows = await self.update_content_status(
  358. trace_id=trace_id,
  359. new_content_status=self.const.TASK_PROCESSING_STATUS,
  360. ori_content_status=self.const.TASK_INIT_STATUS
  361. )
  362. if affected_rows == 0:
  363. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  364. return
  365. if publish_flag == self.const.DO_NOT_NEED_PUBLISH:
  366. logging(
  367. code="3013",
  368. info="长文系统托管, 不进行发布",
  369. trace_id=trace_id
  370. )
  371. # 把状态改为3
  372. await self.update_content_status(
  373. trace_id=trace_id,
  374. new_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
  375. ori_content_status=self.const.TASK_PROCESSING_STATUS
  376. )
  377. return
  378. try:
  379. kimi_title = await self.get_kimi_title(content_id)
  380. await self.publish_videos_to_pq(
  381. flow_pool_level=flow_pool_level,
  382. kimi_title=kimi_title,
  383. gh_id=gh_id,
  384. trace_id=trace_id,
  385. download_videos=download_videos,
  386. process_times=process_times
  387. )
  388. except Exception as e:
  389. logging(
  390. code="history1008",
  391. info="history task 在发布的时候出现异常, error = {}".format(e),
  392. trace_id=trace_id,
  393. data={
  394. "error": str(e),
  395. "traceback": traceback.format_exc()
  396. }
  397. )
  398. bot(
  399. title="history task failed",
  400. detail={
  401. "trace_id": trace_id,
  402. "error": str(e)
  403. },
  404. mention=False
  405. )
  406. await self.roll_back_content_status_when_fails(
  407. trace_id=trace_id,
  408. process_times=process_times
  409. )
  410. else:
  411. return
  412. async def deal(self):
  413. """
  414. 处理
  415. :return:
  416. """
  417. task_list = await self.get_tasks()
  418. logging(
  419. code="history1001",
  420. info="History content_task Task Got {} this time".format(len(task_list)),
  421. function="History Contents Task {}".format(self.publish_flag)
  422. )
  423. if task_list:
  424. a = time.time()
  425. tasks = [self.process_task(params) for params in task_list]
  426. await asyncio.gather(*tasks)
  427. b = time.time()
  428. print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
  429. else:
  430. print("暂时未获得历史已存在文章")