new_contentId_task.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. GROUP BY content_id
  46. ) t2
  47. ON t1.content_id = t2.content_id
  48. WHERE
  49. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  50. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  51. AND t2.cnt IS NULL
  52. ORDER BY flow_pool_level, request_timestamp
  53. LIMIT {self.spider_coroutines};
  54. """
  55. tasks = await self.long_articles_client.async_select(select_sql)
  56. if tasks:
  57. return [
  58. {
  59. "trace_id": i[0],
  60. "content_id": i[1],
  61. "flow_pool_level": i[2],
  62. "gh_id": i[3],
  63. "process_times": i[4],
  64. "publish_flag": i[5]
  65. }
  66. for i in tasks
  67. ]
  68. else:
  69. return []
  70. async def set_tasks_status_fail(self) -> None:
  71. """
  72. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  73. """
  74. update_status_sql = f"""
  75. UPDATE
  76. {self.article_match_video_table}
  77. SET
  78. content_status = %s
  79. WHERE
  80. process_times > %s and content_status not in (%s, %s);
  81. """
  82. await self.long_articles_client.async_insert(
  83. update_status_sql,
  84. params=(
  85. NewContentIdTaskConst.TASK_FAIL_STATUS,
  86. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  87. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  88. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  89. )
  90. )
  91. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  92. """
  93. 将长时间处于中间状态的任务回滚
  94. """
  95. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  96. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  97. processing_status_tuple = (
  98. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  99. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  100. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  102. )
  103. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  104. processing_status_tuple = (
  105. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  106. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  107. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  108. )
  109. else:
  110. return
  111. select_processing_sql = f"""
  112. SELECT
  113. trace_id, content_status_update_time, process_times, content_status
  114. FROM
  115. {self.article_match_video_table}
  116. WHERE
  117. content_status in {processing_status_tuple}
  118. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  119. and publish_flag = {publish_flag};
  120. """
  121. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  122. if processing_articles:
  123. processing_list = [
  124. {
  125. "trace_id": item[0],
  126. "content_status_update_time": item[1],
  127. "process_times": item[2],
  128. "content_status": item[3]
  129. }
  130. for item in processing_articles
  131. ]
  132. for obj in processing_list:
  133. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  134. # 认为该任务失败
  135. await self.roll_back_content_status_when_fails(
  136. process_times=obj['process_times'] + 1,
  137. trace_id=obj['trace_id'],
  138. ori_content_status=obj['content_status']
  139. )
  140. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  141. """
  142. :param new_content_status:
  143. :param trace_id:
  144. :param ori_content_status:
  145. :return:
  146. """
  147. update_sql = f"""
  148. UPDATE {self.article_match_video_table}
  149. SET content_status = %s, content_status_update_time = %s
  150. WHERE trace_id = %s and content_status = %s;
  151. """
  152. row_counts = await self.long_articles_client.async_insert(
  153. sql=update_sql,
  154. params=(
  155. new_content_status,
  156. int(time.time()),
  157. trace_id,
  158. ori_content_status
  159. )
  160. )
  161. return row_counts
  162. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  163. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  164. """
  165. 处理失败,回滚至初始状态,处理次数加 1
  166. :param process_times:
  167. :param trace_id:
  168. :param ori_content_status:
  169. :return:
  170. """
  171. update_article_sql = f"""
  172. UPDATE {self.article_match_video_table}
  173. SET
  174. content_status = %s,
  175. content_status_update_time = %s,
  176. process_times = %s
  177. WHERE trace_id = %s and content_status = %s;
  178. """
  179. await self.long_articles_client.async_insert(
  180. sql=update_article_sql,
  181. params=(
  182. NewContentIdTaskConst.TASK_INIT_STATUS,
  183. int(time.time()),
  184. process_times + 1,
  185. trace_id,
  186. ori_content_status
  187. )
  188. )
  189. async def judge_whether_same_content_id_is_processing(self, content_id):
  190. """
  191. 同一个 content_id 只需要处理一次
  192. :param content_id:
  193. :return:
  194. success: 4
  195. init: 0
  196. fail: 99
  197. """
  198. select_sql = f"""
  199. SELECT distinct content_status
  200. FROM {self.article_match_video_table}
  201. WHERE content_id = '{content_id}';
  202. """
  203. result = await self.long_articles_client.async_select(select_sql)
  204. if result:
  205. for item in result:
  206. content_status = item[0]
  207. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  208. if content_status in {
  209. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  210. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  212. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  213. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  214. }:
  215. return True
  216. return False
  217. else:
  218. return False
  219. async def get_source_content_id(self, new_content_id):
  220. """
  221. 通过content_id 查找源content_id,并且更新其内容
  222. """
  223. select_channel_id_sql = f"""
  224. SELECT channel_content_id
  225. FROM produce_plan_exe_record
  226. WHERE plan_exe_id = '{new_content_id}';
  227. """
  228. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  229. if channel_content_id:
  230. select_source_content_id_sql = f"""
  231. SELECT root_produce_content_id
  232. FROM article_pool_promotion_source
  233. WHERE channel_content_id = '{channel_content_id[0][0]}';
  234. """
  235. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  236. if source_content_id:
  237. return source_content_id[0][0]
  238. else:
  239. return
  240. else:
  241. return
  242. async def kimi_task(self, params):
  243. """
  244. 执行 kimi 任务
  245. :return:
  246. """
  247. trace_id = params['trace_id']
  248. if params.get("root_content_id"):
  249. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  250. if kimi_result:
  251. affected_rows = await self.update_content_status(
  252. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  253. trace_id=trace_id,
  254. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  255. )
  256. if affected_rows == 0:
  257. logging(
  258. code="6000",
  259. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  260. )
  261. return
  262. logging(
  263. code="8023",
  264. function="kimi_task",
  265. trace_id=trace_id,
  266. info="从root_content_id获取结果",
  267. data=params
  268. )
  269. return kimi_result
  270. else:
  271. params.pop('root_content_id', None)
  272. print(params)
  273. return await self.kimi_task(params)
  274. # 处理content_id
  275. content_id = params['content_id']
  276. process_times = params['process_times']
  277. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  278. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  279. affected_rows = await self.update_content_status(
  280. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  281. trace_id=trace_id,
  282. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  283. )
  284. if affected_rows == 0:
  285. logging(
  286. code="6000",
  287. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  288. )
  289. return
  290. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  291. return kimi_result
  292. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  293. logging(
  294. code="4000",
  295. info="long_articles_text表中未找到 content_id"
  296. )
  297. else:
  298. # 开始处理,讲 content_status 从 0 改为 101
  299. affected_rows = await self.update_content_status(
  300. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  301. trace_id=trace_id,
  302. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  303. )
  304. if affected_rows == 0:
  305. logging(
  306. code="6000",
  307. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  308. )
  309. return
  310. try:
  311. kimi_result = await generate_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  312. await self.update_content_status(
  313. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  314. trace_id=trace_id,
  315. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  316. )
  317. return kimi_result
  318. except Exception as e:
  319. # kimi 任务处理失败
  320. update_kimi_sql = f"""
  321. UPDATE {self.article_text_table}
  322. SET
  323. kimi_status = %s
  324. WHERE content_id = %s
  325. """
  326. await self.long_articles_client.async_insert(
  327. sql=update_kimi_sql,
  328. params=(
  329. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  330. content_id
  331. )
  332. )
  333. # 将状态由 101 回退为 0
  334. await self.roll_back_content_status_when_fails(
  335. process_times=process_times,
  336. trace_id=trace_id
  337. )
  338. return {}
  339. async def spider_task(self, params, kimi_result):
  340. """
  341. 爬虫任务
  342. :return:
  343. """
  344. trace_id = params['trace_id']
  345. content_id = params['content_id']
  346. process_times = params['process_times']
  347. gh_id = params['gh_id']
  348. if params.get("root_content_id"):
  349. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  350. update_rows = await update_crawler_table_with_exist_content_id(
  351. content_id=content_id,
  352. trace_id=trace_id,
  353. article_crawler_video_table=self.article_crawler_video_table,
  354. db_client=self.long_articles_client,
  355. root_content_id=params['root_content_id']
  356. )
  357. if update_rows:
  358. affected_rows = await self.update_content_status(
  359. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  360. trace_id=trace_id,
  361. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  362. )
  363. if affected_rows == 0:
  364. return
  365. logging(
  366. code="8024",
  367. function="spider_task",
  368. trace_id=trace_id,
  369. info="从root_content_id获取结果",
  370. data=params
  371. )
  372. return True
  373. else:
  374. params.pop("root_content_id", None)
  375. return await self.spider_task(params, kimi_result)
  376. download_video_exist_flag = await whether_downloaded_videos_exists(
  377. content_id=content_id,
  378. article_crawler_video_table=self.article_crawler_video_table,
  379. db_client=self.long_articles_client
  380. )
  381. if download_video_exist_flag:
  382. await self.update_content_status(
  383. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  384. trace_id=trace_id,
  385. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  386. )
  387. return True
  388. # 开始处理,将状态由 1 改成 101
  389. affected_rows = await self.update_content_status(
  390. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  391. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  392. trace_id=trace_id
  393. )
  394. if affected_rows == 0:
  395. logging(
  396. code="6000",
  397. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  398. )
  399. return False
  400. try:
  401. logging(
  402. code="spider_1001",
  403. info="开始执行搜索任务",
  404. trace_id=trace_id,
  405. data=kimi_result
  406. )
  407. search_videos_count = await search_videos_from_web(
  408. info={
  409. "ori_title": kimi_result['ori_title'],
  410. "kimi_summary": kimi_result['kimi_summary'],
  411. "kimi_keys": kimi_result['kimi_keys'],
  412. "trace_id": trace_id,
  413. "gh_id": gh_id,
  414. "content_id": content_id,
  415. "crawler_video_table": self.article_crawler_video_table
  416. },
  417. gh_id_map=self.account_map,
  418. db_client=self.long_articles_client
  419. )
  420. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  421. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  422. logging(
  423. code="spider_1002",
  424. info="搜索成功",
  425. trace_id=trace_id,
  426. data=kimi_result
  427. )
  428. await self.update_content_status(
  429. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  430. trace_id=trace_id,
  431. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  432. )
  433. return True
  434. else:
  435. logging(
  436. code="spider_1003",
  437. info="搜索失败",
  438. trace_id=trace_id,
  439. data=kimi_result
  440. )
  441. await self.roll_back_content_status_when_fails(
  442. process_times=process_times + 1,
  443. trace_id=trace_id
  444. )
  445. return False
  446. except Exception as e:
  447. await self.roll_back_content_status_when_fails(
  448. process_times=process_times + 1,
  449. trace_id=trace_id
  450. )
  451. print("爬虫处理失败: {}".format(e))
  452. return False
  453. async def etl_task(self, params):
  454. """
  455. download && upload videos
  456. :param params:
  457. :return:
  458. """
  459. trace_id = params['trace_id']
  460. content_id = params['content_id']
  461. process_times = params['process_times']
  462. # 判断视频是否已下载完成
  463. video_exist_flag = await whether_downloaded_videos_exists(
  464. content_id=content_id,
  465. article_crawler_video_table=self.article_crawler_video_table,
  466. db_client=self.long_articles_client
  467. )
  468. if video_exist_flag:
  469. affect_rows = await self.update_content_status(
  470. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  471. trace_id=trace_id,
  472. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  473. )
  474. if affect_rows == 0:
  475. logging(
  476. code="6000",
  477. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  478. )
  479. return False
  480. return True
  481. else:
  482. # 开始处理, 将文章状态修改为处理状态
  483. affected_rows = await self.update_content_status(
  484. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  485. trace_id=trace_id,
  486. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  487. )
  488. if affected_rows == 0:
  489. logging(
  490. code="6000",
  491. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  492. )
  493. return False
  494. # download videos
  495. downloaded_count = await async_download_videos(
  496. trace_id=trace_id,
  497. content_id=content_id,
  498. article_crawler_video_table=self.article_crawler_video_table,
  499. db_client=self.long_articles_client
  500. )
  501. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  502. await self.update_content_status(
  503. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  504. trace_id=trace_id,
  505. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  506. )
  507. return True
  508. else:
  509. await self.roll_back_content_status_when_fails(
  510. process_times=process_times + 1,
  511. trace_id=trace_id
  512. )
  513. return False
  514. async def publish_task(self, params, kimi_title):
  515. """
  516. 发布任务
  517. :param kimi_title:
  518. :param params:
  519. :return:
  520. """
  521. gh_id = params['gh_id']
  522. flow_pool_level = params['flow_pool_level']
  523. content_id = params['content_id']
  524. trace_id = params['trace_id']
  525. process_times = params['process_times']
  526. # 开始处理,将状态修改为操作状态
  527. affected_rows = await self.update_content_status(
  528. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  529. trace_id=trace_id,
  530. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  531. )
  532. if affected_rows == 0:
  533. logging(
  534. code="6000",
  535. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  536. )
  537. return False
  538. try:
  539. download_videos = await get_downloaded_videos(
  540. content_id=content_id,
  541. article_crawler_video_table=self.article_crawler_video_table,
  542. db_client=self.long_articles_client
  543. )
  544. match flow_pool_level:
  545. case "autoArticlePoolLevel4":
  546. # 冷启层, 全量做
  547. video_list = shuffle_list(download_videos)[:3]
  548. case "autoArticlePoolLevel3":
  549. if self.gh_id_dict.get(gh_id):
  550. video_list = shuffle_list(download_videos)[:3]
  551. else:
  552. video_list = download_videos[:3]
  553. case "autoArticlePoolLevel2":
  554. # 次条,只针对具体账号做
  555. video_list = []
  556. case "autoArticlePoolLevel1":
  557. # 头条,先不做
  558. video_list = download_videos[:3]
  559. case _:
  560. video_list = download_videos[:3]
  561. # 将视频发布至票圈
  562. await publish_videos_to_piaoquan(
  563. video_list=video_list,
  564. kimi_title=kimi_title,
  565. process_times=process_times,
  566. trace_id=trace_id,
  567. db_client=self.long_articles_client,
  568. article_match_video_table=self.article_match_video_table
  569. )
  570. except Exception as e:
  571. await self.roll_back_content_status_when_fails(
  572. process_times=params['process_times'] + 1,
  573. trace_id=params['trace_id']
  574. )
  575. print(e)
  576. async def start_process(self, params):
  577. """
  578. 处理单篇文章
  579. :param params:
  580. :return:
  581. """
  582. kimi_result = await self.kimi_task(params)
  583. trace_id = params['trace_id']
  584. process_times = params['process_times']
  585. content_id = params['content_id']
  586. publish_flag = params['publish_flag']
  587. if kimi_result:
  588. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  589. print("kimi success")
  590. logging(
  591. code=3001,
  592. info="kimi success",
  593. trace_id=trace_id
  594. )
  595. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  596. if spider_flag:
  597. # 等待爬虫执行完成后,开始执行 etl_task
  598. print("spider success")
  599. logging(
  600. code=3002,
  601. info="spider_success",
  602. trace_id=trace_id
  603. )
  604. etl_flag = await self.etl_task(params)
  605. if etl_flag:
  606. # 等待下载上传完成,执行发布任务
  607. print("etl success")
  608. logging(
  609. code="3003",
  610. info="etl_success",
  611. trace_id=trace_id
  612. )
  613. """
  614. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  615. 目前先对这两种情况都做托管操作
  616. """
  617. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  618. logging(
  619. code="3013",
  620. info="不需要发布,长文系统托管发布",
  621. trace_id=trace_id
  622. )
  623. return
  624. else:
  625. try:
  626. await self.publish_task(params, kimi_result['kimi_title'])
  627. logging(
  628. code="3004",
  629. info="publish_success",
  630. trace_id=trace_id
  631. )
  632. await record_trace_id(
  633. trace_id=trace_id,
  634. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  635. )
  636. except Exception as e:
  637. logging(
  638. code="6004",
  639. info="publish 失败--{}".format(e),
  640. trace_id=params['trace_id']
  641. )
  642. else:
  643. logging(
  644. code="6003",
  645. info="ETL 处理失败",
  646. trace_id=params['trace_id']
  647. )
  648. else:
  649. logging(
  650. code="6002",
  651. info="爬虫处理失败",
  652. trace_id=params['trace_id']
  653. )
  654. else:
  655. logging(
  656. code="6001",
  657. info="kimi 处理失败",
  658. trace_id=trace_id
  659. )
  660. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  661. logging(
  662. code="6011",
  663. info="kimi处理次数达到上限, 放弃处理",
  664. trace_id=trace_id
  665. )
  666. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  667. update_sql = f"""
  668. UPDATE {self.article_match_video_table}
  669. SET content_status = %s
  670. WHERE content_id = %s and content_status = %s;
  671. """
  672. affected_rows = await self.long_articles_client.async_insert(
  673. sql=update_sql,
  674. params=(
  675. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  676. content_id,
  677. NewContentIdTaskConst.TASK_INIT_STATUS
  678. )
  679. )
  680. # 查询出该content_id所对应的标题
  681. select_sql = f"""
  682. SELECT article_title
  683. FROM {self.article_text_table}
  684. WHERE content_id = '{content_id}';
  685. """
  686. result = await self.long_articles_client.async_select(select_sql)
  687. bot(
  688. title="KIMI 处理失败",
  689. detail={
  690. "content_id": content_id,
  691. "affected_rows": affected_rows,
  692. "article_title": result[0][0]
  693. },
  694. mention=False
  695. )
  696. async def process_each_task(self, params):
  697. """
  698. 处理任务
  699. :return:
  700. """
  701. content_id = params['content_id']
  702. flow_pool_level = params['flow_pool_level']
  703. download_videos_exists_flag = await whether_downloaded_videos_exists(
  704. content_id=content_id,
  705. article_crawler_video_table=self.article_crawler_video_table,
  706. db_client=self.long_articles_client
  707. )
  708. if not download_videos_exists_flag:
  709. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  710. if processing_flag:
  711. logging(
  712. code="9001",
  713. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  714. )
  715. else:
  716. # 判断是否存在root_content_id
  717. if flow_pool_level != 'autoArticlePoolLevel4':
  718. # 如果查到根content_id, 采用根content_id的视频
  719. root_content_id = await self.get_source_content_id(content_id)
  720. if root_content_id:
  721. # 传参新增root_content_id
  722. params['root_content_id'] = root_content_id
  723. # 开始处理
  724. await self.start_process(params=params)
  725. else:
  726. print("存在已下载视频")
  727. async def deal(self) -> None:
  728. """
  729. function
  730. :return:
  731. """
  732. # 处理未托管的任务
  733. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  734. # 处理托管任务
  735. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  736. # 将处理次数大于3次且未成功的任务置为失败
  737. await self.set_tasks_status_fail()
  738. # 获取task_list
  739. task_list = await self.get_tasks()
  740. task_dict = {task['content_id']: task for task in task_list}
  741. process_list = list(task_dict.values())
  742. logging(
  743. code="5001",
  744. info="Match Task Got {} this time".format(len(process_list)),
  745. function="Publish Task"
  746. )
  747. # 处理process_list
  748. if process_list:
  749. a = time.time()
  750. tasks = [self.process_each_task(params) for params in process_list]
  751. await asyncio.gather(*tasks)
  752. b = time.time()
  753. print("处理时间: {} s".format(b - a))
  754. else:
  755. logging(
  756. code="9008",
  757. info="没有要处理的请求"
  758. )