|
@@ -4,7 +4,8 @@
|
|
import json
|
|
import json
|
|
import time
|
|
import time
|
|
|
|
|
|
-from applications.config import Config, NewContentIdTaskConst
|
|
|
|
|
|
+from applications.config import Config
|
|
|
|
+from applications.config.const import new_content_id_task as NewContentIdTaskConst
|
|
from applications.log import logging
|
|
from applications.log import logging
|
|
from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
|
|
from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
|
|
from applications.functions.common import shuffle_list
|
|
from applications.functions.common import shuffle_list
|
|
@@ -29,7 +30,6 @@ class NewContentIdTask(object):
|
|
self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
|
|
self.gh_id_dict = json.loads(self.config.get_config_value("testAccountLevel2"))
|
|
self.account_map = json.loads(self.config.get_config_value("accountMap"))
|
|
self.account_map = json.loads(self.config.get_config_value("accountMap"))
|
|
self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
|
|
self.spider_coroutines = self.config.get_config_value("spiderCoroutines")
|
|
- self.const = NewContentIdTaskConst()
|
|
|
|
|
|
|
|
async def get_tasks(self):
|
|
async def get_tasks(self):
|
|
"""
|
|
"""
|
|
@@ -37,10 +37,10 @@ class NewContentIdTask(object):
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
# 处理未托管的任务
|
|
# 处理未托管的任务
|
|
- await self.roll_back_unfinished_tasks(publish_flag=self.const.NEED_PUBLISH)
|
|
|
|
|
|
+ await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.NEED_PUBLISH)
|
|
|
|
|
|
# 处理托管任务
|
|
# 处理托管任务
|
|
- await self.roll_back_unfinished_tasks(publish_flag=self.const.DO_NOT_NEED_PUBLISH)
|
|
|
|
|
|
+ await self.roll_back_unfinished_tasks(publish_flag=NewContentIdTaskConst.DO_NOT_NEED_PUBLISH)
|
|
# 将 process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
|
|
# 将 process_times > 3 且状态不为 4 的任务的状态修改为失败, 判断条件需要加上索引
|
|
update_status_sql = f"""
|
|
update_status_sql = f"""
|
|
UPDATE
|
|
UPDATE
|
|
@@ -53,10 +53,10 @@ class NewContentIdTask(object):
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
update_status_sql,
|
|
update_status_sql,
|
|
params=(
|
|
params=(
|
|
- self.const.TASK_FAIL_STATUS,
|
|
|
|
- self.const.TASK_MAX_PROCESS_TIMES,
|
|
|
|
- self.const.TASK_ETL_COMPLETE_STATUS,
|
|
|
|
- self.const.TASK_PUBLISHED_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_FAIL_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES,
|
|
|
|
+ NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_PUBLISHED_STATUS
|
|
)
|
|
)
|
|
)
|
|
)
|
|
# 获取 process_times <= 3 且 content_status = 0 的任务
|
|
# 获取 process_times <= 3 且 content_status = 0 的任务
|
|
@@ -66,8 +66,8 @@ class NewContentIdTask(object):
|
|
FROM
|
|
FROM
|
|
{self.article_match_video_table}
|
|
{self.article_match_video_table}
|
|
WHERE
|
|
WHERE
|
|
- content_status = {self.const.TASK_INIT_STATUS}
|
|
|
|
- and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
|
|
|
|
|
|
+ content_status = {NewContentIdTaskConst.TASK_INIT_STATUS}
|
|
|
|
+ and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
|
|
ORDER BY flow_pool_level, request_timestamp
|
|
ORDER BY flow_pool_level, request_timestamp
|
|
LIMIT {self.spider_coroutines};
|
|
LIMIT {self.spider_coroutines};
|
|
"""
|
|
"""
|
|
@@ -92,18 +92,18 @@ class NewContentIdTask(object):
|
|
将长时间处于中间状态的任务回滚
|
|
将长时间处于中间状态的任务回滚
|
|
"""
|
|
"""
|
|
# 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
|
|
# 获取 content_status 为 处理中 的任务,判断时间, 如果超过 1h 则,则将它改为 0, process_times + 1
|
|
- if publish_flag == self.const.NEED_PUBLISH:
|
|
|
|
|
|
+ if publish_flag == NewContentIdTaskConst.NEED_PUBLISH:
|
|
processing_status_tuple = (
|
|
processing_status_tuple = (
|
|
- self.const.TASK_PROCESSING_STATUS,
|
|
|
|
- self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
- self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
- self.const.TASK_ETL_COMPLETE_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
|
|
)
|
|
)
|
|
- elif publish_flag == self.const.DO_NOT_NEED_PUBLISH:
|
|
|
|
|
|
+ elif publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
|
|
processing_status_tuple = (
|
|
processing_status_tuple = (
|
|
- self.const.TASK_PROCESSING_STATUS,
|
|
|
|
- self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
- self.const.TASK_SPIDER_FINISHED_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS
|
|
)
|
|
)
|
|
else:
|
|
else:
|
|
return
|
|
return
|
|
@@ -114,7 +114,7 @@ class NewContentIdTask(object):
|
|
{self.article_match_video_table}
|
|
{self.article_match_video_table}
|
|
WHERE
|
|
WHERE
|
|
content_status in {processing_status_tuple}
|
|
content_status in {processing_status_tuple}
|
|
- and process_times <= {self.const.TASK_MAX_PROCESS_TIMES}
|
|
|
|
|
|
+ and process_times <= {NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES}
|
|
and publish_flag = {publish_flag};
|
|
and publish_flag = {publish_flag};
|
|
"""
|
|
"""
|
|
processing_articles = await self.mysql_client.async_select(select_processing_sql)
|
|
processing_articles = await self.mysql_client.async_select(select_processing_sql)
|
|
@@ -129,7 +129,7 @@ class NewContentIdTask(object):
|
|
for item in processing_articles
|
|
for item in processing_articles
|
|
]
|
|
]
|
|
for obj in processing_list:
|
|
for obj in processing_list:
|
|
- if int(time.time()) - obj['content_status_update_time'] >= self.const.TASK_PROCESSING_TIMEOUT:
|
|
|
|
|
|
+ if int(time.time()) - obj['content_status_update_time'] >= NewContentIdTaskConst.TASK_PROCESSING_TIMEOUT:
|
|
# 认为该任务失败
|
|
# 认为该任务失败
|
|
await self.roll_back_content_status_when_fails(
|
|
await self.roll_back_content_status_when_fails(
|
|
process_times=obj['process_times'] + 1,
|
|
process_times=obj['process_times'] + 1,
|
|
@@ -146,10 +146,10 @@ class NewContentIdTask(object):
|
|
sql = f"""
|
|
sql = f"""
|
|
SELECT id
|
|
SELECT id
|
|
FROM {self.article_crawler_video_table}
|
|
FROM {self.article_crawler_video_table}
|
|
- WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
|
|
|
|
+ WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
"""
|
|
"""
|
|
res_tuple = await self.mysql_client.async_select(sql)
|
|
res_tuple = await self.mysql_client.async_select(sql)
|
|
- if len(res_tuple) >= self.const.MIN_MATCH_VIDEO_NUM:
|
|
|
|
|
|
+ if len(res_tuple) >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
|
|
return True
|
|
return True
|
|
else:
|
|
else:
|
|
return False
|
|
return False
|
|
@@ -177,7 +177,7 @@ class NewContentIdTask(object):
|
|
)
|
|
)
|
|
return row_counts
|
|
return row_counts
|
|
|
|
|
|
- async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=None):
|
|
|
|
|
|
+ async def roll_back_content_status_when_fails(self, process_times, trace_id, ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS):
|
|
"""
|
|
"""
|
|
处理失败,回滚至初始状态,处理次数加 1
|
|
处理失败,回滚至初始状态,处理次数加 1
|
|
:param process_times:
|
|
:param process_times:
|
|
@@ -185,9 +185,6 @@ class NewContentIdTask(object):
|
|
:param ori_content_status:
|
|
:param ori_content_status:
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
- if not ori_content_status:
|
|
|
|
- ori_content_status = self.const.TASK_PROCESSING_STATUS
|
|
|
|
-
|
|
|
|
update_article_sql = f"""
|
|
update_article_sql = f"""
|
|
UPDATE {self.article_match_video_table}
|
|
UPDATE {self.article_match_video_table}
|
|
SET
|
|
SET
|
|
@@ -199,7 +196,7 @@ class NewContentIdTask(object):
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_article_sql,
|
|
sql=update_article_sql,
|
|
params=(
|
|
params=(
|
|
- self.const.TASK_INIT_STATUS,
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_INIT_STATUS,
|
|
int(time.time()),
|
|
int(time.time()),
|
|
process_times + 1,
|
|
process_times + 1,
|
|
trace_id,
|
|
trace_id,
|
|
@@ -228,11 +225,11 @@ class NewContentIdTask(object):
|
|
content_status = item[0]
|
|
content_status = item[0]
|
|
# if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
|
|
# if content_status not in {self.TASK_INIT_STATUS, self.TASK_PUBLISHED_STATUS} :
|
|
if content_status in {
|
|
if content_status in {
|
|
- self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
- self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
- self.const.TASK_ETL_COMPLETE_STATUS,
|
|
|
|
- self.const.TASK_PROCESSING_STATUS,
|
|
|
|
- self.const.TASK_PUBLISHED_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
|
|
+ NewContentIdTaskConst.TASK_PUBLISHED_STATUS
|
|
}:
|
|
}:
|
|
return True
|
|
return True
|
|
return False
|
|
return False
|
|
@@ -247,7 +244,7 @@ class NewContentIdTask(object):
|
|
sql = f"""
|
|
sql = f"""
|
|
SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
|
|
SELECT platform, play_count, like_count, video_oss_path, cover_oss_path, user_id
|
|
FROM {self.article_crawler_video_table}
|
|
FROM {self.article_crawler_video_table}
|
|
- WHERE content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
|
|
|
|
+ WHERE content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
ORDER BY score DESC;
|
|
ORDER BY score DESC;
|
|
"""
|
|
"""
|
|
res_tuple = await self.mysql_client.async_select(sql)
|
|
res_tuple = await self.mysql_client.async_select(sql)
|
|
@@ -278,7 +275,7 @@ class NewContentIdTask(object):
|
|
kimi_status = response[0][0]
|
|
kimi_status = response[0][0]
|
|
return kimi_status
|
|
return kimi_status
|
|
else:
|
|
else:
|
|
- return self.const.ARTICLE_TEXT_TABLE_ERROR
|
|
|
|
|
|
+ return NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR
|
|
|
|
|
|
async def kimi_task(self, params):
|
|
async def kimi_task(self, params):
|
|
"""
|
|
"""
|
|
@@ -290,11 +287,11 @@ class NewContentIdTask(object):
|
|
process_times = params['process_times']
|
|
process_times = params['process_times']
|
|
kimi_status_code = await self.get_kimi_status(content_id=content_id)
|
|
kimi_status_code = await self.get_kimi_status(content_id=content_id)
|
|
|
|
|
|
- if kimi_status_code == self.const.KIMI_SUCCESS_STATUS:
|
|
|
|
|
|
+ if kimi_status_code == NewContentIdTaskConst.KIMI_SUCCESS_STATUS:
|
|
affected_rows = await self.update_content_status(
|
|
affected_rows = await self.update_content_status(
|
|
- new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- ori_content_status=self.const.TASK_INIT_STATUS
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
)
|
|
)
|
|
if affected_rows == 0:
|
|
if affected_rows == 0:
|
|
logging(
|
|
logging(
|
|
@@ -314,7 +311,7 @@ class NewContentIdTask(object):
|
|
"kimi_summary": kimi_info[0][2],
|
|
"kimi_summary": kimi_info[0][2],
|
|
"kimi_keys": json.loads(kimi_info[0][3])
|
|
"kimi_keys": json.loads(kimi_info[0][3])
|
|
}
|
|
}
|
|
- elif kimi_status_code == self.const.ARTICLE_TEXT_TABLE_ERROR:
|
|
|
|
|
|
+ elif kimi_status_code == NewContentIdTaskConst.ARTICLE_TEXT_TABLE_ERROR:
|
|
logging(
|
|
logging(
|
|
code="4000",
|
|
code="4000",
|
|
info="long_articles_text表中未找到 content_id"
|
|
info="long_articles_text表中未找到 content_id"
|
|
@@ -322,9 +319,9 @@ class NewContentIdTask(object):
|
|
else:
|
|
else:
|
|
# 开始处理,讲 content_status 从 0 改为 101
|
|
# 开始处理,讲 content_status 从 0 改为 101
|
|
affected_rows = await self.update_content_status(
|
|
affected_rows = await self.update_content_status(
|
|
- new_content_status=self.const.TASK_PROCESSING_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- ori_content_status=self.const.TASK_INIT_STATUS
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_INIT_STATUS
|
|
)
|
|
)
|
|
if affected_rows == 0:
|
|
if affected_rows == 0:
|
|
logging(
|
|
logging(
|
|
@@ -360,12 +357,12 @@ class NewContentIdTask(object):
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_kimi_sql,
|
|
sql=update_kimi_sql,
|
|
params=(
|
|
params=(
|
|
- kimi_title, content_title, content_keys, self.const.KIMI_SUCCESS_STATUS, params['content_id'])
|
|
|
|
|
|
+ kimi_title, content_title, content_keys, NewContentIdTaskConst.KIMI_SUCCESS_STATUS, params['content_id'])
|
|
)
|
|
)
|
|
await self.update_content_status(
|
|
await self.update_content_status(
|
|
- new_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- ori_content_status=self.const.TASK_PROCESSING_STATUS
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
|
|
)
|
|
)
|
|
return {
|
|
return {
|
|
"kimi_title": kimi_title,
|
|
"kimi_title": kimi_title,
|
|
@@ -384,7 +381,7 @@ class NewContentIdTask(object):
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_kimi_sql,
|
|
sql=update_kimi_sql,
|
|
params=(
|
|
params=(
|
|
- self.const.KIMI_FAIL_STATUS,
|
|
|
|
|
|
+ NewContentIdTaskConst.KIMI_FAIL_STATUS,
|
|
content_id
|
|
content_id
|
|
)
|
|
)
|
|
)
|
|
)
|
|
@@ -408,21 +405,21 @@ class NewContentIdTask(object):
|
|
SELECT count(id)
|
|
SELECT count(id)
|
|
FROM {self.article_crawler_video_table}
|
|
FROM {self.article_crawler_video_table}
|
|
WHERE content_id = '{content_id}'
|
|
WHERE content_id = '{content_id}'
|
|
- AND download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
|
|
|
|
+ AND download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
"""
|
|
"""
|
|
count_tuple = await self.mysql_client.async_select(select_sql)
|
|
count_tuple = await self.mysql_client.async_select(select_sql)
|
|
counts = count_tuple[0][0]
|
|
counts = count_tuple[0][0]
|
|
- if counts >= self.const.MIN_MATCH_VIDEO_NUM:
|
|
|
|
|
|
+ if counts >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
|
|
await self.update_content_status(
|
|
await self.update_content_status(
|
|
- new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS
|
|
)
|
|
)
|
|
return True
|
|
return True
|
|
# 开始处理,将状态由 1 改成 101
|
|
# 开始处理,将状态由 1 改成 101
|
|
affected_rows = await self.update_content_status(
|
|
affected_rows = await self.update_content_status(
|
|
- new_content_status=self.const.TASK_PROCESSING_STATUS,
|
|
|
|
- ori_content_status=self.const.TASK_KIMI_FINISHED_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_KIMI_FINISHED_STATUS,
|
|
trace_id=trace_id
|
|
trace_id=trace_id
|
|
)
|
|
)
|
|
if affected_rows == 0:
|
|
if affected_rows == 0:
|
|
@@ -451,7 +448,7 @@ class NewContentIdTask(object):
|
|
gh_id_map=self.account_map,
|
|
gh_id_map=self.account_map,
|
|
db_client=self.mysql_client
|
|
db_client=self.mysql_client
|
|
)
|
|
)
|
|
- if search_videos_count >= self.const.MIN_MATCH_VIDEO_NUM:
|
|
|
|
|
|
+ if search_videos_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
|
|
# 表示爬虫任务执行成功, 将状态从 101 改为 2
|
|
# 表示爬虫任务执行成功, 将状态从 101 改为 2
|
|
logging(
|
|
logging(
|
|
code="spider_1002",
|
|
code="spider_1002",
|
|
@@ -460,9 +457,9 @@ class NewContentIdTask(object):
|
|
data=kimi_result
|
|
data=kimi_result
|
|
)
|
|
)
|
|
await self.update_content_status(
|
|
await self.update_content_status(
|
|
- new_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- ori_content_status=self.const.TASK_PROCESSING_STATUS
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
|
|
)
|
|
)
|
|
return True
|
|
return True
|
|
else:
|
|
else:
|
|
@@ -498,15 +495,15 @@ class NewContentIdTask(object):
|
|
select_sql = f"""
|
|
select_sql = f"""
|
|
select count(id)
|
|
select count(id)
|
|
from {self.article_crawler_video_table}
|
|
from {self.article_crawler_video_table}
|
|
- where content_id = '{content_id}' and download_status = {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
|
|
|
|
+ where content_id = '{content_id}' and download_status = {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS};
|
|
"""
|
|
"""
|
|
video_count_tuple = await self.mysql_client.async_select(select_sql)
|
|
video_count_tuple = await self.mysql_client.async_select(select_sql)
|
|
video_count = video_count_tuple[0][0]
|
|
video_count = video_count_tuple[0][0]
|
|
- if video_count >= self.const.MIN_MATCH_VIDEO_NUM:
|
|
|
|
|
|
+ if video_count >= NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
|
|
affect_rows = await self.update_content_status(
|
|
affect_rows = await self.update_content_status(
|
|
- ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
|
|
)
|
|
)
|
|
if affect_rows == 0:
|
|
if affect_rows == 0:
|
|
logging(
|
|
logging(
|
|
@@ -518,9 +515,9 @@ class NewContentIdTask(object):
|
|
else:
|
|
else:
|
|
# 开始处理, 将文章状态修改为处理状态
|
|
# 开始处理, 将文章状态修改为处理状态
|
|
affected_rows = await self.update_content_status(
|
|
affected_rows = await self.update_content_status(
|
|
- ori_content_status=self.const.TASK_SPIDER_FINISHED_STATUS,
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_SPIDER_FINISHED_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- new_content_status=self.const.TASK_PROCESSING_STATUS
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
|
|
)
|
|
)
|
|
if affected_rows == 0:
|
|
if affected_rows == 0:
|
|
logging(
|
|
logging(
|
|
@@ -531,7 +528,7 @@ class NewContentIdTask(object):
|
|
select_sql = f"""
|
|
select_sql = f"""
|
|
SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
|
|
SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
|
|
FROM {self.article_crawler_video_table}
|
|
FROM {self.article_crawler_video_table}
|
|
- WHERE content_id = '{content_id}' and download_status != {self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
|
|
|
|
|
|
+ WHERE content_id = '{content_id}' and download_status != {NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS}
|
|
ORDER BY score DESC;
|
|
ORDER BY score DESC;
|
|
"""
|
|
"""
|
|
videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
|
|
videos_need_to_download_tuple = await self.mysql_client.async_select(select_sql)
|
|
@@ -564,7 +561,7 @@ class NewContentIdTask(object):
|
|
"""
|
|
"""
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_sql,
|
|
sql=update_sql,
|
|
- params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
|
|
|
|
|
|
+ params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
|
|
)
|
|
)
|
|
logging(
|
|
logging(
|
|
code="etl_1001",
|
|
code="etl_1001",
|
|
@@ -604,7 +601,7 @@ class NewContentIdTask(object):
|
|
params=(
|
|
params=(
|
|
oss_video,
|
|
oss_video,
|
|
oss_cover,
|
|
oss_cover,
|
|
- self.const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
|
|
|
|
|
|
+ NewContentIdTaskConst.VIDEO_DOWNLOAD_SUCCESS_STATUS,
|
|
params['id']
|
|
params['id']
|
|
)
|
|
)
|
|
)
|
|
)
|
|
@@ -616,11 +613,11 @@ class NewContentIdTask(object):
|
|
function="etl_task"
|
|
function="etl_task"
|
|
)
|
|
)
|
|
# 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
|
|
# 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
|
|
- if downloaded_count > self.const.MIN_MATCH_VIDEO_NUM:
|
|
|
|
|
|
+ if downloaded_count > NewContentIdTaskConst.MIN_MATCH_VIDEO_NUM:
|
|
await self.update_content_status(
|
|
await self.update_content_status(
|
|
- ori_content_status=self.const.TASK_PROCESSING_STATUS,
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
|
|
)
|
|
)
|
|
return True
|
|
return True
|
|
except Exception as e:
|
|
except Exception as e:
|
|
@@ -631,7 +628,7 @@ class NewContentIdTask(object):
|
|
"""
|
|
"""
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_sql,
|
|
sql=update_sql,
|
|
- params=(self.const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
|
|
|
|
|
|
+ params=(NewContentIdTaskConst.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
|
|
)
|
|
)
|
|
logging(
|
|
logging(
|
|
code="etl_1001",
|
|
code="etl_1001",
|
|
@@ -641,9 +638,9 @@ class NewContentIdTask(object):
|
|
)
|
|
)
|
|
if downloaded_count >= 3:
|
|
if downloaded_count >= 3:
|
|
await self.update_content_status(
|
|
await self.update_content_status(
|
|
- ori_content_status=self.const.TASK_PROCESSING_STATUS,
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- new_content_status=self.const.TASK_ETL_COMPLETE_STATUS
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS
|
|
)
|
|
)
|
|
return True
|
|
return True
|
|
else:
|
|
else:
|
|
@@ -667,9 +664,9 @@ class NewContentIdTask(object):
|
|
process_times = params['process_times']
|
|
process_times = params['process_times']
|
|
# 开始处理,将状态修改为操作状态
|
|
# 开始处理,将状态修改为操作状态
|
|
affected_rows = await self.update_content_status(
|
|
affected_rows = await self.update_content_status(
|
|
- ori_content_status=self.const.TASK_ETL_COMPLETE_STATUS,
|
|
|
|
|
|
+ ori_content_status=NewContentIdTaskConst.TASK_ETL_COMPLETE_STATUS,
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- new_content_status=self.const.TASK_PROCESSING_STATUS
|
|
|
|
|
|
+ new_content_status=NewContentIdTaskConst.TASK_PROCESSING_STATUS
|
|
)
|
|
)
|
|
if affected_rows == 0:
|
|
if affected_rows == 0:
|
|
logging(
|
|
logging(
|
|
@@ -725,11 +722,11 @@ class NewContentIdTask(object):
|
|
await self.mysql_client.async_insert(
|
|
await self.mysql_client.async_insert(
|
|
sql=update_sql,
|
|
sql=update_sql,
|
|
params=(
|
|
params=(
|
|
- self.const.TASK_PUBLISHED_STATUS,
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_PUBLISHED_STATUS,
|
|
json.dumps(L, ensure_ascii=False),
|
|
json.dumps(L, ensure_ascii=False),
|
|
process_times + 1,
|
|
process_times + 1,
|
|
trace_id,
|
|
trace_id,
|
|
- self.const.TASK_PROCESSING_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_PROCESSING_STATUS
|
|
)
|
|
)
|
|
)
|
|
)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
@@ -784,7 +781,7 @@ class NewContentIdTask(object):
|
|
todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
|
|
todo 若新建计划,计划为设置托管,但接入账号又在配置账号中,仍会走托管逻辑,需考虑历史存量的处理
|
|
目前先对这两种情况都做托管操作
|
|
目前先对这两种情况都做托管操作
|
|
"""
|
|
"""
|
|
- if publish_flag == self.const.DO_NOT_NEED_PUBLISH:
|
|
|
|
|
|
+ if publish_flag == NewContentIdTaskConst.DO_NOT_NEED_PUBLISH:
|
|
logging(
|
|
logging(
|
|
code="3013",
|
|
code="3013",
|
|
info="不需要发布,长文系统托管发布",
|
|
info="不需要发布,长文系统托管发布",
|
|
@@ -801,7 +798,7 @@ class NewContentIdTask(object):
|
|
)
|
|
)
|
|
await record_trace_id(
|
|
await record_trace_id(
|
|
trace_id=trace_id,
|
|
trace_id=trace_id,
|
|
- status=self.const.RECORD_SUCCESS_TRACE_ID_CODE
|
|
|
|
|
|
+ status=NewContentIdTaskConst.RECORD_SUCCESS_TRACE_ID_CODE
|
|
)
|
|
)
|
|
except Exception as e:
|
|
except Exception as e:
|
|
logging(
|
|
logging(
|
|
@@ -827,7 +824,7 @@ class NewContentIdTask(object):
|
|
info="kimi 处理失败",
|
|
info="kimi 处理失败",
|
|
trace_id=trace_id
|
|
trace_id=trace_id
|
|
)
|
|
)
|
|
- if process_times >= self.const.TASK_MAX_PROCESS_TIMES:
|
|
|
|
|
|
+ if process_times >= NewContentIdTaskConst.TASK_MAX_PROCESS_TIMES:
|
|
logging(
|
|
logging(
|
|
code="6011",
|
|
code="6011",
|
|
info="kimi处理次数达到上限, 放弃处理",
|
|
info="kimi处理次数达到上限, 放弃处理",
|
|
@@ -842,9 +839,9 @@ class NewContentIdTask(object):
|
|
affected_rows = await self.mysql_client.async_insert(
|
|
affected_rows = await self.mysql_client.async_insert(
|
|
sql=update_sql,
|
|
sql=update_sql,
|
|
params=(
|
|
params=(
|
|
- self.const.KIMI_ILLEGAL_STATUS,
|
|
|
|
|
|
+ NewContentIdTaskConst.KIMI_ILLEGAL_STATUS,
|
|
content_id,
|
|
content_id,
|
|
- self.const.TASK_INIT_STATUS
|
|
|
|
|
|
+ NewContentIdTaskConst.TASK_INIT_STATUS
|
|
)
|
|
)
|
|
)
|
|
)
|
|
bot(
|
|
bot(
|