|
@@ -15,6 +15,10 @@ class Response(object):
|
|
|
"""
|
|
|
Response
|
|
|
"""
|
|
|
+ REQUEST_INIT_STATUS = 0
|
|
|
+ REQUEST_SUCCESS_STATUS = 1
|
|
|
+ REQUEST_PROCESSING_TASK = 101
|
|
|
+ TASK_MAX_PROCESS_TIMES = 3
|
|
|
|
|
|
def __init__(self, params, mysql_client, config):
|
|
|
"""
|
|
@@ -25,7 +29,7 @@ class Response(object):
|
|
|
self.mysql_client = mysql_client
|
|
|
self.params = params
|
|
|
self.article_match_video_table = config.article_match_video_table
|
|
|
- self.mini_program_map = json.loads(config.getConfigValue("miniMap"))
|
|
|
+ self.mini_program_map = json.loads(config.get_config_value("miniMap"))
|
|
|
|
|
|
def check_params(self):
|
|
|
"""
|
|
@@ -49,17 +53,18 @@ class Response(object):
|
|
|
:return:
|
|
|
"""
|
|
|
select_sql = f"""
|
|
|
- SELECT gh_id, content_status, response, process_times
|
|
|
+ SELECT gh_id, content_status, response, process_times, success_status
|
|
|
FROM {self.article_match_video_table}
|
|
|
WHERE trace_id = '{self.trace_id}';
|
|
|
"""
|
|
|
info_tuple = await self.mysql_client.async_select(select_sql)
|
|
|
- gh_id, content_status, response, process_times = info_tuple[0]
|
|
|
+ gh_id, content_status, response, process_times, success_status = info_tuple[0]
|
|
|
return {
|
|
|
- "ghId": gh_id,
|
|
|
- "contentStatus": content_status,
|
|
|
+ "gh_id": gh_id,
|
|
|
+ "content_status": content_status,
|
|
|
"response": response,
|
|
|
- "processTimes": process_times
|
|
|
+ "process_times": process_times,
|
|
|
+ "success_status": success_status
|
|
|
}
|
|
|
|
|
|
def create_gzh_path(self, video_id, shared_uid, gh_id):
|
|
@@ -147,7 +152,7 @@ class Response(object):
|
|
|
生成返回卡片
|
|
|
:return:
|
|
|
"""
|
|
|
- gh_id = result['ghId']
|
|
|
+ gh_id = result['gh_id']
|
|
|
response = json.loads(result['response'])
|
|
|
touliu_mini_program_id = 33
|
|
|
we_com_mini_program_id = 27
|
|
@@ -190,11 +195,18 @@ class Response(object):
|
|
|
:return:
|
|
|
"""
|
|
|
response = await self.get_videos_result()
|
|
|
- status_code = response.get('contentStatus')
|
|
|
- process_times = response.get('processTimes')
|
|
|
+ status_code = response.get('content_status')
|
|
|
+ process_times = response.get('process_times')
|
|
|
+ success_status = response.get('success_status')
|
|
|
+ if success_status == 1:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "info": "请联系管理员",
|
|
|
+ "error": "该请求已经被请求过一次"
|
|
|
+ }
|
|
|
match status_code:
|
|
|
case 0:
|
|
|
- if process_times > 3:
|
|
|
+ if process_times > self.TASK_MAX_PROCESS_TIMES:
|
|
|
result = {
|
|
|
"traceId": self.trace_id,
|
|
|
"code": 0,
|
|
@@ -211,28 +223,72 @@ class Response(object):
|
|
|
return {
|
|
|
"traceId": self.trace_id,
|
|
|
"code": 1,
|
|
|
- "Message": "该请求正在处理中"
|
|
|
+ "Message": "已经执行完kimi"
|
|
|
}
|
|
|
case 2:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "code": 2,
|
|
|
+ "Message": "已经执行完爬虫"
|
|
|
+ }
|
|
|
+ case 3:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "code": 3,
|
|
|
+ "error": "已经执行完 etl"
|
|
|
+ }
|
|
|
+ case 4:
|
|
|
+ # 修改任务状态为处理中
|
|
|
+ update_sql = f"""
|
|
|
+ UPDATE {self.article_match_video_table}
|
|
|
+ SET success_status = %s
|
|
|
+ WHERE success_status = %s and trace_id = %s;
|
|
|
+ """
|
|
|
+ affected_rows = await self.mysql_client.async_insert(
|
|
|
+ sql=update_sql,
|
|
|
+ params=(
|
|
|
+ self.REQUEST_PROCESSING_TASK,
|
|
|
+ self.REQUEST_INIT_STATUS,
|
|
|
+ self.trace_id
|
|
|
+ )
|
|
|
+ )
|
|
|
+ if affected_rows == 0:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "info": "并发任务抢占锁失败",
|
|
|
+ "error": "该 trace_id 正在处理中或者已经处理完成"
|
|
|
+ }
|
|
|
card_list, new_items = await self.generate_cards(result=response)
|
|
|
update_sql = f"""
|
|
|
- UPDATE {self.article_match_video_table}
|
|
|
- SET response = %s, success_status = %s
|
|
|
- WHERE trace_id = %s;
|
|
|
+ UPDATE {self.article_match_video_table}
|
|
|
+ SET response = %s, success_status = %s
|
|
|
+ WHERE trace_id = %s and success_status = %s;
|
|
|
"""
|
|
|
await self.mysql_client.async_insert(
|
|
|
sql=update_sql,
|
|
|
- params=(json.dumps(new_items, ensure_ascii=False), 1, self.trace_id)
|
|
|
+ params=(
|
|
|
+ json.dumps(new_items, ensure_ascii=False),
|
|
|
+ self.REQUEST_SUCCESS_STATUS,
|
|
|
+ self.trace_id,
|
|
|
+ self.REQUEST_PROCESSING_TASK
|
|
|
+ )
|
|
|
)
|
|
|
- return {"traceId": self.trace_id, "miniprogramList": card_list}
|
|
|
- case 3:
|
|
|
return {
|
|
|
"traceId": self.trace_id,
|
|
|
- "code": 3,
|
|
|
- "error": "匹配失败,超过三次"
|
|
|
+ "miniprogramList": card_list
|
|
|
+ }
|
|
|
+ case 99:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "code": 99,
|
|
|
+ "error": "该任务执行失败"
|
|
|
+ }
|
|
|
+ case 101:
|
|
|
+ return {
|
|
|
+ "traceId": self.trace_id,
|
|
|
+ "code": 101,
|
|
|
+ "error": "该任务正在执行中"
|
|
|
}
|
|
|
- case 4:
|
|
|
- return {}
|
|
|
|
|
|
async def deal(self):
|
|
|
"""
|