""" @author: luojunhui """ import json import uuid import time import random import hashlib import urllib.parse from applications.log import logging from applications.const import server_const from applications.functions.forward_request import forward_requests class Response(object): """ Response """ def __init__(self, params, mysql_client, config): """ Response 接口 """ self.trace_id = None self.mini_program_type = None self.mysql_client = mysql_client self.params = params self.article_match_video_table = config.article_match_video_table self.mini_program_map = json.loads(config.get_config_value("miniMap")) def check_params(self): """ 请求参数校验 :return: """ try: self.mini_program_type = self.params['miniprogramUseType'] self.trace_id = self.params['traceId'] if self.mini_program_type in [ server_const.DAILY_CODE, server_const.TOULIU_CODE, server_const.WECOM_CODE, server_const.DAITOU_CODE, server_const.COOPERATION_CODE ]: return None else: return { "error": "params error", "message": "mini_program_type error", "info": self.params } except Exception as e: return { "error": "params error", "message": str(e), "info": self.params } async def get_videos_result(self): """ 获取结果 :return: """ select_sql = f""" SELECT gh_id, content_status, response, process_times FROM {self.article_match_video_table} WHERE trace_id = '{self.trace_id}'; """ info_tuple = await self.mysql_client.async_select(select_sql) gh_id, content_status, response, process_times = info_tuple[0] return { "gh_id": gh_id, "content_status": content_status, "response": response, "process_times": process_times } def create_gzh_path(self, video_id, shared_uid, gh_id): """ :param gh_id: 公众号账号的gh_id :param video_id: 视频 id :param shared_uid: 分享 id """ def generate_source_id(): """ generate_source_id :return: """ timestamp = str(int(time.time() * 1000)) random_str = str(random.randint(1000, 9999)) hash_input = f"{timestamp}-{random_str}" return hashlib.md5(hash_input.encode()).hexdigest() root_share_id = str(uuid.uuid4()) match self.mini_program_type: case server_const.DAILY_CODE: source_id = "{}_{}".format(server_const.DAILY_PREFIX, generate_source_id()) case server_const.TOULIU_CODE: source_id = "{}_{}_{}".format(server_const.TOULIU_PREFIX, gh_id, generate_source_id()) case server_const.WECOM_CODE: source_id = "{}_{}".format(server_const.WECOM_PREFIX, generate_source_id()) case server_const.DAITOU_CODE: source_id = "{}_{}_{}".format(server_const.DAITOU_PREFIX, gh_id, generate_source_id()) case server_const.COOPERATION_CODE: source_id = "{}_{}".format(server_const.COOPERATION_PREFIX, generate_source_id()) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}" return ( root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}", ) async def generate_single_card(self, index, gh_id, mini_id, item): """ 生成单个分享卡片 :param item: 单个视频结果 :param mini_id: 小程序 appType :param gh_id: 公众号 id :param index: 视频位置 :return: """ str_mini_id = str(mini_id) mini_info = self.mini_program_map[str_mini_id] avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name'] root_share_id, root_source_id, production_path = self.create_gzh_path( video_id=item['videoId'], shared_uid=item['uid'], gh_id=gh_id ) logging( code="1002", info="root_share_id --{}, productionPath -- {}".format( root_share_id, production_path ), function="process", trace_id=self.trace_id, ) result = { "productionCover": item['videoCover'], "productionName": item['kimiTitle'], "programAvatar": avatar, "programId": app_id, "programName": app_name, "source": item['source'], "rootShareId": root_share_id, "productionPath": production_path, "videoUrl": item['videoPath'], "mini_id": mini_id } if index == 1: result['paragraphPosition'] = 0.01 else: position = (index - 1) * 0.25 result['paragraphPosition'] = position item['rootSourceId'] = root_source_id return result, item async def generate_cards(self, result): """ 生成返回卡片 :return: """ gh_id = result['gh_id'] response = json.loads(result['response']) card_list = [] new_item_list = [] for index, item in enumerate(response, 1): card, new_item = await self.generate_single_card(index, gh_id, server_const.DEFAULT_APP_ID, item) card_list.append(card) new_item_list.append(new_item) return card_list, new_item_list async def job(self): """ 执行方法 :return: """ video_result = await self.get_videos_result() status_code = video_result.get('content_status') process_times = video_result.get('process_times') match status_code: case server_const.TASK_INIT_CODE: if process_times > server_const.TASK_MAX_PROCESS_TIMES: result = { "traceId": self.trace_id, "code": server_const.TASK_INIT_CODE, "error": "匹配失败,处理超过{}次".format(server_const.TASK_MAX_PROCESS_TIMES) } else: result = { "traceId": self.trace_id, "code": server_const.TASK_INIT_CODE, "message": "该请求还没处理" } return result case server_const.TASK_KIMI_FINISHED_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_KIMI_FINISHED_CODE, "message": "已经执行完kimi" } case server_const.TASK_SPIDER_FINISHED_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_SPIDER_FINISHED_CODE, "message": "已经执行完爬虫" } case server_const.TASK_ETL_FINISHED_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_ETL_FINISHED_CODE, "message": "已经执行完 etl" } case server_const.TASK_PUBLISHED_CODE: # 修改任务状态为处理中 update_sql = f""" UPDATE {self.article_match_video_table} SET success_status = %s WHERE success_status = %s and trace_id = %s; """ affected_rows = await self.mysql_client.async_insert( sql=update_sql, params=( server_const.REQUEST_PROCESSING_TASK, server_const.REQUEST_INIT_STATUS, self.trace_id ) ) if affected_rows == 0: return { "traceId": self.trace_id, "info": "并发任务抢占锁失败", "message": "该 trace_id 正在处理中或者已经处理完成" } card_list, new_items = await self.generate_cards(result=video_result) update_sql = f""" UPDATE {self.article_match_video_table} SET response = %s, success_status = %s WHERE trace_id = %s and success_status = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( json.dumps(new_items, ensure_ascii=False), server_const.REQUEST_SUCCESS_STATUS, self.trace_id, server_const.REQUEST_PROCESSING_TASK ) ) return { "traceId": self.trace_id, "miniprogramList": card_list } case server_const.TASK_ILLEGAL_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_ILLEGAL_CODE, "error": "该文章被kimi识别为高风险文章,不处理" } case server_const.TASK_BAD_CATEGORY_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_BAD_CATEGORY_CODE, "error": "该文章品类不符合这个账号,不做冷启动处理" } case server_const.TASK_EXIT_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_EXIT_CODE, "error": "该文章已经退场or晋级, 不再冷启处理" } case server_const.TASK_FAIL_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_FAIL_CODE, "error": "该任务执行失败" } case server_const.TASK_PROCESSING_CODE: return { "traceId": self.trace_id, "code": server_const.TASK_PROCESSING_CODE, "message": "该任务正在执行中" } async def check_trace_id(self): """ check trace id 是否存在与系统中 """ select_sql = f""" SELECT trace_id FROM {self.article_match_video_table} WHERE trace_id = '{self.trace_id}'; """ response = await self.mysql_client.async_select(select_sql) if response: return True else: return False async def deal(self): """ api process starts from here :return: """ params_error = self.check_params() if params_error: return params_error else: trace_id_exist_flag = await self.check_trace_id() if trace_id_exist_flag: return await self.job() else: response = await forward_requests( params={ "traceId": self.trace_id, "miniprogramUseType": self.mini_program_type }, api="recall_videos" ) return response