history_task.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import traceback
  8. from applications.feishu import bot
  9. from applications.config import Config
  10. from applications.log import logging
  11. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  12. from applications.functions.common import shuffle_list
  13. from applications.functions.aigc import record_trace_id
  14. class historyContentIdTask(object):
  15. """
  16. 处理已经匹配过小程序的文章
  17. """
  18. TASK_PROCESSING_STATUS = 101
  19. EXIT_STATUS = 97
  20. MISMATCH_STATUS = 96
  21. TASK_INIT_STATUS = 0
  22. TASK_PUBLISHED_STATUS = 4
  23. RECORD_SUCCESS_TRACE_ID_CODE = 2
  24. RECORD_FAIL_TRACE_ID_CODE = 3
  25. def __init__(self, mysql_client):
  26. """
  27. :param mysql_client:
  28. """
  29. self.mysql_client = mysql_client
  30. self.config = Config()
  31. self.article_match_video_table = self.config.article_match_video_table
  32. self.article_text_table = self.config.article_text_table
  33. self.article_crawler_video_table = self.config.article_crawler_video_table
  34. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  35. self.history_coroutines = self.config.get_config_value("historyArticleCoroutines")
  36. self.account_negative_category = json.loads(self.config.get_config_value("account_negative_category"))
  37. async def get_tasks(self):
  38. """
  39. 获取任务
  40. :return:
  41. """
  42. select_sql1 = f"""
  43. SELECT
  44. ART.trace_id,
  45. ART.content_id,
  46. ART.flow_pool_level,
  47. ART.gh_id,
  48. ART.process_times
  49. FROM {self.article_match_video_table} ART
  50. JOIN (
  51. select content_id, count(1) as cnt
  52. from {self.article_crawler_video_table}
  53. where download_status = 2
  54. group by content_id
  55. ) VID on ART.content_id = VID.content_id and VID.cnt >= 3
  56. WHERE ART.content_status = 0 and ART.process_times <= 3
  57. ORDER BY ART.flow_pool_level, ART.request_timestamp
  58. LIMIT {self.history_coroutines};
  59. """
  60. tasks = await self.mysql_client.async_select(sql=select_sql1)
  61. task_obj_list = [
  62. {
  63. "trace_id": item[0],
  64. "content_id": item[1],
  65. "flow_pool_level": item[2],
  66. "gh_id": item[3],
  67. "process_times": item[4]
  68. } for item in tasks
  69. ]
  70. logging(
  71. code="9001",
  72. info="本次任务获取到 {} 条视频".format(len(task_obj_list)),
  73. data=task_obj_list
  74. )
  75. return task_obj_list
  76. async def get_video_list(self, content_id):
  77. """
  78. content_id
  79. :return:
  80. """
  81. sql = f"""
  82. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  83. FROM {self.article_crawler_video_table}
  84. WHERE content_id = '{content_id}' and download_status = 2
  85. ORDER BY score DESC;
  86. """
  87. res_tuple = await self.mysql_client.async_select(sql)
  88. if len(res_tuple) >= 3:
  89. return [
  90. {
  91. "platform": i[0],
  92. "play_count": i[1],
  93. "like_count": i[2],
  94. "video_oss_path": i[3],
  95. "cover_oss_path": i[4],
  96. "uid": i[5]
  97. }
  98. for i in res_tuple
  99. ]
  100. else:
  101. return []
  102. async def get_kimi_title(self, content_id):
  103. """
  104. 获取 kimiTitle
  105. :param content_id:
  106. :return:
  107. """
  108. select_sql = f"""
  109. select kimi_title from {self.article_text_table} where content_id = '{content_id}';
  110. """
  111. res_tuple = await self.mysql_client.async_select(select_sql)
  112. if res_tuple:
  113. return res_tuple[0][0]
  114. else:
  115. return False
  116. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  117. """
  118. :param new_content_status:
  119. :param trace_id:
  120. :param ori_content_status:
  121. :return:
  122. """
  123. update_sql = f"""
  124. UPDATE {self.article_match_video_table}
  125. SET content_status = %s, content_status_update_time = %s
  126. WHERE trace_id = %s and content_status = %s;
  127. """
  128. row_counts = await self.mysql_client.async_insert(
  129. sql=update_sql,
  130. params=(
  131. new_content_status,
  132. int(time.time()),
  133. trace_id,
  134. ori_content_status
  135. )
  136. )
  137. return row_counts
  138. async def publish_videos_to_pq(self, trace_id, flow_pool_level, kimi_title, gh_id, download_videos, process_times):
  139. """
  140. 发布至 pq
  141. :param process_times:
  142. :param trace_id:
  143. :param download_videos: 已下载的视频---> list [{}, {}, {}.... ]
  144. :param gh_id: 公众号 id ---> str
  145. :param kimi_title: kimi 标题 ---> str
  146. :param flow_pool_level: 流量池层级 ---> str
  147. :return:
  148. """
  149. match flow_pool_level:
  150. case "autoArticlePoolLevel4":
  151. # 冷启层, 全量做
  152. video_list = shuffle_list(download_videos)[:3]
  153. case "autoArticlePoolLevel3":
  154. # 次条,只针对具体账号做
  155. if self.gh_id_dict.get(gh_id):
  156. video_list = shuffle_list(download_videos)[:3]
  157. else:
  158. video_list = download_videos[:3]
  159. case "autoArticlePoolLevel2":
  160. video_list = []
  161. case "autoArticlePoolLevel1":
  162. # 头条,先不做
  163. video_list = download_videos[:3]
  164. case _:
  165. print("未传流量池信息")
  166. video_list = download_videos[:3]
  167. L = []
  168. for video_obj in video_list:
  169. params = {
  170. "videoPath": video_obj['video_oss_path'],
  171. "uid": video_obj['uid'],
  172. "title": kimi_title
  173. }
  174. publish_response = await publish_to_pq(params)
  175. video_id = publish_response['data']['id']
  176. response = await get_pq_video_detail(video_id)
  177. # time.sleep(2)
  178. obj = {
  179. "uid": video_obj['uid'],
  180. "source": video_obj['platform'],
  181. "kimiTitle": kimi_title,
  182. "videoId": response['data'][0]['id'],
  183. "videoCover": response['data'][0]['shareImgPath'],
  184. "videoPath": response['data'][0]['videoPath'],
  185. "videoOss": video_obj['video_oss_path']
  186. }
  187. logging(
  188. code="history1006",
  189. info="视频已经发布到 pq",
  190. trace_id=trace_id,
  191. data=obj
  192. )
  193. L.append(obj)
  194. update_sql = f"""
  195. UPDATE {self.article_match_video_table}
  196. SET content_status = %s, response = %s, process_times = %s
  197. WHERE trace_id = %s and content_status = %s;
  198. """
  199. await self.mysql_client.async_insert(
  200. sql=update_sql,
  201. params=(
  202. self.TASK_PUBLISHED_STATUS,
  203. json.dumps(L, ensure_ascii=False),
  204. process_times + 1,
  205. trace_id,
  206. self.TASK_PROCESSING_STATUS
  207. )
  208. )
  209. logging(
  210. code="history1007",
  211. info="已经更文章状态为已发布",
  212. trace_id=trace_id,
  213. data=L
  214. )
  215. await record_trace_id(trace_id=trace_id, status=self.RECORD_SUCCESS_TRACE_ID_CODE)
  216. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  217. """
  218. 处理失败,回滚至初始状态,处理次数加 1
  219. :param process_times:
  220. :param trace_id:
  221. :return:
  222. """
  223. update_article_sql = f"""
  224. UPDATE {self.article_match_video_table}
  225. SET
  226. content_status = %s,
  227. content_status_update_time = %s,
  228. process_times = %s
  229. WHERE trace_id = %s and content_status = %s;
  230. """
  231. await self.mysql_client.async_insert(
  232. sql=update_article_sql,
  233. params=(
  234. self.TASK_INIT_STATUS,
  235. int(time.time()),
  236. process_times + 1,
  237. trace_id,
  238. self.TASK_PROCESSING_STATUS
  239. )
  240. )
  241. async def check_title_whether_exit(self, content_id):
  242. """
  243. 校验文章是标题是否晋升 or 退场
  244. :return:
  245. """
  246. UP_LEVEL_STATUS = 1
  247. TITLE_EXIT_STATUS = -1
  248. sql = f"""
  249. SELECT lat.article_title, cstp.status
  250. FROM long_articles_text lat
  251. JOIN cold_start_title_pool cstp ON lat.article_title = cstp.title
  252. WHERE lat.content_id = '{content_id}';
  253. """
  254. result = await self.mysql_client.async_select(sql)
  255. if result:
  256. status = result[0][1]
  257. if status in {UP_LEVEL_STATUS, TITLE_EXIT_STATUS}:
  258. return True
  259. else:
  260. return False
  261. else:
  262. return False
  263. async def check_title_category(self, content_id, gh_id, trace_id) -> bool:
  264. """
  265. 判断该文章的品类是否属于该账号的品类
  266. :param trace_id:
  267. :param content_id:
  268. :param gh_id:
  269. :return:
  270. """
  271. bad_category_list = self.account_negative_category.get(gh_id, [])
  272. logging(
  273. code="history1101",
  274. info="该账号的 negative 类型列表",
  275. trace_id=trace_id,
  276. data=bad_category_list
  277. )
  278. if bad_category_list:
  279. sql = f"""
  280. SELECT category
  281. FROM article_category
  282. WHERE produce_content_id = '{content_id}';
  283. """
  284. result = await self.mysql_client.async_select(sql)
  285. if result:
  286. category = result[0][0]
  287. logging(
  288. code="history1102",
  289. info="文章的品类-{}".format(category),
  290. trace_id=trace_id
  291. )
  292. if category in bad_category_list:
  293. return True
  294. return False
  295. async def process_task(self, params):
  296. """
  297. 异步执行
  298. :param params:
  299. :return:
  300. """
  301. content_id = params['content_id']
  302. trace_id = params['trace_id']
  303. flow_pool_level = params['flow_pool_level']
  304. gh_id = params['gh_id']
  305. if flow_pool_level == "autoArticlePoolLevel4":
  306. # 校验文章是否属于该账号的negative 类型
  307. negative_category_status = await self.check_title_category(content_id=content_id, gh_id=gh_id, trace_id=trace_id)
  308. if negative_category_status:
  309. # 修改状态为品类不匹配状态
  310. logging(
  311. code="history1002",
  312. info="文章属于该账号的negative 类型",
  313. trace_id=trace_id
  314. )
  315. affected_rows = await self.update_content_status(
  316. trace_id=trace_id,
  317. new_content_status=self.MISMATCH_STATUS,
  318. ori_content_status=self.TASK_INIT_STATUS
  319. )
  320. logging(
  321. code="history1003",
  322. info="已经修改该文章状态为 品类不匹配状态",
  323. trace_id=trace_id
  324. )
  325. if affected_rows == 0:
  326. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  327. return
  328. await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
  329. return
  330. # 校验文章是否晋升 or 退场
  331. exit_status = await self.check_title_whether_exit(content_id)
  332. if exit_status:
  333. # 修改状态为退出状态
  334. logging(
  335. code="history1004",
  336. info="文章已经晋升 or 退场",
  337. trace_id=trace_id
  338. )
  339. affected_rows = await self.update_content_status(
  340. trace_id=trace_id,
  341. new_content_status=self.EXIT_STATUS,
  342. ori_content_status=self.TASK_INIT_STATUS
  343. )
  344. logging(
  345. code="history1005",
  346. info="已经修改该文章状态为 退出状态",
  347. trace_id=trace_id
  348. )
  349. if affected_rows == 0:
  350. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  351. return
  352. await record_trace_id(trace_id=trace_id, status=self.RECORD_FAIL_TRACE_ID_CODE)
  353. return
  354. gh_id = params['gh_id']
  355. process_times = params['process_times']
  356. download_videos = await self.get_video_list(content_id=content_id)
  357. # time.sleep(3)
  358. if download_videos:
  359. # 修改状态为执行状态,获取该任务的锁
  360. affected_rows = await self.update_content_status(
  361. trace_id=trace_id,
  362. new_content_status=self.TASK_PROCESSING_STATUS,
  363. ori_content_status=self.TASK_INIT_STATUS
  364. )
  365. if affected_rows == 0:
  366. print("修改行数为 0,多个进程抢占同一个 task, 抢占失败,进程退出")
  367. return
  368. try:
  369. kimi_title = await self.get_kimi_title(content_id)
  370. await self.publish_videos_to_pq(
  371. flow_pool_level=flow_pool_level,
  372. kimi_title=kimi_title,
  373. gh_id=gh_id,
  374. trace_id=trace_id,
  375. download_videos=download_videos,
  376. process_times=process_times
  377. )
  378. except Exception as e:
  379. logging(
  380. code="history1008",
  381. info="history task 在发布的时候出现异常, error = {}".format(e),
  382. trace_id=trace_id,
  383. data={
  384. "error": str(e),
  385. "traceback": traceback.format_exc()
  386. }
  387. )
  388. bot(
  389. title="history task failed",
  390. detail={
  391. "trace_id": trace_id,
  392. "error": str(e)
  393. },
  394. mention=False
  395. )
  396. await self.roll_back_content_status_when_fails(
  397. trace_id=trace_id,
  398. process_times=process_times
  399. )
  400. else:
  401. return
  402. async def deal(self):
  403. """
  404. 处理
  405. :return:
  406. """
  407. task_list = await self.get_tasks()
  408. logging(
  409. code="history1001",
  410. info="History content_task Task Got {} this time".format(len(task_list)),
  411. function="History Contents Task"
  412. )
  413. if task_list:
  414. a = time.time()
  415. tasks = [self.process_task(params) for params in task_list]
  416. await asyncio.gather(*tasks)
  417. b = time.time()
  418. print("{} s 内处理了{}个任务".format(b - a, len(task_list)))
  419. else:
  420. print("暂时未获得历史已存在文章")