new_contentId_task.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  244. """
  245. 获取违规的外站视频id
  246. """
  247. select_sql = f"""
  248. SELECT platform, out_video_id
  249. FROM {self.article_crawler_video_table}
  250. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  251. """
  252. response = await self.long_articles_client.async_select(select_sql)
  253. if response:
  254. result = ["{}_{}".format(line[0], line[1]) for line in response]
  255. return result
  256. else:
  257. return []
  258. async def kimi_task(self, params):
  259. """
  260. 执行 kimi 任务
  261. :return:
  262. """
  263. trace_id = params['trace_id']
  264. if params.get("root_content_id"):
  265. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  266. if kimi_result:
  267. affected_rows = await self.update_content_status(
  268. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  269. trace_id=trace_id,
  270. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  271. )
  272. if affected_rows == 0:
  273. logging(
  274. code="6000",
  275. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  276. )
  277. return
  278. logging(
  279. code="8023",
  280. function="kimi_task",
  281. trace_id=trace_id,
  282. info="从root_content_id获取结果",
  283. data=params
  284. )
  285. return kimi_result
  286. else:
  287. params.pop('root_content_id', None)
  288. print(params)
  289. return await self.kimi_task(params)
  290. # 处理content_id
  291. content_id = params['content_id']
  292. process_times = params['process_times']
  293. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  294. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  295. affected_rows = await self.update_content_status(
  296. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  297. trace_id=trace_id,
  298. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  299. )
  300. if affected_rows == 0:
  301. logging(
  302. code="6000",
  303. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  304. )
  305. return
  306. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  307. return kimi_result
  308. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  309. logging(
  310. code="4000",
  311. info="long_articles_text表中未找到 content_id"
  312. )
  313. else:
  314. # 开始处理,讲 content_status 从 0 改为 101
  315. affected_rows = await self.update_content_status(
  316. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  317. trace_id=trace_id,
  318. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  319. )
  320. if affected_rows == 0:
  321. logging(
  322. code="6000",
  323. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  324. )
  325. return
  326. try:
  327. kimi_result = await generate_kimi_result(
  328. content_id=content_id,
  329. article_text_table=self.article_text_table,
  330. db_client=self.long_articles_client,
  331. safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE
  332. )
  333. await self.update_content_status(
  334. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  335. trace_id=trace_id,
  336. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  337. )
  338. return kimi_result
  339. except Exception as e:
  340. # kimi 任务处理失败
  341. update_kimi_sql = f"""
  342. UPDATE {self.article_text_table}
  343. SET
  344. kimi_status = %s
  345. WHERE content_id = %s
  346. """
  347. await self.long_articles_client.async_insert(
  348. sql=update_kimi_sql,
  349. params=(
  350. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  351. content_id
  352. )
  353. )
  354. # 将状态由 101 回退为 0
  355. await self.roll_back_content_status_when_fails(
  356. process_times=process_times,
  357. trace_id=trace_id
  358. )
  359. return {}
  360. async def spider_task(self, params, kimi_result):
  361. """
  362. 爬虫任务
  363. :return:
  364. """
  365. trace_id = params['trace_id']
  366. content_id = params['content_id']
  367. process_times = params['process_times']
  368. gh_id = params['gh_id']
  369. if params.get("root_content_id"):
  370. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  371. update_rows = await update_crawler_table_with_exist_content_id(
  372. content_id=content_id,
  373. trace_id=trace_id,
  374. article_crawler_video_table=self.article_crawler_video_table,
  375. db_client=self.long_articles_client,
  376. root_content_id=params['root_content_id']
  377. )
  378. if update_rows:
  379. affected_rows = await self.update_content_status(
  380. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  381. trace_id=trace_id,
  382. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  383. )
  384. if affected_rows == 0:
  385. return
  386. logging(
  387. code="8024",
  388. function="spider_task",
  389. trace_id=trace_id,
  390. info="从root_content_id获取结果",
  391. data=params
  392. )
  393. return True
  394. else:
  395. params.pop("root_content_id", None)
  396. return await self.spider_task(params, kimi_result)
  397. download_video_exist_flag = await whether_downloaded_videos_exists(
  398. content_id=content_id,
  399. article_crawler_video_table=self.article_crawler_video_table,
  400. db_client=self.long_articles_client
  401. )
  402. if download_video_exist_flag:
  403. await self.update_content_status(
  404. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  405. trace_id=trace_id,
  406. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  407. )
  408. return True
  409. # 开始处理,将状态由 1 改成 101
  410. affected_rows = await self.update_content_status(
  411. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  412. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  413. trace_id=trace_id
  414. )
  415. if affected_rows == 0:
  416. logging(
  417. code="6000",
  418. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  419. )
  420. return False
  421. try:
  422. logging(
  423. code="spider_1001",
  424. info="开始执行搜索任务",
  425. trace_id=trace_id,
  426. data=kimi_result
  427. )
  428. search_videos_count = await search_videos_from_web(
  429. info={
  430. "ori_title": kimi_result['ori_title'],
  431. "kimi_summary": kimi_result['kimi_summary'],
  432. "kimi_keys": kimi_result['kimi_keys'],
  433. "trace_id": trace_id,
  434. "gh_id": gh_id,
  435. "content_id": content_id,
  436. "crawler_video_table": self.article_crawler_video_table
  437. },
  438. gh_id_map=self.account_map,
  439. db_client=self.long_articles_client
  440. )
  441. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  442. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  443. logging(
  444. code="spider_1002",
  445. info="搜索成功",
  446. trace_id=trace_id,
  447. data=kimi_result
  448. )
  449. await self.update_content_status(
  450. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  451. trace_id=trace_id,
  452. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  453. )
  454. return True
  455. else:
  456. logging(
  457. code="spider_1003",
  458. info="搜索失败",
  459. trace_id=trace_id,
  460. data=kimi_result
  461. )
  462. await self.roll_back_content_status_when_fails(
  463. process_times=process_times + 1,
  464. trace_id=trace_id
  465. )
  466. return False
  467. except Exception as e:
  468. await self.roll_back_content_status_when_fails(
  469. process_times=process_times + 1,
  470. trace_id=trace_id
  471. )
  472. print("爬虫处理失败: {}".format(e))
  473. return False
  474. async def etl_task(self, params):
  475. """
  476. download && upload videos
  477. :param params:
  478. :return:
  479. """
  480. trace_id = params['trace_id']
  481. content_id = params['content_id']
  482. process_times = params['process_times']
  483. # 判断视频是否已下载完成
  484. video_exist_flag = await whether_downloaded_videos_exists(
  485. content_id=content_id,
  486. article_crawler_video_table=self.article_crawler_video_table,
  487. db_client=self.long_articles_client
  488. )
  489. if video_exist_flag:
  490. affect_rows = await self.update_content_status(
  491. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  492. trace_id=trace_id,
  493. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  494. )
  495. if affect_rows == 0:
  496. logging(
  497. code="6000",
  498. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  499. )
  500. return False
  501. return True
  502. else:
  503. # 开始处理, 将文章状态修改为处理状态
  504. affected_rows = await self.update_content_status(
  505. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  506. trace_id=trace_id,
  507. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  508. )
  509. if affected_rows == 0:
  510. logging(
  511. code="6000",
  512. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  513. )
  514. return False
  515. # download videos
  516. illegal_videos = await self.get_illegal_out_ids(content_id)
  517. downloaded_count = await async_download_videos(
  518. trace_id=trace_id,
  519. content_id=content_id,
  520. article_crawler_video_table=self.article_crawler_video_table,
  521. db_client=self.long_articles_client,
  522. illegal_videos=illegal_videos
  523. )
  524. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  525. await self.update_content_status(
  526. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  527. trace_id=trace_id,
  528. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  529. )
  530. return True
  531. else:
  532. await self.roll_back_content_status_when_fails(
  533. process_times=process_times + 1,
  534. trace_id=trace_id
  535. )
  536. return False
  537. async def publish_task(self, params, kimi_title):
  538. """
  539. 发布任务
  540. :param kimi_title:
  541. :param params:
  542. :return:
  543. """
  544. gh_id = params['gh_id']
  545. flow_pool_level = params['flow_pool_level']
  546. content_id = params['content_id']
  547. trace_id = params['trace_id']
  548. process_times = params['process_times']
  549. # 开始处理,将状态修改为操作状态
  550. affected_rows = await self.update_content_status(
  551. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  552. trace_id=trace_id,
  553. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  554. )
  555. if affected_rows == 0:
  556. logging(
  557. code="6000",
  558. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  559. )
  560. return False
  561. try:
  562. download_videos = await get_downloaded_videos(
  563. content_id=content_id,
  564. article_crawler_video_table=self.article_crawler_video_table,
  565. db_client=self.long_articles_client
  566. )
  567. match flow_pool_level:
  568. case "autoArticlePoolLevel4":
  569. # 冷启层, 全量做
  570. video_list = shuffle_list(download_videos)[:3]
  571. case "autoArticlePoolLevel3":
  572. if self.gh_id_dict.get(gh_id):
  573. video_list = shuffle_list(download_videos)[:3]
  574. else:
  575. video_list = download_videos[:3]
  576. case "autoArticlePoolLevel2":
  577. # 次条,只针对具体账号做
  578. video_list = []
  579. case "autoArticlePoolLevel1":
  580. # 头条,先不做
  581. video_list = download_videos[:3]
  582. case _:
  583. video_list = download_videos[:3]
  584. # 将视频发布至票圈
  585. await publish_videos_to_piaoquan(
  586. video_list=video_list,
  587. kimi_title=kimi_title,
  588. process_times=process_times,
  589. trace_id=trace_id,
  590. db_client=self.long_articles_client,
  591. article_match_video_table=self.article_match_video_table
  592. )
  593. except Exception as e:
  594. await self.roll_back_content_status_when_fails(
  595. process_times=params['process_times'] + 1,
  596. trace_id=params['trace_id']
  597. )
  598. print(e)
  599. async def start_process(self, params):
  600. """
  601. 处理单篇文章
  602. :param params:
  603. :return:
  604. """
  605. kimi_result = await self.kimi_task(params)
  606. trace_id = params['trace_id']
  607. process_times = params['process_times']
  608. content_id = params['content_id']
  609. publish_flag = params['publish_flag']
  610. if kimi_result:
  611. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  612. print("kimi success")
  613. logging(
  614. code=3001,
  615. info="kimi success",
  616. trace_id=trace_id
  617. )
  618. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  619. if spider_flag:
  620. # 等待爬虫执行完成后,开始执行 etl_task
  621. print("spider success")
  622. logging(
  623. code=3002,
  624. info="spider_success",
  625. trace_id=trace_id
  626. )
  627. etl_flag = await self.etl_task(params)
  628. if etl_flag:
  629. # 等待下载上传完成,执行发布任务
  630. print("etl success")
  631. logging(
  632. code="3003",
  633. info="etl_success",
  634. trace_id=trace_id
  635. )
  636. """
  637. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  638. 目前先对这两种情况都做托管操作
  639. """
  640. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  641. logging(
  642. code="3013",
  643. info="不需要发布,长文系统托管发布",
  644. trace_id=trace_id
  645. )
  646. return
  647. else:
  648. try:
  649. await self.publish_task(params, kimi_result['kimi_title'])
  650. logging(
  651. code="3004",
  652. info="publish_success",
  653. trace_id=trace_id
  654. )
  655. await record_trace_id(
  656. trace_id=trace_id,
  657. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  658. )
  659. except Exception as e:
  660. logging(
  661. code="6004",
  662. info="publish 失败--{}".format(e),
  663. trace_id=params['trace_id']
  664. )
  665. else:
  666. logging(
  667. code="6003",
  668. info="ETL 处理失败",
  669. trace_id=params['trace_id']
  670. )
  671. else:
  672. logging(
  673. code="6002",
  674. info="爬虫处理失败",
  675. trace_id=params['trace_id']
  676. )
  677. else:
  678. logging(
  679. code="6001",
  680. info="kimi 处理失败",
  681. trace_id=trace_id
  682. )
  683. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  684. logging(
  685. code="6011",
  686. info="kimi处理次数达到上限, 放弃处理",
  687. trace_id=trace_id
  688. )
  689. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  690. update_sql = f"""
  691. UPDATE {self.article_match_video_table}
  692. SET content_status = %s
  693. WHERE content_id = %s and content_status = %s;
  694. """
  695. affected_rows = await self.long_articles_client.async_insert(
  696. sql=update_sql,
  697. params=(
  698. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  699. content_id,
  700. NewContentIdTaskConst.TASK_INIT_STATUS
  701. )
  702. )
  703. # 查询出该content_id所对应的标题
  704. select_sql = f"""
  705. SELECT article_title
  706. FROM {self.article_text_table}
  707. WHERE content_id = '{content_id}';
  708. """
  709. result = await self.long_articles_client.async_select(select_sql)
  710. bot(
  711. title="KIMI 处理失败",
  712. detail={
  713. "content_id": content_id,
  714. "affected_rows": affected_rows,
  715. "article_title": result[0][0]
  716. },
  717. mention=False
  718. )
  719. async def process_each_task(self, params):
  720. """
  721. 处理任务
  722. :return:
  723. """
  724. content_id = params['content_id']
  725. flow_pool_level = params['flow_pool_level']
  726. download_videos_exists_flag = await whether_downloaded_videos_exists(
  727. content_id=content_id,
  728. article_crawler_video_table=self.article_crawler_video_table,
  729. db_client=self.long_articles_client
  730. )
  731. if not download_videos_exists_flag:
  732. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  733. if processing_flag:
  734. logging(
  735. code="9001",
  736. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  737. )
  738. else:
  739. # 判断是否存在root_content_id
  740. if flow_pool_level != 'autoArticlePoolLevel4':
  741. # 如果查到根content_id, 采用根content_id的视频
  742. root_content_id = await self.get_source_content_id(content_id)
  743. if root_content_id:
  744. # 传参新增root_content_id
  745. params['root_content_id'] = root_content_id
  746. # 开始处理
  747. await self.start_process(params=params)
  748. else:
  749. print("存在已下载视频")
  750. async def deal(self) -> None:
  751. """
  752. function
  753. :return:
  754. """
  755. # 处理未托管的任务
  756. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  757. # 处理托管任务
  758. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  759. # 将处理次数大于3次且未成功的任务置为失败
  760. await self.set_tasks_status_fail()
  761. # 获取task_list
  762. task_list = await self.get_tasks()
  763. task_dict = {task['content_id']: task for task in task_list}
  764. process_list = list(task_dict.values())
  765. logging(
  766. code="5001",
  767. info="Match Task Got {} this time".format(len(process_list)),
  768. function="Publish Task"
  769. )
  770. # 处理process_list
  771. if process_list:
  772. a = time.time()
  773. tasks = [self.process_each_task(params) for params in process_list]
  774. await asyncio.gather(*tasks)
  775. b = time.time()
  776. print("处理时间: {} s".format(b - a))
  777. else:
  778. logging(
  779. code="9008",
  780. info="没有要处理的请求"
  781. )