newContentIdTask.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. import asyncio
  7. from applications.config import Config
  8. from applications.log import logging
  9. from applications.functions.pqFunctions import publish_to_pq
  10. from applications.functions.common import shuffle_list
  11. from applications.functions.kimi import KimiServer
  12. from applications.spider import search_videos_from_web
  13. from applications.etl_function import *
  14. class NewContentIdTask(object):
  15. """
  16. 不存在历史已经发布的文章的匹配流程
  17. """
  18. TASK_INIT_STATUS = 0
  19. TASK_KIMI_FINISHED_STATUS = 1
  20. TASK_SPIDER_FINISHED_STATUS = 2
  21. TASK_ETL_FINISHED_STATUS = 3
  22. TASK_PUBLISHED_STATUS = 4
  23. TASK_PROCESSING_STATUS = 101
  24. TASK_FAIL_STATUS = 99
  25. ARTICLE_TEXT_TABLE_ERROR = 98
  26. TASK_MAX_PROCESS_TIMES = 3
  27. def __init__(self, mysql_client):
  28. self.mysql_client = mysql_client
  29. self.config = Config()
  30. self.article_match_video_table = self.config.article_match_video_table
  31. self.article_text_table = self.config.article_text_table
  32. self.article_crawler_video_table = self.config.article_crawler_video_table
  33. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  34. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  35. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  36. async def get_tasks(self):
  37. """
  38. 获取 task
  39. :return:
  40. """
  41. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  42. select_processing_sql = f"""
  43. SELECT
  44. trace_id, content_status_update_time, process_times
  45. FROM
  46. {self.article_match_video_table}
  47. WHERE
  48. content_status = {self.TASK_PROCESSING_STATUS}
  49. and process_times <= {self.TASK_MAX_PROCESS_TIMES};
  50. """
  51. processing_articles = await self.mysql_client.async_select(select_processing_sql)
  52. if processing_articles:
  53. processing_list = [
  54. {
  55. "trace_id": item[0],
  56. "content_status_update_time": item[1],
  57. "process_times": item[2]
  58. }
  59. for item in processing_articles
  60. ]
  61. for obj in processing_list:
  62. if int(time.time()) - obj['content_status_update_time'] >= 3600:
  63. # 认为该任务失败
  64. await self.roll_back_content_status_when_fails(
  65. process_times=obj['process_times'] + 1,
  66. trace_id=obj['trace_id']
  67. )
  68. # 将 process_times > 3 且状态不为 4 的任务的状态修改为失败,
  69. update_status_sql = f"""
  70. UPDATE
  71. {self.article_match_video_table}
  72. SET
  73. content_status = %s
  74. WHERE
  75. process_times > %s and content_status != %s;
  76. """
  77. await self.mysql_client.async_insert(
  78. update_status_sql,
  79. params=(
  80. self.TASK_FAIL_STATUS,
  81. self.TASK_MAX_PROCESS_TIMES,
  82. self.TASK_PUBLISHED_STATUS
  83. )
  84. )
  85. # 获取 process_times <= 3 且 content_status = 0 的任务
  86. select_sql = f"""
  87. SELECT
  88. trace_id, content_id, flow_pool_level, gh_id, process_times
  89. FROM
  90. {self.article_match_video_table}
  91. WHERE
  92. content_status = {self.TASK_INIT_STATUS}
  93. and process_times <= {self.TASK_MAX_PROCESS_TIMES}
  94. LIMIT {self.spider_coroutines};
  95. """
  96. tasks = await self.mysql_client.async_select(select_sql)
  97. if tasks:
  98. return [
  99. {
  100. "trace_id": i[0],
  101. "content_id": i[1],
  102. "flow_pool_level": i[2],
  103. "gh_id": i[3],
  104. "process_times": i[4]
  105. }
  106. for i in tasks
  107. ]
  108. else:
  109. return []
  110. async def get_video_list(self, content_id):
  111. """
  112. 判断该文章是否存在历史匹配视频
  113. :param content_id
  114. :return:
  115. """
  116. sql = f"""
  117. SELECT id
  118. FROM {self.article_crawler_video_table}
  119. WHERE content_id = '{content_id}' and download_status = 2;
  120. """
  121. res_tuple = await self.mysql_client.async_select(sql)
  122. if len(res_tuple) >= 3:
  123. return True
  124. else:
  125. return False
  126. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  127. """
  128. :param new_content_status:
  129. :param trace_id:
  130. :param ori_content_status:
  131. :return:
  132. """
  133. update_sql = f"""
  134. UPDATE {self.article_match_video_table}
  135. SET content_status = %s, content_status_update_time = %s
  136. WHERE trace_id = %s and content_status = %s;
  137. """
  138. row_counts = await self.mysql_client.async_insert(
  139. sql=update_sql,
  140. params=(
  141. new_content_status,
  142. int(time.time()),
  143. trace_id,
  144. ori_content_status
  145. )
  146. )
  147. return row_counts
  148. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  149. """
  150. 处理失败,回滚至初始状态,处理次数加 1
  151. :param process_times:
  152. :param trace_id:
  153. :return:
  154. """
  155. update_article_sql = f"""
  156. UPDATE {self.article_match_video_table}
  157. SET
  158. content_status = %s,
  159. content_status_update_time = %s,
  160. process_times = %s
  161. WHERE trace_id = %s and content_status = %s;
  162. """
  163. await self.mysql_client.async_insert(
  164. sql=update_article_sql,
  165. params=(
  166. self.TASK_INIT_STATUS,
  167. int(time.time()),
  168. process_times + 1,
  169. trace_id,
  170. self.TASK_PROCESSING_STATUS
  171. )
  172. )
  173. async def judge_whether_same_content_id_is_processing(self, content_id):
  174. """
  175. 同一个 content_id只需要处理一次
  176. :param content_id:
  177. :return:
  178. """
  179. select_sql = f"""
  180. SELECT distinct content_status
  181. FROM {self.article_match_video_table}
  182. WHERE content_id = '{content_id}';
  183. """
  184. result = await self.mysql_client.async_select(select_sql)
  185. if result:
  186. for item in result:
  187. content_status = item[0]
  188. if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  189. return True
  190. return False
  191. else:
  192. return False
  193. async def get_downloaded_videos(self, content_id):
  194. """
  195. 获取已下载的视频
  196. :return:
  197. """
  198. sql = f"""
  199. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  200. FROM {self.article_crawler_video_table}
  201. WHERE content_id = '{content_id}' and download_status = 2
  202. ORDER BY score DESC;
  203. """
  204. res_tuple = await self.mysql_client.async_select(sql)
  205. return [
  206. {
  207. "platform": i[0],
  208. "play_count": i[1],
  209. "like_count": i[2],
  210. "video_oss_path": i[3],
  211. "cover_oss_path": i[4],
  212. "uid": i[5]
  213. }
  214. for i in res_tuple
  215. ]
  216. async def get_kimi_status(self, content_id):
  217. """
  218. 通过 content_id 获取kimi info
  219. :return:
  220. """
  221. select_sql = f"""
  222. select kimi_status
  223. from {self.article_text_table}
  224. where content_id = '{content_id}';
  225. """
  226. response = await self.mysql_client.async_select(select_sql)
  227. if response:
  228. kimi_status = response[0][0]
  229. return kimi_status
  230. else:
  231. return self.ARTICLE_TEXT_TABLE_ERROR
  232. async def kimi_task(self, params):
  233. """
  234. 执行 kimi 任务
  235. :return:
  236. """
  237. KIMI_SUCCESS_STATUS = 1
  238. KIMI_FAIL_STATUS = 2
  239. content_id = params['content_id']
  240. trace_id = params['trace_id']
  241. process_times = params['process_times']
  242. kimi_status_code = await self.get_kimi_status(content_id=content_id)
  243. if kimi_status_code == KIMI_SUCCESS_STATUS:
  244. affected_rows = await self.update_content_status(
  245. new_content_status=self.TASK_KIMI_FINISHED_STATUS,
  246. trace_id=trace_id,
  247. ori_content_status=self.TASK_INIT_STATUS
  248. )
  249. if affected_rows == 0:
  250. logging(
  251. code="6000",
  252. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  253. )
  254. return
  255. get_kimi_sql = f"""
  256. SELECT article_title, kimi_title, kimi_summary, kimi_keys
  257. FROM {self.article_text_table}
  258. WHERE content_id = '{content_id}';
  259. """
  260. kimi_info = await self.mysql_client.async_select(get_kimi_sql)
  261. return {
  262. "kimi_title": kimi_info[0][1],
  263. "ori_title": kimi_info[0][0],
  264. "kimi_summary": kimi_info[0][2],
  265. "kimi_keys": json.loads(kimi_info[0][3])
  266. }
  267. elif kimi_status_code == self.ARTICLE_TEXT_TABLE_ERROR:
  268. logging(
  269. code="4000",
  270. info="long_articles_text表中未找到 content_id"
  271. )
  272. else:
  273. # 开始处理,讲 content_status 从 0 改为 101
  274. affected_rows = await self.update_content_status(
  275. new_content_status=self.TASK_PROCESSING_STATUS,
  276. trace_id=trace_id,
  277. ori_content_status=self.TASK_INIT_STATUS
  278. )
  279. if affected_rows == 0:
  280. logging(
  281. code="6000",
  282. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  283. )
  284. return
  285. K = KimiServer()
  286. try:
  287. select_sql = f"""
  288. select article_title, article_text
  289. from {self.article_text_table}
  290. where content_id = '{content_id}'
  291. """
  292. res = await self.mysql_client.async_select(select_sql)
  293. article_obj = {
  294. "article_title": res[0][0],
  295. "article_text": res[0][1],
  296. "content_id": content_id
  297. }
  298. kimi_info = await K.search_kimi_schedule(params=article_obj)
  299. kimi_title = kimi_info['k_title']
  300. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  301. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  302. update_kimi_sql = f"""
  303. UPDATE {self.article_text_table}
  304. SET
  305. kimi_title = %s,
  306. kimi_summary = %s,
  307. kimi_keys = %s,
  308. kimi_status = %s
  309. WHERE content_id = %s;"""
  310. await self.mysql_client.async_insert(
  311. sql=update_kimi_sql,
  312. params=(kimi_title, content_title, content_keys, KIMI_SUCCESS_STATUS, params['content_id'])
  313. )
  314. await self.update_content_status(
  315. new_content_status=self.TASK_KIMI_FINISHED_STATUS,
  316. trace_id=trace_id,
  317. ori_content_status=self.TASK_PROCESSING_STATUS
  318. )
  319. return {
  320. "kimi_title": kimi_title,
  321. "ori_title": article_obj['article_title'],
  322. "kimi_summary": content_title,
  323. "kimi_keys": kimi_info['content_keys']
  324. }
  325. except Exception as e:
  326. # kimi 任务处理失败
  327. update_kimi_sql = f"""
  328. UPDATE {self.article_text_table}
  329. SET
  330. kimi_status = %s
  331. WHERE content_id = %s
  332. """
  333. await self.mysql_client.async_insert(
  334. sql=update_kimi_sql,
  335. params=(
  336. KIMI_FAIL_STATUS,
  337. content_id
  338. )
  339. )
  340. # 将状态由 101 回退为 0
  341. await self.roll_back_content_status_when_fails(
  342. process_times=process_times,
  343. trace_id=trace_id
  344. )
  345. return {}
  346. async def spider_task(self, params, kimi_result):
  347. """
  348. 爬虫任务
  349. :return:
  350. """
  351. SPIDER_INIT_STATUS = 1
  352. trace_id = params['trace_id']
  353. content_id = params['content_id']
  354. process_times = params['process_times']
  355. gh_id = params['gh_id']
  356. select_sql = f"""
  357. select count(id) from {self.article_crawler_video_table} where content_id = '{content_id}';
  358. """
  359. count_tuple = await self.mysql_client.async_select(select_sql)
  360. counts = count_tuple[0][0]
  361. if counts >= 3:
  362. await self.update_content_status(
  363. new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
  364. trace_id=trace_id,
  365. ori_content_status=SPIDER_INIT_STATUS
  366. )
  367. return True
  368. # 开始处理,将状态由 1 改成 101
  369. affected_rows = await self.update_content_status(
  370. new_content_status=self.TASK_PROCESSING_STATUS,
  371. ori_content_status=SPIDER_INIT_STATUS,
  372. trace_id=trace_id
  373. )
  374. if affected_rows == 0:
  375. logging(
  376. code="6000",
  377. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  378. )
  379. return False
  380. try:
  381. search_videos_count = await search_videos_from_web(
  382. info={
  383. "ori_title": kimi_result['ori_title'],
  384. "kimi_summary": kimi_result['kimi_summary'],
  385. "kimi_keys": kimi_result['kimi_keys'],
  386. "trace_id": trace_id,
  387. "gh_id": gh_id,
  388. "content_id": content_id,
  389. "crawler_video_table": self.article_crawler_video_table
  390. },
  391. gh_id_map=self.account_map,
  392. db_client=self.mysql_client
  393. )
  394. if search_videos_count >= 3:
  395. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  396. await self.update_content_status(
  397. new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
  398. trace_id=trace_id,
  399. ori_content_status=self.TASK_PROCESSING_STATUS
  400. )
  401. return True
  402. else:
  403. await self.roll_back_content_status_when_fails(
  404. process_times=process_times + 1,
  405. trace_id=trace_id
  406. )
  407. return False
  408. except Exception as e:
  409. await self.roll_back_content_status_when_fails(
  410. process_times=process_times + 1,
  411. trace_id=trace_id
  412. )
  413. print("爬虫处理失败: {}".format(e))
  414. return False
  415. async def etl_task(self, params):
  416. """
  417. download && upload videos
  418. :param params:
  419. :return:
  420. """
  421. VIDEO_DOWNLOAD_SUCCESS_STATUS = 2
  422. VIDEO_DOWNLOAD_FAIL_STATUS = 3
  423. ETL_TASK_INIT_STATUS = 2
  424. trace_id = params['trace_id']
  425. content_id = params['content_id']
  426. # 判断是否有三条已经下载完成的视频
  427. select_sql = f"""
  428. select count(id)
  429. from {self.article_crawler_video_table}
  430. where content_id = '{content_id}' and download_status = {VIDEO_DOWNLOAD_SUCCESS_STATUS};
  431. """
  432. video_count_tuple = await self.mysql_client.async_select(select_sql)
  433. video_count = video_count_tuple[0][0]
  434. if video_count >= 3:
  435. affect_rows = await self.update_content_status(
  436. ori_content_status=ETL_TASK_INIT_STATUS,
  437. trace_id=trace_id,
  438. new_content_status=self.TASK_ETL_FINISHED_STATUS
  439. )
  440. if affect_rows == 0:
  441. logging(
  442. code="6000",
  443. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  444. )
  445. return False
  446. return True
  447. else:
  448. # 开始处理, 将文章状态修改为处理状态
  449. affected_rows = await self.update_content_status(
  450. ori_content_status=ETL_TASK_INIT_STATUS,
  451. trace_id=trace_id,
  452. new_content_status=self.TASK_PROCESSING_STATUS
  453. )
  454. if affected_rows == 0:
  455. logging(
  456. code="6000",
  457. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  458. )
  459. return False
  460. select_sql = f"""
  461. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  462. FROM {self.article_crawler_video_table}
  463. WHERE content_id = '{content_id}' and download_status != {VIDEO_DOWNLOAD_SUCCESS_STATUS}
  464. ORDER BY score DESC;
  465. """
  466. videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
  467. downloaded_count = 0
  468. for line in videos_need_to_download_tuple:
  469. params = {
  470. "id": line[0],
  471. "video_id": line[1],
  472. "platform": line[2],
  473. "video_title": line[3],
  474. "video_url": line[4],
  475. "cover_url": line[5],
  476. "user_id": line[6],
  477. "trace_id": line[7]
  478. }
  479. try:
  480. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  481. # download videos
  482. file_path = await download_video(
  483. file_path=local_video_path,
  484. platform=params['platform'],
  485. video_url=params['video_url']
  486. )
  487. # download cover
  488. cover_path = await download_cover(
  489. file_path=local_cover_path,
  490. platform=params['platform'],
  491. cover_url=params['cover_url']
  492. )
  493. oss_video = await upload_to_oss(
  494. local_video_path=file_path,
  495. download_type="video"
  496. )
  497. if cover_path:
  498. oss_cover = await upload_to_oss(
  499. local_video_path=cover_path,
  500. download_type="image"
  501. )
  502. else:
  503. oss_cover = None
  504. update_sql = f"""
  505. UPDATE {self.article_crawler_video_table}
  506. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  507. WHERE id = %s;
  508. """
  509. await self.mysql_client.async_insert(
  510. sql=update_sql,
  511. params=(
  512. oss_video,
  513. oss_cover,
  514. VIDEO_DOWNLOAD_SUCCESS_STATUS,
  515. params['id']
  516. )
  517. )
  518. downloaded_count += 1
  519. if downloaded_count > 3:
  520. await self.update_content_status(
  521. ori_content_status=self.TASK_PROCESSING_STATUS,
  522. trace_id=trace_id,
  523. new_content_status=self.TASK_ETL_FINISHED_STATUS
  524. )
  525. return True
  526. except Exception as e:
  527. update_sql = f"""
  528. UPDATE {self.article_crawler_video_table}
  529. SET download_status = %s
  530. WHERE id = %s;
  531. """
  532. await self.mysql_client.async_insert(
  533. sql=update_sql,
  534. params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  535. )
  536. if downloaded_count >= 3:
  537. await self.update_content_status(
  538. ori_content_status=self.TASK_PROCESSING_STATUS,
  539. trace_id=trace_id,
  540. new_content_status=self.TASK_ETL_FINISHED_STATUS
  541. )
  542. return True
  543. else:
  544. await self.roll_back_content_status_when_fails(
  545. process_times=params['process_times'] + 1,
  546. trace_id=params['trace_id']
  547. )
  548. return False
  549. async def publish_task(self, params, kimi_title):
  550. """
  551. 发布任务
  552. :param kimi_title:
  553. :param params:
  554. :return:
  555. """
  556. PUBLISH_DEFAULT_STATUS = 3
  557. gh_id = params['gh_id']
  558. flow_pool_level = params['flow_pool_level']
  559. content_id = params['content_id']
  560. trace_id = params['trace_id']
  561. process_times = params['process_times']
  562. # 开始处理,将状态修改为操作状态
  563. affected_rows = await self.update_content_status(
  564. ori_content_status=PUBLISH_DEFAULT_STATUS,
  565. trace_id=trace_id,
  566. new_content_status=self.TASK_PROCESSING_STATUS
  567. )
  568. if affected_rows == 0:
  569. logging(
  570. code="6000",
  571. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  572. )
  573. return False
  574. try:
  575. download_videos = await self.get_downloaded_videos(content_id)
  576. match flow_pool_level:
  577. case "autoArticlePoolLevel4":
  578. # 冷启层, 全量做
  579. video_list = shuffle_list(download_videos)[:3]
  580. case "autoArticlePoolLevel3":
  581. if self.gh_id_dict.get(gh_id):
  582. video_list = shuffle_list(download_videos)[:3]
  583. else:
  584. video_list = download_videos[:3]
  585. case "autoArticlePoolLevel2":
  586. # 次条,只针对具体账号做
  587. video_list = []
  588. case "autoArticlePoolLevel1":
  589. # 头条,先不做
  590. video_list = download_videos[:3]
  591. case _:
  592. video_list = download_videos[:3]
  593. L = []
  594. for video_obj in video_list:
  595. params = {
  596. "videoPath": video_obj['video_oss_path'],
  597. "uid": video_obj['uid'],
  598. "title": kimi_title
  599. }
  600. response = await publish_to_pq(params)
  601. # time.sleep(2)
  602. obj = {
  603. "uid": video_obj['uid'],
  604. "source": video_obj['platform'],
  605. "kimiTitle": kimi_title,
  606. "videoId": response['data']['id'],
  607. "videoCover": response['data']['shareImgPath'],
  608. "videoPath": response['data']['videoPath'],
  609. "videoOss": video_obj['video_oss_path']
  610. }
  611. L.append(obj)
  612. update_sql = f"""
  613. UPDATE {self.article_match_video_table}
  614. SET content_status = %s, response = %s, process_times = %s
  615. WHERE trace_id = %s and content_status = %s;
  616. """
  617. # 从操作中状态修改为已发布状态
  618. await self.mysql_client.async_insert(
  619. sql=update_sql,
  620. params=(
  621. self.TASK_PUBLISHED_STATUS,
  622. json.dumps(L, ensure_ascii=False),
  623. process_times + 1,
  624. trace_id,
  625. self.TASK_PROCESSING_STATUS
  626. )
  627. )
  628. except Exception as e:
  629. await self.roll_back_content_status_when_fails(
  630. process_times=params['process_times'] + 1,
  631. trace_id=params['trace_id']
  632. )
  633. print(e)
  634. async def start_process(self, params):
  635. """
  636. 处理单篇文章
  637. :param params:
  638. :return:
  639. """
  640. # step1: 执行 kimi 操作
  641. # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
  642. kimi_result = await self.kimi_task(params)
  643. trace_id = params['trace_id']
  644. if kimi_result:
  645. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  646. print("kimi success")
  647. logging(
  648. code=3001,
  649. info="kimi success",
  650. trace_id=trace_id
  651. )
  652. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  653. if spider_flag:
  654. # 等待爬虫执行完成后,开始执行 etl_task
  655. print("spider success")
  656. logging(
  657. code=3002,
  658. info="spider_success",
  659. trace_id=trace_id
  660. )
  661. etl_flag = await self.etl_task(params)
  662. if etl_flag:
  663. # 等待下载上传完成,执行发布任务
  664. print("etl success")
  665. logging(
  666. code="3003",
  667. info="etl_success",
  668. trace_id=trace_id
  669. )
  670. try:
  671. await self.publish_task(params, kimi_result['kimi_title'])
  672. logging(
  673. code="3004",
  674. info="publish_success",
  675. trace_id=trace_id
  676. )
  677. except Exception as e:
  678. logging(
  679. code="6004",
  680. info="publish 失败--{}".format(e),
  681. trace_id=params['trace_id']
  682. )
  683. else:
  684. logging(
  685. code="6003",
  686. info="ETL 处理失败",
  687. trace_id=params['trace_id']
  688. )
  689. else:
  690. logging(
  691. code="6002",
  692. info="爬虫处理失败",
  693. trace_id=params['trace_id']
  694. )
  695. else:
  696. logging(
  697. code="6001",
  698. info="kimi 处理失败",
  699. trace_id=params['trace_id']
  700. )
  701. async def process_task(self, params):
  702. """
  703. 处理任务
  704. :return:
  705. """
  706. content_id = params['content_id']
  707. download_videos = await self.get_video_list(content_id)
  708. if not download_videos:
  709. # 开始处理, 判断是否有相同的文章 id 正在处理
  710. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  711. if processing_flag:
  712. logging(
  713. code="9001",
  714. info="该 content id 正在处理中, 跳过此任务"
  715. )
  716. else:
  717. await self.start_process(params=params)
  718. else:
  719. print("存在已下载视频")
  720. async def deal(self):
  721. """
  722. function
  723. :return:
  724. """
  725. task_list = await self.get_tasks()
  726. print(task_list)
  727. task_dict = {}
  728. # 对 content_id去重
  729. for task in task_list:
  730. key = task['content_id']
  731. task_dict[key] = task
  732. process_list = []
  733. for item in task_dict:
  734. process_list.append(task_dict[item])
  735. logging(
  736. code="5001",
  737. info="Match Task Got {} this time".format(len(task_list)),
  738. function="Publish Task"
  739. )
  740. if task_list:
  741. tasks = [self.process_task(params) for params in process_list]
  742. await asyncio.gather(*tasks)
  743. else:
  744. logging(
  745. code="9008",
  746. info="没有要处理的请求"
  747. )