|
@@ -220,9 +220,9 @@ class NewContentIdTask(object):
|
|
|
if content_status in {
|
|
|
NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
- NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
|
|
|
+ # NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
|
|
|
NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
|
- NewContentIdTaskConst.TASK_PUBLISHED_STATUS
|
|
|
+ # NewContentIdTaskConst.TASK_PUBLISHED_STATUS
|
|
|
}:
|
|
|
return True
|
|
|
return False
|
|
@@ -276,50 +276,6 @@ class NewContentIdTask(object):
|
|
|
:return:
|
|
|
"""
|
|
|
trace_id = params['trace_id']
|
|
|
- if params.get("root_content_id"):
|
|
|
- kimi_result = await get_kimi_result(content_id=params['root_content_id'], article_text_table=self.article_text_table, db_client=self.long_articles_client)
|
|
|
- if kimi_result:
|
|
|
- affected_rows = await self.update_content_status(
|
|
|
- new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
- trace_id=trace_id,
|
|
|
- ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
|
- )
|
|
|
- if affected_rows == 0:
|
|
|
- logging(
|
|
|
- code="6000",
|
|
|
- info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
- )
|
|
|
- return
|
|
|
-
|
|
|
- # 将root_content_id的kimi结果更新到content_id
|
|
|
- content_id = params['content_id']
|
|
|
- update_count = await update_kimi_status(
|
|
|
- kimi_info=kimi_result,
|
|
|
- content_id=content_id,
|
|
|
- db_client=self.long_articles_client,
|
|
|
- article_text_table=self.article_text_table,
|
|
|
- success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
|
|
|
- init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
|
|
|
- )
|
|
|
-
|
|
|
- if update_count == 0:
|
|
|
- logging(
|
|
|
- code="6000",
|
|
|
- info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
- )
|
|
|
- return
|
|
|
- logging(
|
|
|
- code="8023",
|
|
|
- function="kimi_task",
|
|
|
- trace_id=trace_id,
|
|
|
- info="从root_content_id获取结果",
|
|
|
- data=params
|
|
|
- )
|
|
|
- return kimi_result
|
|
|
- else:
|
|
|
- params.pop('root_content_id', None)
|
|
|
- print(params)
|
|
|
- return await self.kimi_task(params)
|
|
|
|
|
|
# 处理content_id
|
|
|
content_id = params['content_id']
|
|
@@ -348,6 +304,53 @@ class NewContentIdTask(object):
|
|
|
)
|
|
|
|
|
|
else:
|
|
|
+ # 校验是否存在root_content_id
|
|
|
+ if params.get("root_content_id"):
|
|
|
+ kimi_result = await get_kimi_result(content_id=params['root_content_id'],
|
|
|
+ article_text_table=self.article_text_table,
|
|
|
+ db_client=self.long_articles_client)
|
|
|
+ if kimi_result:
|
|
|
+ affected_rows = await self.update_content_status(
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
+ trace_id=trace_id,
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
|
+ )
|
|
|
+ if affected_rows == 0:
|
|
|
+ logging(
|
|
|
+ code="6000",
|
|
|
+ info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
+ )
|
|
|
+ return
|
|
|
+
|
|
|
+ # 将root_content_id的kimi结果更新到content_id
|
|
|
+ content_id = params['content_id']
|
|
|
+ update_count = await update_kimi_status(
|
|
|
+ kimi_info=kimi_result,
|
|
|
+ content_id=content_id,
|
|
|
+ db_client=self.long_articles_client,
|
|
|
+ article_text_table=self.article_text_table,
|
|
|
+ success_status=NewContentIdTaskConst.KIMI_SUCCESS_STATUS,
|
|
|
+ init_status=NewContentIdTaskConst.KIMI_INIT_STATUS
|
|
|
+ )
|
|
|
+
|
|
|
+ if update_count == 0:
|
|
|
+ logging(
|
|
|
+ code="6000",
|
|
|
+ info="多个进程抢占同一个任务的执行状态锁,抢占失败,return"
|
|
|
+ )
|
|
|
+ return
|
|
|
+ logging(
|
|
|
+ code="8023",
|
|
|
+ function="kimi_task",
|
|
|
+ trace_id=trace_id,
|
|
|
+ info="从root_content_id获取结果",
|
|
|
+ data=params
|
|
|
+ )
|
|
|
+ return kimi_result
|
|
|
+ else:
|
|
|
+ params.pop('root_content_id', None)
|
|
|
+ return await self.kimi_task(params)
|
|
|
+
|
|
|
# 开始处理,讲 content_status 从 0 改为 101
|
|
|
affected_rows = await self.update_content_status(
|
|
|
new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
@@ -404,6 +407,20 @@ class NewContentIdTask(object):
|
|
|
content_id = params['content_id']
|
|
|
process_times = params['process_times']
|
|
|
gh_id = params['gh_id']
|
|
|
+
|
|
|
+ download_video_exist_flag = await whether_downloaded_videos_exists(
|
|
|
+ content_id=content_id,
|
|
|
+ article_crawler_video_table=self.article_crawler_video_table,
|
|
|
+ db_client=self.long_articles_client
|
|
|
+ )
|
|
|
+ if download_video_exist_flag:
|
|
|
+ await self.update_content_status(
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
+ trace_id=trace_id,
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
|
|
|
+ )
|
|
|
+ return True
|
|
|
+ # 判断是否存在root_content_id
|
|
|
if params.get("root_content_id"):
|
|
|
# 从爬虫表获取root_content_id的视频,并且写入爬虫表,将记录状态由1 --> 2
|
|
|
update_rows = await update_crawler_table_with_exist_content_id(
|
|
@@ -433,19 +450,6 @@ class NewContentIdTask(object):
|
|
|
params.pop("root_content_id", None)
|
|
|
return await self.spider_task(params, kimi_result)
|
|
|
|
|
|
- download_video_exist_flag = await whether_downloaded_videos_exists(
|
|
|
- content_id=content_id,
|
|
|
- article_crawler_video_table=self.article_crawler_video_table,
|
|
|
- db_client=self.long_articles_client
|
|
|
- )
|
|
|
- if download_video_exist_flag:
|
|
|
- await self.update_content_status(
|
|
|
- new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
- trace_id=trace_id,
|
|
|
- ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
|
|
|
- )
|
|
|
- return True
|
|
|
-
|
|
|
# 开始处理,将状态由 1 改成 101
|
|
|
affected_rows = await self.update_content_status(
|
|
|
new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
@@ -644,12 +648,31 @@ class NewContentIdTask(object):
|
|
|
)
|
|
|
print(e)
|
|
|
|
|
|
+ async def record_for_audit(self, content_id):
|
|
|
+ """
|
|
|
+ 视频下载成功后,记录到audit表中
|
|
|
+ """
|
|
|
+ insert_sql = f"""
|
|
|
+ INSERT IGNORE INTO long_articles_title_audit
|
|
|
+ (content_id, create_timestamp)
|
|
|
+ VALUES
|
|
|
+ (%s, %s);
|
|
|
+ """
|
|
|
+ await self.long_articles_client.async_insert(
|
|
|
+ sql=insert_sql,
|
|
|
+ params=(
|
|
|
+ content_id,
|
|
|
+ int(time.time() * 1000)
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
async def start_process(self, params):
|
|
|
"""
|
|
|
处理单篇文章
|
|
|
:param params:
|
|
|
:return:
|
|
|
"""
|
|
|
+ print("start process")
|
|
|
kimi_result = await self.kimi_task(params)
|
|
|
trace_id = params['trace_id']
|
|
|
process_times = params['process_times']
|
|
@@ -681,10 +704,9 @@ class NewContentIdTask(object):
|
|
|
info="etl_success",
|
|
|
trace_id=trace_id
|
|
|
)
|
|
|
- """
|
|
|
- todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
|
|
|
- 目前先对这两种情况都做托管操作
|
|
|
- """
|
|
|
+ # ETL下载成功,记录审核
|
|
|
+ await self.record_for_audit(content_id)
|
|
|
+
|
|
|
if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
|
|
|
logging(
|
|
|
code="3013",
|
|
@@ -770,6 +792,7 @@ class NewContentIdTask(object):
|
|
|
处理任务
|
|
|
:return:
|
|
|
"""
|
|
|
+ print(json.dumps(params, ensure_ascii=False, indent=4))
|
|
|
content_id = params['content_id']
|
|
|
flow_pool_level = params['flow_pool_level']
|
|
|
|
|
@@ -778,9 +801,11 @@ class NewContentIdTask(object):
|
|
|
article_crawler_video_table=self.article_crawler_video_table,
|
|
|
db_client=self.long_articles_client
|
|
|
)
|
|
|
+ print("开始处理")
|
|
|
if not download_videos_exists_flag:
|
|
|
processing_flag = await self.judge_whether_same_content_id_is_processing(content_id)
|
|
|
if processing_flag:
|
|
|
+ print("processing success")
|
|
|
logging(
|
|
|
code="9001",
|
|
|
info="该 content id 正在处理中, 跳过此任务--{}".format(content_id)
|
|
@@ -821,7 +846,6 @@ class NewContentIdTask(object):
|
|
|
info="Match Task Got {} this time".format(len(process_list)),
|
|
|
function="Publish Task"
|
|
|
)
|
|
|
-
|
|
|
# 处理process_list
|
|
|
if process_list:
|
|
|
a = time.time()
|