""" @author: luojunhui 对请求进行操作 """ import os import json import uuid import requests import urllib.parse from datetime import datetime, timedelta from applications.log import logging from applications.functions.ask_kimi import ask_kimi from applications.functions.calculate import title_mix from applications.functions.auto_white import auto_white def create_gzh_path(video_id, shared_uid): """ :param video_id: 视频 id :param shared_uid: 分享 id """ root_share_id = str(uuid.uuid4()) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}" # 自动把 root_share_id 加入到白名单 auto_white(root_share_id) return root_share_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}" def request_for_info(video_id): """ 请求数据 :param video_id: :return: """ url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo" data = { "videoIdList": [video_id] } header = { "Content-Type": "application/json", } response = requests.post(url, headers=header, data=json.dumps(data)) return response.json() def choose_video(result): """ :param result: 计算出来的结果 :return: uid, video_id """ # 判断 score score1, score2 = result['s1_score'], result['s2_score'] if score1 == 0 and score2 == 0: return None, None elif score1 == 0 and score2 > 0: return result['s2_uid'], result['s2_vid'] elif score1 > 0 and score2 == 0: return result['s1_uid'], result['s1_vid'] elif score1 > 0 and score2 > 0: return result['s1_uid'], result['s1_vid'] class ProcessParams(object): """ Params Analysis """ def __init__(self, t_id): self.trace_id = t_id def get_params(self, data): """ "accountName": "公众号名称", "content": "文章正文", "title": "文章标题", "cover": "封面链接" :param data: :return: title """ logging( code="1002", info="处理请求参数", function="get_params", trace_id=self.trace_id, data=data ) return data def ask_kimi_and_save_to_local(self, title): """ save file to local :param title: :return: """ save_path = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title)) if os.path.exists(save_path): logging( code="1002", info="该 video 信息已经挖掘完成---{}".format(title), function="ask_kimi_and_save_to_local", trace_id=self.trace_id, ) return else: os.makedirs(os.path.dirname(save_path), exist_ok=True) if not title: result = {} else: result = ask_kimi(title) logging( code="1002", info="kimi-result", data=result, trace_id=self.trace_id, function="ask_kimi_and_save_to_local" ) with open(save_path, "w", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False)) def process(self, data): """执行代码""" today = datetime.today() yesterday = today - timedelta(days=1) yesterday_str = yesterday.strftime("%Y%m%d") logging( code="1002", info="昨日的时间戳是---{}".format(yesterday_str), function="process", trace_id=self.trace_id, ) params = self.get_params(data) title = params['title'] account_name = params['accountName'] title_p = os.path.join(os.getcwd(), 'applications', 'static', "titles", "{}.json".format(title)) if os.path.exists(title_p): logging( code="1002", info="该标题已经被 kimi 处理过,跳过请求 kimi 操作--- {}".format(title), function="process", trace_id=self.trace_id ) result = title_mix(title_p=title_p, dt=yesterday_str, trace_id=self.trace_id) else: self.ask_kimi_and_save_to_local(title) result = title_mix(title_p=title_p, dt=yesterday_str, trace_id=self.trace_id) uid, video_id = choose_video(result) logging( code="1002", info="best video_id --{}".format(video_id), function="process", trace_id=self.trace_id ) if video_id and uid: root_share_id, productionPath = create_gzh_path(video_id=video_id, shared_uid=uid) logging( code="1002", info="root_share_id --{}, productionPath -- {}".format(root_share_id, productionPath), function="process", trace_id=self.trace_id ) response = request_for_info(video_id) productionCover = response['data'][0]['shareImgPath'] productionName = response["data"][0]['title'] videoUrl = response['data'][0]['videoPath'] programAvatar = "/static/logo.png" programId = "wx69c36def517d687a" programName = "票圈最惊奇" source = "Web" result = { "productionCover": productionCover, "productionName": productionName, "programAvatar": programAvatar, "programId": programId, "programName": programName, "source": source, "rootShareId": root_share_id, "productionPath": productionPath, "videoUrl": videoUrl } logging( code="2000", info="统计 root_share_id && video_id", function="process", trace_id=self.trace_id, data={ "rootShareId": root_share_id, "videoId": video_id } ) else: result = { "productionCover": None, "productionName": None, "programAvatar": None, "programId": None, "programName": None, "source": None, "rootShareId": None, "productionPath": None, "videoUrl": None } logging( code="1002", info="返回结果", function="process", data=result, trace_id=self.trace_id ) return result