new_contentId_task.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import traceback
  8. from typing import List, Dict
  9. from applications.config import Config
  10. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  11. from applications.log import logging
  12. from applications.functions.common import shuffle_list
  13. from applications.spider import search_videos_from_web
  14. from applications.feishu import bot
  15. from applications.functions.aigc import record_trace_id
  16. from .utils import *
  17. class NewContentIdTask(object):
  18. """
  19. 不存在历史已经发布的文章的匹配流程
  20. """
  21. def __init__(self, long_articles_client, aigc_client):
  22. self.long_articles_client = long_articles_client
  23. self.aigc_client = aigc_client
  24. self.config = Config()
  25. self.article_match_video_table = self.config.article_match_video_table
  26. self.article_text_table = self.config.article_text_table
  27. self.article_crawler_video_table = self.config.article_crawler_video_table
  28. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  29. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  30. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  31. async def get_tasks(self) -> List[Dict]:
  32. """
  33. 获取 task
  34. :return:
  35. """
  36. # 获取 process_times <= 3 且 content_status = 0 的任务
  37. select_sql = f"""
  38. SELECT
  39. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  40. FROM
  41. {self.article_match_video_table} t1
  42. LEFT JOIN (
  43. SELECT content_id, count(1) as cnt
  44. FROM {self.article_crawler_video_table}
  45. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND t2.cnt IS NULL
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def kimi_task(self, params):
  244. """
  245. 执行 kimi 任务
  246. :return:
  247. """
  248. trace_id = params['trace_id']
  249. if params.get("root_content_id"):
  250. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  251. if kimi_result:
  252. affected_rows = await self.update_content_status(
  253. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  254. trace_id=trace_id,
  255. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  256. )
  257. if affected_rows == 0:
  258. logging(
  259. code="6000",
  260. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  261. )
  262. return
  263. logging(
  264. code="8023",
  265. function="kimi_task",
  266. trace_id=trace_id,
  267. info="从root_content_id获取结果",
  268. data=params
  269. )
  270. return kimi_result
  271. else:
  272. params.pop('root_content_id', None)
  273. print(params)
  274. return await self.kimi_task(params)
  275. # 处理content_id
  276. content_id = params['content_id']
  277. process_times = params['process_times']
  278. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  279. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  280. affected_rows = await self.update_content_status(
  281. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  282. trace_id=trace_id,
  283. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  284. )
  285. if affected_rows == 0:
  286. logging(
  287. code="6000",
  288. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  289. )
  290. return
  291. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  292. return kimi_result
  293. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  294. logging(
  295. code="4000",
  296. info="long_articles_text表中未找到 content_id"
  297. )
  298. else:
  299. # 开始处理,讲 content_status 从 0 改为 101
  300. affected_rows = await self.update_content_status(
  301. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  302. trace_id=trace_id,
  303. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  304. )
  305. if affected_rows == 0:
  306. logging(
  307. code="6000",
  308. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  309. )
  310. return
  311. try:
  312. kimi_result = await generate_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  313. await self.update_content_status(
  314. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  315. trace_id=trace_id,
  316. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  317. )
  318. return kimi_result
  319. except Exception as e:
  320. # kimi 任务处理失败
  321. error = traceback.format_exc()
  322. print(error)
  323. update_kimi_sql = f"""
  324. UPDATE {self.article_text_table}
  325. SET
  326. kimi_status = %s
  327. WHERE content_id = %s
  328. """
  329. await self.long_articles_client.async_insert(
  330. sql=update_kimi_sql,
  331. params=(
  332. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  333. content_id
  334. )
  335. )
  336. # 将状态由 101 回退为 0
  337. await self.roll_back_content_status_when_fails(
  338. process_times=process_times,
  339. trace_id=trace_id
  340. )
  341. return {}
  342. async def spider_task(self, params, kimi_result):
  343. """
  344. 爬虫任务
  345. :return:
  346. """
  347. trace_id = params['trace_id']
  348. content_id = params['content_id']
  349. process_times = params['process_times']
  350. gh_id = params['gh_id']
  351. if params.get("root_content_id"):
  352. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  353. update_rows = await update_crawler_table_with_exist_content_id(
  354. content_id=content_id,
  355. trace_id=trace_id,
  356. article_crawler_video_table=self.article_crawler_video_table,
  357. db_client=self.long_articles_client,
  358. root_content_id=params['root_content_id']
  359. )
  360. if update_rows:
  361. affected_rows = await self.update_content_status(
  362. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  363. trace_id=trace_id,
  364. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  365. )
  366. if affected_rows == 0:
  367. return
  368. logging(
  369. code="8024",
  370. function="spider_task",
  371. trace_id=trace_id,
  372. info="从root_content_id获取结果",
  373. data=params
  374. )
  375. return True
  376. else:
  377. params.pop("root_content_id", None)
  378. return await self.spider_task(params, kimi_result)
  379. download_video_exist_flag = await whether_downloaded_videos_exists(
  380. content_id=content_id,
  381. article_crawler_video_table=self.article_crawler_video_table,
  382. db_client=self.long_articles_client
  383. )
  384. if download_video_exist_flag:
  385. await self.update_content_status(
  386. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  387. trace_id=trace_id,
  388. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  389. )
  390. return True
  391. # 开始处理,将状态由 1 改成 101
  392. affected_rows = await self.update_content_status(
  393. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  394. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  395. trace_id=trace_id
  396. )
  397. if affected_rows == 0:
  398. logging(
  399. code="6000",
  400. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  401. )
  402. return False
  403. try:
  404. logging(
  405. code="spider_1001",
  406. info="开始执行搜索任务",
  407. trace_id=trace_id,
  408. data=kimi_result
  409. )
  410. search_videos_count = await search_videos_from_web(
  411. info={
  412. "ori_title": kimi_result['ori_title'],
  413. "kimi_summary": kimi_result['kimi_summary'],
  414. "kimi_keys": kimi_result['kimi_keys'],
  415. "trace_id": trace_id,
  416. "gh_id": gh_id,
  417. "content_id": content_id,
  418. "crawler_video_table": self.article_crawler_video_table
  419. },
  420. gh_id_map=self.account_map,
  421. db_client=self.long_articles_client
  422. )
  423. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  424. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  425. logging(
  426. code="spider_1002",
  427. info="搜索成功",
  428. trace_id=trace_id,
  429. data=kimi_result
  430. )
  431. await self.update_content_status(
  432. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  433. trace_id=trace_id,
  434. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  435. )
  436. return True
  437. else:
  438. logging(
  439. code="spider_1003",
  440. info="搜索失败",
  441. trace_id=trace_id,
  442. data=kimi_result
  443. )
  444. await self.roll_back_content_status_when_fails(
  445. process_times=process_times + 1,
  446. trace_id=trace_id
  447. )
  448. return False
  449. except Exception as e:
  450. await self.roll_back_content_status_when_fails(
  451. process_times=process_times + 1,
  452. trace_id=trace_id
  453. )
  454. print("爬虫处理失败: {}".format(e))
  455. return False
  456. async def etl_task(self, params):
  457. """
  458. download && upload videos
  459. :param params:
  460. :return:
  461. """
  462. trace_id = params['trace_id']
  463. content_id = params['content_id']
  464. process_times = params['process_times']
  465. # 判断视频是否已下载完成
  466. video_exist_flag = await whether_downloaded_videos_exists(
  467. content_id=content_id,
  468. article_crawler_video_table=self.article_crawler_video_table,
  469. db_client=self.long_articles_client
  470. )
  471. if video_exist_flag:
  472. affect_rows = await self.update_content_status(
  473. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  474. trace_id=trace_id,
  475. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  476. )
  477. if affect_rows == 0:
  478. logging(
  479. code="6000",
  480. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  481. )
  482. return False
  483. return True
  484. else:
  485. # 开始处理, 将文章状态修改为处理状态
  486. affected_rows = await self.update_content_status(
  487. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  488. trace_id=trace_id,
  489. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  490. )
  491. if affected_rows == 0:
  492. logging(
  493. code="6000",
  494. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  495. )
  496. return False
  497. # download videos
  498. downloaded_count = await async_download_videos(
  499. trace_id=trace_id,
  500. content_id=content_id,
  501. article_crawler_video_table=self.article_crawler_video_table,
  502. db_client=self.long_articles_client
  503. )
  504. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  505. await self.update_content_status(
  506. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  507. trace_id=trace_id,
  508. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  509. )
  510. return True
  511. else:
  512. await self.roll_back_content_status_when_fails(
  513. process_times=process_times + 1,
  514. trace_id=trace_id
  515. )
  516. return False
  517. async def publish_task(self, params, kimi_title):
  518. """
  519. 发布任务
  520. :param kimi_title:
  521. :param params:
  522. :return:
  523. """
  524. gh_id = params['gh_id']
  525. flow_pool_level = params['flow_pool_level']
  526. content_id = params['content_id']
  527. trace_id = params['trace_id']
  528. process_times = params['process_times']
  529. # 开始处理,将状态修改为操作状态
  530. affected_rows = await self.update_content_status(
  531. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  532. trace_id=trace_id,
  533. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  534. )
  535. if affected_rows == 0:
  536. logging(
  537. code="6000",
  538. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  539. )
  540. return False
  541. try:
  542. download_videos = await get_downloaded_videos(
  543. content_id=content_id,
  544. article_crawler_video_table=self.article_crawler_video_table,
  545. db_client=self.long_articles_client
  546. )
  547. match flow_pool_level:
  548. case "autoArticlePoolLevel4":
  549. # 冷启层, 全量做
  550. video_list = shuffle_list(download_videos)[:3]
  551. case "autoArticlePoolLevel3":
  552. if self.gh_id_dict.get(gh_id):
  553. video_list = shuffle_list(download_videos)[:3]
  554. else:
  555. video_list = download_videos[:3]
  556. case "autoArticlePoolLevel2":
  557. # 次条,只针对具体账号做
  558. video_list = []
  559. case "autoArticlePoolLevel1":
  560. # 头条,先不做
  561. video_list = download_videos[:3]
  562. case _:
  563. video_list = download_videos[:3]
  564. # 将视频发布至票圈
  565. await publish_videos_to_piaoquan(
  566. video_list=video_list,
  567. kimi_title=kimi_title,
  568. process_times=process_times,
  569. trace_id=trace_id,
  570. db_client=self.long_articles_client,
  571. article_match_video_table=self.article_match_video_table
  572. )
  573. except Exception as e:
  574. await self.roll_back_content_status_when_fails(
  575. process_times=params['process_times'] + 1,
  576. trace_id=params['trace_id']
  577. )
  578. print(e)
  579. async def start_process(self, params):
  580. """
  581. 处理单篇文章
  582. :param params:
  583. :return:
  584. """
  585. kimi_result = await self.kimi_task(params)
  586. trace_id = params['trace_id']
  587. process_times = params['process_times']
  588. content_id = params['content_id']
  589. publish_flag = params['publish_flag']
  590. if kimi_result:
  591. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  592. print("kimi success")
  593. logging(
  594. code=3001,
  595. info="kimi success",
  596. trace_id=trace_id
  597. )
  598. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  599. if spider_flag:
  600. # 等待爬虫执行完成后,开始执行 etl_task
  601. print("spider success")
  602. logging(
  603. code=3002,
  604. info="spider_success",
  605. trace_id=trace_id
  606. )
  607. etl_flag = await self.etl_task(params)
  608. if etl_flag:
  609. # 等待下载上传完成,执行发布任务
  610. print("etl success")
  611. logging(
  612. code="3003",
  613. info="etl_success",
  614. trace_id=trace_id
  615. )
  616. """
  617. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  618. 目前先对这两种情况都做托管操作
  619. """
  620. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  621. logging(
  622. code="3013",
  623. info="不需要发布,长文系统托管发布",
  624. trace_id=trace_id
  625. )
  626. return
  627. else:
  628. try:
  629. await self.publish_task(params, kimi_result['kimi_title'])
  630. logging(
  631. code="3004",
  632. info="publish_success",
  633. trace_id=trace_id
  634. )
  635. await record_trace_id(
  636. trace_id=trace_id,
  637. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  638. )
  639. except Exception as e:
  640. logging(
  641. code="6004",
  642. info="publish 失败--{}".format(e),
  643. trace_id=params['trace_id']
  644. )
  645. else:
  646. logging(
  647. code="6003",
  648. info="ETL 处理失败",
  649. trace_id=params['trace_id']
  650. )
  651. else:
  652. logging(
  653. code="6002",
  654. info="爬虫处理失败",
  655. trace_id=params['trace_id']
  656. )
  657. else:
  658. logging(
  659. code="6001",
  660. info="kimi 处理失败",
  661. trace_id=trace_id
  662. )
  663. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  664. logging(
  665. code="6011",
  666. info="kimi处理次数达到上限, 放弃处理",
  667. trace_id=trace_id
  668. )
  669. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  670. update_sql = f"""
  671. UPDATE {self.article_match_video_table}
  672. SET content_status = %s
  673. WHERE content_id = %s and content_status = %s;
  674. """
  675. affected_rows = await self.long_articles_client.async_insert(
  676. sql=update_sql,
  677. params=(
  678. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  679. content_id,
  680. NewContentIdTaskConst.TASK_INIT_STATUS
  681. )
  682. )
  683. # 查询出该content_id所对应的标题
  684. select_sql = f"""
  685. SELECT article_title
  686. FROM {self.article_text_table}
  687. WHERE content_id = '{content_id}';
  688. """
  689. result = await self.long_articles_client.async_select(select_sql)
  690. bot(
  691. title="KIMI 处理失败",
  692. detail={
  693. "content_id": content_id,
  694. "affected_rows": affected_rows,
  695. "article_title": result[0][0]
  696. },
  697. mention=False
  698. )
  699. async def process_each_task(self, params):
  700. """
  701. 处理任务
  702. :return:
  703. """
  704. content_id = params['content_id']
  705. flow_pool_level = params['flow_pool_level']
  706. download_videos_exists_flag = await whether_downloaded_videos_exists(
  707. content_id=content_id,
  708. article_crawler_video_table=self.article_crawler_video_table,
  709. db_client=self.long_articles_client
  710. )
  711. if not download_videos_exists_flag:
  712. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  713. if processing_flag:
  714. logging(
  715. code="9001",
  716. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  717. )
  718. else:
  719. # 判断是否存在root_content_id
  720. if flow_pool_level != 'autoArticlePoolLevel4':
  721. # 如果查到根content_id, 采用根content_id的视频
  722. root_content_id = await self.get_source_content_id(content_id)
  723. if root_content_id:
  724. # 传参新增root_content_id
  725. params['root_content_id'] = root_content_id
  726. # 开始处理
  727. await self.start_process(params=params)
  728. else:
  729. print("存在已下载视频")
  730. async def deal(self) -> None:
  731. """
  732. function
  733. :return:
  734. """
  735. # 处理未托管的任务
  736. # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  737. # 处理托管任务
  738. # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  739. # 将处理次数大于3次且未成功的任务置为失败
  740. # await self.set_tasks_status_fail()
  741. # 获取task_list
  742. task_list = await self.get_tasks()
  743. task_dict = {task['content_id']: task for task in task_list}
  744. process_list = list(task_dict.values())
  745. logging(
  746. code="5001",
  747. info="Match Task Got {} this time".format(len(process_list)),
  748. function="Publish Task"
  749. )
  750. # 处理process_list
  751. if process_list:
  752. a = time.time()
  753. tasks = [self.process_each_task(params) for params in process_list]
  754. await asyncio.gather(*tasks)
  755. b = time.time()
  756. print("处理时间: {} s".format(b - a))
  757. else:
  758. logging(
  759. code="9008",
  760. info="没有要处理的请求"
  761. )