""" @author: luojunhui """ import os import json import uuid import requests import pymysql import urllib.parse from openai import OpenAI from applications.functions.log import logging class Functions(object): """ 通用工具代码 """ # 敏感词逻辑 @classmethod def sensitive_flag(cls, title): """ 判断标题是否命中过滤词 :param title: :return: """ sensitive_words = MySQLServer().select_sensitive_words() for word in sensitive_words: if word in title: # title = title.replace(word, "*") return False return True # 自动加入白名单逻辑 @classmethod def auto_white(cls, root_share_id): """ 自动加入白名单, 保证公众号百分百出广告 :param root_share_id: :return: """ def get_cookie(): """ 获取 cookie :return: """ url = "https://admin.piaoquantv.com/manager/login?account=luojunhui&passWd=e10adc3949ba59abbe56e057f20f883e&muid=7" payload = {} headers = { 'accept': 'application/json, text/plain, */*', 'accept-language': 'en', 'priority': 'u=1, i', 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"macOS"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36' } response = requests.request("GET", url, headers=headers, data=payload) return response.cookies.values()[0] url = "https://admin.piaoquantv.com/manager/ad/own/white/rootShare/save" dd = { "rootShareId": root_share_id, "commit": "算法自动加入白名单--" } payload = json.dumps(dd) cookie = get_cookie() headers = { 'accept': 'application/json', 'accept-language': 'en', 'content-type': 'application/json;', 'cookie': "SESSION=" + cookie, 'origin': 'https://admin.piaoquantv.com', 'priority': 'u=1, i', 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36' } response = requests.request("POST", url, headers=headers, data=payload) return response.json()['content'] # 创建公众号分享卡片 @classmethod def create_gzh_path(cls, video_id, shared_uid): """ :param video_id: 视频 id :param shared_uid: 分享 id """ root_share_id = str(uuid.uuid4()) url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}" # 自动把 root_share_id 加入到白名单 cls.auto_white(root_share_id) return root_share_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}" # 从票圈请求视频 @classmethod def request_for_info(cls, video_id): """ 请求数据 :param video_id: :return: """ url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo" data = { "videoIdList": [video_id] } header = { "Content-Type": "application/json", } response = requests.post(url, headers=header, data=json.dumps(data)) return response.json() # 清理标题 @classmethod def clean_title(cls, strings): """ :param strings: :return: """ return ( strings.strip() .replace("\n", "") .replace("/", "") .replace("\r", "") .replace("#", "") .replace(".", "。") .replace("\\", "") .replace("&NBSP", "") .replace(":", "") .replace("*", "") .replace("?", "") .replace("?", "") .replace('"', "") .replace("<", "") .replace(">", "") .replace("|", "") .replace(" ", "") .replace('"', "") .replace("'", "") ) class MySQLServer(object): """ MySql 服务 """ @classmethod def select_download_videos(cls, trace_id): """ 查询 :param trace_id: :return: """ sql = "select video_id, video_title from crawler_video where out_user_id = '{}' limit 5;".format(trace_id) connection = pymysql.connect( host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 port=3306, # 端口号 user="crawler", # mysql用户名 passwd="crawler123456@", # mysql用户登录密码 db="piaoquan-crawler", # 数据库名 charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8 ) cursor = connection.cursor() cursor.execute(sql) out_video_list = cursor.fetchall() if len(out_video_list) > 0: vid_list = [i[0] for i in out_video_list if i[0] != 0] vid_list = [vid_list[0]] # dir_path = os.path.join(os.getcwd(), 'applications', 'static', "out_videos") # os.makedirs(os.path.dirname(dir_path), exist_ok=True) # done_list = os.listdir(dir_path) # process_list = [ # ( # i[1], # trace_id, # os.path.join(dir_path, "{}.json".format(i[0])) # ) for i in out_video_list if not "{}.json".format(i[0]) in done_list # ] # if process_list: # ask_kimi_and_save_to_local(process_list[0]) logging( code="2003", trace_id=trace_id, info="recall_search_list", function="find_videos_in_mysql", data=vid_list ) return { "search_videos": "success", "trace_id": trace_id, "video_list": vid_list } else: return { "search_videos": "failed", "trace_id": trace_id, "video_list": [] } @classmethod def select_pq_videos(cls): """ 查询 :return: info_list """ connection = pymysql.connect( host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 port=3306, # 端口号 user="wx2016_longvideo", # mysql用户名 passwd="wx2016_longvideoP@assword1234", # mysql用户登录密码 db="incentive", # 数据库名 charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8 ) sql = "select video_id, key_words, search_keys, extra_keys from video_content" cursor = connection.cursor() cursor.execute(sql) data = cursor.fetchall() result = [ { "video_id": line[0], "key_words": json.loads(line[1]), "search_keys": json.loads(line[2]), "extra_keys": json.loads(line[3]), } for line in data ] return result # 敏感词 @classmethod def select_sensitive_words(cls): """ sensitive words :return: """ connection = pymysql.connect( host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址 port=3306, # 端口号 user="wx2016_longvideo", # mysql用户名 passwd="wx2016_longvideoP@assword1234", # mysql用户登录密码 db="longvideo", # 数据库名 charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8 ) sql = "select `keyword` from wx_sensitive_word where `data_status` = 0" cursor = connection.cursor() cursor.execute(sql) data = cursor.fetchall() result = [line[0] for line in data] return result class KimiServer(object): """ Kimi Server """ @classmethod def ask_kimi(cls, question): """ Ask Kimi for information :param question: tiny text :return: "{}" """ single_title_prompt = """ 我会给你一个视频标题,需要你帮我用你所学的知识来帮我分析出以下信息,信息我都写到 json 里面了 { "key_words": [], # 返回三个关键词 "search_keys": [], # 标题可能的搜索关键词,返回 3 个 "extra_keys": [], # 关心这个视频的用户还会关心哪些关键词, 返回 3 个 "theme": 标题的主题, 用一个词概括 } 只需要返回一个 json,key 和上面的一样, 我给你的标题是: """ client = OpenAI( api_key='sk-tz1VaKqksTzk0F8HxlU4YVGwj7oa1g0c0puGNUZrdn9MDtzm', base_url="https://api.moonshot.cn/v1" ) chat_completion = client.chat.completions.create( messages=[ { "role": "user", "content": single_title_prompt + question, } ], model="moonshot-v1-8k", ) response = chat_completion.choices[0].message.content.replace('```json', '').replace('```', '') try: response = json.loads(response) return response except: return {} @classmethod def ask_kimi_and_save_to_local(cls, info_tuple): """ save file to local :return: """ title, trace_id, save_path = info_tuple[0], info_tuple[1], info_tuple[2] if os.path.exists(save_path): logging( code="2001", info="该 video 信息已经挖掘完成---{}".format(title), function="ask_kimi_and_save_to_local", trace_id=trace_id, ) else: os.makedirs(os.path.dirname(save_path), exist_ok=True) if not title: result = {} else: result = cls.ask_kimi(title) logging( code="2001", info="kimi-result", data=result, trace_id=trace_id, function="ask_kimi_and_save_to_local" ) with open(save_path, "w", encoding="utf-8") as f: f.write(json.dumps(result, ensure_ascii=False))