|
@@ -4,6 +4,7 @@
|
|
|
import json
|
|
|
import time
|
|
|
import asyncio
|
|
|
+import traceback
|
|
|
|
|
|
from typing import List, Dict
|
|
|
|
|
@@ -261,25 +262,30 @@ class NewContentIdTask(object):
|
|
|
trace_id = params['trace_id']
|
|
|
if params.get("root_content_id"):
|
|
|
kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
|
|
|
- affected_rows = await self.update_content_status(
|
|
|
- new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
- trace_id=trace_id,
|
|
|
- ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
|
- )
|
|
|
- if affected_rows == 0:
|
|
|
+ if kimi_result:
|
|
|
+ affected_rows = await self.update_content_status(
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
+ trace_id=trace_id,
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
|
+ )
|
|
|
+ if affected_rows == 0:
|
|
|
+ logging(
|
|
|
+ code="6000",
|
|
|
+ info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
+ )
|
|
|
+ return
|
|
|
logging(
|
|
|
- code="6000",
|
|
|
- info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
+ code="8023",
|
|
|
+ function="kimi_task",
|
|
|
+ trace_id=trace_id,
|
|
|
+ info="从root_content_id获取结果",
|
|
|
+ data=params
|
|
|
)
|
|
|
- return
|
|
|
- logging(
|
|
|
- code="8023",
|
|
|
- function="kimi_task",
|
|
|
- trace_id=trace_id,
|
|
|
- info="从root_content_id获取结果",
|
|
|
- data=params
|
|
|
- )
|
|
|
- return kimi_result
|
|
|
+ return kimi_result
|
|
|
+ else:
|
|
|
+ params.pop('root_content_id', None)
|
|
|
+ print(params)
|
|
|
+ return await self.kimi_task(params)
|
|
|
|
|
|
# 处理content_id
|
|
|
content_id = params['content_id']
|
|
@@ -321,7 +327,7 @@ class NewContentIdTask(object):
|
|
|
)
|
|
|
return
|
|
|
try:
|
|
|
- kimi_result = await get_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
|
|
|
+ kimi_result = await generate_kimi_result(content_id=content_id, article_text_table=self.article_text_table, db_client=self.long_articles_client)
|
|
|
await self.update_content_status(
|
|
|
new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
trace_id=trace_id,
|
|
@@ -330,6 +336,8 @@ class NewContentIdTask(object):
|
|
|
return kimi_result
|
|
|
except Exception as e:
|
|
|
# kimi 任务处理失败
|
|
|
+ error = traceback.format_exc()
|
|
|
+ print(error)
|
|
|
update_kimi_sql = f"""
|
|
|
UPDATE {self.article_text_table}
|
|
|
SET
|
|
@@ -361,28 +369,32 @@ class NewContentIdTask(object):
|
|
|
gh_id = params['gh_id']
|
|
|
if params.get("root_content_id"):
|
|
|
# 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
|
|
|
- await update_crawler_table_with_exist_content_id(
|
|
|
+ update_rows = await update_crawler_table_with_exist_content_id(
|
|
|
content_id=content_id,
|
|
|
trace_id=trace_id,
|
|
|
article_crawler_video_table=self.article_crawler_video_table,
|
|
|
db_client=self.long_articles_client,
|
|
|
root_content_id=params['root_content_id']
|
|
|
)
|
|
|
- affected_rows = await self.update_content_status(
|
|
|
- new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
- trace_id=trace_id,
|
|
|
- ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
|
|
|
- )
|
|
|
- if affected_rows == 0:
|
|
|
- return
|
|
|
- logging(
|
|
|
- code="8024",
|
|
|
- function="spider_task",
|
|
|
- trace_id=trace_id,
|
|
|
- info="从root_content_id获取结果",
|
|
|
- data=params
|
|
|
- )
|
|
|
- return True
|
|
|
+ if update_rows:
|
|
|
+ affected_rows = await self.update_content_status(
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
+ trace_id=trace_id,
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
|
|
|
+ )
|
|
|
+ if affected_rows == 0:
|
|
|
+ return
|
|
|
+ logging(
|
|
|
+ code="8024",
|
|
|
+ function="spider_task",
|
|
|
+ trace_id=trace_id,
|
|
|
+ info="从root_content_id获取结果",
|
|
|
+ data=params
|
|
|
+ )
|
|
|
+ return True
|
|
|
+ else:
|
|
|
+ params.pop("root_content_id", None)
|
|
|
+ return await self.spider_task(params, kimi_result)
|
|
|
|
|
|
download_video_exist_flag = await whether_downloaded_videos_exists(
|
|
|
content_id=content_id,
|
|
@@ -741,7 +753,7 @@ class NewContentIdTask(object):
|
|
|
root_content_id = await self.get_source_content_id(content_id)
|
|
|
if root_content_id:
|
|
|
# 传参新增root_content_id
|
|
|
- params['root_content_id'] = root_content_id[0][0]
|
|
|
+ params['root_content_id'] = root_content_id
|
|
|
# 开始处理
|
|
|
await self.start_process(params=params)
|
|
|
else:
|
|
@@ -753,13 +765,13 @@ class NewContentIdTask(object):
|
|
|
:return:
|
|
|
"""
|
|
|
# 处理未托管的任务
|
|
|
- await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
|
|
|
+ # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
|
|
|
|
|
|
# 处理托管任务
|
|
|
- await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
|
|
|
+ # await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
|
|
|
|
|
|
# 将处理次数大于3次且未成功的任务置为失败
|
|
|
- await self.set_tasks_status_fail()
|
|
|
+ # await self.set_tasks_status_fail()
|
|
|
|
|
|
# 获取task_list
|
|
|
task_list = await self.get_tasks()
|