new_contentId_task.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  244. """
  245. 获取违规的外站视频id
  246. """
  247. select_sql = f"""
  248. SELECT platform, out_video_id
  249. FROM {self.article_crawler_video_table}
  250. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  251. """
  252. response = await self.long_articles_client.async_select(select_sql)
  253. if response:
  254. result = ["{}_{}".format(line[0], line[1]) for line in response]
  255. return result
  256. else:
  257. return []
  258. async def kimi_task(self, params):
  259. """
  260. 执行 kimi 任务
  261. :return:
  262. """
  263. trace_id = params['trace_id']
  264. if params.get("root_content_id"):
  265. kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
  266. if kimi_result:
  267. affected_rows = await self.update_content_status(
  268. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  269. trace_id=trace_id,
  270. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  271. )
  272. if affected_rows == 0:
  273. logging(
  274. code="6000",
  275. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  276. )
  277. return
  278. # 将root_content_id的kimi结果更新到content_id
  279. content_id = params['content_id']
  280. update_count = await update_kimi_status(
  281. kimi_info=kimi_result,
  282. content_id=content_id,
  283. db_client=self.long_articles_client,
  284. article_text_table=self.article_text_table,
  285. success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
  286. init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
  287. )
  288. if update_count == 0:
  289. logging(
  290. code="6000",
  291. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  292. )
  293. return
  294. logging(
  295. code="8023",
  296. function="kimi_task",
  297. trace_id=trace_id,
  298. info="从root_content_id获取结果",
  299. data=params
  300. )
  301. return kimi_result
  302. else:
  303. params.pop('root_content_id', None)
  304. print(params)
  305. return await self.kimi_task(params)
  306. # 处理content_id
  307. content_id = params['content_id']
  308. process_times = params['process_times']
  309. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  310. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  311. affected_rows = await self.update_content_status(
  312. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  313. trace_id=trace_id,
  314. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  315. )
  316. if affected_rows == 0:
  317. logging(
  318. code="6000",
  319. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  320. )
  321. return
  322. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  323. return kimi_result
  324. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  325. logging(
  326. code="4000",
  327. info="long_articles_text表中未找到 content_id"
  328. )
  329. else:
  330. # 开始处理,讲 content_status 从 0 改为 101
  331. affected_rows = await self.update_content_status(
  332. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  333. trace_id=trace_id,
  334. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  335. )
  336. if affected_rows == 0:
  337. logging(
  338. code="6000",
  339. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  340. )
  341. return
  342. try:
  343. kimi_result = await generate_kimi_result(
  344. content_id=content_id,
  345. article_text_table=self.article_text_table,
  346. db_client=self.long_articles_client,
  347. safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE
  348. )
  349. await self.update_content_status(
  350. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  351. trace_id=trace_id,
  352. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  353. )
  354. return kimi_result
  355. except Exception as e:
  356. # kimi 任务处理失败
  357. update_kimi_sql = f"""
  358. UPDATE {self.article_text_table}
  359. SET
  360. kimi_status = %s
  361. WHERE content_id = %s
  362. """
  363. await self.long_articles_client.async_insert(
  364. sql=update_kimi_sql,
  365. params=(
  366. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  367. content_id
  368. )
  369. )
  370. # 将状态由 101 回退为 0
  371. await self.roll_back_content_status_when_fails(
  372. process_times=process_times,
  373. trace_id=trace_id
  374. )
  375. return {}
  376. async def spider_task(self, params, kimi_result):
  377. """
  378. 爬虫任务
  379. :return:
  380. """
  381. trace_id = params['trace_id']
  382. content_id = params['content_id']
  383. process_times = params['process_times']
  384. gh_id = params['gh_id']
  385. if params.get("root_content_id"):
  386. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  387. update_rows = await update_crawler_table_with_exist_content_id(
  388. content_id=content_id,
  389. trace_id=trace_id,
  390. article_crawler_video_table=self.article_crawler_video_table,
  391. db_client=self.long_articles_client,
  392. root_content_id=params['root_content_id']
  393. )
  394. if update_rows:
  395. affected_rows = await self.update_content_status(
  396. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  397. trace_id=trace_id,
  398. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  399. )
  400. if affected_rows == 0:
  401. return
  402. logging(
  403. code="8024",
  404. function="spider_task",
  405. trace_id=trace_id,
  406. info="从root_content_id获取结果",
  407. data=params
  408. )
  409. return True
  410. else:
  411. params.pop("root_content_id", None)
  412. return await self.spider_task(params, kimi_result)
  413. download_video_exist_flag = await whether_downloaded_videos_exists(
  414. content_id=content_id,
  415. article_crawler_video_table=self.article_crawler_video_table,
  416. db_client=self.long_articles_client
  417. )
  418. if download_video_exist_flag:
  419. await self.update_content_status(
  420. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  421. trace_id=trace_id,
  422. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  423. )
  424. return True
  425. # 开始处理,将状态由 1 改成 101
  426. affected_rows = await self.update_content_status(
  427. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  428. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  429. trace_id=trace_id
  430. )
  431. if affected_rows == 0:
  432. logging(
  433. code="6000",
  434. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  435. )
  436. return False
  437. try:
  438. logging(
  439. code="spider_1001",
  440. info="开始执行搜索任务",
  441. trace_id=trace_id,
  442. data=kimi_result
  443. )
  444. search_videos_count = await search_videos_from_web(
  445. info={
  446. "ori_title": kimi_result['ori_title'],
  447. "kimi_summary": kimi_result['kimi_summary'],
  448. "kimi_keys": kimi_result['kimi_keys'],
  449. "trace_id": trace_id,
  450. "gh_id": gh_id,
  451. "content_id": content_id,
  452. "crawler_video_table": self.article_crawler_video_table
  453. },
  454. gh_id_map=self.account_map,
  455. db_client=self.long_articles_client
  456. )
  457. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  458. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  459. logging(
  460. code="spider_1002",
  461. info="搜索成功",
  462. trace_id=trace_id,
  463. data=kimi_result
  464. )
  465. await self.update_content_status(
  466. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  467. trace_id=trace_id,
  468. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  469. )
  470. return True
  471. else:
  472. logging(
  473. code="spider_1003",
  474. info="搜索失败",
  475. trace_id=trace_id,
  476. data=kimi_result
  477. )
  478. await self.roll_back_content_status_when_fails(
  479. process_times=process_times + 1,
  480. trace_id=trace_id
  481. )
  482. return False
  483. except Exception as e:
  484. await self.roll_back_content_status_when_fails(
  485. process_times=process_times + 1,
  486. trace_id=trace_id
  487. )
  488. print("爬虫处理失败: {}".format(e))
  489. return False
  490. async def etl_task(self, params):
  491. """
  492. download && upload videos
  493. :param params:
  494. :return:
  495. """
  496. trace_id = params['trace_id']
  497. content_id = params['content_id']
  498. process_times = params['process_times']
  499. # 判断视频是否已下载完成
  500. video_exist_flag = await whether_downloaded_videos_exists(
  501. content_id=content_id,
  502. article_crawler_video_table=self.article_crawler_video_table,
  503. db_client=self.long_articles_client
  504. )
  505. if video_exist_flag:
  506. affect_rows = await self.update_content_status(
  507. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  508. trace_id=trace_id,
  509. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  510. )
  511. if affect_rows == 0:
  512. logging(
  513. code="6000",
  514. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  515. )
  516. return False
  517. return True
  518. else:
  519. # 开始处理, 将文章状态修改为处理状态
  520. affected_rows = await self.update_content_status(
  521. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  522. trace_id=trace_id,
  523. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  524. )
  525. if affected_rows == 0:
  526. logging(
  527. code="6000",
  528. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  529. )
  530. return False
  531. # download videos
  532. illegal_videos = await self.get_illegal_out_ids(content_id)
  533. downloaded_count = await async_download_videos(
  534. trace_id=trace_id,
  535. content_id=content_id,
  536. article_crawler_video_table=self.article_crawler_video_table,
  537. db_client=self.long_articles_client,
  538. illegal_videos=illegal_videos
  539. )
  540. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  541. await self.update_content_status(
  542. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  543. trace_id=trace_id,
  544. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  545. )
  546. return True
  547. else:
  548. await self.roll_back_content_status_when_fails(
  549. process_times=process_times + 1,
  550. trace_id=trace_id
  551. )
  552. return False
  553. async def publish_task(self, params, kimi_title):
  554. """
  555. 发布任务
  556. :param kimi_title:
  557. :param params:
  558. :return:
  559. """
  560. gh_id = params['gh_id']
  561. flow_pool_level = params['flow_pool_level']
  562. content_id = params['content_id']
  563. trace_id = params['trace_id']
  564. process_times = params['process_times']
  565. # 开始处理,将状态修改为操作状态
  566. affected_rows = await self.update_content_status(
  567. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  568. trace_id=trace_id,
  569. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  570. )
  571. if affected_rows == 0:
  572. logging(
  573. code="6000",
  574. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  575. )
  576. return False
  577. try:
  578. download_videos = await get_downloaded_videos(
  579. content_id=content_id,
  580. article_crawler_video_table=self.article_crawler_video_table,
  581. db_client=self.long_articles_client
  582. )
  583. match flow_pool_level:
  584. case "autoArticlePoolLevel4":
  585. # 冷启层, 全量做
  586. video_list = shuffle_list(download_videos)[:3]
  587. case "autoArticlePoolLevel3":
  588. if self.gh_id_dict.get(gh_id):
  589. video_list = shuffle_list(download_videos)[:3]
  590. else:
  591. video_list = download_videos[:3]
  592. case "autoArticlePoolLevel2":
  593. # 次条,只针对具体账号做
  594. video_list = []
  595. case "autoArticlePoolLevel1":
  596. # 头条,先不做
  597. video_list = download_videos[:3]
  598. case _:
  599. video_list = download_videos[:3]
  600. # 将视频发布至票圈
  601. await publish_videos_to_piaoquan(
  602. video_list=video_list,
  603. kimi_title=kimi_title,
  604. process_times=process_times,
  605. trace_id=trace_id,
  606. db_client=self.long_articles_client,
  607. article_match_video_table=self.article_match_video_table
  608. )
  609. except Exception as e:
  610. await self.roll_back_content_status_when_fails(
  611. process_times=params['process_times'] + 1,
  612. trace_id=params['trace_id']
  613. )
  614. print(e)
  615. async def start_process(self, params):
  616. """
  617. 处理单篇文章
  618. :param params:
  619. :return:
  620. """
  621. kimi_result = await self.kimi_task(params)
  622. trace_id = params['trace_id']
  623. process_times = params['process_times']
  624. content_id = params['content_id']
  625. publish_flag = params['publish_flag']
  626. if kimi_result:
  627. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  628. print("kimi success")
  629. logging(
  630. code=3001,
  631. info="kimi success",
  632. trace_id=trace_id
  633. )
  634. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  635. if spider_flag:
  636. # 等待爬虫执行完成后,开始执行 etl_task
  637. print("spider success")
  638. logging(
  639. code=3002,
  640. info="spider_success",
  641. trace_id=trace_id
  642. )
  643. etl_flag = await self.etl_task(params)
  644. if etl_flag:
  645. # 等待下载上传完成,执行发布任务
  646. print("etl success")
  647. logging(
  648. code="3003",
  649. info="etl_success",
  650. trace_id=trace_id
  651. )
  652. """
  653. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  654. 目前先对这两种情况都做托管操作
  655. """
  656. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  657. logging(
  658. code="3013",
  659. info="不需要发布,长文系统托管发布",
  660. trace_id=trace_id
  661. )
  662. return
  663. else:
  664. try:
  665. await self.publish_task(params, kimi_result['kimi_title'])
  666. logging(
  667. code="3004",
  668. info="publish_success",
  669. trace_id=trace_id
  670. )
  671. await record_trace_id(
  672. trace_id=trace_id,
  673. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  674. )
  675. except Exception as e:
  676. logging(
  677. code="6004",
  678. info="publish 失败--{}".format(e),
  679. trace_id=params['trace_id']
  680. )
  681. else:
  682. logging(
  683. code="6003",
  684. info="ETL 处理失败",
  685. trace_id=params['trace_id']
  686. )
  687. else:
  688. logging(
  689. code="6002",
  690. info="爬虫处理失败",
  691. trace_id=params['trace_id']
  692. )
  693. else:
  694. logging(
  695. code="6001",
  696. info="kimi 处理失败",
  697. trace_id=trace_id
  698. )
  699. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  700. logging(
  701. code="6011",
  702. info="kimi处理次数达到上限, 放弃处理",
  703. trace_id=trace_id
  704. )
  705. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  706. update_sql = f"""
  707. UPDATE {self.article_match_video_table}
  708. SET content_status = %s
  709. WHERE content_id = %s and content_status = %s;
  710. """
  711. affected_rows = await self.long_articles_client.async_insert(
  712. sql=update_sql,
  713. params=(
  714. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  715. content_id,
  716. NewContentIdTaskConst.TASK_INIT_STATUS
  717. )
  718. )
  719. # 查询出该content_id所对应的标题
  720. select_sql = f"""
  721. SELECT article_title
  722. FROM {self.article_text_table}
  723. WHERE content_id = '{content_id}';
  724. """
  725. result = await self.long_articles_client.async_select(select_sql)
  726. bot(
  727. title="KIMI 处理失败",
  728. detail={
  729. "content_id": content_id,
  730. "affected_rows": affected_rows,
  731. "article_title": result[0][0]
  732. },
  733. mention=False
  734. )
  735. async def process_each_task(self, params):
  736. """
  737. 处理任务
  738. :return:
  739. """
  740. content_id = params['content_id']
  741. flow_pool_level = params['flow_pool_level']
  742. download_videos_exists_flag = await whether_downloaded_videos_exists(
  743. content_id=content_id,
  744. article_crawler_video_table=self.article_crawler_video_table,
  745. db_client=self.long_articles_client
  746. )
  747. if not download_videos_exists_flag:
  748. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  749. if processing_flag:
  750. logging(
  751. code="9001",
  752. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  753. )
  754. else:
  755. # 判断是否存在root_content_id
  756. if flow_pool_level != 'autoArticlePoolLevel4':
  757. # 如果查到根content_id, 采用根content_id的视频
  758. root_content_id = await self.get_source_content_id(content_id)
  759. if root_content_id:
  760. # 传参新增root_content_id
  761. params['root_content_id'] = root_content_id
  762. # 开始处理
  763. await self.start_process(params=params)
  764. else:
  765. print("存在已下载视频")
  766. async def deal(self) -> None:
  767. """
  768. function
  769. :return:
  770. """
  771. # 处理未托管的任务
  772. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  773. # 处理托管任务
  774. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  775. # 将处理次数大于3次且未成功的任务置为失败
  776. await self.set_tasks_status_fail()
  777. # 获取task_list
  778. task_list = await self.get_tasks()
  779. task_dict = {task['content_id']: task for task in task_list}
  780. process_list = list(task_dict.values())
  781. logging(
  782. code="5001",
  783. info="Match Task Got {} this time".format(len(process_list)),
  784. function="Publish Task"
  785. )
  786. # 处理process_list
  787. if process_list:
  788. a = time.time()
  789. tasks = [self.process_each_task(params) for params in process_list]
  790. await asyncio.gather(*tasks)
  791. b = time.time()
  792. print("处理时间: {} s".format(b - a))
  793. else:
  794. logging(
  795. code="9008",
  796. info="没有要处理的请求"
  797. )