|
- """
- @author: luojunhui
- """
- import os
- import json
- import uuid
- import requests
- import pymysql
- import urllib.parse
- from openai import OpenAI
- from applications.functions.log import logging
- class Functions(object):
- """
- 通用工具代码
- """
- # 敏感词逻辑
- @classmethod
- def sensitive_flag(cls, title):
- """
- 判断标题是否命中过滤词
- :param title:
- :return:
- """
- sensitive_words = MySQLServer().select_sensitive_words()
- for word in sensitive_words:
- if word in title:
- # title = title.replace(word, "*")
- return False
- return True
- # 自动加入白名单逻辑
- @classmethod
- def auto_white(cls, root_share_id):
- """
- 自动加入白名单, 保证公众号百分百出广告
- :param root_share_id:
- :return:
- """
- def get_cookie():
- """
- 获取 cookie
- :return:
- """
- url = "https://admin.piaoquantv.com/manager/login?account=luojunhui&passWd=e10adc3949ba59abbe56e057f20f883e&muid=7"
- payload = {}
- headers = {
- 'accept': 'application/json, text/plain, */*',
- 'accept-language': 'en',
- 'priority': 'u=1, i',
- 'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
- 'sec-ch-ua-mobile': '?0',
- 'sec-ch-ua-platform': '"macOS"',
- 'sec-fetch-dest': 'empty',
- 'sec-fetch-mode': 'cors',
- 'sec-fetch-site': 'same-origin',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
- }
- response = requests.request("GET", url, headers=headers, data=payload)
- return response.cookies.values()[0]
- url = "https://admin.piaoquantv.com/manager/ad/own/white/rootShare/save"
- dd = {
- "rootShareId": root_share_id,
- "commit": "算法自动加入白名单--"
- }
- payload = json.dumps(dd)
- cookie = get_cookie()
- headers = {
- 'accept': 'application/json',
- 'accept-language': 'en',
- 'content-type': 'application/json;',
- 'cookie': "SESSION=" + cookie,
- 'origin': 'https://admin.piaoquantv.com',
- 'priority': 'u=1, i',
- 'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36'
- }
- response = requests.request("POST", url, headers=headers, data=payload)
- return response.json()['content']
- # 创建公众号分享卡片
- @classmethod
- def create_gzh_path(cls, video_id, shared_uid):
- """
- :param video_id: 视频 id
- :param shared_uid: 分享 id
- """
- root_share_id = str(uuid.uuid4())
- url = f"pages/user-videos?id={video_id}&su={shared_uid}&fromGzh=1&rootShareId={root_share_id}&shareId={root_share_id}"
- # 自动把 root_share_id 加入到白名单
- cls.auto_white(root_share_id)
- return root_share_id, f"pages/category?jumpPage={urllib.parse.quote(url, safe='')}"
- # 从票圈请求视频
- @classmethod
- def request_for_info(cls, video_id):
- """
- 请求数据
- :param video_id:
- :return:
- """
- url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/batchSelectVideoInfo"
- data = {
- "videoIdList": [video_id]
- }
- header = {
- "Content-Type": "application/json",
- }
- response = requests.post(url, headers=header, data=json.dumps(data))
- return response.json()
- # 清理标题
- @classmethod
- def clean_title(cls, strings):
- """
- :param strings:
- :return:
- """
- return (
- strings.strip()
- .replace("\n", "")
- .replace("/", "")
- .replace("\r", "")
- .replace("#", "")
- .replace(".", "。")
- .replace("\\", "")
- .replace("&NBSP", "")
- .replace(":", "")
- .replace("*", "")
- .replace("?", "")
- .replace("?", "")
- .replace('"', "")
- .replace("<", "")
- .replace(">", "")
- .replace("|", "")
- .replace(" ", "")
- .replace('"', "")
- .replace("'", "")
- )
- class MySQLServer(object):
- """
- MySql 服务
- """
- @classmethod
- def select_download_videos(cls, trace_id):
- """
- 查询
- :param trace_id:
- :return:
- """
- sql = "select video_id, video_title from crawler_video where out_user_id = '{}' limit 5;".format(trace_id)
- connection = pymysql.connect(
- host="rm-bp1159bu17li9hi94.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="crawler", # mysql用户名
- passwd="crawler123456@", # mysql用户登录密码
- db="piaoquan-crawler", # 数据库名
- charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- )
- cursor = connection.cursor()
- cursor.execute(sql)
- out_video_list = cursor.fetchall()
- if len(out_video_list) > 0:
- vid_list = [i[0] for i in out_video_list if i[0] != 0]
- vid_list = [vid_list[0]]
- # dir_path = os.path.join(os.getcwd(), 'applications', 'static', "out_videos")
- # os.makedirs(os.path.dirname(dir_path), exist_ok=True)
- # done_list = os.listdir(dir_path)
- # process_list = [
- # (
- # i[1],
- # trace_id,
- # os.path.join(dir_path, "{}.json".format(i[0]))
- # ) for i in out_video_list if not "{}.json".format(i[0]) in done_list
- # ]
- # if process_list:
- # ask_kimi_and_save_to_local(process_list[0])
- logging(
- code="2003",
- trace_id=trace_id,
- info="recall_search_list",
- function="find_videos_in_mysql",
- data=vid_list
- )
- return {
- "search_videos": "success",
- "trace_id": trace_id,
- "video_list": vid_list
- }
- else:
- return {
- "search_videos": "failed",
- "trace_id": trace_id,
- "video_list": []
- }
- @classmethod
- def select_pq_videos(cls):
- """
- 查询
- :return: info_list
- """
- connection = pymysql.connect(
- host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="wx2016_longvideo", # mysql用户名
- passwd="wx2016_longvideoP@assword1234", # mysql用户登录密码
- db="incentive", # 数据库名
- charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- )
- sql = "select video_id, key_words, search_keys, extra_keys from video_content"
- cursor = connection.cursor()
- cursor.execute(sql)
- data = cursor.fetchall()
- result = [
- {
- "video_id": line[0],
- "key_words": json.loads(line[1]),
- "search_keys": json.loads(line[2]),
- "extra_keys": json.loads(line[3]),
- }
- for line in data
- ]
- return result
- # 敏感词
- @classmethod
- def select_sensitive_words(cls):
- """
- sensitive words
- :return:
- """
- connection = pymysql.connect(
- host="rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com", # 数据库IP地址,内网地址
- port=3306, # 端口号
- user="wx2016_longvideo", # mysql用户名
- passwd="wx2016_longvideoP@assword1234", # mysql用户登录密码
- db="longvideo", # 数据库名
- charset="utf8mb4" # 如果数据库里面的文本是utf8编码的,charset指定是utf8
- )
- sql = "select `keyword` from wx_sensitive_word where `data_status` = 0"
- cursor = connection.cursor()
- cursor.execute(sql)
- data = cursor.fetchall()
- result = [line[0] for line in data]
- return result
- class KimiServer(object):
- """
- Kimi Server
- """
- @classmethod
- def ask_kimi(cls, question):
- """
- Ask Kimi for information
- :param question: tiny text
- :return: "{}"
- """
- single_title_prompt = """
- 我会给你一个视频标题,需要你帮我用你所学的知识来帮我分析出以下信息,信息我都写到 json 里面了
- {
- "key_words": [], # 返回三个关键词
- "search_keys": [], # 标题可能的搜索关键词,返回 3 个
- "extra_keys": [], # 关心这个视频的用户还会关心哪些关键词, 返回 3 个
- "theme": 标题的主题, 用一个词概括
- }
- 只需要返回一个 json,key 和上面的一样,
- 我给你的标题是:
- """
- client = OpenAI(
- api_key='sk-tz1VaKqksTzk0F8HxlU4YVGwj7oa1g0c0puGNUZrdn9MDtzm',
- base_url="https://api.moonshot.cn/v1"
- )
- chat_completion = client.chat.completions.create(
- messages=[
- {
- "role": "user",
- "content": single_title_prompt + question,
- }
- ],
- model="moonshot-v1-8k",
- )
- response = chat_completion.choices[0].message.content.replace('```json', '').replace('```', '')
- try:
- response = json.loads(response)
- return response
- except:
- return {}
- @classmethod
- def ask_kimi_and_save_to_local(cls, info_tuple):
- """
- save file to local
- :return:
- """
- title, trace_id, save_path = info_tuple[0], info_tuple[1], info_tuple[2]
- if os.path.exists(save_path):
- logging(
- code="2001",
- info="该 video 信息已经挖掘完成---{}".format(title),
- function="ask_kimi_and_save_to_local",
- trace_id=trace_id,
- )
- else:
- os.makedirs(os.path.dirname(save_path), exist_ok=True)
- if not title:
- result = {}
- else:
- result = cls.ask_kimi(title)
- logging(
- code="2001",
- info="kimi-result",
- data=result,
- trace_id=trace_id,
- function="ask_kimi_and_save_to_local"
- )
- with open(save_path, "w", encoding="utf-8") as f:
- f.write(json.dumps(result, ensure_ascii=False))
|