""" @author: luojunhui """ import os import json import uuid import requests import urllib.parse from applications.functions.auto_white import auto_white from applications.functions.mysql import select, select_sensitive_words from applications.functions.ask_kimi import ask_kimi from applications.log import logging def sensitive_flag(title): """ 判断标题是否命中过滤词 :param title: :return: """ sensitive_words = select_sensitive_words() for word in sensitive_words: if word in title: # title = title.replace(word, "*") return False return True def ask_kimi_and_save_to_local(info_tuple): """ save file to local :return: """ title, trace_id, save_path = info_tuple[0], info_tuple[1], info_tuple[2] if os.path.exists(save_path): logging( code="2001", info="该 video 信息已经挖掘完成---{}".format(title), function="ask_kimi_and_save_to_local", trace_id=trace_id, ) else: os.makedirs(os.path.dirname(save_path), exist_ok=True) if not title: result = {} else: result = ask_kimi(title) logging( code="2001", info="kimi-result", data=result, trace_id=trace_id, function="ask_kimi_and_save_to_local" ) with open(save_path, "w", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False)) def create_gzh_path(video_id, shared_uid): """ :param video_id: 视频 id :param shared_uid: 分享 id """ root_share_id = str(uuid.uuid4()) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}" # 自动把 root_share_id 加入到白名单 auto_white(root_share_id) return root_share_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}" def request_for_info(video_id): """ 请求数据 :param video_id: :return: """ url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo" data = { "videoIdList": [video_id] } header = { "Content-Type": "application/json", } response = requests.post(url, headers=header, data=json.dumps(data)) return response.json() def choose_video(result): """ :param result: 计算出来的结果 :return: uid, video_id """ score1 = result['s1_score'] if score1 > 0: return result['s1_uid'], result['s1_vid'] else: return None, None def find_videos_in_mysql(trace_id): """ 通过 trace_id去 pq_spider_mysql 搜索视频 :param trace_id: :return: """ sql = "select video_id, video_title from crawler_video where out_user_id = '{}' limit 10;".format(trace_id) out_video_list = select(sql=sql) if len(out_video_list) > 0: vid_list = [i[0] for i in out_video_list if i[0] != 0] vid_list = [vid_list[-1]] dir_path = os.path.join(os.getcwd(), 'applications', 'static', "out_videos") os.makedirs(os.path.dirname(dir_path), exist_ok=True) done_list = os.listdir(dir_path) process_list = [ ( i[1], trace_id, os.path.join(dir_path, "{}.json".format(i[0])) ) for i in out_video_list if not "{}.json".format(i[0]) in done_list ] if process_list: ask_kimi_and_save_to_local(process_list[0]) logging( code="2003", trace_id=trace_id, info="recall_search_list", function="find_videos_in_mysql", data=vid_list ) return { "search_videos": "success", "trace_id": trace_id, "video_list": vid_list } else: return { "search_videos": "failed", "trace_id": trace_id, "video_list": [] } def clean_title(strings): """ :param strings: :return: """ return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") )