new_contentId_task.py 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924
  1. """
  2. @author: luojunhui
  3. """
  4. import json
  5. import time
  6. from applications.config import Config
  7. from applications.config.const import new_content_id_task as NewContentIdTaskConst
  8. from applications.log import logging
  9. from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
  10. from applications.functions.common import shuffle_list
  11. from applications.functions.kimi import KimiServer
  12. from applications.spider import search_videos_from_web
  13. from applications.etl_function import *
  14. from applications.feishu import bot
  15. from applications.functions.aigc import record_trace_id
  16. class NewContentIdTask(object):
  17. """
  18. 不存在历史已经发布的文章的匹配流程
  19. """
  20. def __init__(self, mysql_client):
  21. self.mysql_client = mysql_client
  22. self.config = Config()
  23. self.article_match_video_table = self.config.article_match_video_table
  24. self.article_text_table = self.config.article_text_table
  25. self.article_crawler_video_table = self.config.article_crawler_video_table
  26. self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
  27. self.account_map = json.loads(self.config.get_config_value("accountMap"))
  28. self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
  29. async def get_tasks(self):
  30. """
  31. 获取 task
  32. :return:
  33. """
  34. # 处理未托管的任务
  35. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
  36. # 处理托管任务
  37. await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
  38. # 将 process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
  39. update_status_sql = f"""
  40. UPDATE
  41. {self.article_match_video_table}
  42. SET
  43. content_status = %s
  44. WHERE
  45. process_times > %s and content_status not in (%s, %s);
  46. """
  47. await self.mysql_client.async_insert(
  48. update_status_sql,
  49. params=(
  50. NewContentIdTaskConst.TASK_FAIL_STATUS,
  51. NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
  52. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  53. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  54. )
  55. )
  56. # 获取 process_times <= 3 且 content_status = 0 的任务
  57. select_sql = f"""
  58. SELECT
  59. t1.trace_id, t1.content_id, t1.flow_pool_level, t1.gh_id, t1.process_times, t1.publish_flag
  60. FROM
  61. {self.article_match_video_table} t1
  62. LEFT JOIN (
  63. SELECT content_id, count(1) as cnt
  64. FROM {self.article_crawler_video_table}
  65. WHERE download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  66. GROUP BY content_id
  67. ) t2
  68. ON t1.content_id = t2.content_id
  69. WHERE
  70. t1.content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
  71. AND t1.process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  72. AND t2.cnt IS NULL
  73. ORDER BY flow_pool_level, request_timestamp
  74. LIMIT {self.spider_coroutines};
  75. """
  76. tasks = await self.mysql_client.async_select(select_sql)
  77. if tasks:
  78. return [
  79. {
  80. "trace_id": i[0],
  81. "content_id": i[1],
  82. "flow_pool_level": i[2],
  83. "gh_id": i[3],
  84. "process_times": i[4],
  85. "publish_flag": i[5]
  86. }
  87. for i in tasks
  88. ]
  89. else:
  90. return []
  91. async def roll_back_unfinished_tasks(self, publish_flag):
  92. """
  93. 将长时间处于中间状态的任务回滚
  94. """
  95. # 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
  96. if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
  97. processing_status_tuple = (
  98. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  99. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  100. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  101. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  102. )
  103. elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  104. processing_status_tuple = (
  105. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  106. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  107. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
  108. )
  109. else:
  110. return
  111. select_processing_sql = f"""
  112. SELECT
  113. trace_id, content_status_update_time, process_times, content_status
  114. FROM
  115. {self.article_match_video_table}
  116. WHERE
  117. content_status in {processing_status_tuple}
  118. and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
  119. and publish_flag = {publish_flag};
  120. """
  121. processing_articles = await self.mysql_client.async_select(select_processing_sql)
  122. if processing_articles:
  123. processing_list = [
  124. {
  125. "trace_id": item[0],
  126. "content_status_update_time": item[1],
  127. "process_times": item[2],
  128. "content_status": item[3]
  129. }
  130. for item in processing_articles
  131. ]
  132. for obj in processing_list:
  133. if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
  134. # 认为该任务失败
  135. await self.roll_back_content_status_when_fails(
  136. process_times=obj['process_times'] + 1,
  137. trace_id=obj['trace_id'],
  138. ori_content_status=obj['content_status']
  139. )
  140. async def get_video_list(self, content_id):
  141. """
  142. 判断该文章是否存在历史匹配视频
  143. :param content_id
  144. :return:
  145. """
  146. sql = f"""
  147. SELECT id
  148. FROM {self.article_crawler_video_table}
  149. WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  150. """
  151. res_tuple = await self.mysql_client.async_select(sql)
  152. if len(res_tuple) >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  153. return True
  154. else:
  155. return False
  156. async def update_content_status(self, new_content_status, trace_id, ori_content_status):
  157. """
  158. :param new_content_status:
  159. :param trace_id:
  160. :param ori_content_status:
  161. :return:
  162. """
  163. update_sql = f"""
  164. UPDATE {self.article_match_video_table}
  165. SET content_status = %s, content_status_update_time = %s
  166. WHERE trace_id = %s and content_status = %s;
  167. """
  168. row_counts = await self.mysql_client.async_insert(
  169. sql=update_sql,
  170. params=(
  171. new_content_status,
  172. int(time.time()),
  173. trace_id,
  174. ori_content_status
  175. )
  176. )
  177. return row_counts
  178. async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
  179. """
  180. 处理失败,回滚至初始状态,处理次数加 1
  181. :param process_times:
  182. :param trace_id:
  183. :param ori_content_status:
  184. :return:
  185. """
  186. update_article_sql = f"""
  187. UPDATE {self.article_match_video_table}
  188. SET
  189. content_status = %s,
  190. content_status_update_time = %s,
  191. process_times = %s
  192. WHERE trace_id = %s and content_status = %s;
  193. """
  194. await self.mysql_client.async_insert(
  195. sql=update_article_sql,
  196. params=(
  197. NewContentIdTaskConst.TASK_INIT_STATUS,
  198. int(time.time()),
  199. process_times + 1,
  200. trace_id,
  201. ori_content_status
  202. )
  203. )
  204. async def judge_whether_same_content_id_is_processing(self, content_id):
  205. """
  206. 同一个 content_id 只需要处理一次
  207. :param content_id:
  208. :return:
  209. success: 4
  210. init: 0
  211. fail: 99
  212. todo: 存在处理失败的content_id是否需要不再处理
  213. """
  214. select_sql = f"""
  215. SELECT distinct content_status
  216. FROM {self.article_match_video_table}
  217. WHERE content_id = '{content_id}';
  218. """
  219. result = await self.mysql_client.async_select(select_sql)
  220. if result:
  221. for item in result:
  222. content_status = item[0]
  223. # if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
  224. if content_status in {
  225. NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  226. NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  227. NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  228. NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  229. NewContentIdTaskConst.TASK_PUBLISHED_STATUS
  230. }:
  231. return True
  232. return False
  233. else:
  234. return False
  235. async def get_downloaded_videos(self, content_id):
  236. """
  237. 获取已下载的视频
  238. :return:
  239. """
  240. sql = f"""
  241. SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
  242. FROM {self.article_crawler_video_table}
  243. WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  244. ORDER BY score DESC;
  245. """
  246. res_tuple = await self.mysql_client.async_select(sql)
  247. return [
  248. {
  249. "platform": i[0],
  250. "play_count": i[1],
  251. "like_count": i[2],
  252. "video_oss_path": i[3],
  253. "cover_oss_path": i[4],
  254. "uid": i[5]
  255. }
  256. for i in res_tuple
  257. ]
  258. async def get_kimi_status(self, content_id):
  259. """
  260. 通过 content_id 获取kimi info
  261. :return:
  262. """
  263. select_sql = f"""
  264. select kimi_status
  265. from {self.article_text_table}
  266. where content_id = '{content_id}';
  267. """
  268. response = await self.mysql_client.async_select(select_sql)
  269. if response:
  270. kimi_status = response[0][0]
  271. return kimi_status
  272. else:
  273. return NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR
  274. async def kimi_task(self, params):
  275. """
  276. 执行 kimi 任务
  277. :return:
  278. """
  279. content_id = params['content_id']
  280. trace_id = params['trace_id']
  281. process_times = params['process_times']
  282. kimi_status_code = await self.get_kimi_status(content_id=content_id)
  283. if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
  284. affected_rows = await self.update_content_status(
  285. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  286. trace_id=trace_id,
  287. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  288. )
  289. if affected_rows == 0:
  290. logging(
  291. code="6000",
  292. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  293. )
  294. return
  295. get_kimi_sql = f"""
  296. SELECT article_title, kimi_title, kimi_summary, kimi_keys
  297. FROM {self.article_text_table}
  298. WHERE content_id = '{content_id}';
  299. """
  300. kimi_info = await self.mysql_client.async_select(get_kimi_sql)
  301. return {
  302. "kimi_title": kimi_info[0][1],
  303. "ori_title": kimi_info[0][0],
  304. "kimi_summary": kimi_info[0][2],
  305. "kimi_keys": json.loads(kimi_info[0][3])
  306. }
  307. elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
  308. logging(
  309. code="4000",
  310. info="long_articles_text表中未找到 content_id"
  311. )
  312. else:
  313. # 开始处理,讲 content_status 从 0 改为 101
  314. affected_rows = await self.update_content_status(
  315. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  316. trace_id=trace_id,
  317. ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
  318. )
  319. if affected_rows == 0:
  320. logging(
  321. code="6000",
  322. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  323. )
  324. return
  325. K = KimiServer()
  326. try:
  327. select_sql = f"""
  328. select article_title, article_text
  329. from {self.article_text_table}
  330. where content_id = '{content_id}'
  331. """
  332. res = await self.mysql_client.async_select(select_sql)
  333. article_obj = {
  334. "article_title": res[0][0],
  335. "article_text": res[0][1],
  336. "content_id": content_id
  337. }
  338. kimi_info = await K.search_kimi_schedule(params=article_obj)
  339. kimi_title = kimi_info['k_title']
  340. content_title = kimi_info['content_title'].replace("'", "").replace('"', "")
  341. content_keys = json.dumps(kimi_info['content_keys'], ensure_ascii=False)
  342. update_kimi_sql = f"""
  343. UPDATE {self.article_text_table}
  344. SET
  345. kimi_title = %s,
  346. kimi_summary = %s,
  347. kimi_keys = %s,
  348. kimi_status = %s
  349. WHERE content_id = %s;"""
  350. await self.mysql_client.async_insert(
  351. sql=update_kimi_sql,
  352. params=(
  353. kimi_title, content_title, content_keys, NewContentIdTaskConst.KIMI_SUCCESS_STATUS, params['content_id'])
  354. )
  355. await self.update_content_status(
  356. new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  357. trace_id=trace_id,
  358. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  359. )
  360. return {
  361. "kimi_title": kimi_title,
  362. "ori_title": article_obj['article_title'],
  363. "kimi_summary": content_title,
  364. "kimi_keys": kimi_info['content_keys']
  365. }
  366. except Exception as e:
  367. # kimi 任务处理失败
  368. update_kimi_sql = f"""
  369. UPDATE {self.article_text_table}
  370. SET
  371. kimi_status = %s
  372. WHERE content_id = %s
  373. """
  374. await self.mysql_client.async_insert(
  375. sql=update_kimi_sql,
  376. params=(
  377. NewContentIdTaskConst.KIMI_FAIL_STATUS,
  378. content_id
  379. )
  380. )
  381. # 将状态由 101 回退为 0
  382. await self.roll_back_content_status_when_fails(
  383. process_times=process_times,
  384. trace_id=trace_id
  385. )
  386. return {}
  387. async def spider_task(self, params, kimi_result):
  388. """
  389. 爬虫任务
  390. :return:
  391. """
  392. trace_id = params['trace_id']
  393. content_id = params['content_id']
  394. process_times = params['process_times']
  395. gh_id = params['gh_id']
  396. select_sql = f"""
  397. SELECT count(id)
  398. FROM {self.article_crawler_video_table}
  399. WHERE content_id = '{content_id}'
  400. AND download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  401. """
  402. count_tuple = await self.mysql_client.async_select(select_sql)
  403. counts = count_tuple[0][0]
  404. if counts >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  405. await self.update_content_status(
  406. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  407. trace_id=trace_id,
  408. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
  409. )
  410. return True
  411. # 开始处理,将状态由 1 改成 101
  412. affected_rows = await self.update_content_status(
  413. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  414. ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
  415. trace_id=trace_id
  416. )
  417. if affected_rows == 0:
  418. logging(
  419. code="6000",
  420. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  421. )
  422. return False
  423. try:
  424. logging(
  425. code="spider_1001",
  426. info="开始执行搜索任务",
  427. trace_id=trace_id,
  428. data=kimi_result
  429. )
  430. search_videos_count = await search_videos_from_web(
  431. info={
  432. "ori_title": kimi_result['ori_title'],
  433. "kimi_summary": kimi_result['kimi_summary'],
  434. "kimi_keys": kimi_result['kimi_keys'],
  435. "trace_id": trace_id,
  436. "gh_id": gh_id,
  437. "content_id": content_id,
  438. "crawler_video_table": self.article_crawler_video_table
  439. },
  440. gh_id_map=self.account_map,
  441. db_client=self.mysql_client
  442. )
  443. if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  444. # 表示爬虫任务执行成功, 将状态从 101 改为 2
  445. logging(
  446. code="spider_1002",
  447. info="搜索成功",
  448. trace_id=trace_id,
  449. data=kimi_result
  450. )
  451. await self.update_content_status(
  452. new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  453. trace_id=trace_id,
  454. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  455. )
  456. return True
  457. else:
  458. logging(
  459. code="spider_1003",
  460. info="搜索失败",
  461. trace_id=trace_id,
  462. data=kimi_result
  463. )
  464. await self.roll_back_content_status_when_fails(
  465. process_times=process_times + 1,
  466. trace_id=trace_id
  467. )
  468. return False
  469. except Exception as e:
  470. await self.roll_back_content_status_when_fails(
  471. process_times=process_times + 1,
  472. trace_id=trace_id
  473. )
  474. print("爬虫处理失败: {}".format(e))
  475. return False
  476. async def etl_task(self, params):
  477. """
  478. download && upload videos
  479. :param params:
  480. :return:
  481. """
  482. trace_id = params['trace_id']
  483. content_id = params['content_id']
  484. process_times = params['process_times']
  485. # 判断是否有三条已经下载完成的视频
  486. select_sql = f"""
  487. select count(id)
  488. from {self.article_crawler_video_table}
  489. where content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
  490. """
  491. video_count_tuple = await self.mysql_client.async_select(select_sql)
  492. video_count = video_count_tuple[0][0]
  493. if video_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  494. affect_rows = await self.update_content_status(
  495. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  496. trace_id=trace_id,
  497. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  498. )
  499. if affect_rows == 0:
  500. logging(
  501. code="6000",
  502. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  503. )
  504. return False
  505. return True
  506. else:
  507. # 开始处理, 将文章状态修改为处理状态
  508. affected_rows = await self.update_content_status(
  509. ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
  510. trace_id=trace_id,
  511. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  512. )
  513. if affected_rows == 0:
  514. logging(
  515. code="6000",
  516. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  517. )
  518. return False
  519. select_sql = f"""
  520. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  521. FROM {self.article_crawler_video_table}
  522. WHERE content_id = '{content_id}' and download_status != {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  523. ORDER BY score DESC;
  524. """
  525. videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
  526. downloaded_count = 0
  527. for line in videos_need_to_download_tuple:
  528. params = {
  529. "id": line[0],
  530. "video_id": line[1],
  531. "platform": line[2],
  532. "video_title": line[3],
  533. "video_url": line[4],
  534. "cover_url": line[5],
  535. "user_id": line[6],
  536. "trace_id": line[7]
  537. }
  538. try:
  539. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  540. # download videos
  541. file_path = await download_video(
  542. file_path=local_video_path,
  543. platform=params['platform'],
  544. video_url=params['video_url']
  545. )
  546. if not file_path:
  547. # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
  548. update_sql = f"""
  549. UPDATE {self.article_crawler_video_table}
  550. SET download_status = %s
  551. WHERE id = %s;
  552. """
  553. await self.mysql_client.async_insert(
  554. sql=update_sql,
  555. params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  556. )
  557. logging(
  558. code="etl_1001",
  559. info="etl_下载视频失败",
  560. trace_id=trace_id,
  561. function="etl_task"
  562. )
  563. else:
  564. # download cover
  565. cover_path = await download_cover(
  566. file_path=local_cover_path,
  567. platform=params['platform'],
  568. cover_url=params['cover_url']
  569. )
  570. # upload video to oss
  571. oss_video = await upload_to_oss(
  572. local_video_path=file_path,
  573. download_type="video"
  574. )
  575. # upload cover to oss
  576. if cover_path:
  577. oss_cover = await upload_to_oss(
  578. local_video_path=cover_path,
  579. download_type="image"
  580. )
  581. else:
  582. oss_cover = None
  583. # change status to success
  584. update_sql = f"""
  585. UPDATE {self.article_crawler_video_table}
  586. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  587. WHERE id = %s;
  588. """
  589. await self.mysql_client.async_insert(
  590. sql=update_sql,
  591. params=(
  592. oss_video,
  593. oss_cover,
  594. NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS,
  595. params['id']
  596. )
  597. )
  598. downloaded_count += 1
  599. logging(
  600. code="etl_1002",
  601. info="etl_视频下载成功",
  602. trace_id=trace_id,
  603. function="etl_task"
  604. )
  605. # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
  606. if downloaded_count > NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
  607. await self.update_content_status(
  608. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  609. trace_id=trace_id,
  610. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  611. )
  612. return True
  613. except Exception as e:
  614. update_sql = f"""
  615. UPDATE {self.article_crawler_video_table}
  616. SET download_status = %s
  617. WHERE id = %s;
  618. """
  619. await self.mysql_client.async_insert(
  620. sql=update_sql,
  621. params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  622. )
  623. logging(
  624. code="etl_1001",
  625. info="etl_下载视频失败",
  626. trace_id=trace_id,
  627. function="etl_task"
  628. )
  629. if downloaded_count >= 3:
  630. await self.update_content_status(
  631. ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
  632. trace_id=trace_id,
  633. new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
  634. )
  635. return True
  636. else:
  637. await self.roll_back_content_status_when_fails(
  638. process_times=process_times + 1,
  639. trace_id=trace_id
  640. )
  641. return False
  642. async def publish_task(self, params, kimi_title):
  643. """
  644. 发布任务
  645. :param kimi_title:
  646. :param params:
  647. :return:
  648. """
  649. gh_id = params['gh_id']
  650. flow_pool_level = params['flow_pool_level']
  651. content_id = params['content_id']
  652. trace_id = params['trace_id']
  653. process_times = params['process_times']
  654. # 开始处理,将状态修改为操作状态
  655. affected_rows = await self.update_content_status(
  656. ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
  657. trace_id=trace_id,
  658. new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
  659. )
  660. if affected_rows == 0:
  661. logging(
  662. code="6000",
  663. info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
  664. )
  665. return False
  666. try:
  667. download_videos = await self.get_downloaded_videos(content_id)
  668. match flow_pool_level:
  669. case "autoArticlePoolLevel4":
  670. # 冷启层, 全量做
  671. video_list = shuffle_list(download_videos)[:3]
  672. case "autoArticlePoolLevel3":
  673. if self.gh_id_dict.get(gh_id):
  674. video_list = shuffle_list(download_videos)[:3]
  675. else:
  676. video_list = download_videos[:3]
  677. case "autoArticlePoolLevel2":
  678. # 次条,只针对具体账号做
  679. video_list = []
  680. case "autoArticlePoolLevel1":
  681. # 头条,先不做
  682. video_list = download_videos[:3]
  683. case _:
  684. video_list = download_videos[:3]
  685. L = []
  686. for video_obj in video_list:
  687. params = {
  688. "videoPath": video_obj['video_oss_path'],
  689. "uid": video_obj['uid'],
  690. "title": kimi_title
  691. }
  692. publish_response = await publish_to_pq(params)
  693. video_id = publish_response['data']['id']
  694. response = await get_pq_video_detail(video_id)
  695. obj = {
  696. "uid": video_obj['uid'],
  697. "source": video_obj['platform'],
  698. "kimiTitle": kimi_title,
  699. "videoId": response['data'][0]['id'],
  700. "videoCover": response['data'][0]['shareImgPath'],
  701. "videoPath": response['data'][0]['videoPath'],
  702. "videoOss": video_obj['video_oss_path']
  703. }
  704. L.append(obj)
  705. update_sql = f"""
  706. UPDATE {self.article_match_video_table}
  707. SET content_status = %s, response = %s, process_times = %s
  708. WHERE trace_id = %s and content_status = %s;
  709. """
  710. # 从操作中状态修改为已发布状态
  711. await self.mysql_client.async_insert(
  712. sql=update_sql,
  713. params=(
  714. NewContentIdTaskConst.TASK_PUBLISHED_STATUS,
  715. json.dumps(L, ensure_ascii=False),
  716. process_times + 1,
  717. trace_id,
  718. NewContentIdTaskConst.TASK_PROCESSING_STATUS
  719. )
  720. )
  721. except Exception as e:
  722. await self.roll_back_content_status_when_fails(
  723. process_times=params['process_times'] + 1,
  724. trace_id=params['trace_id']
  725. )
  726. print(e)
  727. async def start_process(self, params):
  728. """
  729. 处理单篇文章
  730. :param params:
  731. :return:
  732. """
  733. # step1: 执行 kimi 操作
  734. # time.sleep(5) # 测试多个进程操作同一个 task 的等待时间
  735. kimi_result = await self.kimi_task(params)
  736. trace_id = params['trace_id']
  737. process_times = params['process_times']
  738. content_id = params['content_id']
  739. gh_id = params['gh_id']
  740. publish_flag = params['publish_flag']
  741. print(kimi_result)
  742. if kimi_result:
  743. # 等待 kimi 操作执行完成之后,开始执行 spider_task
  744. print("kimi success")
  745. logging(
  746. code=3001,
  747. info="kimi success",
  748. trace_id=trace_id
  749. )
  750. spider_flag = await self.spider_task(params=params, kimi_result=kimi_result)
  751. if spider_flag:
  752. # 等待爬虫执行完成后,开始执行 etl_task
  753. print("spider success")
  754. logging(
  755. code=3002,
  756. info="spider_success",
  757. trace_id=trace_id
  758. )
  759. etl_flag = await self.etl_task(params)
  760. if etl_flag:
  761. # 等待下载上传完成,执行发布任务
  762. print("etl success")
  763. logging(
  764. code="3003",
  765. info="etl_success",
  766. trace_id=trace_id
  767. )
  768. """
  769. todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
  770. 目前先对这两种情况都做托管操作
  771. """
  772. if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
  773. logging(
  774. code="3013",
  775. info="不需要发布,长文系统托管发布",
  776. trace_id=trace_id
  777. )
  778. return
  779. else:
  780. try:
  781. await self.publish_task(params, kimi_result['kimi_title'])
  782. logging(
  783. code="3004",
  784. info="publish_success",
  785. trace_id=trace_id
  786. )
  787. await record_trace_id(
  788. trace_id=trace_id,
  789. status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
  790. )
  791. except Exception as e:
  792. logging(
  793. code="6004",
  794. info="publish 失败--{}".format(e),
  795. trace_id=params['trace_id']
  796. )
  797. else:
  798. logging(
  799. code="6003",
  800. info="ETL 处理失败",
  801. trace_id=params['trace_id']
  802. )
  803. else:
  804. logging(
  805. code="6002",
  806. info="爬虫处理失败",
  807. trace_id=params['trace_id']
  808. )
  809. else:
  810. logging(
  811. code="6001",
  812. info="kimi 处理失败",
  813. trace_id=trace_id
  814. )
  815. if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
  816. logging(
  817. code="6011",
  818. info="kimi处理次数达到上限, 放弃处理",
  819. trace_id=trace_id
  820. )
  821. # 将相同的content_id && content_status = 0的状态修改为kimi 失败状态
  822. update_sql = f"""
  823. UPDATE {self.article_match_video_table}
  824. SET content_status = %s
  825. WHERE content_id = %s and content_status = %s;
  826. """
  827. affected_rows = await self.mysql_client.async_insert(
  828. sql=update_sql,
  829. params=(
  830. NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
  831. content_id,
  832. NewContentIdTaskConst.TASK_INIT_STATUS
  833. )
  834. )
  835. # 查询出该content_id所对应的标题
  836. select_sql = f"""
  837. SELECT article_title
  838. FROM {self.article_text_table}
  839. WHERE content_id = '{content_id}';
  840. """
  841. result = await self.mysql_client.async_select(select_sql)
  842. bot(
  843. title="KIMI 处理失败",
  844. detail={
  845. "content_id": content_id,
  846. "affected_rows": affected_rows,
  847. "article_title": result[0][0]
  848. },
  849. mention=False
  850. )
  851. async def process_task(self, params):
  852. """
  853. 处理任务
  854. :return:
  855. """
  856. content_id = params['content_id']
  857. download_videos = await self.get_video_list(content_id)
  858. if not download_videos:
  859. # 开始处理, 判断是否有相同的文章 id 正在处理
  860. processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
  861. if processing_flag:
  862. logging(
  863. code="9001",
  864. info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
  865. )
  866. else:
  867. await self.start_process(params=params)
  868. else:
  869. print("存在已下载视频")
  870. async def deal(self):
  871. """
  872. function
  873. :return:
  874. """
  875. task_list = await self.get_tasks()
  876. task_dict = {}
  877. # 对 content_id去重
  878. for task in task_list:
  879. key = task['content_id']
  880. task_dict[key] = task
  881. process_list = []
  882. for item in task_dict:
  883. process_list.append(task_dict[item])
  884. logging(
  885. code="5001",
  886. info="Match Task Got {} this time".format(len(process_list)),
  887. function="Publish Task"
  888. )
  889. if task_list:
  890. total_task = len(process_list)
  891. print(process_list)
  892. a = time.time()
  893. print("开始处理,一共{}个任务".format(total_task))
  894. tasks = [self.process_task(params) for params in process_list]
  895. await asyncio.gather(*tasks)
  896. b = time.time()
  897. print("处理时间: {} s".format(b - a))
  898. else:
  899. logging(
  900. code="9008",
  901. info="没有要处理的请求"
  902. )