new_contentId_task.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from typing import List, Dict
  8. from applications.config import Config
  9. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  10. from applications.log import logging
  11. from applications.functions.common import shuffle_list
  12. from applications.spider import search_videos_from_web
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. from .utils import *
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, long_articles_client, aigc_client):
  21. self.long_articles_client = long_articles_client
  22. self.aigc_client = aigc_client
  23. self.config = Config()
  24. self.article_match_video_table = self.config.article_match_video_table
  25. self.article_text_table = self.config.article_text_table
  26. self.article_crawler_video_table = self.config.article_crawler_video_table
  27. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  28. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  29. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  30. async def get_tasks(self) -> List[Dict]:
  31. """
  32. 获取 task
  33. :return:
  34. """
  35. # 获取 process_times <= 3 且 content_status = 0 的任务
  36. select_sql = f"""
  37. SELECT
  38. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  39. FROM
  40. {self.article_match_video_table} t1
  41. LEFT JOIN (
  42. SELECT content_id, count(1) as cnt
  43. FROM {self.article_crawler_video_table}
  44. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  45. AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
  46. GROUP BY content_id
  47. ) t2
  48. ON t1.content_id = t2.content_id
  49. WHERE
  50. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  51. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  52. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  53. ORDER BY flow_pool_level, request_timestamp
  54. LIMIT {self.spider_coroutines};
  55. """
  56. tasks = await self.long_articles_client.async_select(select_sql)
  57. if tasks:
  58. return [
  59. {
  60. "trace_id": i[0],
  61. "content_id": i[1],
  62. "flow_pool_level": i[2],
  63. "gh_id": i[3],
  64. "process_times": i[4],
  65. "publish_flag": i[5]
  66. }
  67. for i in tasks
  68. ]
  69. else:
  70. return []
  71. async def set_tasks_status_fail(self) -> None:
  72. """
  73. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  74. """
  75. update_status_sql = f"""
  76. UPDATE
  77. {self.article_match_video_table}
  78. SET
  79. content_status = %s
  80. WHERE
  81. process_times > %s and content_status not in (%s, %s);
  82. """
  83. await self.long_articles_client.async_insert(
  84. update_status_sql,
  85. params=(
  86. NewContentIdTaskConst.TASK_FAIL_STATUS,
  87. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  88. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  89. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  90. )
  91. )
  92. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  93. """
  94. 将长时间处于中间状态的任务回滚
  95. """
  96. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  97. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  98. processing_status_tuple = (
  99. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  100. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  103. )
  104. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  105. processing_status_tuple = (
  106. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  107. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  108. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  109. )
  110. else:
  111. return
  112. select_processing_sql = f"""
  113. SELECT
  114. trace_id, content_status_update_time, process_times, content_status
  115. FROM
  116. {self.article_match_video_table}
  117. WHERE
  118. content_status in {processing_status_tuple}
  119. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  120. and publish_flag = {publish_flag};
  121. """
  122. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  123. if processing_articles:
  124. processing_list = [
  125. {
  126. "trace_id": item[0],
  127. "content_status_update_time": item[1],
  128. "process_times": item[2],
  129. "content_status": item[3]
  130. }
  131. for item in processing_articles
  132. ]
  133. for obj in processing_list:
  134. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  135. # 认为该任务失败
  136. await self.roll_back_content_status_when_fails(
  137. process_times=obj['process_times'] + 1,
  138. trace_id=obj['trace_id'],
  139. ori_content_status=obj['content_status']
  140. )
  141. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  142. """
  143. :param new_content_status:
  144. :param trace_id:
  145. :param ori_content_status:
  146. :return:
  147. """
  148. update_sql = f"""
  149. UPDATE {self.article_match_video_table}
  150. SET content_status = %s, content_status_update_time = %s
  151. WHERE trace_id = %s and content_status = %s;
  152. """
  153. row_counts = await self.long_articles_client.async_insert(
  154. sql=update_sql,
  155. params=(
  156. new_content_status,
  157. int(time.time()),
  158. trace_id,
  159. ori_content_status
  160. )
  161. )
  162. return row_counts
  163. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  164. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  165. """
  166. 处理失败,回滚至初始状态,处理次数加 1
  167. :param process_times:
  168. :param trace_id:
  169. :param ori_content_status:
  170. :return:
  171. """
  172. update_article_sql = f"""
  173. UPDATE {self.article_match_video_table}
  174. SET
  175. content_status = %s,
  176. content_status_update_time = %s,
  177. process_times = %s
  178. WHERE trace_id = %s and content_status = %s;
  179. """
  180. await self.long_articles_client.async_insert(
  181. sql=update_article_sql,
  182. params=(
  183. NewContentIdTaskConst.TASK_INIT_STATUS,
  184. int(time.time()),
  185. process_times + 1,
  186. trace_id,
  187. ori_content_status
  188. )
  189. )
  190. async def judge_whether_same_content_id_is_processing(self, content_id):
  191. """
  192. 同一个 content_id 只需要处理一次
  193. :param content_id:
  194. :return:
  195. success: 4
  196. init: 0
  197. fail: 99
  198. """
  199. select_sql = f"""
  200. SELECT distinct content_status
  201. FROM {self.article_match_video_table}
  202. WHERE content_id = '{content_id}';
  203. """
  204. result = await self.long_articles_client.async_select(select_sql)
  205. if result:
  206. for item in result:
  207. content_status = item[0]
  208. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  209. if content_status in {
  210. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  211. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  212. # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  213. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  214. # NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  215. }:
  216. return True
  217. return False
  218. else:
  219. return False
  220. async def get_source_content_id(self, new_content_id):
  221. """
  222. 通过content_id 查找源content_id,并且更新其内容
  223. """
  224. select_channel_id_sql = f"""
  225. SELECT channel_content_id
  226. FROM produce_plan_exe_record
  227. WHERE plan_exe_id = '{new_content_id}';
  228. """
  229. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  230. if channel_content_id:
  231. select_source_content_id_sql = f"""
  232. SELECT root_produce_content_id
  233. FROM article_pool_promotion_source
  234. WHERE channel_content_id = '{channel_content_id[0][0]}';
  235. """
  236. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  237. if source_content_id:
  238. return source_content_id[0][0]
  239. else:
  240. return
  241. else:
  242. return
  243. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  244. """
  245. 获取违规的外站视频id
  246. """
  247. select_sql = f"""
  248. SELECT platform, out_video_id
  249. FROM {self.article_crawler_video_table}
  250. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  251. """
  252. response = await self.long_articles_client.async_select(select_sql)
  253. if response:
  254. result = ["{}_{}".format(line[0], line[1]) for line in response]
  255. return result
  256. else:
  257. return []
  258. async def kimi_task(self, params):
  259. """
  260. 执行 kimi 任务
  261. :return:
  262. """
  263. trace_id = params['trace_id']
  264. # 处理content_id
  265. content_id = params['content_id']
  266. process_times = params['process_times']
  267. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  268. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  269. affected_rows = await self.update_content_status(
  270. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  271. trace_id=trace_id,
  272. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  273. )
  274. if affected_rows == 0:
  275. logging(
  276. code="6000",
  277. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  278. )
  279. return
  280. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  281. return kimi_result
  282. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  283. logging(
  284. code="4000",
  285. info="long_articles_text表中未找到 content_id"
  286. )
  287. else:
  288. # 校验是否存在root_content_id
  289. if params.get("root_content_id"):
  290. kimi_result = await get_kimi_result(content_id=params['root_content_id'],
  291. article_text_table=self.article_text_table,
  292. db_client=self.long_articles_client)
  293. if kimi_result:
  294. affected_rows = await self.update_content_status(
  295. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  296. trace_id=trace_id,
  297. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  298. )
  299. if affected_rows == 0:
  300. logging(
  301. code="6000",
  302. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  303. )
  304. return
  305. # 将root_content_id的kimi结果更新到content_id
  306. content_id = params['content_id']
  307. update_count = await update_kimi_status(
  308. kimi_info=kimi_result,
  309. content_id=content_id,
  310. db_client=self.long_articles_client,
  311. article_text_table=self.article_text_table,
  312. success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
  313. init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
  314. )
  315. if update_count == 0:
  316. logging(
  317. code="6000",
  318. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  319. )
  320. return
  321. logging(
  322. code="8023",
  323. function="kimi_task",
  324. trace_id=trace_id,
  325. info="从root_content_id获取结果",
  326. data=params
  327. )
  328. return kimi_result
  329. else:
  330. params.pop('root_content_id', None)
  331. return await self.kimi_task(params)
  332. # 开始处理,讲 content_status 从 0 改为 101
  333. affected_rows = await self.update_content_status(
  334. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  335. trace_id=trace_id,
  336. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  337. )
  338. if affected_rows == 0:
  339. logging(
  340. code="6000",
  341. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  342. )
  343. return
  344. try:
  345. kimi_result = await generate_kimi_result(
  346. content_id=content_id,
  347. article_text_table=self.article_text_table,
  348. db_client=self.long_articles_client,
  349. safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE
  350. )
  351. await self.update_content_status(
  352. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  353. trace_id=trace_id,
  354. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  355. )
  356. return kimi_result
  357. except Exception as e:
  358. # kimi 任务处理失败
  359. update_kimi_sql = f"""
  360. UPDATE {self.article_text_table}
  361. SET
  362. kimi_status = %s
  363. WHERE content_id = %s
  364. """
  365. await self.long_articles_client.async_insert(
  366. sql=update_kimi_sql,
  367. params=(
  368. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  369. content_id
  370. )
  371. )
  372. # 将状态由 101 回退为 0
  373. await self.roll_back_content_status_when_fails(
  374. process_times=process_times,
  375. trace_id=trace_id
  376. )
  377. return {}
  378. async def spider_task(self, params, kimi_result):
  379. """
  380. 爬虫任务
  381. :return:
  382. """
  383. trace_id = params['trace_id']
  384. content_id = params['content_id']
  385. process_times = params['process_times']
  386. gh_id = params['gh_id']
  387. download_video_exist_flag = await whether_downloaded_videos_exists(
  388. content_id=content_id,
  389. article_crawler_video_table=self.article_crawler_video_table,
  390. db_client=self.long_articles_client
  391. )
  392. if download_video_exist_flag:
  393. await self.update_content_status(
  394. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  395. trace_id=trace_id,
  396. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  397. )
  398. return True
  399. # 判断是否存在root_content_id
  400. if params.get("root_content_id"):
  401. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  402. update_rows = await update_crawler_table_with_exist_content_id(
  403. content_id=content_id,
  404. trace_id=trace_id,
  405. article_crawler_video_table=self.article_crawler_video_table,
  406. db_client=self.long_articles_client,
  407. root_content_id=params['root_content_id']
  408. )
  409. if update_rows:
  410. affected_rows = await self.update_content_status(
  411. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  412. trace_id=trace_id,
  413. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  414. )
  415. if affected_rows == 0:
  416. return
  417. logging(
  418. code="8024",
  419. function="spider_task",
  420. trace_id=trace_id,
  421. info="从root_content_id获取结果",
  422. data=params
  423. )
  424. return True
  425. else:
  426. params.pop("root_content_id", None)
  427. return await self.spider_task(params, kimi_result)
  428. # 开始处理,将状态由 1 改成 101
  429. affected_rows = await self.update_content_status(
  430. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  431. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  432. trace_id=trace_id
  433. )
  434. if affected_rows == 0:
  435. logging(
  436. code="6000",
  437. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  438. )
  439. return False
  440. try:
  441. logging(
  442. code="spider_1001",
  443. info="开始执行搜索任务",
  444. trace_id=trace_id,
  445. data=kimi_result
  446. )
  447. search_videos_count = await search_videos_from_web(
  448. info={
  449. "ori_title": kimi_result['ori_title'],
  450. "kimi_summary": kimi_result['kimi_summary'],
  451. "kimi_keys": kimi_result['kimi_keys'],
  452. "trace_id": trace_id,
  453. "gh_id": gh_id,
  454. "content_id": content_id,
  455. "crawler_video_table": self.article_crawler_video_table
  456. },
  457. gh_id_map=self.account_map,
  458. db_client=self.long_articles_client
  459. )
  460. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  461. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  462. logging(
  463. code="spider_1002",
  464. info="搜索成功",
  465. trace_id=trace_id,
  466. data=kimi_result
  467. )
  468. await self.update_content_status(
  469. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  470. trace_id=trace_id,
  471. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  472. )
  473. return True
  474. else:
  475. logging(
  476. code="spider_1003",
  477. info="搜索失败",
  478. trace_id=trace_id,
  479. data=kimi_result
  480. )
  481. await self.roll_back_content_status_when_fails(
  482. process_times=process_times + 1,
  483. trace_id=trace_id
  484. )
  485. return False
  486. except Exception as e:
  487. await self.roll_back_content_status_when_fails(
  488. process_times=process_times + 1,
  489. trace_id=trace_id
  490. )
  491. print("爬虫处理失败: {}".format(e))
  492. return False
  493. async def etl_task(self, params):
  494. """
  495. download && upload videos
  496. :param params:
  497. :return:
  498. """
  499. trace_id = params['trace_id']
  500. content_id = params['content_id']
  501. process_times = params['process_times']
  502. # 判断视频是否已下载完成
  503. video_exist_flag = await whether_downloaded_videos_exists(
  504. content_id=content_id,
  505. article_crawler_video_table=self.article_crawler_video_table,
  506. db_client=self.long_articles_client
  507. )
  508. if video_exist_flag:
  509. affect_rows = await self.update_content_status(
  510. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  511. trace_id=trace_id,
  512. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  513. )
  514. if affect_rows == 0:
  515. logging(
  516. code="6000",
  517. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  518. )
  519. return False
  520. return True
  521. else:
  522. # 开始处理, 将文章状态修改为处理状态
  523. affected_rows = await self.update_content_status(
  524. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  525. trace_id=trace_id,
  526. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  527. )
  528. if affected_rows == 0:
  529. logging(
  530. code="6000",
  531. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  532. )
  533. return False
  534. # download videos
  535. illegal_videos = await self.get_illegal_out_ids(content_id)
  536. downloaded_count = await async_download_videos(
  537. trace_id=trace_id,
  538. content_id=content_id,
  539. article_crawler_video_table=self.article_crawler_video_table,
  540. db_client=self.long_articles_client,
  541. illegal_videos=illegal_videos
  542. )
  543. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  544. await self.update_content_status(
  545. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  546. trace_id=trace_id,
  547. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  548. )
  549. return True
  550. else:
  551. await self.roll_back_content_status_when_fails(
  552. process_times=process_times + 1,
  553. trace_id=trace_id
  554. )
  555. return False
  556. async def publish_task(self, params, kimi_title):
  557. """
  558. 发布任务
  559. :param kimi_title:
  560. :param params:
  561. :return:
  562. """
  563. gh_id = params['gh_id']
  564. flow_pool_level = params['flow_pool_level']
  565. content_id = params['content_id']
  566. trace_id = params['trace_id']
  567. process_times = params['process_times']
  568. # 开始处理,将状态修改为操作状态
  569. affected_rows = await self.update_content_status(
  570. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  571. trace_id=trace_id,
  572. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  573. )
  574. if affected_rows == 0:
  575. logging(
  576. code="6000",
  577. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  578. )
  579. return False
  580. try:
  581. download_videos = await get_downloaded_videos(
  582. content_id=content_id,
  583. article_crawler_video_table=self.article_crawler_video_table,
  584. db_client=self.long_articles_client
  585. )
  586. match flow_pool_level:
  587. case "autoArticlePoolLevel4":
  588. # 冷启层, 全量做
  589. video_list = shuffle_list(download_videos)[:3]
  590. case "autoArticlePoolLevel3":
  591. if self.gh_id_dict.get(gh_id):
  592. video_list = shuffle_list(download_videos)[:3]
  593. else:
  594. video_list = download_videos[:3]
  595. case "autoArticlePoolLevel2":
  596. # 次条,只针对具体账号做
  597. video_list = []
  598. case "autoArticlePoolLevel1":
  599. # 头条,先不做
  600. video_list = download_videos[:3]
  601. case _:
  602. video_list = download_videos[:3]
  603. # 将视频发布至票圈
  604. await publish_videos_to_piaoquan(
  605. video_list=video_list,
  606. kimi_title=kimi_title,
  607. process_times=process_times,
  608. trace_id=trace_id,
  609. db_client=self.long_articles_client,
  610. article_match_video_table=self.article_match_video_table
  611. )
  612. except Exception as e:
  613. await self.roll_back_content_status_when_fails(
  614. process_times=params['process_times'] + 1,
  615. trace_id=params['trace_id']
  616. )
  617. print(e)
  618. async def record_for_audit(self, content_id):
  619. """
  620. 视频下载成功后,记录到audit表中
  621. """
  622. insert_sql = f"""
  623. INSERT IGNORE INTO long_articles_title_audit
  624. (content_id, create_timestamp)
  625. VALUES
  626. (%s, %s);
  627. """
  628. await self.long_articles_client.async_insert(
  629. sql=insert_sql,
  630. params=(
  631. content_id,
  632. int(time.time() * 1000)
  633. )
  634. )
  635. async def start_process(self, params):
  636. """
  637. 处理单篇文章
  638. :param params:
  639. :return:
  640. """
  641. print("start process")
  642. kimi_result = await self.kimi_task(params)
  643. trace_id = params['trace_id']
  644. process_times = params['process_times']
  645. content_id = params['content_id']
  646. publish_flag = params['publish_flag']
  647. if kimi_result:
  648. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  649. print("kimi success")
  650. logging(
  651. code=3001,
  652. info="kimi success",
  653. trace_id=trace_id
  654. )
  655. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  656. if spider_flag:
  657. # 等待爬虫执行完成后,开始执行 etl_task
  658. print("spider success")
  659. logging(
  660. code=3002,
  661. info="spider_success",
  662. trace_id=trace_id
  663. )
  664. etl_flag = await self.etl_task(params)
  665. if etl_flag:
  666. # 等待下载上传完成,执行发布任务
  667. print("etl success")
  668. logging(
  669. code="3003",
  670. info="etl_success",
  671. trace_id=trace_id
  672. )
  673. # ETL下载成功,记录审核
  674. await self.record_for_audit(content_id)
  675. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  676. logging(
  677. code="3013",
  678. info="不需要发布,长文系统托管发布",
  679. trace_id=trace_id
  680. )
  681. return
  682. else:
  683. try:
  684. await self.publish_task(params, kimi_result['kimi_title'])
  685. logging(
  686. code="3004",
  687. info="publish_success",
  688. trace_id=trace_id
  689. )
  690. await record_trace_id(
  691. trace_id=trace_id,
  692. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  693. )
  694. except Exception as e:
  695. logging(
  696. code="6004",
  697. info="publish 失败--{}".format(e),
  698. trace_id=params['trace_id']
  699. )
  700. else:
  701. logging(
  702. code="6003",
  703. info="ETL 处理失败",
  704. trace_id=params['trace_id']
  705. )
  706. else:
  707. logging(
  708. code="6002",
  709. info="爬虫处理失败",
  710. trace_id=params['trace_id']
  711. )
  712. else:
  713. logging(
  714. code="6001",
  715. info="kimi 处理失败",
  716. trace_id=trace_id
  717. )
  718. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  719. logging(
  720. code="6011",
  721. info="kimi处理次数达到上限, 放弃处理",
  722. trace_id=trace_id
  723. )
  724. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  725. update_sql = f"""
  726. UPDATE {self.article_match_video_table}
  727. SET content_status = %s
  728. WHERE content_id = %s and content_status = %s;
  729. """
  730. affected_rows = await self.long_articles_client.async_insert(
  731. sql=update_sql,
  732. params=(
  733. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  734. content_id,
  735. NewContentIdTaskConst.TASK_INIT_STATUS
  736. )
  737. )
  738. # 查询出该content_id所对应的标题
  739. select_sql = f"""
  740. SELECT article_title
  741. FROM {self.article_text_table}
  742. WHERE content_id = '{content_id}';
  743. """
  744. result = await self.long_articles_client.async_select(select_sql)
  745. bot(
  746. title="KIMI 处理失败",
  747. detail={
  748. "content_id": content_id,
  749. "affected_rows": affected_rows,
  750. "article_title": result[0][0]
  751. },
  752. mention=False
  753. )
  754. async def process_each_task(self, params):
  755. """
  756. 处理任务
  757. :return:
  758. """
  759. print(json.dumps(params, ensure_ascii=False, indent=4))
  760. content_id = params['content_id']
  761. flow_pool_level = params['flow_pool_level']
  762. download_videos_exists_flag = await whether_downloaded_videos_exists(
  763. content_id=content_id,
  764. article_crawler_video_table=self.article_crawler_video_table,
  765. db_client=self.long_articles_client
  766. )
  767. print("开始处理")
  768. if not download_videos_exists_flag:
  769. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  770. if processing_flag:
  771. print("processing success")
  772. logging(
  773. code="9001",
  774. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  775. )
  776. else:
  777. # 判断是否存在root_content_id
  778. if flow_pool_level != 'autoArticlePoolLevel4':
  779. # 如果查到根content_id, 采用根content_id的视频
  780. root_content_id = await self.get_source_content_id(content_id)
  781. if root_content_id:
  782. # 传参新增root_content_id
  783. params['root_content_id'] = root_content_id
  784. # 开始处理
  785. await self.start_process(params=params)
  786. else:
  787. print("存在已下载视频")
  788. async def deal(self) -> None:
  789. """
  790. function
  791. :return:
  792. """
  793. # 处理未托管的任务
  794. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  795. # 处理托管任务
  796. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  797. # 将处理次数大于3次且未成功的任务置为失败
  798. await self.set_tasks_status_fail()
  799. # 获取task_list
  800. task_list = await self.get_tasks()
  801. task_dict = {task['content_id']: task for task in task_list}
  802. process_list = list(task_dict.values())
  803. logging(
  804. code="5001",
  805. info="Match Task Got {} this time".format(len(process_list)),
  806. function="Publish Task"
  807. )
  808. # 处理process_list
  809. if process_list:
  810. a = time.time()
  811. tasks = [self.process_each_task(params) for params in process_list[:1]]
  812. await asyncio.gather(*tasks)
  813. b = time.time()
  814. print("处理时间: {} s".format(b - a))
  815. else:
  816. logging(
  817. code="9008",
  818. info="没有要处理的请求"
  819. )