""" @author: luojunhui 对请求进行操作 """ import os from applications.log import logging from applications.functions.ask_kimi import ask_kimi from applications.match_alg import best_choice from applications.functions.common import * class ProcessParams(object): """ Params Analysis """ def __init__(self, t_id): self.trace_id = t_id def get_params(self, data): """ "accountName": "公众号名称", "content": "文章正文", "title": "文章标题", "cover": "封面链接" :param data: :return: title """ logging( code="1002", info="处理请求参数", function="get_params", trace_id=self.trace_id, data=data ) return data def ask_kimi_and_save_to_local(self, title): """ save file to local :param title: :return: """ save_path = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title)) if os.path.exists(save_path): logging( code="1002", info="该 video 信息已经挖掘完成---{}".format(title), function="ask_kimi_and_save_to_local", trace_id=self.trace_id, ) return else: os.makedirs(os.path.dirname(save_path), exist_ok=True) if not title: result = {} else: result = ask_kimi(title) logging( code="1002", info="kimi-result", data=result, trace_id=self.trace_id, function="ask_kimi_and_save_to_local" ) with open(save_path, "w", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False)) def deal(self, data): """执行代码""" params = self.get_params(data) title = params['title'] # account_name = params['accountName'] # ghId = params['ghId'] title_p = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title)) if os.path.exists(title_p): logging( code="1002", info="该标题已经被 kimi 处理过,跳过请求 kimi 操作--- {}".format(title), function="process", trace_id=self.trace_id ) else: self.ask_kimi_and_save_to_local(title) with open(title_p, encoding="utf-8") as f: params_obj = json.loads(f.read()) best_video_id = best_choice( params_obj=params_obj, trace_id=self.trace_id, request_param=params ) logging( code="1002", info="best video_id --{}".format(best_video_id), function="process", trace_id=self.trace_id ) if best_video_id: print(best_video_id) response = request_for_info(best_video_id) productionCover = response['data'][0]['shareImgPath'] productionName = response["data"][0]['title'] videoUrl = response['data'][0]['videoPath'] user_id = response['data'][0]['user']['uid'] programAvatar = "/static/logo.png" programId = "wx69c36def517d687a" programName = "票圈最惊奇" source = "Web" root_share_id, productionPath = create_gzh_path(video_id=best_video_id, shared_uid=user_id) logging( code="1002", info="root_share_id --{}, productionPath -- {}".format(root_share_id, productionPath), function="process", trace_id=self.trace_id ) result = { "productionCover": productionCover, "productionName": productionName, "programAvatar": programAvatar, "programId": programId, "programName": programName, "source": source, "rootShareId": root_share_id, "productionPath": productionPath, "videoUrl": videoUrl } logging( code="2000", info="统计 root_share_id && video_id", function="process", trace_id=self.trace_id, data={ "rootShareId": root_share_id, "videoId": best_video_id } ) else: result = { "productionCover": None, "productionName": None, "programAvatar": None, "programId": None, "programName": None, "source": None, "rootShareId": None, "productionPath": None, "videoUrl": None } logging( code="1002", info="返回结果", function="process", data=result, trace_id=self.trace_id ) return result