""" @author: luojunhui """ import json import uuid import time import random import hashlib import urllib.parse from applications.log import logging class Response(object): """ Response """ REQUEST_INIT_STATUS = 0 REQUEST_SUCCESS_STATUS = 1 REQUEST_PROCESSING_TASK = 101 TASK_MAX_PROCESS_TIMES = 3 def __init__(self, params, mysql_client, config): """ Response 接口 """ self.trace_id = None self.mini_program_type = None self.mysql_client = mysql_client self.params = params self.article_match_video_table = config.article_match_video_table self.mini_program_map = json.loads(config.get_config_value("miniMap")) def check_params(self): """ 请求参数校验 :return: """ try: self.mini_program_type = self.params['miniprogramUseType'] self.trace_id = self.params['traceId'] return None except Exception as e: return { "error": "params error", "message": str(e), "info": self.params } async def get_videos_result(self): """ 获取结果 :return: """ select_sql = f""" SELECT gh_id, content_status, response, process_times FROM {self.article_match_video_table} WHERE trace_id = '{self.trace_id}'; """ info_tuple = await self.mysql_client.async_select(select_sql) gh_id, content_status, response, process_times = info_tuple[0] return { "gh_id": gh_id, "content_status": content_status, "response": response, "process_times": process_times } def create_gzh_path(self, video_id, shared_uid, gh_id): """ :param gh_id: 公众号账号的gh_id :param video_id: 视频 id :param shared_uid: 分享 id """ def generate_source_id(): """ generate_source_id :return: """ timestamp = str(int(time.time() * 1000)) random_str = str(random.randint(1000, 9999)) hash_input = f"{timestamp}-{random_str}" return hashlib.md5(hash_input.encode()).hexdigest() root_share_id = str(uuid.uuid4()) if self.mini_program_type == 2: source_id = ( "touliu_tencentGzhArticle_{}_".format(gh_id) + generate_source_id() ) elif self.mini_program_type == 1: source_id = "longArticles_" + generate_source_id() elif self.mini_program_type == 3: source_id = "WeCom_" + generate_source_id() else: source_id = "Error mini_program_type {}".format(self.mini_program_type) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}&rootSourceId={source_id}" # 自动把 root_share_id 加入到白名单 # auto_white(root_share_id) return ( root_share_id, source_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}", ) async def generate_single_card(self, index, gh_id, mini_id, item): """ 生成单个分享卡片 :param item: 单个视频结果 :param mini_id: 小程序 appType :param gh_id: 公众号 id :param index: 视频位置 :return: """ str_mini_id = str(mini_id) mini_info = self.mini_program_map[str_mini_id] avatar, app_id, app_name = mini_info['avatar'], mini_info['id'], mini_info['name'] root_share_id, root_source_id, production_path = self.create_gzh_path( video_id=item['videoId'], shared_uid=item['uid'], gh_id=gh_id ) logging( code="1002", info="root_share_id --{}, productionPath -- {}".format( root_share_id, production_path ), function="process", trace_id=self.trace_id, ) result = { "productionCover": item['videoCover'], "productionName": item['kimiTitle'], "programAvatar": avatar, "programId": app_id, "programName": app_name, "source": item['source'], "rootShareId": root_share_id, "productionPath": production_path, "videoUrl": item['videoPath'], "mini_id": mini_id, "paragraphPosition": index * 0.25 } if index == 1: result['paragraphPosition'] = 0.01 item['rootSourceId'] = root_source_id return result, item async def generate_cards(self, result): """ 生成返回卡片 :return: """ gh_id = result['gh_id'] response = json.loads(result['response']) long_articles_mini_program_id = 25 touliu_mini_program_id = 33 we_com_mini_program_id = 27 match self.mini_program_type: case 1: L = [] new_item_list = [] for index, item in enumerate(response, 1): # random_num = random.randint(1, 10) # if random_num in [1, 2, 3, 4, 5, 6]: # long_articles_mini_program_id = 25 # elif random_num in [7, 8]: # long_articles_mini_program_id = 29 # else: # long_articles_mini_program_id = 31 card, new_item = await self.generate_single_card(index, gh_id, long_articles_mini_program_id, item) L.append(card) new_item_list.append(new_item) return L, new_item_list case 2: L = [] new_item_list = [] for index, item in enumerate(response, 1): card, new_item = await self.generate_single_card(index, gh_id, touliu_mini_program_id, item) L.append(card) new_item_list.append(new_item) return L, new_item_list case 3: L = [] new_item_list = [] for index, item in enumerate(response, 1): card, new_item = await self.generate_single_card(index, gh_id, we_com_mini_program_id, item) L.append(card) new_item_list.append(card) return L, new_item_list async def job(self): """ 执行方法 :return: """ response = await self.get_videos_result() status_code = response.get('content_status') process_times = response.get('process_times') match status_code: case 0: if process_times > self.TASK_MAX_PROCESS_TIMES: result = { "traceId": self.trace_id, "code": 0, "error": "匹配失败,处理超过 3 次" } else: result = { "traceId": self.trace_id, "code": 0, "message": "该请求还没处理" } return result case 1: return { "traceId": self.trace_id, "code": 1, "message": "已经执行完kimi" } case 2: return { "traceId": self.trace_id, "code": 2, "message": "已经执行完爬虫" } case 3: return { "traceId": self.trace_id, "code": 3, "message": "已经执行完 etl" } case 4: # 修改任务状态为处理中 update_sql = f""" UPDATE {self.article_match_video_table} SET success_status = %s WHERE success_status = %s and trace_id = %s; """ affected_rows = await self.mysql_client.async_insert( sql=update_sql, params=( self.REQUEST_PROCESSING_TASK, self.REQUEST_INIT_STATUS, self.trace_id ) ) if affected_rows == 0: return { "traceId": self.trace_id, "info": "并发任务抢占锁失败", "message": "该 trace_id 正在处理中或者已经处理完成" } card_list, new_items = await self.generate_cards(result=response) update_sql = f""" UPDATE {self.article_match_video_table} SET response = %s, success_status = %s WHERE trace_id = %s and success_status = %s; """ await self.mysql_client.async_insert( sql=update_sql, params=( json.dumps(new_items, ensure_ascii=False), self.REQUEST_SUCCESS_STATUS, self.trace_id, self.REQUEST_PROCESSING_TASK ) ) return { "traceId": self.trace_id, "miniprogramList": card_list } case 99: return { "traceId": self.trace_id, "code": 99, "error": "该任务执行失败" } case 101: return { "traceId": self.trace_id, "code": 101, "message": "该任务正在执行中" } async def deal(self): """ api process starts from here :return: """ params_error = self.check_params() if params_error: return params_error else: return await self.job()