new_contentId_task.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  244. """
  245. 获取违规的外站视频id
  246. """
  247. select_sql = f"""
  248. SELECT platform, out_video_id
  249. FROM {self.article_crawler_video_table}
  250. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  251. """
  252. response = await self.long_articles_client.async_select(select_sql)
  253. if response:
  254. result = ["{}_{}".format(line[0], line[1]) for line in response]
  255. return result
  256. else:
  257. return []
  258. async def kimi_task(self, params):
  259. """
  260. 执行 kimi 任务
  261. :return:
  262. """
  263. trace_id = params['trace_id']
  264. if params.get("root_content_id"):
  265. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  266. if kimi_result:
  267. affected_rows = await self.update_content_status(
  268. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  269. trace_id=trace_id,
  270. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  271. )
  272. if affected_rows == 0:
  273. logging(
  274. code="6000",
  275. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  276. )
  277. return
  278. logging(
  279. code="8023",
  280. function="kimi_task",
  281. trace_id=trace_id,
  282. info="从root_content_id获取结果",
  283. data=params
  284. )
  285. return kimi_result
  286. else:
  287. params.pop('root_content_id', None)
  288. print(params)
  289. return await self.kimi_task(params)
  290. # 处理content_id
  291. content_id = params['content_id']
  292. process_times = params['process_times']
  293. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  294. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  295. affected_rows = await self.update_content_status(
  296. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  297. trace_id=trace_id,
  298. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  299. )
  300. if affected_rows == 0:
  301. logging(
  302. code="6000",
  303. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  304. )
  305. return
  306. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  307. return kimi_result
  308. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  309. logging(
  310. code="4000",
  311. info="long_articles_text表中未找到 content_id"
  312. )
  313. else:
  314. # 开始处理,讲 content_status 从 0 改为 101
  315. affected_rows = await self.update_content_status(
  316. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  317. trace_id=trace_id,
  318. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  319. )
  320. if affected_rows == 0:
  321. logging(
  322. code="6000",
  323. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  324. )
  325. return
  326. try:
  327. kimi_result = await generate_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  328. await self.update_content_status(
  329. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  330. trace_id=trace_id,
  331. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  332. )
  333. return kimi_result
  334. except Exception as e:
  335. # kimi 任务处理失败
  336. update_kimi_sql = f"""
  337. UPDATE {self.article_text_table}
  338. SET
  339. kimi_status = %s
  340. WHERE content_id = %s
  341. """
  342. await self.long_articles_client.async_insert(
  343. sql=update_kimi_sql,
  344. params=(
  345. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  346. content_id
  347. )
  348. )
  349. # 将状态由 101 回退为 0
  350. await self.roll_back_content_status_when_fails(
  351. process_times=process_times,
  352. trace_id=trace_id
  353. )
  354. return {}
  355. async def spider_task(self, params, kimi_result):
  356. """
  357. 爬虫任务
  358. :return:
  359. """
  360. trace_id = params['trace_id']
  361. content_id = params['content_id']
  362. process_times = params['process_times']
  363. gh_id = params['gh_id']
  364. if params.get("root_content_id"):
  365. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  366. update_rows = await update_crawler_table_with_exist_content_id(
  367. content_id=content_id,
  368. trace_id=trace_id,
  369. article_crawler_video_table=self.article_crawler_video_table,
  370. db_client=self.long_articles_client,
  371. root_content_id=params['root_content_id']
  372. )
  373. if update_rows:
  374. affected_rows = await self.update_content_status(
  375. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  376. trace_id=trace_id,
  377. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  378. )
  379. if affected_rows == 0:
  380. return
  381. logging(
  382. code="8024",
  383. function="spider_task",
  384. trace_id=trace_id,
  385. info="从root_content_id获取结果",
  386. data=params
  387. )
  388. return True
  389. else:
  390. params.pop("root_content_id", None)
  391. return await self.spider_task(params, kimi_result)
  392. download_video_exist_flag = await whether_downloaded_videos_exists(
  393. content_id=content_id,
  394. article_crawler_video_table=self.article_crawler_video_table,
  395. db_client=self.long_articles_client
  396. )
  397. if download_video_exist_flag:
  398. await self.update_content_status(
  399. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  400. trace_id=trace_id,
  401. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  402. )
  403. return True
  404. # 开始处理,将状态由 1 改成 101
  405. affected_rows = await self.update_content_status(
  406. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  407. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  408. trace_id=trace_id
  409. )
  410. if affected_rows == 0:
  411. logging(
  412. code="6000",
  413. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  414. )
  415. return False
  416. try:
  417. logging(
  418. code="spider_1001",
  419. info="开始执行搜索任务",
  420. trace_id=trace_id,
  421. data=kimi_result
  422. )
  423. search_videos_count = await search_videos_from_web(
  424. info={
  425. "ori_title": kimi_result['ori_title'],
  426. "kimi_summary": kimi_result['kimi_summary'],
  427. "kimi_keys": kimi_result['kimi_keys'],
  428. "trace_id": trace_id,
  429. "gh_id": gh_id,
  430. "content_id": content_id,
  431. "crawler_video_table": self.article_crawler_video_table
  432. },
  433. gh_id_map=self.account_map,
  434. db_client=self.long_articles_client
  435. )
  436. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  437. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  438. logging(
  439. code="spider_1002",
  440. info="搜索成功",
  441. trace_id=trace_id,
  442. data=kimi_result
  443. )
  444. await self.update_content_status(
  445. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  446. trace_id=trace_id,
  447. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  448. )
  449. return True
  450. else:
  451. logging(
  452. code="spider_1003",
  453. info="搜索失败",
  454. trace_id=trace_id,
  455. data=kimi_result
  456. )
  457. await self.roll_back_content_status_when_fails(
  458. process_times=process_times + 1,
  459. trace_id=trace_id
  460. )
  461. return False
  462. except Exception as e:
  463. await self.roll_back_content_status_when_fails(
  464. process_times=process_times + 1,
  465. trace_id=trace_id
  466. )
  467. print("爬虫处理失败: {}".format(e))
  468. return False
  469. async def etl_task(self, params):
  470. """
  471. download && upload videos
  472. :param params:
  473. :return:
  474. """
  475. trace_id = params['trace_id']
  476. content_id = params['content_id']
  477. process_times = params['process_times']
  478. # 判断视频是否已下载完成
  479. video_exist_flag = await whether_downloaded_videos_exists(
  480. content_id=content_id,
  481. article_crawler_video_table=self.article_crawler_video_table,
  482. db_client=self.long_articles_client
  483. )
  484. if video_exist_flag:
  485. affect_rows = await self.update_content_status(
  486. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  487. trace_id=trace_id,
  488. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  489. )
  490. if affect_rows == 0:
  491. logging(
  492. code="6000",
  493. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  494. )
  495. return False
  496. return True
  497. else:
  498. # 开始处理, 将文章状态修改为处理状态
  499. affected_rows = await self.update_content_status(
  500. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  501. trace_id=trace_id,
  502. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  503. )
  504. if affected_rows == 0:
  505. logging(
  506. code="6000",
  507. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  508. )
  509. return False
  510. # download videos
  511. illegal_videos = await self.get_illegal_out_ids(content_id)
  512. downloaded_count = await async_download_videos(
  513. trace_id=trace_id,
  514. content_id=content_id,
  515. article_crawler_video_table=self.article_crawler_video_table,
  516. db_client=self.long_articles_client,
  517. illegal_videos=illegal_videos
  518. )
  519. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  520. await self.update_content_status(
  521. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  522. trace_id=trace_id,
  523. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  524. )
  525. return True
  526. else:
  527. await self.roll_back_content_status_when_fails(
  528. process_times=process_times + 1,
  529. trace_id=trace_id
  530. )
  531. return False
  532. async def publish_task(self, params, kimi_title):
  533. """
  534. 发布任务
  535. :param kimi_title:
  536. :param params:
  537. :return:
  538. """
  539. gh_id = params['gh_id']
  540. flow_pool_level = params['flow_pool_level']
  541. content_id = params['content_id']
  542. trace_id = params['trace_id']
  543. process_times = params['process_times']
  544. # 开始处理,将状态修改为操作状态
  545. affected_rows = await self.update_content_status(
  546. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  547. trace_id=trace_id,
  548. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  549. )
  550. if affected_rows == 0:
  551. logging(
  552. code="6000",
  553. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  554. )
  555. return False
  556. try:
  557. download_videos = await get_downloaded_videos(
  558. content_id=content_id,
  559. article_crawler_video_table=self.article_crawler_video_table,
  560. db_client=self.long_articles_client
  561. )
  562. match flow_pool_level:
  563. case "autoArticlePoolLevel4":
  564. # 冷启层, 全量做
  565. video_list = shuffle_list(download_videos)[:3]
  566. case "autoArticlePoolLevel3":
  567. if self.gh_id_dict.get(gh_id):
  568. video_list = shuffle_list(download_videos)[:3]
  569. else:
  570. video_list = download_videos[:3]
  571. case "autoArticlePoolLevel2":
  572. # 次条,只针对具体账号做
  573. video_list = []
  574. case "autoArticlePoolLevel1":
  575. # 头条,先不做
  576. video_list = download_videos[:3]
  577. case _:
  578. video_list = download_videos[:3]
  579. # 将视频发布至票圈
  580. await publish_videos_to_piaoquan(
  581. video_list=video_list,
  582. kimi_title=kimi_title,
  583. process_times=process_times,
  584. trace_id=trace_id,
  585. db_client=self.long_articles_client,
  586. article_match_video_table=self.article_match_video_table
  587. )
  588. except Exception as e:
  589. await self.roll_back_content_status_when_fails(
  590. process_times=params['process_times'] + 1,
  591. trace_id=params['trace_id']
  592. )
  593. print(e)
  594. async def start_process(self, params):
  595. """
  596. 处理单篇文章
  597. :param params:
  598. :return:
  599. """
  600. kimi_result = await self.kimi_task(params)
  601. trace_id = params['trace_id']
  602. process_times = params['process_times']
  603. content_id = params['content_id']
  604. publish_flag = params['publish_flag']
  605. if kimi_result:
  606. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  607. print("kimi success")
  608. logging(
  609. code=3001,
  610. info="kimi success",
  611. trace_id=trace_id
  612. )
  613. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  614. if spider_flag:
  615. # 等待爬虫执行完成后,开始执行 etl_task
  616. print("spider success")
  617. logging(
  618. code=3002,
  619. info="spider_success",
  620. trace_id=trace_id
  621. )
  622. etl_flag = await self.etl_task(params)
  623. if etl_flag:
  624. # 等待下载上传完成,执行发布任务
  625. print("etl success")
  626. logging(
  627. code="3003",
  628. info="etl_success",
  629. trace_id=trace_id
  630. )
  631. """
  632. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  633. 目前先对这两种情况都做托管操作
  634. """
  635. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  636. logging(
  637. code="3013",
  638. info="不需要发布,长文系统托管发布",
  639. trace_id=trace_id
  640. )
  641. return
  642. else:
  643. try:
  644. await self.publish_task(params, kimi_result['kimi_title'])
  645. logging(
  646. code="3004",
  647. info="publish_success",
  648. trace_id=trace_id
  649. )
  650. await record_trace_id(
  651. trace_id=trace_id,
  652. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  653. )
  654. except Exception as e:
  655. logging(
  656. code="6004",
  657. info="publish 失败--{}".format(e),
  658. trace_id=params['trace_id']
  659. )
  660. else:
  661. logging(
  662. code="6003",
  663. info="ETL 处理失败",
  664. trace_id=params['trace_id']
  665. )
  666. else:
  667. logging(
  668. code="6002",
  669. info="爬虫处理失败",
  670. trace_id=params['trace_id']
  671. )
  672. else:
  673. logging(
  674. code="6001",
  675. info="kimi 处理失败",
  676. trace_id=trace_id
  677. )
  678. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  679. logging(
  680. code="6011",
  681. info="kimi处理次数达到上限, 放弃处理",
  682. trace_id=trace_id
  683. )
  684. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  685. update_sql = f"""
  686. UPDATE {self.article_match_video_table}
  687. SET content_status = %s
  688. WHERE content_id = %s and content_status = %s;
  689. """
  690. affected_rows = await self.long_articles_client.async_insert(
  691. sql=update_sql,
  692. params=(
  693. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  694. content_id,
  695. NewContentIdTaskConst.TASK_INIT_STATUS
  696. )
  697. )
  698. # 查询出该content_id所对应的标题
  699. select_sql = f"""
  700. SELECT article_title
  701. FROM {self.article_text_table}
  702. WHERE content_id = '{content_id}';
  703. """
  704. result = await self.long_articles_client.async_select(select_sql)
  705. bot(
  706. title="KIMI 处理失败",
  707. detail={
  708. "content_id": content_id,
  709. "affected_rows": affected_rows,
  710. "article_title": result[0][0]
  711. },
  712. mention=False
  713. )
  714. async def process_each_task(self, params):
  715. """
  716. 处理任务
  717. :return:
  718. """
  719. content_id = params['content_id']
  720. flow_pool_level = params['flow_pool_level']
  721. download_videos_exists_flag = await whether_downloaded_videos_exists(
  722. content_id=content_id,
  723. article_crawler_video_table=self.article_crawler_video_table,
  724. db_client=self.long_articles_client
  725. )
  726. if not download_videos_exists_flag:
  727. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  728. if processing_flag:
  729. logging(
  730. code="9001",
  731. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  732. )
  733. else:
  734. # 判断是否存在root_content_id
  735. if flow_pool_level != 'autoArticlePoolLevel4':
  736. # 如果查到根content_id, 采用根content_id的视频
  737. root_content_id = await self.get_source_content_id(content_id)
  738. if root_content_id:
  739. # 传参新增root_content_id
  740. params['root_content_id'] = root_content_id
  741. # 开始处理
  742. await self.start_process(params=params)
  743. else:
  744. print("存在已下载视频")
  745. async def deal(self) -> None:
  746. """
  747. function
  748. :return:
  749. """
  750. # 处理未托管的任务
  751. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  752. # 处理托管任务
  753. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  754. # 将处理次数大于3次且未成功的任务置为失败
  755. await self.set_tasks_status_fail()
  756. # 获取task_list
  757. task_list = await self.get_tasks()
  758. task_dict = {task['content_id']: task for task in task_list}
  759. process_list = list(task_dict.values())
  760. logging(
  761. code="5001",
  762. info="Match Task Got {} this time".format(len(process_list)),
  763. function="Publish Task"
  764. )
  765. # 处理process_list
  766. if process_list:
  767. a = time.time()
  768. tasks = [self.process_each_task(params) for params in process_list]
  769. await asyncio.gather(*tasks)
  770. b = time.time()
  771. print("处理时间: {} s".format(b - a))
  772. else:
  773. logging(
  774. code="9008",
  775. info="没有要处理的请求"
  776. )