new_contentId_task.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from applications.config import Config, NewContentIdTaskConst
  7. from applications.log import logging
  8. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  9. from applications.functions.common import shuffle_list
  10. from applications.functions.kimi import KimiServer
  11. from applications.spider import search_videos_from_web
  12. from applications.etl_function import *
  13. from applications.feishu import bot
  14. from applications.functions.aigc import record_trace_id
  15. class NewContentIdTask(object):
  16. """
  17. 不存在历史已经发布的文章的匹配流程
  18. """
  19. def __init__(self, mysql_client):
  20. self.mysql_client = mysql_client
  21. self.config = Config()
  22. self.article_match_video_table = self.config.article_match_video_table
  23. self.article_text_table = self.config.article_text_table
  24. self.article_crawler_video_table = self.config.article_crawler_video_table
  25. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  26. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  27. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  28. self.const = NewContentIdTaskConst()
  29. async def get_tasks(self):
  30. """
  31. 获取 task
  32. :return:
  33. """
  34. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  35. select_processing_sql = f"""
  36. SELECT
  37. trace_id, content_status_update_time, process_times
  38. FROM
  39. {self.article_match_video_table}
  40. WHERE
  41. content_status = {self.const.TASK_PROCESSING_STATUS}
  42. and process_times <= {self.const.TASK_MAX_PROCESS_TIMES};
  43. """
  44. processing_articles = await self.mysql_client.async_select(select_processing_sql)
  45. if processing_articles:
  46. processing_list = [
  47. {
  48. "trace_id": item[0],
  49. "content_status_update_time": item[1],
  50. "process_times": item[2]
  51. }
  52. for item in processing_articles
  53. ]
  54. for obj in processing_list:
  55. if int(time.time()) - obj['content_status_update_time'] >= self.const.TASK_PROCESSING_TIMEOUT:
  56. # 认为该任务失败
  57. await self.roll_back_content_status_when_fails(
  58. process_times=obj['process_times'] + 1,
  59. trace_id=obj['trace_id']
  60. )
  61. # 将 process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
  62. update_status_sql = f"""
  63. UPDATE
  64. {self.article_match_video_table}
  65. SET
  66. content_status = %s
  67. WHERE
  68. process_times > %s and content_status not in (%s, %s);
  69. """
  70. await self.mysql_client.async_insert(
  71. update_status_sql,
  72. params=(
  73. self.const.TASK_FAIL_STATUS,
  74. self.const.TASK_MAX_PROCESS_TIMES,
  75. self.const.TASK_ETL_COMPLETE_STATUS,
  76. self.const.TASK_PUBLISHED_STATUS
  77. )
  78. )
  79. # 获取 process_times <= 3 且 content_status = 0 的任务
  80. select_sql = f"""
  81. SELECT
  82. trace_id, content_id, flow_pool_level, gh_id, process_times, publish_flag
  83. FROM
  84. {self.article_match_video_table}
  85. WHERE
  86. content_status = {self.const.TASK_INIT_STATUS}
  87. and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
  88. ORDER BY flow_pool_level, request_timestamp
  89. LIMIT {self.spider_coroutines};
  90. """
  91. tasks = await self.mysql_client.async_select(select_sql)
  92. if tasks:
  93. return [
  94. {
  95. "trace_id": i[0],
  96. "content_id": i[1],
  97. "flow_pool_level": i[2],
  98. "gh_id": i[3],
  99. "process_times": i[4],
  100. "publish_flag": i[5]
  101. }
  102. for i in tasks
  103. ]
  104. else:
  105. return []
  106. async def get_video_list(self, content_id):
  107. """
  108. 判断该文章是否存在历史匹配视频
  109. :param content_id
  110. :return:
  111. """
  112. sql = f"""
  113. SELECT id
  114. FROM {self.article_crawler_video_table}
  115. WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  116. """
  117. res_tuple = await self.mysql_client.async_select(sql)
  118. if len(res_tuple) >= self.const.MIN_MATCH_VIDEO_NUM:
  119. return True
  120. else:
  121. return False
  122. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  123. """
  124. :param new_content_status:
  125. :param trace_id:
  126. :param ori_content_status:
  127. :return:
  128. """
  129. update_sql = f"""
  130. UPDATE {self.article_match_video_table}
  131. SET content_status = %s, content_status_update_time = %s
  132. WHERE trace_id = %s and content_status = %s;
  133. """
  134. row_counts = await self.mysql_client.async_insert(
  135. sql=update_sql,
  136. params=(
  137. new_content_status,
  138. int(time.time()),
  139. trace_id,
  140. ori_content_status
  141. )
  142. )
  143. return row_counts
  144. async def roll_back_content_status_when_fails(self, process_times, trace_id):
  145. """
  146. 处理失败,回滚至初始状态,处理次数加 1
  147. :param process_times:
  148. :param trace_id:
  149. :return:
  150. """
  151. update_article_sql = f"""
  152. UPDATE {self.article_match_video_table}
  153. SET
  154. content_status = %s,
  155. content_status_update_time = %s,
  156. process_times = %s
  157. WHERE trace_id = %s and content_status = %s;
  158. """
  159. await self.mysql_client.async_insert(
  160. sql=update_article_sql,
  161. params=(
  162. self.const.TASK_INIT_STATUS,
  163. int(time.time()),
  164. process_times + 1,
  165. trace_id,
  166. self.const.TASK_PROCESSING_STATUS
  167. )
  168. )
  169. async def judge_whether_same_content_id_is_processing(self, content_id):
  170. """
  171. 同一个 content_id 只需要处理一次
  172. :param content_id:
  173. :return:
  174. success: 4
  175. init: 0
  176. fail: 99
  177. todo: 存在处理失败的content_id是否需要不再处理
  178. """
  179. select_sql = f"""
  180. SELECT distinct content_status
  181. FROM {self.article_match_video_table}
  182. WHERE content_id = '{content_id}';
  183. """
  184. result = await self.mysql_client.async_select(select_sql)
  185. if result:
  186. for item in result:
  187. content_status = item[0]
  188. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  189. if content_status in {
  190. self.const.TASK_KIMI_FINISHED_STATUS,
  191. self.const.TASK_SPIDER_FINISHED_STATUS,
  192. self.const.TASK_ETL_COMPLETE_STATUS,
  193. self.const.TASK_PROCESSING_STATUS,
  194. self.const.TASK_PUBLISHED_STATUS
  195. }:
  196. return True
  197. return False
  198. else:
  199. return False
  200. async def get_downloaded_videos(self, content_id):
  201. """
  202. 获取已下载的视频
  203. :return:
  204. """
  205. sql = f"""
  206. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  207. FROM {self.article_crawler_video_table}
  208. WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  209. ORDER BY score DESC;
  210. """
  211. res_tuple = await self.mysql_client.async_select(sql)
  212. return [
  213. {
  214. "platform": i[0],
  215. "play_count": i[1],
  216. "like_count": i[2],
  217. "video_oss_path": i[3],
  218. "cover_oss_path": i[4],
  219. "uid": i[5]
  220. }
  221. for i in res_tuple
  222. ]
  223. async def get_kimi_status(self, content_id):
  224. """
  225. 通过 content_id 获取kimi info
  226. :return:
  227. """
  228. select_sql = f"""
  229. select kimi_status
  230. from {self.article_text_table}
  231. where content_id = '{content_id}';
  232. """
  233. response = await self.mysql_client.async_select(select_sql)
  234. if response:
  235. kimi_status = response[0][0]
  236. return kimi_status
  237. else:
  238. return self.const.ARTICLE_TEXT_TABLE_ERROR
  239. async def kimi_task(self, params):
  240. """
  241. 执行 kimi 任务
  242. :return:
  243. """
  244. content_id = params['content_id']
  245. trace_id = params['trace_id']
  246. process_times = params['process_times']
  247. kimi_status_code = await self.get_kimi_status(content_id=content_id)
  248. if kimi_status_code == self.const.KIMI_SUCCESS_STATUS:
  249. affected_rows = await self.update_content_status(
  250. new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
  251. trace_id=trace_id,
  252. ori_content_status=self.const.TASK_INIT_STATUS
  253. )
  254. if affected_rows == 0:
  255. logging(
  256. code="6000",
  257. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  258. )
  259. return
  260. get_kimi_sql = f"""
  261. SELECT article_title, kimi_title, kimi_summary, kimi_keys
  262. FROM {self.article_text_table}
  263. WHERE content_id = '{content_id}';
  264. """
  265. kimi_info = await self.mysql_client.async_select(get_kimi_sql)
  266. return {
  267. "kimi_title": kimi_info[0][1],
  268. "ori_title": kimi_info[0][0],
  269. "kimi_summary": kimi_info[0][2],
  270. "kimi_keys": json.loads(kimi_info[0][3])
  271. }
  272. elif kimi_status_code == self.const.ARTICLE_TEXT_TABLE_ERROR:
  273. logging(
  274. code="4000",
  275. info="long_articles_text表中未找到 content_id"
  276. )
  277. else:
  278. # 开始处理,讲 content_status 从 0 改为 101
  279. affected_rows = await self.update_content_status(
  280. new_content_status=self.const.TASK_PROCESSING_STATUS,
  281. trace_id=trace_id,
  282. ori_content_status=self.const.TASK_INIT_STATUS
  283. )
  284. if affected_rows == 0:
  285. logging(
  286. code="6000",
  287. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  288. )
  289. return
  290. K = KimiServer()
  291. try:
  292. select_sql = f"""
  293. select article_title, article_text
  294. from {self.article_text_table}
  295. where content_id = '{content_id}'
  296. """
  297. res = await self.mysql_client.async_select(select_sql)
  298. article_obj = {
  299. "article_title": res[0][0],
  300. "article_text": res[0][1],
  301. "content_id": content_id
  302. }
  303. kimi_info = await K.search_kimi_schedule(params=article_obj)
  304. kimi_title = kimi_info['k_title']
  305. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  306. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  307. update_kimi_sql = f"""
  308. UPDATE {self.article_text_table}
  309. SET
  310. kimi_title = %s,
  311. kimi_summary = %s,
  312. kimi_keys = %s,
  313. kimi_status = %s
  314. WHERE content_id = %s;"""
  315. await self.mysql_client.async_insert(
  316. sql=update_kimi_sql,
  317. params=(kimi_title, content_title, content_keys, self.const.KIMI_SUCCESS_STATUS, params['content_id'])
  318. )
  319. await self.update_content_status(
  320. new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
  321. trace_id=trace_id,
  322. ori_content_status=self.const.TASK_PROCESSING_STATUS
  323. )
  324. return {
  325. "kimi_title": kimi_title,
  326. "ori_title": article_obj['article_title'],
  327. "kimi_summary": content_title,
  328. "kimi_keys": kimi_info['content_keys']
  329. }
  330. except Exception as e:
  331. # kimi 任务处理失败
  332. update_kimi_sql = f"""
  333. UPDATE {self.article_text_table}
  334. SET
  335. kimi_status = %s
  336. WHERE content_id = %s
  337. """
  338. await self.mysql_client.async_insert(
  339. sql=update_kimi_sql,
  340. params=(
  341. self.const.KIMI_FAIL_STATUS,
  342. content_id
  343. )
  344. )
  345. # 将状态由 101 回退为 0
  346. await self.roll_back_content_status_when_fails(
  347. process_times=process_times,
  348. trace_id=trace_id
  349. )
  350. return {}
  351. async def spider_task(self, params, kimi_result):
  352. """
  353. 爬虫任务
  354. :return:
  355. """
  356. trace_id = params['trace_id']
  357. content_id = params['content_id']
  358. process_times = params['process_times']
  359. gh_id = params['gh_id']
  360. select_sql = f"""
  361. SELECT count(id)
  362. FROM {self.article_crawler_video_table}
  363. WHERE content_id = '{content_id}'
  364. AND download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  365. """
  366. count_tuple = await self.mysql_client.async_select(select_sql)
  367. counts = count_tuple[0][0]
  368. if counts >= self.const.MIN_MATCH_VIDEO_NUM:
  369. await self.update_content_status(
  370. new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
  371. trace_id=trace_id,
  372. ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS
  373. )
  374. return True
  375. # 开始处理,将状态由 1 改成 101
  376. affected_rows = await self.update_content_status(
  377. new_content_status=self.const.TASK_PROCESSING_STATUS,
  378. ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
  379. trace_id=trace_id
  380. )
  381. if affected_rows == 0:
  382. logging(
  383. code="6000",
  384. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  385. )
  386. return False
  387. try:
  388. logging(
  389. code="spider_1001",
  390. info="开始执行搜索任务",
  391. trace_id=trace_id,
  392. data=kimi_result
  393. )
  394. search_videos_count = await search_videos_from_web(
  395. info={
  396. "ori_title": kimi_result['ori_title'],
  397. "kimi_summary": kimi_result['kimi_summary'],
  398. "kimi_keys": kimi_result['kimi_keys'],
  399. "trace_id": trace_id,
  400. "gh_id": gh_id,
  401. "content_id": content_id,
  402. "crawler_video_table": self.article_crawler_video_table
  403. },
  404. gh_id_map=self.account_map,
  405. db_client=self.mysql_client
  406. )
  407. if search_videos_count >= self.const.MIN_MATCH_VIDEO_NUM:
  408. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  409. logging(
  410. code="spider_1002",
  411. info="搜索成功",
  412. trace_id=trace_id,
  413. data=kimi_result
  414. )
  415. await self.update_content_status(
  416. new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
  417. trace_id=trace_id,
  418. ori_content_status=self.const.TASK_PROCESSING_STATUS
  419. )
  420. return True
  421. else:
  422. logging(
  423. code="spider_1003",
  424. info="搜索失败",
  425. trace_id=trace_id,
  426. data=kimi_result
  427. )
  428. await self.roll_back_content_status_when_fails(
  429. process_times=process_times + 1,
  430. trace_id=trace_id
  431. )
  432. return False
  433. except Exception as e:
  434. await self.roll_back_content_status_when_fails(
  435. process_times=process_times + 1,
  436. trace_id=trace_id
  437. )
  438. print("爬虫处理失败: {}".format(e))
  439. return False
  440. async def etl_task(self, params):
  441. """
  442. download && upload videos
  443. :param params:
  444. :return:
  445. """
  446. trace_id = params['trace_id']
  447. content_id = params['content_id']
  448. process_times = params['process_times']
  449. # 判断是否有三条已经下载完成的视频
  450. select_sql = f"""
  451. select count(id)
  452. from {self.article_crawler_video_table}
  453. where content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  454. """
  455. video_count_tuple = await self.mysql_client.async_select(select_sql)
  456. video_count = video_count_tuple[0][0]
  457. if video_count >= self.const.MIN_MATCH_VIDEO_NUM:
  458. affect_rows = await self.update_content_status(
  459. ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
  460. trace_id=trace_id,
  461. new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
  462. )
  463. if affect_rows == 0:
  464. logging(
  465. code="6000",
  466. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  467. )
  468. return False
  469. return True
  470. else:
  471. # 开始处理, 将文章状态修改为处理状态
  472. affected_rows = await self.update_content_status(
  473. ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
  474. trace_id=trace_id,
  475. new_content_status=self.const.TASK_PROCESSING_STATUS
  476. )
  477. if affected_rows == 0:
  478. logging(
  479. code="6000",
  480. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  481. )
  482. return False
  483. select_sql = f"""
  484. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  485. FROM {self.article_crawler_video_table}
  486. WHERE content_id = '{content_id}' and download_status != {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  487. ORDER BY score DESC;
  488. """
  489. videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
  490. downloaded_count = 0
  491. for line in videos_need_to_download_tuple:
  492. params = {
  493. "id": line[0],
  494. "video_id": line[1],
  495. "platform": line[2],
  496. "video_title": line[3],
  497. "video_url": line[4],
  498. "cover_url": line[5],
  499. "user_id": line[6],
  500. "trace_id": line[7]
  501. }
  502. try:
  503. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  504. # download videos
  505. file_path = await download_video(
  506. file_path=local_video_path,
  507. platform=params['platform'],
  508. video_url=params['video_url']
  509. )
  510. if not file_path:
  511. # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
  512. update_sql = f"""
  513. UPDATE {self.article_crawler_video_table}
  514. SET download_status = %s
  515. WHERE id = %s;
  516. """
  517. await self.mysql_client.async_insert(
  518. sql=update_sql,
  519. params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  520. )
  521. logging(
  522. code="etl_1001",
  523. info="etl_下载视频失败",
  524. trace_id=trace_id,
  525. function="etl_task"
  526. )
  527. else:
  528. # download cover
  529. cover_path = await download_cover(
  530. file_path=local_cover_path,
  531. platform=params['platform'],
  532. cover_url=params['cover_url']
  533. )
  534. # upload video to oss
  535. oss_video = await upload_to_oss(
  536. local_video_path=file_path,
  537. download_type="video"
  538. )
  539. # upload cover to oss
  540. if cover_path:
  541. oss_cover = await upload_to_oss(
  542. local_video_path=cover_path,
  543. download_type="image"
  544. )
  545. else:
  546. oss_cover = None
  547. # change status to success
  548. update_sql = f"""
  549. UPDATE {self.article_crawler_video_table}
  550. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  551. WHERE id = %s;
  552. """
  553. await self.mysql_client.async_insert(
  554. sql=update_sql,
  555. params=(
  556. oss_video,
  557. oss_cover,
  558. self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
  559. params['id']
  560. )
  561. )
  562. downloaded_count += 1
  563. logging(
  564. code="etl_1002",
  565. info="etl_视频下载成功",
  566. trace_id=trace_id,
  567. function="etl_task"
  568. )
  569. # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
  570. if downloaded_count > self.const.MIN_MATCH_VIDEO_NUM:
  571. await self.update_content_status(
  572. ori_content_status=self.const.TASK_PROCESSING_STATUS,
  573. trace_id=trace_id,
  574. new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
  575. )
  576. return True
  577. except Exception as e:
  578. update_sql = f"""
  579. UPDATE {self.article_crawler_video_table}
  580. SET download_status = %s
  581. WHERE id = %s;
  582. """
  583. await self.mysql_client.async_insert(
  584. sql=update_sql,
  585. params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  586. )
  587. logging(
  588. code="etl_1001",
  589. info="etl_下载视频失败",
  590. trace_id=trace_id,
  591. function="etl_task"
  592. )
  593. if downloaded_count >= 3:
  594. await self.update_content_status(
  595. ori_content_status=self.const.TASK_PROCESSING_STATUS,
  596. trace_id=trace_id,
  597. new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
  598. )
  599. return True
  600. else:
  601. await self.roll_back_content_status_when_fails(
  602. process_times=process_times + 1,
  603. trace_id=trace_id
  604. )
  605. return False
  606. async def publish_task(self, params, kimi_title):
  607. """
  608. 发布任务
  609. :param kimi_title:
  610. :param params:
  611. :return:
  612. """
  613. gh_id = params['gh_id']
  614. flow_pool_level = params['flow_pool_level']
  615. content_id = params['content_id']
  616. trace_id = params['trace_id']
  617. process_times = params['process_times']
  618. # 开始处理,将状态修改为操作状态
  619. affected_rows = await self.update_content_status(
  620. ori_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
  621. trace_id=trace_id,
  622. new_content_status=self.const.TASK_PROCESSING_STATUS
  623. )
  624. if affected_rows == 0:
  625. logging(
  626. code="6000",
  627. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  628. )
  629. return False
  630. try:
  631. download_videos = await self.get_downloaded_videos(content_id)
  632. match flow_pool_level:
  633. case "autoArticlePoolLevel4":
  634. # 冷启层, 全量做
  635. video_list = shuffle_list(download_videos)[:3]
  636. case "autoArticlePoolLevel3":
  637. if self.gh_id_dict.get(gh_id):
  638. video_list = shuffle_list(download_videos)[:3]
  639. else:
  640. video_list = download_videos[:3]
  641. case "autoArticlePoolLevel2":
  642. # 次条,只针对具体账号做
  643. video_list = []
  644. case "autoArticlePoolLevel1":
  645. # 头条,先不做
  646. video_list = download_videos[:3]
  647. case _:
  648. video_list = download_videos[:3]
  649. L = []
  650. for video_obj in video_list:
  651. params = {
  652. "videoPath": video_obj['video_oss_path'],
  653. "uid": video_obj['uid'],
  654. "title": kimi_title
  655. }
  656. publish_response = await publish_to_pq(params)
  657. video_id = publish_response['data']['id']
  658. response = await get_pq_video_detail(video_id)
  659. obj = {
  660. "uid": video_obj['uid'],
  661. "source": video_obj['platform'],
  662. "kimiTitle": kimi_title,
  663. "videoId": response['data'][0]['id'],
  664. "videoCover": response['data'][0]['shareImgPath'],
  665. "videoPath": response['data'][0]['videoPath'],
  666. "videoOss": video_obj['video_oss_path']
  667. }
  668. L.append(obj)
  669. update_sql = f"""
  670. UPDATE {self.article_match_video_table}
  671. SET content_status = %s, response = %s, process_times = %s
  672. WHERE trace_id = %s and content_status = %s;
  673. """
  674. # 从操作中状态修改为已发布状态
  675. await self.mysql_client.async_insert(
  676. sql=update_sql,
  677. params=(
  678. self.const.TASK_PUBLISHED_STATUS,
  679. json.dumps(L, ensure_ascii=False),
  680. process_times + 1,
  681. trace_id,
  682. self.const.TASK_PROCESSING_STATUS
  683. )
  684. )
  685. except Exception as e:
  686. await self.roll_back_content_status_when_fails(
  687. process_times=params['process_times'] + 1,
  688. trace_id=params['trace_id']
  689. )
  690. print(e)
  691. async def start_process(self, params):
  692. """
  693. 处理单篇文章
  694. :param params:
  695. :return:
  696. """
  697. # step1: 执行 kimi 操作
  698. # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
  699. kimi_result = await self.kimi_task(params)
  700. trace_id = params['trace_id']
  701. process_times = params['process_times']
  702. content_id = params['content_id']
  703. gh_id = params['gh_id']
  704. publish_flag = params['publish_flag']
  705. print(kimi_result)
  706. if kimi_result:
  707. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  708. print("kimi success")
  709. logging(
  710. code=3001,
  711. info="kimi success",
  712. trace_id=trace_id
  713. )
  714. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  715. if spider_flag:
  716. # 等待爬虫执行完成后,开始执行 etl_task
  717. print("spider success")
  718. logging(
  719. code=3002,
  720. info="spider_success",
  721. trace_id=trace_id
  722. )
  723. etl_flag = await self.etl_task(params)
  724. if etl_flag:
  725. # 等待下载上传完成,执行发布任务
  726. print("etl success")
  727. logging(
  728. code="3003",
  729. info="etl_success",
  730. trace_id=trace_id
  731. )
  732. """
  733. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  734. 目前先对这两种情况都做托管操作
  735. """
  736. if publish_flag == self.const.DO_NOT_NEED_PUBLISH:
  737. logging(
  738. code="3013",
  739. info="不需要发布,长文系统托管发布",
  740. trace_id=trace_id
  741. )
  742. return
  743. else:
  744. try:
  745. await self.publish_task(params, kimi_result['kimi_title'])
  746. logging(
  747. code="3004",
  748. info="publish_success",
  749. trace_id=trace_id
  750. )
  751. await record_trace_id(
  752. trace_id=trace_id,
  753. status=self.const.RECORD_SUCCESS_TRACE_ID_CODE
  754. )
  755. except Exception as e:
  756. logging(
  757. code="6004",
  758. info="publish 失败--{}".format(e),
  759. trace_id=params['trace_id']
  760. )
  761. else:
  762. logging(
  763. code="6003",
  764. info="ETL 处理失败",
  765. trace_id=params['trace_id']
  766. )
  767. else:
  768. logging(
  769. code="6002",
  770. info="爬虫处理失败",
  771. trace_id=params['trace_id']
  772. )
  773. else:
  774. logging(
  775. code="6001",
  776. info="kimi 处理失败",
  777. trace_id=trace_id
  778. )
  779. if process_times >= self.const.TASK_MAX_PROCESS_TIMES:
  780. logging(
  781. code="6011",
  782. info="kimi处理次数达到上限, 放弃处理",
  783. trace_id=trace_id
  784. )
  785. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  786. update_sql = f"""
  787. UPDATE {self.article_match_video_table}
  788. SET content_status = %s
  789. WHERE content_id = %s and content_status = %s;
  790. """
  791. affected_rows = await self.mysql_client.async_insert(
  792. sql=update_sql,
  793. params=(
  794. self.const.KIMI_ILLEGAL_STATUS,
  795. content_id,
  796. self.const.TASK_INIT_STATUS
  797. )
  798. )
  799. bot(
  800. title="KIMI 处理失败",
  801. detail={
  802. "content_id": content_id,
  803. "affected_rows": affected_rows
  804. }
  805. )
  806. async def process_task(self, params):
  807. """
  808. 处理任务
  809. :return:
  810. """
  811. content_id = params['content_id']
  812. download_videos = await self.get_video_list(content_id)
  813. if not download_videos:
  814. # 开始处理, 判断是否有相同的文章 id 正在处理
  815. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  816. if processing_flag:
  817. logging(
  818. code="9001",
  819. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  820. )
  821. else:
  822. await self.start_process(params=params)
  823. else:
  824. print("存在已下载视频")
  825. async def deal(self):
  826. """
  827. function
  828. :return:
  829. """
  830. task_list = await self.get_tasks()
  831. task_dict = {}
  832. # 对 content_id去重
  833. for task in task_list:
  834. key = task['content_id']
  835. task_dict[key] = task
  836. process_list = []
  837. for item in task_dict:
  838. process_list.append(task_dict[item])
  839. logging(
  840. code="5001",
  841. info="Match Task Got {} this time".format(len(process_list)),
  842. function="Publish Task"
  843. )
  844. if task_list:
  845. total_task = len(process_list)
  846. print(process_list)
  847. a = time.time()
  848. print("开始处理,一共{}个任务".format(total_task))
  849. tasks = [self.process_task(params) for params in process_list]
  850. await asyncio.gather(*tasks)
  851. b = time.time()
  852. print("处理时间: {} s".format(b - a))
  853. else:
  854. logging(
  855. code="9008",
  856. info="没有要处理的请求"
  857. )