""" @author: luojunhui """ import json import uuid import time import random import hashlib import urllib.parse from applications.functions.log import logging class Response(object): """ Response """ def __init__(self, params, mysql_client, config): """ Response 接口 """ self.trace_id = None self.mini_program_type = None self.mysql_client = mysql_client self.params = params self.article_videos = config.articleVideos self.mini_map = json.loads(config.getConfigValue("miniMap")) def checkParams(self): """ 请求参数校验 :return: """ try: self.mini_program_type = self.params['miniprogramUseType'] self.trace_id = self.params['traceId'] return None except Exception as e: return { "error": "params error", "msg": str(e), "info": self.params } async def getVideosResult(self): """ 获取结果 :return: """ select_sql = f""" SELECT gh_id, content_status, response, process_times FROM {self.article_videos} WHERE trace_id = '{self.trace_id}'; """ info_tuple = await self.mysql_client.asyncSelect(select_sql) gh_id, content_status, response, process_times = info_tuple[0] return { "ghId": gh_id, "contentStatus": content_status, "response": json.loads(response), "processTimes": process_times } def createGzhPath(self, video_id, shared_uid, gh_id): """ :param gh_id: 公众号账号的gh_id :param video_id: 视频 id :param shared_uid: 分享 id """ def generate_source_id(): """ generate_source_id :return: """ timestamp = str(int(time.time() * 1000)) random_str = str(random.randint(1000, 9999)) hash_input = f"{timestamp}-{random_str}" return hashlib.md5(hash_input.encode()).hexdigest() root_share_id = str(uuid.uuid4()) if self.mini_program_type == 2: source_id = ( "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id() ) elif self.mini_program_type == 1: source_id = "longArticles_" + generate_source_id() elif self.mini_program_type == 3: source_id = "WeCom_" + generate_source_id() else: source_id = "Error mini_program_type {}".format(self.mini_program_type) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}" # 自动把 root_share_id 加入到白名单 # auto_white(root_share_id) return ( root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}", ) async def generateCard(self, index, gh_id, mini_id, item): """ 生成单个分享卡片 :param item: 单个视频结果 :param mini_id: 小程序 appType :param gh_id: 公众号 id :param index: 视频位置 :return: """ str_mini_id = str(mini_id) mini_info = self.mini_map[str_mini_id] avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name'] root_share_id, root_source_id, production_path = self.createGzhPath( video_id=item['videoId'], shared_uid=item['uid'], gh_id=gh_id ) logging( code="1002", info="root_share_id --{}, productionPath -- {}".format( root_share_id, production_path ), function="process", trace_id=self.trace_id, ) result = { "productionCover": item['videoCover'], "productionName": item['kimiTitle'], "programAvatar": avatar, "programId": app_id, "programName": app_name, "source": item['source'], "rootShareId": root_share_id, "productionPath": production_path, "videoUrl": item['videoPath'], "mini_id": mini_id, "paragraphPosition": index * 0.25 } if index == 1: result['paragraphPosition'] = 0.01 item['rootSourceId'] = root_source_id return result, item async def generateCards(self, result): """ 生成返回卡片 :return: """ gh_id = result['ghId'] response = result['response'] match self.mini_program_type: case 1: L = [] new_item_list = [] for index, item in enumerate(response, 1): random_num = random.randint(1, 10) if random_num in [1, 2, 3, 4, 5, 6]: mini_id = 25 elif random_num in [7, 8]: mini_id = 29 else: mini_id = 31 card, new_item = await self.generateCard(index, gh_id, mini_id, item) L.append(card) new_item_list.append(new_item) return L, new_item_list case 2: L = [] new_item_list = [] for index, item in enumerate(response, 1): card, new_item = await self.generateCard(index, gh_id, 33, item) L.append(card) new_item_list.append(new_item) return L, new_item_list case 3: L = [] new_item_list = [] for index, item in enumerate(response, 1): card, new_item = await self.generateCard(index, gh_id, 27, item) L.append(card) new_item_list.append(card) return L, new_item_list async def job(self): """ 执行方法 :return: """ response = await self.getVideosResult() status_code = response.get('contentStatus') process_times = response.get('processTimes') match status_code: case 0: if process_times > 3: result = { "traceId": self.trace_id, "code": 0, "error": "匹配失败,处理超过 3 次" } else: result = { "traceId": self.trace_id, "code": 0, "Message": "该请求还没处理" } return result case 1: return { "traceId": self.trace_id, "code": 1, "Message": "该请求正在处理中" } case 2: card_list, new_items = await self.generateCards(result=response) update_sql = f""" UPDATE {self.article_videos} SET response = %s, success_status = %s WHERE trace_id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=(json.dumps(new_items, ensure_ascii=False), 1, self.trace_id) ) return {"traceId": self.trace_id, "miniprogramList": card_list} case 3: return { "traceId": self.trace_id, "code": 3, "error": "匹配失败,超过三次" } async def deal(self): """ api process starts from here :return: """ params_error = self.checkParams() if params_error: return params_error else: return await self.job()