new_contentId_task.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import random
  8. from typing import List, Dict
  9. from applications.config import Config
  10. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  11. from applications.log import logging
  12. from applications.functions.common import shuffle_list
  13. from applications.spider import search_videos_from_web
  14. from applications.feishu import bot
  15. from applications.functions.aigc import record_trace_id
  16. from .utils import *
  17. class NewContentIdTask(object):
  18. """
  19. 不存在历史已经发布的文章的匹配流程
  20. """
  21. def __init__(self, long_articles_client, aigc_client):
  22. self.long_articles_client = long_articles_client
  23. self.aigc_client = aigc_client
  24. self.config = Config()
  25. self.article_match_video_table = self.config.article_match_video_table
  26. self.article_text_table = self.config.article_text_table
  27. self.article_crawler_video_table = self.config.article_crawler_video_table
  28. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  29. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  30. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  31. async def get_tasks(self) -> List[Dict]:
  32. """
  33. 获取 task
  34. :return:
  35. """
  36. # 获取 process_times <= 3 且 content_status = 0 的任务
  37. select_sql = f"""
  38. SELECT
  39. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  40. FROM
  41. {self.article_match_video_table} t1
  42. LEFT JOIN (
  43. SELECT content_id, count(1) as cnt
  44. FROM {self.article_crawler_video_table}
  45. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. # NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  244. """
  245. 获取违规的外站视频id
  246. """
  247. select_sql = f"""
  248. SELECT platform, out_video_id
  249. FROM {self.article_crawler_video_table}
  250. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  251. """
  252. response = await self.long_articles_client.async_select(select_sql)
  253. if response:
  254. result = ["{}_{}".format(line[0], line[1]) for line in response]
  255. return result
  256. else:
  257. return []
  258. async def kimi_task(self, params):
  259. """
  260. 执行 kimi 任务
  261. :return:
  262. """
  263. trace_id = params['trace_id']
  264. # 处理content_id
  265. content_id = params['content_id']
  266. process_times = params['process_times']
  267. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  268. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  269. affected_rows = await self.update_content_status(
  270. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  271. trace_id=trace_id,
  272. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  273. )
  274. if affected_rows == 0:
  275. logging(
  276. code="6000",
  277. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  278. )
  279. return
  280. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  281. return kimi_result
  282. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  283. logging(
  284. code="4000",
  285. info="long_articles_text表中未找到 content_id"
  286. )
  287. else:
  288. # 校验是否存在root_content_id
  289. if params.get("root_content_id"):
  290. kimi_result = await get_kimi_result(content_id=params['root_content_id'],
  291. article_text_table=self.article_text_table,
  292. db_client=self.long_articles_client)
  293. if kimi_result:
  294. affected_rows = await self.update_content_status(
  295. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  296. trace_id=trace_id,
  297. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  298. )
  299. if affected_rows == 0:
  300. logging(
  301. code="6000",
  302. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  303. )
  304. return
  305. # 将root_content_id的kimi结果更新到content_id
  306. content_id = params['content_id']
  307. update_count = await update_kimi_status(
  308. kimi_info=kimi_result,
  309. content_id=content_id,
  310. db_client=self.long_articles_client,
  311. article_text_table=self.article_text_table,
  312. success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
  313. init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
  314. )
  315. if update_count == 0:
  316. logging(
  317. code="6000",
  318. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  319. )
  320. return
  321. logging(
  322. code="8023",
  323. function="kimi_task",
  324. trace_id=trace_id,
  325. info="从root_content_id获取结果",
  326. data=params
  327. )
  328. return kimi_result
  329. else:
  330. params.pop('root_content_id', None)
  331. return await self.kimi_task(params)
  332. # 开始处理,讲 content_status 从 0 改为 101
  333. affected_rows = await self.update_content_status(
  334. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  335. trace_id=trace_id,
  336. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  337. )
  338. if affected_rows == 0:
  339. logging(
  340. code="6000",
  341. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  342. )
  343. return
  344. try:
  345. kimi_result = await generate_kimi_result(
  346. content_id=content_id,
  347. article_text_table=self.article_text_table,
  348. db_client=self.long_articles_client,
  349. safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE
  350. )
  351. await self.update_content_status(
  352. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  353. trace_id=trace_id,
  354. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  355. )
  356. return kimi_result
  357. except Exception as e:
  358. # kimi 任务处理失败
  359. update_kimi_sql = f"""
  360. UPDATE {self.article_text_table}
  361. SET
  362. kimi_status = %s
  363. WHERE content_id = %s
  364. """
  365. await self.long_articles_client.async_insert(
  366. sql=update_kimi_sql,
  367. params=(
  368. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  369. content_id
  370. )
  371. )
  372. # 将状态由 101 回退为 0
  373. await self.roll_back_content_status_when_fails(
  374. process_times=process_times,
  375. trace_id=trace_id
  376. )
  377. return {}
  378. async def spider_task(self, params, kimi_result):
  379. """
  380. 爬虫任务
  381. :return:
  382. """
  383. trace_id = params['trace_id']
  384. content_id = params['content_id']
  385. process_times = params['process_times']
  386. gh_id = params['gh_id']
  387. download_video_exist_flag = await whether_downloaded_videos_exists(
  388. content_id=content_id,
  389. article_crawler_video_table=self.article_crawler_video_table,
  390. db_client=self.long_articles_client
  391. )
  392. if download_video_exist_flag:
  393. await self.update_content_status(
  394. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  395. trace_id=trace_id,
  396. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  397. )
  398. return True
  399. # 判断是否存在root_content_id
  400. if params.get("root_content_id"):
  401. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  402. update_rows = await update_crawler_table_with_exist_content_id(
  403. content_id=content_id,
  404. trace_id=trace_id,
  405. article_crawler_video_table=self.article_crawler_video_table,
  406. db_client=self.long_articles_client,
  407. root_content_id=params['root_content_id']
  408. )
  409. if update_rows:
  410. affected_rows = await self.update_content_status(
  411. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  412. trace_id=trace_id,
  413. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  414. )
  415. if affected_rows == 0:
  416. return
  417. logging(
  418. code="8024",
  419. function="spider_task",
  420. trace_id=trace_id,
  421. info="从root_content_id获取结果",
  422. data=params
  423. )
  424. return True
  425. else:
  426. params.pop("root_content_id", None)
  427. return await self.spider_task(params, kimi_result)
  428. # 开始处理,将状态由 1 改成 101
  429. affected_rows = await self.update_content_status(
  430. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  431. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  432. trace_id=trace_id
  433. )
  434. if affected_rows == 0:
  435. logging(
  436. code="6000",
  437. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  438. )
  439. return False
  440. try:
  441. logging(
  442. code="spider_1001",
  443. info="开始执行搜索任务",
  444. trace_id=trace_id,
  445. data=kimi_result
  446. )
  447. search_videos_count = await search_videos_from_web(
  448. info={
  449. "ori_title": kimi_result['ori_title'],
  450. "kimi_summary": kimi_result['kimi_summary'],
  451. "kimi_keys": kimi_result['kimi_keys'],
  452. "trace_id": trace_id,
  453. "gh_id": gh_id,
  454. "content_id": content_id,
  455. "crawler_video_table": self.article_crawler_video_table
  456. },
  457. gh_id_map=self.account_map,
  458. db_client=self.long_articles_client
  459. )
  460. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  461. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  462. logging(
  463. code="spider_1002",
  464. info="搜索成功",
  465. trace_id=trace_id,
  466. data=kimi_result
  467. )
  468. await self.update_content_status(
  469. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  470. trace_id=trace_id,
  471. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  472. )
  473. return True
  474. else:
  475. logging(
  476. code="spider_1003",
  477. info="搜索失败",
  478. trace_id=trace_id,
  479. data=kimi_result
  480. )
  481. await self.roll_back_content_status_when_fails(
  482. process_times=process_times + 1,
  483. trace_id=trace_id
  484. )
  485. return False
  486. except Exception as e:
  487. await self.roll_back_content_status_when_fails(
  488. process_times=process_times + 1,
  489. trace_id=trace_id
  490. )
  491. print("爬虫处理失败: {}".format(e))
  492. return False
  493. async def etl_task(self, params):
  494. """
  495. download && upload videos
  496. :param params:
  497. :return:
  498. """
  499. trace_id = params['trace_id']
  500. content_id = params['content_id']
  501. process_times = params['process_times']
  502. # 判断视频是否已下载完成
  503. video_exist_flag = await whether_downloaded_videos_exists(
  504. content_id=content_id,
  505. article_crawler_video_table=self.article_crawler_video_table,
  506. db_client=self.long_articles_client
  507. )
  508. if video_exist_flag:
  509. affect_rows = await self.update_content_status(
  510. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  511. trace_id=trace_id,
  512. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  513. )
  514. if affect_rows == 0:
  515. logging(
  516. code="6000",
  517. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  518. )
  519. return False
  520. return True
  521. else:
  522. # 开始处理, 将文章状态修改为处理状态
  523. affected_rows = await self.update_content_status(
  524. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  525. trace_id=trace_id,
  526. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  527. )
  528. if affected_rows == 0:
  529. logging(
  530. code="6000",
  531. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  532. )
  533. return False
  534. # download videos
  535. illegal_videos = await self.get_illegal_out_ids(content_id)
  536. downloaded_count = await async_download_videos(
  537. trace_id=trace_id,
  538. content_id=content_id,
  539. article_crawler_video_table=self.article_crawler_video_table,
  540. db_client=self.long_articles_client,
  541. illegal_videos=illegal_videos
  542. )
  543. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  544. await self.update_content_status(
  545. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  546. trace_id=trace_id,
  547. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  548. )
  549. return True
  550. else:
  551. await self.roll_back_content_status_when_fails(
  552. process_times=process_times + 1,
  553. trace_id=trace_id
  554. )
  555. return False
  556. async def publish_task(self, params, kimi_title):
  557. """
  558. 发布任务
  559. :param kimi_title:
  560. :param params:
  561. :return:
  562. """
  563. gh_id = params['gh_id']
  564. flow_pool_level = params['flow_pool_level']
  565. content_id = params['content_id']
  566. trace_id = params['trace_id']
  567. process_times = params['process_times']
  568. # 开始处理,将状态修改为操作状态
  569. affected_rows = await self.update_content_status(
  570. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  571. trace_id=trace_id,
  572. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  573. )
  574. if affected_rows == 0:
  575. logging(
  576. code="6000",
  577. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  578. )
  579. return False
  580. try:
  581. download_videos = await get_downloaded_videos(
  582. content_id=content_id,
  583. article_crawler_video_table=self.article_crawler_video_table,
  584. db_client=self.long_articles_client
  585. )
  586. match flow_pool_level:
  587. case "autoArticlePoolLevel4":
  588. # 冷启层, 全量做
  589. video_list = shuffle_list(download_videos)[:3]
  590. case "autoArticlePoolLevel3":
  591. if self.gh_id_dict.get(gh_id):
  592. video_list = shuffle_list(download_videos)[:3]
  593. else:
  594. video_list = download_videos[:3]
  595. case "autoArticlePoolLevel2":
  596. # 次条,只针对具体账号做
  597. video_list = []
  598. case "autoArticlePoolLevel1":
  599. # 头条,先不做
  600. video_list = download_videos[:3]
  601. case _:
  602. video_list = download_videos[:3]
  603. # 将视频发布至票圈
  604. await publish_videos_to_piaoquan(
  605. video_list=video_list,
  606. kimi_title=kimi_title,
  607. process_times=process_times,
  608. trace_id=trace_id,
  609. db_client=self.long_articles_client,
  610. article_match_video_table=self.article_match_video_table
  611. )
  612. except Exception as e:
  613. await self.roll_back_content_status_when_fails(
  614. process_times=params['process_times'] + 1,
  615. trace_id=params['trace_id']
  616. )
  617. print(e)
  618. async def record_for_audit(self, content_id):
  619. """
  620. 视频下载成功后,记录到audit表中
  621. """
  622. audit_account = chr(random.randint(97, 106))
  623. insert_sql = f"""
  624. INSERT IGNORE INTO long_articles_title_audit
  625. (content_id, create_timestamp, audit_account)
  626. VALUES
  627. (%s, %s, %s);
  628. """
  629. await self.long_articles_client.async_insert(
  630. sql=insert_sql,
  631. params=(
  632. content_id,
  633. int(time.time() * 1000),
  634. audit_account
  635. )
  636. )
  637. async def start_process(self, params):
  638. """
  639. 处理单篇文章
  640. :param params:
  641. :return:
  642. """
  643. print("start process")
  644. kimi_result = await self.kimi_task(params)
  645. trace_id = params['trace_id']
  646. process_times = params['process_times']
  647. content_id = params['content_id']
  648. publish_flag = params['publish_flag']
  649. if kimi_result:
  650. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  651. print("kimi success")
  652. logging(
  653. code=3001,
  654. info="kimi success",
  655. trace_id=trace_id
  656. )
  657. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  658. if spider_flag:
  659. # 等待爬虫执行完成后,开始执行 etl_task
  660. print("spider success")
  661. logging(
  662. code=3002,
  663. info="spider_success",
  664. trace_id=trace_id
  665. )
  666. etl_flag = await self.etl_task(params)
  667. if etl_flag:
  668. # 等待下载上传完成,执行发布任务
  669. print("etl success")
  670. logging(
  671. code="3003",
  672. info="etl_success",
  673. trace_id=trace_id
  674. )
  675. # ETL下载成功,记录审核
  676. await self.record_for_audit(content_id)
  677. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  678. logging(
  679. code="3013",
  680. info="不需要发布,长文系统托管发布",
  681. trace_id=trace_id
  682. )
  683. return
  684. else:
  685. try:
  686. await self.publish_task(params, kimi_result['kimi_title'])
  687. logging(
  688. code="3004",
  689. info="publish_success",
  690. trace_id=trace_id
  691. )
  692. await record_trace_id(
  693. trace_id=trace_id,
  694. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  695. )
  696. except Exception as e:
  697. logging(
  698. code="6004",
  699. info="publish 失败--{}".format(e),
  700. trace_id=params['trace_id']
  701. )
  702. else:
  703. logging(
  704. code="6003",
  705. info="ETL 处理失败",
  706. trace_id=params['trace_id']
  707. )
  708. else:
  709. logging(
  710. code="6002",
  711. info="爬虫处理失败",
  712. trace_id=params['trace_id']
  713. )
  714. else:
  715. logging(
  716. code="6001",
  717. info="kimi 处理失败",
  718. trace_id=trace_id
  719. )
  720. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  721. logging(
  722. code="6011",
  723. info="kimi处理次数达到上限, 放弃处理",
  724. trace_id=trace_id
  725. )
  726. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  727. update_sql = f"""
  728. UPDATE {self.article_match_video_table}
  729. SET content_status = %s
  730. WHERE content_id = %s and content_status = %s;
  731. """
  732. affected_rows = await self.long_articles_client.async_insert(
  733. sql=update_sql,
  734. params=(
  735. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  736. content_id,
  737. NewContentIdTaskConst.TASK_INIT_STATUS
  738. )
  739. )
  740. # 查询出该content_id所对应的标题
  741. select_sql = f"""
  742. SELECT article_title
  743. FROM {self.article_text_table}
  744. WHERE content_id = '{content_id}';
  745. """
  746. result = await self.long_articles_client.async_select(select_sql)
  747. bot(
  748. title="KIMI 处理失败",
  749. detail={
  750. "content_id": content_id,
  751. "affected_rows": affected_rows,
  752. "article_title": result[0][0]
  753. },
  754. mention=False
  755. )
  756. async def process_each_task(self, params):
  757. """
  758. 处理任务
  759. :return:
  760. """
  761. print(json.dumps(params, ensure_ascii=False, indent=4))
  762. content_id = params['content_id']
  763. flow_pool_level = params['flow_pool_level']
  764. download_videos_exists_flag = await whether_downloaded_videos_exists(
  765. content_id=content_id,
  766. article_crawler_video_table=self.article_crawler_video_table,
  767. db_client=self.long_articles_client
  768. )
  769. print("开始处理")
  770. if not download_videos_exists_flag:
  771. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  772. if processing_flag:
  773. print("processing success")
  774. logging(
  775. code="9001",
  776. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  777. )
  778. else:
  779. # 判断是否存在root_content_id
  780. if flow_pool_level != 'autoArticlePoolLevel4':
  781. # 如果查到根content_id, 采用根content_id的视频
  782. root_content_id = await self.get_source_content_id(content_id)
  783. if root_content_id:
  784. # 传参新增root_content_id
  785. params['root_content_id'] = root_content_id
  786. # 开始处理
  787. await self.start_process(params=params)
  788. else:
  789. print("存在已下载视频")
  790. async def deal(self) -> None:
  791. """
  792. function
  793. :return:
  794. """
  795. # 处理未托管的任务
  796. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  797. # 处理托管任务
  798. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  799. # 将处理次数大于3次且未成功的任务置为失败
  800. await self.set_tasks_status_fail()
  801. # 获取task_list
  802. task_list = await self.get_tasks()
  803. task_dict = {task['content_id']: task for task in task_list}
  804. process_list = list(task_dict.values())
  805. logging(
  806. code="5001",
  807. info="Match Task Got {} this time".format(len(process_list)),
  808. function="Publish Task"
  809. )
  810. # 处理process_list
  811. if process_list:
  812. a = time.time()
  813. tasks = [self.process_each_task(params) for params in process_list]
  814. await asyncio.gather(*tasks)
  815. b = time.time()
  816. print("处理时间: {} s".format(b - a))
  817. else:
  818. logging(
  819. code="9008",
  820. info="没有要处理的请求"
  821. )