new_contentId_task.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. import random
  8. from typing import List, Dict
  9. from applications.config import Config
  10. from applications.const import new_content_id_task_const as NewContentIdTaskConst
  11. from applications.log import logging
  12. from applications.functions.common import shuffle_list
  13. from applications.spider import search_videos_from_web
  14. from applications.feishu import bot
  15. from applications.functions.aigc import record_trace_id
  16. from .utils import *
  17. class NewContentIdTask(object):
  18. """
  19. 不存在历史已经发布的文章的匹配流程
  20. """
  21. def __init__(self, long_articles_client, aigc_client):
  22. self.long_articles_client = long_articles_client
  23. self.aigc_client = aigc_client
  24. self.config = Config()
  25. self.article_match_video_table = self.config.article_match_video_table
  26. self.article_text_table = self.config.article_text_table
  27. self.article_crawler_video_table = self.config.article_crawler_video_table
  28. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  29. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  30. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  31. async def get_tasks(self) -> List[Dict]:
  32. """
  33. 获取 task
  34. :return:
  35. """
  36. # 获取 process_times <= 3 且 content_status = 0 的任务
  37. select_sql = f"""
  38. SELECT
  39. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  40. FROM
  41. {self.article_match_video_table} t1
  42. LEFT JOIN (
  43. SELECT content_id, count(1) as cnt
  44. FROM {self.article_crawler_video_table}
  45. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  46. AND is_illegal = {NewContentIdTaskConst.VIDEO_SAFE}
  47. GROUP BY content_id
  48. ) t2
  49. ON t1.content_id = t2.content_id
  50. WHERE
  51. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  52. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  53. AND COALESCE(t2.cnt, 0) < {NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM}
  54. ORDER BY flow_pool_level, request_timestamp
  55. LIMIT {self.spider_coroutines};
  56. """
  57. tasks = await self.long_articles_client.async_select(select_sql)
  58. if tasks:
  59. return [
  60. {
  61. "trace_id": i[0],
  62. "content_id": i[1],
  63. "flow_pool_level": i[2],
  64. "gh_id": i[3],
  65. "process_times": i[4],
  66. "publish_flag": i[5]
  67. }
  68. for i in tasks
  69. ]
  70. else:
  71. return []
  72. async def set_tasks_status_fail(self) -> None:
  73. """
  74. 将 处理次数大约3次,且状态不为成功状态的(3, 4)的任务状态修改为失败
  75. """
  76. update_status_sql = f"""
  77. UPDATE
  78. {self.article_match_video_table}
  79. SET
  80. content_status = %s
  81. WHERE
  82. process_times > %s and content_status not in (%s, %s);
  83. """
  84. await self.long_articles_client.async_insert(
  85. update_status_sql,
  86. params=(
  87. NewContentIdTaskConst.TASK_FAIL_STATUS,
  88. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  89. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  90. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  91. )
  92. )
  93. async def roll_back_unfinished_tasks(self, publish_flag: int) -> None:
  94. """
  95. 将长时间处于中间状态的任务回滚
  96. """
  97. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  98. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  99. processing_status_tuple = (
  100. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  101. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  102. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  103. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  104. )
  105. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  106. processing_status_tuple = (
  107. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  108. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  109. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  110. )
  111. else:
  112. return
  113. select_processing_sql = f"""
  114. SELECT
  115. trace_id, content_status_update_time, process_times, content_status
  116. FROM
  117. {self.article_match_video_table}
  118. WHERE
  119. content_status in {processing_status_tuple}
  120. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  121. and publish_flag = {publish_flag};
  122. """
  123. processing_articles = await self.long_articles_client.async_select(select_processing_sql)
  124. if processing_articles:
  125. processing_list = [
  126. {
  127. "trace_id": item[0],
  128. "content_status_update_time": item[1],
  129. "process_times": item[2],
  130. "content_status": item[3]
  131. }
  132. for item in processing_articles
  133. ]
  134. for obj in processing_list:
  135. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  136. # 认为该任务失败
  137. await self.roll_back_content_status_when_fails(
  138. process_times=obj['process_times'] + 1,
  139. trace_id=obj['trace_id'],
  140. ori_content_status=obj['content_status']
  141. )
  142. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  143. """
  144. :param new_content_status:
  145. :param trace_id:
  146. :param ori_content_status:
  147. :return:
  148. """
  149. update_sql = f"""
  150. UPDATE {self.article_match_video_table}
  151. SET content_status = %s, content_status_update_time = %s
  152. WHERE trace_id = %s and content_status = %s;
  153. """
  154. row_counts = await self.long_articles_client.async_insert(
  155. sql=update_sql,
  156. params=(
  157. new_content_status,
  158. int(time.time()),
  159. trace_id,
  160. ori_content_status
  161. )
  162. )
  163. return row_counts
  164. async def roll_back_content_status_when_fails(self, process_times, trace_id,
  165. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  166. """
  167. 处理失败,回滚至初始状态,处理次数加 1
  168. :param process_times:
  169. :param trace_id:
  170. :param ori_content_status:
  171. :return:
  172. """
  173. update_article_sql = f"""
  174. UPDATE {self.article_match_video_table}
  175. SET
  176. content_status = %s,
  177. content_status_update_time = %s,
  178. process_times = %s
  179. WHERE trace_id = %s and content_status = %s;
  180. """
  181. await self.long_articles_client.async_insert(
  182. sql=update_article_sql,
  183. params=(
  184. NewContentIdTaskConst.TASK_INIT_STATUS,
  185. int(time.time()),
  186. process_times + 1,
  187. trace_id,
  188. ori_content_status
  189. )
  190. )
  191. async def judge_whether_same_content_id_is_processing(self, content_id):
  192. """
  193. 同一个 content_id 只需要处理一次
  194. :param content_id:
  195. :return:
  196. success: 4
  197. init: 0
  198. fail: 99
  199. """
  200. select_sql = f"""
  201. SELECT distinct content_status
  202. FROM {self.article_match_video_table}
  203. WHERE content_id = '{content_id}';
  204. """
  205. result = await self.long_articles_client.async_select(select_sql)
  206. if result:
  207. for item in result:
  208. content_status = item[0]
  209. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  210. if content_status in {
  211. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  212. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  213. # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  214. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  215. # NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  216. }:
  217. return True
  218. return False
  219. else:
  220. return False
  221. async def get_source_content_id(self, new_content_id):
  222. """
  223. 通过content_id 查找源content_id,并且更新其内容
  224. """
  225. select_channel_id_sql = f"""
  226. SELECT channel_content_id
  227. FROM produce_plan_exe_record
  228. WHERE plan_exe_id = '{new_content_id}';
  229. """
  230. channel_content_id = await self.aigc_client.async_select(select_channel_id_sql)
  231. if channel_content_id:
  232. select_source_content_id_sql = f"""
  233. SELECT root_produce_content_id
  234. FROM article_pool_promotion_source
  235. WHERE channel_content_id = '{channel_content_id[0][0]}';
  236. """
  237. source_content_id = await self.long_articles_client.async_select(select_source_content_id_sql)
  238. if source_content_id:
  239. return source_content_id[0][0]
  240. else:
  241. return
  242. else:
  243. return
  244. async def get_illegal_out_ids(self, content_id: str) -> List[str]:
  245. """
  246. 获取违规的外站视频id
  247. """
  248. select_sql = f"""
  249. SELECT platform, out_video_id
  250. FROM {self.article_crawler_video_table}
  251. WHERE content_id = '{content_id}' and is_illegal = {NewContentIdTaskConst.VIDEO_UNSAFE};
  252. """
  253. response = await self.long_articles_client.async_select(select_sql)
  254. if response:
  255. result = ["{}_{}".format(line[0], line[1]) for line in response]
  256. return result
  257. else:
  258. return []
  259. async def kimi_task(self, params):
  260. """
  261. 执行 kimi 任务
  262. :return:
  263. """
  264. trace_id = params['trace_id']
  265. # 处理content_id
  266. content_id = params['content_id']
  267. process_times = params['process_times']
  268. kimi_status_code = await get_kimi_status(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  269. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  270. affected_rows = await self.update_content_status(
  271. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  272. trace_id=trace_id,
  273. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  274. )
  275. if affected_rows == 0:
  276. logging(
  277. code="6000",
  278. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  279. )
  280. return
  281. kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
  282. return kimi_result
  283. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  284. logging(
  285. code="4000",
  286. info="long_articles_text表中未找到 content_id"
  287. )
  288. else:
  289. # 校验是否存在root_content_id
  290. if params.get("root_content_id"):
  291. kimi_result = await get_kimi_result(content_id=params['root_content_id'],
  292. article_text_table=self.article_text_table,
  293. db_client=self.long_articles_client)
  294. if kimi_result:
  295. affected_rows = await self.update_content_status(
  296. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  297. trace_id=trace_id,
  298. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  299. )
  300. if affected_rows == 0:
  301. logging(
  302. code="6000",
  303. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  304. )
  305. return
  306. # 将root_content_id的kimi结果更新到content_id
  307. content_id = params['content_id']
  308. update_count = await update_kimi_status(
  309. kimi_info=kimi_result,
  310. content_id=content_id,
  311. db_client=self.long_articles_client,
  312. article_text_table=self.article_text_table,
  313. success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
  314. init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
  315. )
  316. if update_count == 0:
  317. logging(
  318. code="6000",
  319. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  320. )
  321. return
  322. logging(
  323. code="8023",
  324. function="kimi_task",
  325. trace_id=trace_id,
  326. info="从root_content_id获取结果",
  327. data=params
  328. )
  329. return kimi_result
  330. else:
  331. params.pop('root_content_id', None)
  332. return await self.kimi_task(params)
  333. # 开始处理,讲 content_status 从 0 改为 101
  334. affected_rows = await self.update_content_status(
  335. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  336. trace_id=trace_id,
  337. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  338. )
  339. if affected_rows == 0:
  340. logging(
  341. code="6000",
  342. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  343. )
  344. return
  345. try:
  346. kimi_result = await generate_kimi_result(
  347. content_id=content_id,
  348. article_text_table=self.article_text_table,
  349. db_client=self.long_articles_client,
  350. safe_score=NewContentIdTaskConst.KIMI_SAFE_SCORE
  351. )
  352. await self.update_content_status(
  353. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  354. trace_id=trace_id,
  355. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  356. )
  357. return kimi_result
  358. except Exception as e:
  359. # kimi 任务处理失败
  360. update_kimi_sql = f"""
  361. UPDATE {self.article_text_table}
  362. SET
  363. kimi_status = %s
  364. WHERE content_id = %s
  365. """
  366. await self.long_articles_client.async_insert(
  367. sql=update_kimi_sql,
  368. params=(
  369. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  370. content_id
  371. )
  372. )
  373. # 将状态由 101 回退为 0
  374. await self.roll_back_content_status_when_fails(
  375. process_times=process_times,
  376. trace_id=trace_id
  377. )
  378. return {}
  379. async def spider_task(self, params, kimi_result):
  380. """
  381. 爬虫任务
  382. :return:
  383. """
  384. trace_id = params['trace_id']
  385. content_id = params['content_id']
  386. process_times = params['process_times']
  387. gh_id = params['gh_id']
  388. download_video_exist_flag = await whether_downloaded_videos_exists(
  389. content_id=content_id,
  390. article_crawler_video_table=self.article_crawler_video_table,
  391. db_client=self.long_articles_client
  392. )
  393. if download_video_exist_flag:
  394. await self.update_content_status(
  395. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  396. trace_id=trace_id,
  397. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  398. )
  399. return True
  400. # 判断是否存在root_content_id
  401. if params.get("root_content_id"):
  402. # 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
  403. update_rows = await update_crawler_table_with_exist_content_id(
  404. content_id=content_id,
  405. trace_id=trace_id,
  406. article_crawler_video_table=self.article_crawler_video_table,
  407. db_client=self.long_articles_client,
  408. root_content_id=params['root_content_id']
  409. )
  410. if update_rows:
  411. affected_rows = await self.update_content_status(
  412. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  413. trace_id=trace_id,
  414. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  415. )
  416. if affected_rows == 0:
  417. return
  418. logging(
  419. code="8024",
  420. function="spider_task",
  421. trace_id=trace_id,
  422. info="从root_content_id获取结果",
  423. data=params
  424. )
  425. return True
  426. else:
  427. params.pop("root_content_id", None)
  428. return await self.spider_task(params, kimi_result)
  429. # 开始处理,将状态由 1 改成 101
  430. affected_rows = await self.update_content_status(
  431. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  432. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  433. trace_id=trace_id
  434. )
  435. if affected_rows == 0:
  436. logging(
  437. code="6000",
  438. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  439. )
  440. return False
  441. try:
  442. logging(
  443. code="spider_1001",
  444. info="开始执行搜索任务",
  445. trace_id=trace_id,
  446. data=kimi_result
  447. )
  448. search_videos_count = await search_videos_from_web(
  449. info={
  450. "ori_title": kimi_result['ori_title'],
  451. "kimi_summary": kimi_result['kimi_summary'],
  452. "kimi_keys": kimi_result['kimi_keys'],
  453. "trace_id": trace_id,
  454. "gh_id": gh_id,
  455. "content_id": content_id,
  456. "crawler_video_table": self.article_crawler_video_table
  457. },
  458. gh_id_map=self.account_map,
  459. db_client=self.long_articles_client
  460. )
  461. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  462. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  463. logging(
  464. code="spider_1002",
  465. info="搜索成功",
  466. trace_id=trace_id,
  467. data=kimi_result
  468. )
  469. await self.update_content_status(
  470. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  471. trace_id=trace_id,
  472. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  473. )
  474. return True
  475. else:
  476. logging(
  477. code="spider_1003",
  478. info="搜索失败",
  479. trace_id=trace_id,
  480. data=kimi_result
  481. )
  482. await self.roll_back_content_status_when_fails(
  483. process_times=process_times + 1,
  484. trace_id=trace_id
  485. )
  486. return False
  487. except Exception as e:
  488. await self.roll_back_content_status_when_fails(
  489. process_times=process_times + 1,
  490. trace_id=trace_id
  491. )
  492. print("爬虫处理失败: {}".format(e))
  493. return False
  494. async def etl_task(self, params):
  495. """
  496. download && upload videos
  497. :param params:
  498. :return:
  499. """
  500. trace_id = params['trace_id']
  501. content_id = params['content_id']
  502. process_times = params['process_times']
  503. # 判断视频是否已下载完成
  504. video_exist_flag = await whether_downloaded_videos_exists(
  505. content_id=content_id,
  506. article_crawler_video_table=self.article_crawler_video_table,
  507. db_client=self.long_articles_client
  508. )
  509. if video_exist_flag:
  510. affect_rows = await self.update_content_status(
  511. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  512. trace_id=trace_id,
  513. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  514. )
  515. if affect_rows == 0:
  516. logging(
  517. code="6000",
  518. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  519. )
  520. return False
  521. return True
  522. else:
  523. # 开始处理, 将文章状态修改为处理状态
  524. affected_rows = await self.update_content_status(
  525. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  526. trace_id=trace_id,
  527. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  528. )
  529. if affected_rows == 0:
  530. logging(
  531. code="6000",
  532. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  533. )
  534. return False
  535. # download videos
  536. illegal_videos = await self.get_illegal_out_ids(content_id)
  537. downloaded_count = await async_download_videos(
  538. trace_id=trace_id,
  539. content_id=content_id,
  540. article_crawler_video_table=self.article_crawler_video_table,
  541. db_client=self.long_articles_client,
  542. illegal_videos=illegal_videos
  543. )
  544. if downloaded_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  545. await self.update_content_status(
  546. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  547. trace_id=trace_id,
  548. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  549. )
  550. return True
  551. else:
  552. await self.roll_back_content_status_when_fails(
  553. process_times=process_times + 1,
  554. trace_id=trace_id
  555. )
  556. return False
  557. async def publish_task(self, params, kimi_title):
  558. """
  559. 发布任务
  560. :param kimi_title:
  561. :param params:
  562. :return:
  563. """
  564. gh_id = params['gh_id']
  565. flow_pool_level = params['flow_pool_level']
  566. content_id = params['content_id']
  567. trace_id = params['trace_id']
  568. process_times = params['process_times']
  569. # 开始处理,将状态修改为操作状态
  570. affected_rows = await self.update_content_status(
  571. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  572. trace_id=trace_id,
  573. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  574. )
  575. if affected_rows == 0:
  576. logging(
  577. code="6000",
  578. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  579. )
  580. return False
  581. try:
  582. download_videos = await get_downloaded_videos(
  583. content_id=content_id,
  584. article_crawler_video_table=self.article_crawler_video_table,
  585. db_client=self.long_articles_client
  586. )
  587. match flow_pool_level:
  588. case "autoArticlePoolLevel4":
  589. # 冷启层, 全量做
  590. video_list = shuffle_list(download_videos)[:3]
  591. case "autoArticlePoolLevel3":
  592. if self.gh_id_dict.get(gh_id):
  593. video_list = shuffle_list(download_videos)[:3]
  594. else:
  595. video_list = download_videos[:3]
  596. case "autoArticlePoolLevel2":
  597. # 次条,只针对具体账号做
  598. video_list = []
  599. case "autoArticlePoolLevel1":
  600. # 头条,先不做
  601. video_list = download_videos[:3]
  602. case _:
  603. video_list = download_videos[:3]
  604. # 将视频发布至票圈
  605. await publish_videos_to_piaoquan(
  606. video_list=video_list,
  607. kimi_title=kimi_title,
  608. process_times=process_times,
  609. trace_id=trace_id,
  610. db_client=self.long_articles_client,
  611. article_match_video_table=self.article_match_video_table
  612. )
  613. except Exception as e:
  614. await self.roll_back_content_status_when_fails(
  615. process_times=params['process_times'] + 1,
  616. trace_id=params['trace_id']
  617. )
  618. print(e)
  619. async def record_for_audit(self, content_id):
  620. """
  621. 视频下载成功后,记录到audit表中
  622. """
  623. audit_account = chr(random.randint(97, 106))
  624. insert_sql = f"""
  625. INSERT IGNORE INTO long_articles_title_audit
  626. (content_id, create_timestamp, audit_account)
  627. VALUES
  628. (%s, %s, %s);
  629. """
  630. await self.long_articles_client.async_insert(
  631. sql=insert_sql,
  632. params=(
  633. content_id,
  634. int(time.time() * 1000),
  635. audit_account
  636. )
  637. )
  638. async def start_process(self, params):
  639. """
  640. 处理单篇文章
  641. :param params:
  642. :return:
  643. """
  644. print("start process")
  645. kimi_result = await self.kimi_task(params)
  646. trace_id = params['trace_id']
  647. process_times = params['process_times']
  648. content_id = params['content_id']
  649. publish_flag = params['publish_flag']
  650. if kimi_result:
  651. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  652. print("kimi success")
  653. logging(
  654. code=3001,
  655. info="kimi success",
  656. trace_id=trace_id
  657. )
  658. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  659. if spider_flag:
  660. # 等待爬虫执行完成后,开始执行 etl_task
  661. print("spider success")
  662. logging(
  663. code=3002,
  664. info="spider_success",
  665. trace_id=trace_id
  666. )
  667. etl_flag = await self.etl_task(params)
  668. if etl_flag:
  669. # 等待下载上传完成,执行发布任务
  670. print("etl success")
  671. logging(
  672. code="3003",
  673. info="etl_success",
  674. trace_id=trace_id
  675. )
  676. # ETL下载成功,记录审核
  677. await self.record_for_audit(content_id)
  678. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  679. logging(
  680. code="3013",
  681. info="不需要发布,长文系统托管发布",
  682. trace_id=trace_id
  683. )
  684. return
  685. else:
  686. try:
  687. await self.publish_task(params, kimi_result['kimi_title'])
  688. logging(
  689. code="3004",
  690. info="publish_success",
  691. trace_id=trace_id
  692. )
  693. await record_trace_id(
  694. trace_id=trace_id,
  695. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  696. )
  697. except Exception as e:
  698. logging(
  699. code="6004",
  700. info="publish 失败--{}".format(e),
  701. trace_id=params['trace_id']
  702. )
  703. else:
  704. logging(
  705. code="6003",
  706. info="ETL 处理失败",
  707. trace_id=params['trace_id']
  708. )
  709. else:
  710. logging(
  711. code="6002",
  712. info="爬虫处理失败",
  713. trace_id=params['trace_id']
  714. )
  715. else:
  716. logging(
  717. code="6001",
  718. info="kimi 处理失败",
  719. trace_id=trace_id
  720. )
  721. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  722. logging(
  723. code="6011",
  724. info="kimi处理次数达到上限, 放弃处理",
  725. trace_id=trace_id
  726. )
  727. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  728. update_sql = f"""
  729. UPDATE {self.article_match_video_table}
  730. SET content_status = %s
  731. WHERE content_id = %s and content_status = %s;
  732. """
  733. affected_rows = await self.long_articles_client.async_insert(
  734. sql=update_sql,
  735. params=(
  736. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  737. content_id,
  738. NewContentIdTaskConst.TASK_INIT_STATUS
  739. )
  740. )
  741. # 查询出该content_id所对应的标题
  742. select_sql = f"""
  743. SELECT article_title
  744. FROM {self.article_text_table}
  745. WHERE content_id = '{content_id}';
  746. """
  747. result = await self.long_articles_client.async_select(select_sql)
  748. bot(
  749. title="KIMI 处理失败",
  750. detail={
  751. "content_id": content_id,
  752. "affected_rows": affected_rows,
  753. "article_title": result[0][0]
  754. },
  755. mention=False
  756. )
  757. async def process_each_task(self, params):
  758. """
  759. 处理任务
  760. :return:
  761. """
  762. print(json.dumps(params, ensure_ascii=False, indent=4))
  763. content_id = params['content_id']
  764. flow_pool_level = params['flow_pool_level']
  765. download_videos_exists_flag = await whether_downloaded_videos_exists(
  766. content_id=content_id,
  767. article_crawler_video_table=self.article_crawler_video_table,
  768. db_client=self.long_articles_client
  769. )
  770. print("开始处理")
  771. if not download_videos_exists_flag:
  772. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  773. if processing_flag:
  774. print("processing success")
  775. logging(
  776. code="9001",
  777. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  778. )
  779. else:
  780. # 判断是否存在root_content_id
  781. if flow_pool_level != 'autoArticlePoolLevel4':
  782. # 如果查到根content_id, 采用根content_id的视频
  783. root_content_id = await self.get_source_content_id(content_id)
  784. if root_content_id:
  785. # 传参新增root_content_id
  786. params['root_content_id'] = root_content_id
  787. # 开始处理
  788. await self.start_process(params=params)
  789. else:
  790. print("存在已下载视频")
  791. async def deal(self) -> None:
  792. """
  793. function
  794. :return:
  795. """
  796. # 处理未托管的任务
  797. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  798. # 处理托管任务
  799. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  800. # 将处理次数大于3次且未成功的任务置为失败
  801. await self.set_tasks_status_fail()
  802. # 获取task_list
  803. task_list = await self.get_tasks()
  804. task_dict = {task['content_id']: task for task in task_list}
  805. process_list = list(task_dict.values())
  806. logging(
  807. code="5001",
  808. info="Match Task Got {} this time".format(len(process_list)),
  809. function="Publish Task"
  810. )
  811. # 处理process_list
  812. if process_list:
  813. a = time.time()
  814. tasks = [self.process_each_task(params) for params in process_list]
  815. await asyncio.gather(*tasks)
  816. b = time.time()
  817. print("处理时间: {} s".format(b - a))
  818. else:
  819. logging(
  820. code="9008",
  821. info="没有要处理的请求"
  822. )