new_contentId_task.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.config.const import new_content_id_task as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. GROUP BY content_id
  46. ) t2
  47. ON t1.content_id = t2.content_id
  48. WHERE
  49. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  50. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  51. AND t2.cnt IS NULL
  52. ORDER BY flow_pool_level, request_timestamp
  53. LIMIT {self.spider_coroutines};
  54. """
  55. tasks = await self.long_articles_client.async_select(select_sql)
  56. if tasks:
  57. return [
  58. {
  59. "trace_id": i[0],
  60. "content_id": i[1],
  61. "flow_pool_level": i[2],
  62. "gh_id": i[3],
  63. "process_times": i[4],
  64. "publish_flag": i[5]
  65. }
  66. for i in tasks
  67. ]
  68. else:
  69. return []
  70. async def set_tasks_status_fail(self) -> None:
  71. """
  72. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  73. """
  74. update_status_sql = f"""
  75. UPDATE
  76. {self.article_match_video_table}
  77. SET
  78. content_status = %s
  79. WHERE
  80. process_times > %s and content_status not in (%s, %s);
  81. """
  82. await self.long_articles_client.async_insert(
  83. update_status_sql,
  84. params=(
  85. NewContentIdTaskConst.TASK_FAIL_STATUS,
  86. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  87. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  88. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  89. )
  90. )
  91. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  92. """
  93. 将长时间处于中间状态的任务回滚
  94. """
  95. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  96. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  97. processing_status_tuple = (
  98. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  99. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  100. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  102. )
  103. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  104. processing_status_tuple = (
  105. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  106. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  107. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  108. )
  109. else:
  110. return
  111. select_processing_sql = f"""
  112. SELECT
  113. trace_id, content_status_update_time, process_times, content_status
  114. FROM
  115. {self.article_match_video_table}
  116. WHERE
  117. content_status in {processing_status_tuple}
  118. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  119. and publish_flag = {publish_flag};
  120. """
  121. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  122. if processing_articles:
  123. processing_list = [
  124. {
  125. "trace_id": item[0],
  126. "content_status_update_time": item[1],
  127. "process_times": item[2],
  128. "content_status": item[3]
  129. }
  130. for item in processing_articles
  131. ]
  132. for obj in processing_list:
  133. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  134. # 认为该任务失败
  135. await self.roll_back_content_status_when_fails(
  136. process_times=obj['process_times'] + 1,
  137. trace_id=obj['trace_id'],
  138. ori_content_status=obj['content_status']
  139. )
  140. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  141. """
  142. :param new_content_status:
  143. :param trace_id:
  144. :param ori_content_status:
  145. :return:
  146. """
  147. update_sql = f"""
  148. UPDATE {self.article_match_video_table}
  149. SET content_status = %s, content_status_update_time = %s
  150. WHERE trace_id = %s and content_status = %s;
  151. """
  152. row_counts = await self.long_articles_client.async_insert(
  153. sql=update_sql,
  154. params=(
  155. new_content_status,
  156. int(time.time()),
  157. trace_id,
  158. ori_content_status
  159. )
  160. )
  161. return row_counts
  162. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  163. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  164. """
  165. 处理失败,回滚至初始状态,处理次数加 1
  166. :param process_times:
  167. :param trace_id:
  168. :param ori_content_status:
  169. :return:
  170. """
  171. update_article_sql = f"""
  172. UPDATE {self.article_match_video_table}
  173. SET
  174. content_status = %s,
  175. content_status_update_time = %s,
  176. process_times = %s
  177. WHERE trace_id = %s and content_status = %s;
  178. """
  179. await self.long_articles_client.async_insert(
  180. sql=update_article_sql,
  181. params=(
  182. NewContentIdTaskConst.TASK_INIT_STATUS,
  183. int(time.time()),
  184. process_times + 1,
  185. trace_id,
  186. ori_content_status
  187. )
  188. )
  189. async def judge_whether_same_content_id_is_processing(self, content_id):
  190. """
  191. 同一个 content_id 只需要处理一次
  192. :param content_id:
  193. :return:
  194. success: 4
  195. init: 0
  196. fail: 99
  197. """
  198. select_sql = f"""
  199. SELECT distinct content_status
  200. FROM {self.article_match_video_table}
  201. WHERE content_id = '{content_id}';
  202. """
  203. result = await self.long_articles_client.async_select(select_sql)
  204. if result:
  205. for item in result:
  206. content_status = item[0]
  207. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  208. if content_status in {
  209. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  210. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  212. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  213. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  214. }:
  215. return True
  216. return False
  217. else:
  218. return False
  219. async def get_source_content_id(self, new_content_id):
  220. """
  221. 通过content_id 查找源content_id,并且更新其内容
  222. """
  223. select_channel_id_sql = f"""
  224. SELECT channel_content_id
  225. FROM produce_plan_exe_record
  226. WHERE plan_exe_id = '{new_content_id}';
  227. """
  228. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  229. if channel_content_id:
  230. select_source_content_id_sql = f"""
  231. SELECT root_produce_content_id
  232. FROM article_pool_promotion_source
  233. WHERE channel_content_id = '{channel_content_id[0][0]}';
  234. """
  235. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  236. if source_content_id:
  237. return source_content_id[0][0]
  238. else:
  239. return
  240. else:
  241. return
  242. async def kimi_task(self, params):
  243. """
  244. 执行 kimi 任务
  245. :return:
  246. """
  247. trace_id = params['trace_id']
  248. if params.get("root_content_id"):
  249. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  250. affected_rows = await self.update_content_status(
  251. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  252. trace_id=trace_id,
  253. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  254. )
  255. if affected_rows == 0:
  256. logging(
  257. code="6000",
  258. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  259. )
  260. return
  261. return kimi_result
  262. # 处理content_id
  263. content_id = params['content_id']
  264. process_times = params['process_times']
  265. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  266. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  267. affected_rows = await self.update_content_status(
  268. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  269. trace_id=trace_id,
  270. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  271. )
  272. if affected_rows == 0:
  273. logging(
  274. code="6000",
  275. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  276. )
  277. return
  278. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  279. return kimi_result
  280. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  281. logging(
  282. code="4000",
  283. info="long_articles_text表中未找到 content_id"
  284. )
  285. else:
  286. # 开始处理,讲 content_status 从 0 改为 101
  287. affected_rows = await self.update_content_status(
  288. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  289. trace_id=trace_id,
  290. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  291. )
  292. if affected_rows == 0:
  293. logging(
  294. code="6000",
  295. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  296. )
  297. return
  298. try:
  299. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  300. await self.update_content_status(
  301. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  302. trace_id=trace_id,
  303. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  304. )
  305. return kimi_result
  306. except Exception as e:
  307. # kimi 任务处理失败
  308. update_kimi_sql = f"""
  309. UPDATE {self.article_text_table}
  310. SET
  311. kimi_status = %s
  312. WHERE content_id = %s
  313. """
  314. await self.long_articles_client.async_insert(
  315. sql=update_kimi_sql,
  316. params=(
  317. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  318. content_id
  319. )
  320. )
  321. # 将状态由 101 回退为 0
  322. await self.roll_back_content_status_when_fails(
  323. process_times=process_times,
  324. trace_id=trace_id
  325. )
  326. return {}
  327. async def spider_task(self, params, kimi_result):
  328. """
  329. 爬虫任务
  330. :return:
  331. """
  332. trace_id = params['trace_id']
  333. content_id = params['content_id']
  334. process_times = params['process_times']
  335. gh_id = params['gh_id']
  336. if params.get("root_content_id"):
  337. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  338. await update_crawler_table_with_exist_content_id(
  339. content_id=content_id,
  340. trace_id=trace_id,
  341. article_crawler_video_table=self.article_crawler_video_table,
  342. db_client=self.long_articles_client,
  343. root_content_id=params['root_content_id']
  344. )
  345. affected_rows = await self.update_content_status(
  346. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  347. trace_id=trace_id,
  348. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  349. )
  350. if affected_rows == 0:
  351. return
  352. return True
  353. download_video_exist_flag = await whether_downloaded_videos_exists(
  354. content_id=content_id,
  355. article_crawler_video_table=self.article_crawler_video_table,
  356. db_client=self.long_articles_client
  357. )
  358. if download_video_exist_flag:
  359. await self.update_content_status(
  360. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  361. trace_id=trace_id,
  362. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  363. )
  364. return True
  365. # 开始处理,将状态由 1 改成 101
  366. affected_rows = await self.update_content_status(
  367. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  368. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  369. trace_id=trace_id
  370. )
  371. if affected_rows == 0:
  372. logging(
  373. code="6000",
  374. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  375. )
  376. return False
  377. try:
  378. logging(
  379. code="spider_1001",
  380. info="开始执行搜索任务",
  381. trace_id=trace_id,
  382. data=kimi_result
  383. )
  384. search_videos_count = await search_videos_from_web(
  385. info={
  386. "ori_title": kimi_result['ori_title'],
  387. "kimi_summary": kimi_result['kimi_summary'],
  388. "kimi_keys": kimi_result['kimi_keys'],
  389. "trace_id": trace_id,
  390. "gh_id": gh_id,
  391. "content_id": content_id,
  392. "crawler_video_table": self.article_crawler_video_table
  393. },
  394. gh_id_map=self.account_map,
  395. db_client=self.long_articles_client
  396. )
  397. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  398. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  399. logging(
  400. code="spider_1002",
  401. info="搜索成功",
  402. trace_id=trace_id,
  403. data=kimi_result
  404. )
  405. await self.update_content_status(
  406. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  407. trace_id=trace_id,
  408. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  409. )
  410. return True
  411. else:
  412. logging(
  413. code="spider_1003",
  414. info="搜索失败",
  415. trace_id=trace_id,
  416. data=kimi_result
  417. )
  418. await self.roll_back_content_status_when_fails(
  419. process_times=process_times + 1,
  420. trace_id=trace_id
  421. )
  422. return False
  423. except Exception as e:
  424. await self.roll_back_content_status_when_fails(
  425. process_times=process_times + 1,
  426. trace_id=trace_id
  427. )
  428. print("爬虫处理失败: {}".format(e))
  429. return False
  430. async def etl_task(self, params):
  431. """
  432. download && upload videos
  433. :param params:
  434. :return:
  435. """
  436. trace_id = params['trace_id']
  437. content_id = params['content_id']
  438. process_times = params['process_times']
  439. # 判断视频是否已下载完成
  440. video_exist_flag = await whether_downloaded_videos_exists(
  441. content_id=content_id,
  442. article_crawler_video_table=self.article_crawler_video_table,
  443. db_client=self.long_articles_client
  444. )
  445. if video_exist_flag:
  446. affect_rows = await self.update_content_status(
  447. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  448. trace_id=trace_id,
  449. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  450. )
  451. if affect_rows == 0:
  452. logging(
  453. code="6000",
  454. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  455. )
  456. return False
  457. return True
  458. else:
  459. # 开始处理, 将文章状态修改为处理状态
  460. affected_rows = await self.update_content_status(
  461. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  462. trace_id=trace_id,
  463. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  464. )
  465. if affected_rows == 0:
  466. logging(
  467. code="6000",
  468. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  469. )
  470. return False
  471. # download videos
  472. downloaded_count = await async_download_videos(
  473. trace_id=trace_id,
  474. content_id=content_id,
  475. article_crawler_video_table=self.article_crawler_video_table,
  476. db_client=self.long_articles_client
  477. )
  478. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  479. await self.update_content_status(
  480. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  481. trace_id=trace_id,
  482. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  483. )
  484. return True
  485. else:
  486. await self.roll_back_content_status_when_fails(
  487. process_times=process_times + 1,
  488. trace_id=trace_id
  489. )
  490. return False
  491. async def publish_task(self, params, kimi_title):
  492. """
  493. 发布任务
  494. :param kimi_title:
  495. :param params:
  496. :return:
  497. """
  498. gh_id = params['gh_id']
  499. flow_pool_level = params['flow_pool_level']
  500. content_id = params['content_id']
  501. trace_id = params['trace_id']
  502. process_times = params['process_times']
  503. # 开始处理,将状态修改为操作状态
  504. affected_rows = await self.update_content_status(
  505. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  506. trace_id=trace_id,
  507. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  508. )
  509. if affected_rows == 0:
  510. logging(
  511. code="6000",
  512. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  513. )
  514. return False
  515. try:
  516. download_videos = await get_downloaded_videos(
  517. content_id=content_id,
  518. article_crawler_video_table=self.article_crawler_video_table,
  519. db_client=self.long_articles_client
  520. )
  521. match flow_pool_level:
  522. case "autoArticlePoolLevel4":
  523. # 冷启层, 全量做
  524. video_list = shuffle_list(download_videos)[:3]
  525. case "autoArticlePoolLevel3":
  526. if self.gh_id_dict.get(gh_id):
  527. video_list = shuffle_list(download_videos)[:3]
  528. else:
  529. video_list = download_videos[:3]
  530. case "autoArticlePoolLevel2":
  531. # 次条,只针对具体账号做
  532. video_list = []
  533. case "autoArticlePoolLevel1":
  534. # 头条,先不做
  535. video_list = download_videos[:3]
  536. case _:
  537. video_list = download_videos[:3]
  538. # 将视频发布至票圈
  539. await publish_videos_to_piaoquan(
  540. video_list=video_list,
  541. kimi_title=kimi_title,
  542. process_times=process_times,
  543. trace_id=trace_id,
  544. db_client=self.long_articles_client,
  545. article_match_video_table=self.article_match_video_table
  546. )
  547. except Exception as e:
  548. await self.roll_back_content_status_when_fails(
  549. process_times=params['process_times'] + 1,
  550. trace_id=params['trace_id']
  551. )
  552. print(e)
  553. async def start_process(self, params):
  554. """
  555. 处理单篇文章
  556. :param params:
  557. :return:
  558. """
  559. kimi_result = await self.kimi_task(params)
  560. trace_id = params['trace_id']
  561. process_times = params['process_times']
  562. content_id = params['content_id']
  563. publish_flag = params['publish_flag']
  564. if kimi_result:
  565. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  566. print("kimi success")
  567. logging(
  568. code=3001,
  569. info="kimi success",
  570. trace_id=trace_id
  571. )
  572. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  573. if spider_flag:
  574. # 等待爬虫执行完成后,开始执行 etl_task
  575. print("spider success")
  576. logging(
  577. code=3002,
  578. info="spider_success",
  579. trace_id=trace_id
  580. )
  581. etl_flag = await self.etl_task(params)
  582. if etl_flag:
  583. # 等待下载上传完成,执行发布任务
  584. print("etl success")
  585. logging(
  586. code="3003",
  587. info="etl_success",
  588. trace_id=trace_id
  589. )
  590. """
  591. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  592. 目前先对这两种情况都做托管操作
  593. """
  594. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  595. logging(
  596. code="3013",
  597. info="不需要发布,长文系统托管发布",
  598. trace_id=trace_id
  599. )
  600. return
  601. else:
  602. try:
  603. await self.publish_task(params, kimi_result['kimi_title'])
  604. logging(
  605. code="3004",
  606. info="publish_success",
  607. trace_id=trace_id
  608. )
  609. await record_trace_id(
  610. trace_id=trace_id,
  611. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  612. )
  613. except Exception as e:
  614. logging(
  615. code="6004",
  616. info="publish 失败--{}".format(e),
  617. trace_id=params['trace_id']
  618. )
  619. else:
  620. logging(
  621. code="6003",
  622. info="ETL 处理失败",
  623. trace_id=params['trace_id']
  624. )
  625. else:
  626. logging(
  627. code="6002",
  628. info="爬虫处理失败",
  629. trace_id=params['trace_id']
  630. )
  631. else:
  632. logging(
  633. code="6001",
  634. info="kimi 处理失败",
  635. trace_id=trace_id
  636. )
  637. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  638. logging(
  639. code="6011",
  640. info="kimi处理次数达到上限, 放弃处理",
  641. trace_id=trace_id
  642. )
  643. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  644. update_sql = f"""
  645. UPDATE {self.article_match_video_table}
  646. SET content_status = %s
  647. WHERE content_id = %s and content_status = %s;
  648. """
  649. affected_rows = await self.long_articles_client.async_insert(
  650. sql=update_sql,
  651. params=(
  652. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  653. content_id,
  654. NewContentIdTaskConst.TASK_INIT_STATUS
  655. )
  656. )
  657. # 查询出该content_id所对应的标题
  658. select_sql = f"""
  659. SELECT article_title
  660. FROM {self.article_text_table}
  661. WHERE content_id = '{content_id}';
  662. """
  663. result = await self.long_articles_client.async_select(select_sql)
  664. bot(
  665. title="KIMI 处理失败",
  666. detail={
  667. "content_id": content_id,
  668. "affected_rows": affected_rows,
  669. "article_title": result[0][0]
  670. },
  671. mention=False
  672. )
  673. async def process_each_task(self, params):
  674. """
  675. 处理任务
  676. :return:
  677. """
  678. content_id = params['content_id']
  679. flow_pool_level = params['flow_pool_level']
  680. download_videos_exists_flag = await whether_downloaded_videos_exists(
  681. content_id=content_id,
  682. article_crawler_video_table=self.article_crawler_video_table,
  683. db_client=self.long_articles_client
  684. )
  685. if not download_videos_exists_flag:
  686. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  687. if processing_flag:
  688. logging(
  689. code="9001",
  690. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  691. )
  692. else:
  693. # 判断是否存在root_content_id
  694. if flow_pool_level != 'autoArticlePoolLevel4':
  695. # 如果查到根content_id, 采用根content_id的视频
  696. root_content_id = await self.get_source_content_id(content_id)
  697. if root_content_id:
  698. # 传参新增root_content_id
  699. params['root_content_id'] = root_content_id[0][0]
  700. # 开始处理
  701. await self.start_process(params=params)
  702. else:
  703. print("存在已下载视频")
  704. async def deal(self) -> None:
  705. """
  706. function
  707. :return:
  708. """
  709. # 处理未托管的任务
  710. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  711. # 处理托管任务
  712. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  713. # 将处理次数大于3次且未成功的任务置为失败
  714. await self.set_tasks_status_fail()
  715. # 获取task_list
  716. task_list = await self.get_tasks()
  717. task_dict = {task['content_id']: task for task in task_list}
  718. process_list = list(task_dict.values())
  719. logging(
  720. code="5001",
  721. info="Match Task Got {} this time".format(len(process_list)),
  722. function="Publish Task"
  723. )
  724. # 处理process_list
  725. if process_list:
  726. a = time.time()
  727. tasks = [self.process_each_task(params) for params in process_list]
  728. await asyncio.gather(*tasks)
  729. b = time.time()
  730. print("处理时间: {} s".format(b - a))
  731. else:
  732. logging(
  733. code="9008",
  734. info="没有要处理的请求"
  735. )