new_contentId_task.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. GROUP BY content_id
  46. ) t2
  47. ON t1.content_id = t2.content_id
  48. WHERE
  49. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  50. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  51. AND t2.cnt IS NULL
  52. ORDER BY flow_pool_level, request_timestamp
  53. LIMIT {self.spider_coroutines};
  54. """
  55. tasks = await self.long_articles_client.async_select(select_sql)
  56. if tasks:
  57. return [
  58. {
  59. "trace_id": i[0],
  60. "content_id": i[1],
  61. "flow_pool_level": i[2],
  62. "gh_id": i[3],
  63. "process_times": i[4],
  64. "publish_flag": i[5]
  65. }
  66. for i in tasks
  67. ]
  68. else:
  69. return []
  70. async def set_tasks_status_fail(self) -> None:
  71. """
  72. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  73. """
  74. update_status_sql = f"""
  75. UPDATE
  76. {self.article_match_video_table}
  77. SET
  78. content_status = %s
  79. WHERE
  80. process_times > %s and content_status not in (%s, %s);
  81. """
  82. await self.long_articles_client.async_insert(
  83. update_status_sql,
  84. params=(
  85. NewContentIdTaskConst.TASK_FAIL_STATUS,
  86. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  87. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  88. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  89. )
  90. )
  91. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  92. """
  93. 将长时间处于中间状态的任务回滚
  94. """
  95. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  96. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  97. processing_status_tuple = (
  98. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  99. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  100. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  102. )
  103. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  104. processing_status_tuple = (
  105. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  106. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  107. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  108. )
  109. else:
  110. return
  111. select_processing_sql = f"""
  112. SELECT
  113. trace_id, content_status_update_time, process_times, content_status
  114. FROM
  115. {self.article_match_video_table}
  116. WHERE
  117. content_status in {processing_status_tuple}
  118. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  119. and publish_flag = {publish_flag};
  120. """
  121. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  122. if processing_articles:
  123. processing_list = [
  124. {
  125. "trace_id": item[0],
  126. "content_status_update_time": item[1],
  127. "process_times": item[2],
  128. "content_status": item[3]
  129. }
  130. for item in processing_articles
  131. ]
  132. for obj in processing_list:
  133. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  134. # 认为该任务失败
  135. await self.roll_back_content_status_when_fails(
  136. process_times=obj['process_times'] + 1,
  137. trace_id=obj['trace_id'],
  138. ori_content_status=obj['content_status']
  139. )
  140. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  141. """
  142. :param new_content_status:
  143. :param trace_id:
  144. :param ori_content_status:
  145. :return:
  146. """
  147. update_sql = f"""
  148. UPDATE {self.article_match_video_table}
  149. SET content_status = %s, content_status_update_time = %s
  150. WHERE trace_id = %s and content_status = %s;
  151. """
  152. row_counts = await self.long_articles_client.async_insert(
  153. sql=update_sql,
  154. params=(
  155. new_content_status,
  156. int(time.time()),
  157. trace_id,
  158. ori_content_status
  159. )
  160. )
  161. return row_counts
  162. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  163. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  164. """
  165. 处理失败,回滚至初始状态,处理次数加 1
  166. :param process_times:
  167. :param trace_id:
  168. :param ori_content_status:
  169. :return:
  170. """
  171. update_article_sql = f"""
  172. UPDATE {self.article_match_video_table}
  173. SET
  174. content_status = %s,
  175. content_status_update_time = %s,
  176. process_times = %s
  177. WHERE trace_id = %s and content_status = %s;
  178. """
  179. await self.long_articles_client.async_insert(
  180. sql=update_article_sql,
  181. params=(
  182. NewContentIdTaskConst.TASK_INIT_STATUS,
  183. int(time.time()),
  184. process_times + 1,
  185. trace_id,
  186. ori_content_status
  187. )
  188. )
  189. async def judge_whether_same_content_id_is_processing(self, content_id):
  190. """
  191. 同一个 content_id 只需要处理一次
  192. :param content_id:
  193. :return:
  194. success: 4
  195. init: 0
  196. fail: 99
  197. """
  198. select_sql = f"""
  199. SELECT distinct content_status
  200. FROM {self.article_match_video_table}
  201. WHERE content_id = '{content_id}';
  202. """
  203. result = await self.long_articles_client.async_select(select_sql)
  204. if result:
  205. for item in result:
  206. content_status = item[0]
  207. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  208. if content_status in {
  209. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  210. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  212. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  213. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  214. }:
  215. return True
  216. return False
  217. else:
  218. return False
  219. async def get_source_content_id(self, new_content_id):
  220. """
  221. 通过content_id 查找源content_id,并且更新其内容
  222. """
  223. select_channel_id_sql = f"""
  224. SELECT channel_content_id
  225. FROM produce_plan_exe_record
  226. WHERE plan_exe_id = '{new_content_id}';
  227. """
  228. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  229. if channel_content_id:
  230. select_source_content_id_sql = f"""
  231. SELECT root_produce_content_id
  232. FROM article_pool_promotion_source
  233. WHERE channel_content_id = '{channel_content_id[0][0]}';
  234. """
  235. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  236. if source_content_id:
  237. return source_content_id[0][0]
  238. else:
  239. return
  240. else:
  241. return
  242. async def kimi_task(self, params):
  243. """
  244. 执行 kimi 任务
  245. :return:
  246. """
  247. trace_id = params['trace_id']
  248. if params.get("root_content_id"):
  249. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  250. affected_rows = await self.update_content_status(
  251. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  252. trace_id=trace_id,
  253. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  254. )
  255. if affected_rows == 0:
  256. logging(
  257. code="6000",
  258. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  259. )
  260. return
  261. logging(
  262. code="8023",
  263. function="kimi_task",
  264. trace_id=trace_id,
  265. info="从root_content_id获取结果",
  266. data=params
  267. )
  268. return kimi_result
  269. # 处理content_id
  270. content_id = params['content_id']
  271. process_times = params['process_times']
  272. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  273. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  274. affected_rows = await self.update_content_status(
  275. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  276. trace_id=trace_id,
  277. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  278. )
  279. if affected_rows == 0:
  280. logging(
  281. code="6000",
  282. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  283. )
  284. return
  285. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  286. return kimi_result
  287. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  288. logging(
  289. code="4000",
  290. info="long_articles_text表中未找到 content_id"
  291. )
  292. else:
  293. # 开始处理,讲 content_status 从 0 改为 101
  294. affected_rows = await self.update_content_status(
  295. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  296. trace_id=trace_id,
  297. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  298. )
  299. if affected_rows == 0:
  300. logging(
  301. code="6000",
  302. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  303. )
  304. return
  305. try:
  306. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  307. await self.update_content_status(
  308. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  309. trace_id=trace_id,
  310. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  311. )
  312. return kimi_result
  313. except Exception as e:
  314. # kimi 任务处理失败
  315. update_kimi_sql = f"""
  316. UPDATE {self.article_text_table}
  317. SET
  318. kimi_status = %s
  319. WHERE content_id = %s
  320. """
  321. await self.long_articles_client.async_insert(
  322. sql=update_kimi_sql,
  323. params=(
  324. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  325. content_id
  326. )
  327. )
  328. # 将状态由 101 回退为 0
  329. await self.roll_back_content_status_when_fails(
  330. process_times=process_times,
  331. trace_id=trace_id
  332. )
  333. return {}
  334. async def spider_task(self, params, kimi_result):
  335. """
  336. 爬虫任务
  337. :return:
  338. """
  339. trace_id = params['trace_id']
  340. content_id = params['content_id']
  341. process_times = params['process_times']
  342. gh_id = params['gh_id']
  343. if params.get("root_content_id"):
  344. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  345. await update_crawler_table_with_exist_content_id(
  346. content_id=content_id,
  347. trace_id=trace_id,
  348. article_crawler_video_table=self.article_crawler_video_table,
  349. db_client=self.long_articles_client,
  350. root_content_id=params['root_content_id']
  351. )
  352. affected_rows = await self.update_content_status(
  353. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  354. trace_id=trace_id,
  355. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  356. )
  357. if affected_rows == 0:
  358. return
  359. logging(
  360. code="8024",
  361. function="spider_task",
  362. trace_id=trace_id,
  363. info="从root_content_id获取结果",
  364. data=params
  365. )
  366. return True
  367. download_video_exist_flag = await whether_downloaded_videos_exists(
  368. content_id=content_id,
  369. article_crawler_video_table=self.article_crawler_video_table,
  370. db_client=self.long_articles_client
  371. )
  372. if download_video_exist_flag:
  373. await self.update_content_status(
  374. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  375. trace_id=trace_id,
  376. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  377. )
  378. return True
  379. # 开始处理,将状态由 1 改成 101
  380. affected_rows = await self.update_content_status(
  381. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  382. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  383. trace_id=trace_id
  384. )
  385. if affected_rows == 0:
  386. logging(
  387. code="6000",
  388. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  389. )
  390. return False
  391. try:
  392. logging(
  393. code="spider_1001",
  394. info="开始执行搜索任务",
  395. trace_id=trace_id,
  396. data=kimi_result
  397. )
  398. search_videos_count = await search_videos_from_web(
  399. info={
  400. "ori_title": kimi_result['ori_title'],
  401. "kimi_summary": kimi_result['kimi_summary'],
  402. "kimi_keys": kimi_result['kimi_keys'],
  403. "trace_id": trace_id,
  404. "gh_id": gh_id,
  405. "content_id": content_id,
  406. "crawler_video_table": self.article_crawler_video_table
  407. },
  408. gh_id_map=self.account_map,
  409. db_client=self.long_articles_client
  410. )
  411. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  412. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  413. logging(
  414. code="spider_1002",
  415. info="搜索成功",
  416. trace_id=trace_id,
  417. data=kimi_result
  418. )
  419. await self.update_content_status(
  420. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  421. trace_id=trace_id,
  422. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  423. )
  424. return True
  425. else:
  426. logging(
  427. code="spider_1003",
  428. info="搜索失败",
  429. trace_id=trace_id,
  430. data=kimi_result
  431. )
  432. await self.roll_back_content_status_when_fails(
  433. process_times=process_times + 1,
  434. trace_id=trace_id
  435. )
  436. return False
  437. except Exception as e:
  438. await self.roll_back_content_status_when_fails(
  439. process_times=process_times + 1,
  440. trace_id=trace_id
  441. )
  442. print("爬虫处理失败: {}".format(e))
  443. return False
  444. async def etl_task(self, params):
  445. """
  446. download && upload videos
  447. :param params:
  448. :return:
  449. """
  450. trace_id = params['trace_id']
  451. content_id = params['content_id']
  452. process_times = params['process_times']
  453. # 判断视频是否已下载完成
  454. video_exist_flag = await whether_downloaded_videos_exists(
  455. content_id=content_id,
  456. article_crawler_video_table=self.article_crawler_video_table,
  457. db_client=self.long_articles_client
  458. )
  459. if video_exist_flag:
  460. affect_rows = await self.update_content_status(
  461. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  462. trace_id=trace_id,
  463. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  464. )
  465. if affect_rows == 0:
  466. logging(
  467. code="6000",
  468. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  469. )
  470. return False
  471. return True
  472. else:
  473. # 开始处理, 将文章状态修改为处理状态
  474. affected_rows = await self.update_content_status(
  475. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  476. trace_id=trace_id,
  477. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  478. )
  479. if affected_rows == 0:
  480. logging(
  481. code="6000",
  482. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  483. )
  484. return False
  485. # download videos
  486. downloaded_count = await async_download_videos(
  487. trace_id=trace_id,
  488. content_id=content_id,
  489. article_crawler_video_table=self.article_crawler_video_table,
  490. db_client=self.long_articles_client
  491. )
  492. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  493. await self.update_content_status(
  494. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  495. trace_id=trace_id,
  496. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  497. )
  498. return True
  499. else:
  500. await self.roll_back_content_status_when_fails(
  501. process_times=process_times + 1,
  502. trace_id=trace_id
  503. )
  504. return False
  505. async def publish_task(self, params, kimi_title):
  506. """
  507. 发布任务
  508. :param kimi_title:
  509. :param params:
  510. :return:
  511. """
  512. gh_id = params['gh_id']
  513. flow_pool_level = params['flow_pool_level']
  514. content_id = params['content_id']
  515. trace_id = params['trace_id']
  516. process_times = params['process_times']
  517. # 开始处理,将状态修改为操作状态
  518. affected_rows = await self.update_content_status(
  519. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  520. trace_id=trace_id,
  521. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  522. )
  523. if affected_rows == 0:
  524. logging(
  525. code="6000",
  526. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  527. )
  528. return False
  529. try:
  530. download_videos = await get_downloaded_videos(
  531. content_id=content_id,
  532. article_crawler_video_table=self.article_crawler_video_table,
  533. db_client=self.long_articles_client
  534. )
  535. match flow_pool_level:
  536. case "autoArticlePoolLevel4":
  537. # 冷启层, 全量做
  538. video_list = shuffle_list(download_videos)[:3]
  539. case "autoArticlePoolLevel3":
  540. if self.gh_id_dict.get(gh_id):
  541. video_list = shuffle_list(download_videos)[:3]
  542. else:
  543. video_list = download_videos[:3]
  544. case "autoArticlePoolLevel2":
  545. # 次条,只针对具体账号做
  546. video_list = []
  547. case "autoArticlePoolLevel1":
  548. # 头条,先不做
  549. video_list = download_videos[:3]
  550. case _:
  551. video_list = download_videos[:3]
  552. # 将视频发布至票圈
  553. await publish_videos_to_piaoquan(
  554. video_list=video_list,
  555. kimi_title=kimi_title,
  556. process_times=process_times,
  557. trace_id=trace_id,
  558. db_client=self.long_articles_client,
  559. article_match_video_table=self.article_match_video_table
  560. )
  561. except Exception as e:
  562. await self.roll_back_content_status_when_fails(
  563. process_times=params['process_times'] + 1,
  564. trace_id=params['trace_id']
  565. )
  566. print(e)
  567. async def start_process(self, params):
  568. """
  569. 处理单篇文章
  570. :param params:
  571. :return:
  572. """
  573. kimi_result = await self.kimi_task(params)
  574. trace_id = params['trace_id']
  575. process_times = params['process_times']
  576. content_id = params['content_id']
  577. publish_flag = params['publish_flag']
  578. if kimi_result:
  579. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  580. print("kimi success")
  581. logging(
  582. code=3001,
  583. info="kimi success",
  584. trace_id=trace_id
  585. )
  586. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  587. if spider_flag:
  588. # 等待爬虫执行完成后,开始执行 etl_task
  589. print("spider success")
  590. logging(
  591. code=3002,
  592. info="spider_success",
  593. trace_id=trace_id
  594. )
  595. etl_flag = await self.etl_task(params)
  596. if etl_flag:
  597. # 等待下载上传完成,执行发布任务
  598. print("etl success")
  599. logging(
  600. code="3003",
  601. info="etl_success",
  602. trace_id=trace_id
  603. )
  604. """
  605. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  606. 目前先对这两种情况都做托管操作
  607. """
  608. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  609. logging(
  610. code="3013",
  611. info="不需要发布,长文系统托管发布",
  612. trace_id=trace_id
  613. )
  614. return
  615. else:
  616. try:
  617. await self.publish_task(params, kimi_result['kimi_title'])
  618. logging(
  619. code="3004",
  620. info="publish_success",
  621. trace_id=trace_id
  622. )
  623. await record_trace_id(
  624. trace_id=trace_id,
  625. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  626. )
  627. except Exception as e:
  628. logging(
  629. code="6004",
  630. info="publish 失败--{}".format(e),
  631. trace_id=params['trace_id']
  632. )
  633. else:
  634. logging(
  635. code="6003",
  636. info="ETL 处理失败",
  637. trace_id=params['trace_id']
  638. )
  639. else:
  640. logging(
  641. code="6002",
  642. info="爬虫处理失败",
  643. trace_id=params['trace_id']
  644. )
  645. else:
  646. logging(
  647. code="6001",
  648. info="kimi 处理失败",
  649. trace_id=trace_id
  650. )
  651. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  652. logging(
  653. code="6011",
  654. info="kimi处理次数达到上限, 放弃处理",
  655. trace_id=trace_id
  656. )
  657. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  658. update_sql = f"""
  659. UPDATE {self.article_match_video_table}
  660. SET content_status = %s
  661. WHERE content_id = %s and content_status = %s;
  662. """
  663. affected_rows = await self.long_articles_client.async_insert(
  664. sql=update_sql,
  665. params=(
  666. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  667. content_id,
  668. NewContentIdTaskConst.TASK_INIT_STATUS
  669. )
  670. )
  671. # 查询出该content_id所对应的标题
  672. select_sql = f"""
  673. SELECT article_title
  674. FROM {self.article_text_table}
  675. WHERE content_id = '{content_id}';
  676. """
  677. result = await self.long_articles_client.async_select(select_sql)
  678. bot(
  679. title="KIMI 处理失败",
  680. detail={
  681. "content_id": content_id,
  682. "affected_rows": affected_rows,
  683. "article_title": result[0][0]
  684. },
  685. mention=False
  686. )
  687. async def process_each_task(self, params):
  688. """
  689. 处理任务
  690. :return:
  691. """
  692. content_id = params['content_id']
  693. flow_pool_level = params['flow_pool_level']
  694. download_videos_exists_flag = await whether_downloaded_videos_exists(
  695. content_id=content_id,
  696. article_crawler_video_table=self.article_crawler_video_table,
  697. db_client=self.long_articles_client
  698. )
  699. if not download_videos_exists_flag:
  700. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  701. if processing_flag:
  702. logging(
  703. code="9001",
  704. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  705. )
  706. else:
  707. # 判断是否存在root_content_id
  708. if flow_pool_level != 'autoArticlePoolLevel4':
  709. # 如果查到根content_id, 采用根content_id的视频
  710. root_content_id = await self.get_source_content_id(content_id)
  711. if root_content_id:
  712. # 传参新增root_content_id
  713. params['root_content_id'] = root_content_id[0][0]
  714. # 开始处理
  715. await self.start_process(params=params)
  716. else:
  717. print("存在已下载视频")
  718. async def deal(self) -> None:
  719. """
  720. function
  721. :return:
  722. """
  723. # 处理未托管的任务
  724. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  725. # 处理托管任务
  726. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  727. # 将处理次数大于3次且未成功的任务置为失败
  728. await self.set_tasks_status_fail()
  729. # 获取task_list
  730. task_list = await self.get_tasks()
  731. task_dict = {task['content_id']: task for task in task_list}
  732. process_list = list(task_dict.values())
  733. logging(
  734. code="5001",
  735. info="Match Task Got {} this time".format(len(process_list)),
  736. function="Publish Task"
  737. )
  738. # 处理process_list
  739. if process_list:
  740. a = time.time()
  741. tasks = [self.process_each_task(params) for params in process_list]
  742. await asyncio.gather(*tasks)
  743. b = time.time()
  744. print("处理时间: {} s".format(b - a))
  745. else:
  746. logging(
  747. code="9008",
  748. info="没有要处理的请求"
  749. )