Browse Source

video insight

丁云鹏 1 month ago
parent
commit
c74cef6fc6

+ 14 - 0
Dockerfile

@@ -0,0 +1,14 @@
+FROM python:3.11-slim
+
+WORKDIR /app
+
+COPY . .
+
+ENV TZ=Asia/Shanghai
+
+RUN apt update && apt --no-install-recommends install -y curl jq \
+    && apt-get clean && rm -rf /var/lib/apt/lists/* \
+    && pip install -r requirements.txt --no-cache-dir \
+    && mkdir -p /app/cache
+
+#ENTRYPOINT ["python", "/app/job.py"]

+ 1 - 0
README.md

@@ -0,0 +1 @@
+视频洞察

+ 29 - 0
docker-compose.yml

@@ -0,0 +1,29 @@
+services:
+  select:
+    build:
+      context: .
+      dockerfile: Dockerfile
+    image: content_job
+    container_name: content_worker1
+    restart: unless-stopped
+    environment:
+      - ENV=prod
+    networks:
+      - content_net
+    entrypoint: "python /app/workers/select_work.py"
+  consumption:
+    image: content_job
+    restart: unless-stopped
+    env_file:
+      - product.env
+    volumes:
+      - ./sh:/app/sh
+      - /var/run/docker.sock:/var/run/docker.sock:ro
+    networks:
+      - content_net
+    deploy:
+      replicas: 2
+    entrypoint: sh /app/start.sh
+networks:
+  content_net:
+    name: content_net

+ 5 - 0
product.env

@@ -0,0 +1,5 @@
+ENV=prod
+GEMINI_API_KEY_1=AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk
+GEMINI_API_KEY_2=AIzaSyBGPYEc9F3FoDEqwlaVBxUHsNdkxmR_sl0
+
+

+ 13 - 0
requirements.txt

@@ -0,0 +1,13 @@
+aliyun-log-python-sdk==0.9.12
+google-generativeai==0.8.3
+loguru==0.7.2
+odps==3.5.1
+opencv-python==4.10.0.84
+redis==5.1.1
+requests==2.32.3
+schedule==1.2.2
+pymysql==1.0.2
+lark-oapi==1.4.8
+orjson==3.10.13
+oss2==2.19.1
+apscheduler==3.11.0

+ 8 - 0
start.sh

@@ -0,0 +1,8 @@
+#!/bin/sh
+
+export CONTAINER_INFO="$(curl -s --unix-socket /var/run/docker.sock http://docker/containers/$HOSTNAME/json)"
+export CONTAINER_INDEX="$(echo "$CONTAINER_INFO" | jq '.Name' | sed 's/^"\(.*\)"$/\1/' | awk -F'-' '{print $NF}')"
+echo "export GEMINI_API_KEY=$(eval echo \$"GEMINI_API_KEY_${CONTAINER_INDEX}")" >> /root/.bashrc
+. /root/.bashrc
+
+python /app/workers/consumption_work.py

+ 0 - 0
utils/__init__.py


+ 60 - 0
utils/aliyun_log.py

@@ -0,0 +1,60 @@
+# -*- coding: utf-8 -*-
+"""
+公共方法,包含:生成log / 删除log
+"""
+from typing import Optional
+from loguru import logger
+
+from aliyun.log import PutLogsRequest, LogClient, LogItem
+
+
+proxies = {"http": None, "https": None}
+
+
+class AliyunLogger:
+
+    # 写入阿里云日志
+    @staticmethod
+    def logging(
+            video_id: str,
+            title: str,
+            video_url: str,
+            version: str,
+            type: str,
+            partition: str,
+            data: Optional[str] = None):
+        """
+        写入阿里云日志
+        测试库: https://sls.console.aliyun.com/lognext/project/crawler-log-dev/logsearch/crawler-log-dev
+        正式库: https://sls.console.aliyun.com/lognext/project/crawler-log-prod/logsearch/crawler-log-prod
+        """
+        accessKeyId = "LTAIWYUujJAm7CbH"
+        accessKey = "RfSjdiWwED1sGFlsjXv0DlfTnZTG1P"
+
+        project = "crawler-log-prod"
+        logstore = "temp"
+        endpoint = "cn-hangzhou.log.aliyuncs.com"
+        try:
+            contents = [
+                ("video_id", video_id),
+                ("data", data),
+            ]
+            # 创建 LogClient 实例
+            client = LogClient(endpoint, accessKeyId, accessKey)
+            log_group = []
+            log_item = LogItem()
+            log_item.set_contents(contents)
+            log_group.append(log_item)
+            # 写入日志
+            request = PutLogsRequest(
+                project=project,
+                logstore=logstore,
+                topic="",
+                source="",
+                logitems=log_group,
+                compress=False,
+            )
+
+            client.put_logs(request)
+        except Exception as e:
+            logger.info( f'[+] 日志写入失败: {e}' )

+ 52 - 0
utils/common_log.py

@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# @Time: 2023/12/26
+"""
+公共方法,包含:生成log / 删除log / 下载方法 / 删除 weixinzhishu_chlsfiles / 过滤词库 / 保存视频信息至本地 txt / 翻译 / ffmpeg
+"""
+import os
+import sys
+
+sys.path.append(os.getcwd())
+from datetime import date, timedelta
+from datetime import datetime
+from loguru import logger
+
+proxies = {"http": None, "https": None}
+
+
+class Common:
+    # 统一获取当前时间 <class 'datetime.datetime'>  2022-04-14 20:13:51.244472
+    now = datetime.now()
+    # 昨天 <class 'str'>  2022-04-13
+    yesterday = (date.today() + timedelta(days=-1)).strftime("%Y-%m-%d")
+    # 今天 <class 'datetime.date'>  2022-04-14
+    today = date.today()
+    # 明天 <class 'str'>  2022-04-15
+    tomorrow = (date.today() + timedelta(days=1)).strftime("%Y-%m-%d")
+
+    # 使用 logger 模块生成日志
+    @staticmethod
+    def logger(log_type):
+        try:
+            """
+            使用 logger 模块生成日志
+            """
+            # 日志路径
+            log_dir = f"./logs/{log_type}/"
+            log_path = os.getcwd() + os.sep + log_dir
+            if not os.path.isdir(log_path):
+                os.makedirs(log_path)
+            # 日志文件名
+            log_name = f"{log_type}-{datetime.now().date().strftime('%Y-%m-%d')}.log"
+
+            # 日志不打印到控制台
+            logger.remove(handler_id=None)
+            # 初始化日志
+            logger.add(os.path.join(log_dir, log_name), level="INFO", rotation="00:00", retention="10 days", enqueue=True)
+
+            return logger
+        except Exception as e:
+            Common.logger("aly-logger").log(f"阿里云日志上报异常{e}")
+            return None
+
+

+ 22 - 0
utils/feishu_data.py

@@ -0,0 +1,22 @@
+import time
+
+from utils.feishu_utils import Feishu
+
+
+class Material():
+
+    @classmethod
+    def feishu_list(cls):
+        for i in range(3):
+            data = Feishu.get_values_batch( "U3jrs64cxhJ40Dt4VmXcS0TKnkf", "cb7QdW" )
+            for row in data[0:]:
+                mark = row[0]
+                prompt = row[1]
+                if mark and prompt:
+                    return mark, prompt
+                else:
+                    time.sleep(5)
+
+if __name__ == '__main__':
+    mark, prompt = Material.feishu_list()
+    print(mark, prompt)

+ 411 - 0
utils/feishu_utils.py

@@ -0,0 +1,411 @@
+# -*- coding: utf-8 -*-
+"""
+飞书表配置: token 鉴权 / 增删改查 / 机器人报警
+"""
+import json
+import os
+import sys
+import requests
+import urllib3
+from loguru import logger
+
+sys.path.append(os.getcwd())
+
+proxies = {"http": None, "https": None}
+
+
+class Feishu:
+    """
+    编辑飞书云文档
+    """
+    succinct_url = "https://w42nne6hzg.feishu.cn/sheets/"
+    # 飞书路径token
+    @classmethod
+    def spreadsheettoken(cls, crawler):
+        if crawler == "summary":
+            return "KsoMsyP2ghleM9tzBfmcEEXBnXg"
+        else:
+            return crawler
+
+
+
+    # 获取飞书api token
+    @classmethod
+    def get_token(cls):
+        """
+        获取飞书api token
+        :return:
+        """
+        url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"
+        post_data = {"app_id": "cli_a13ad2afa438d00b",  # 这里账号密码是发布应用的后台账号及密码
+                     "app_secret": "4tK9LY9VbiQlY5umhE42dclBFo6t4p5O"}
+
+        try:
+            urllib3.disable_warnings()
+            response = requests.post(url=url, data=post_data, proxies=proxies, verify=False)
+            tenant_access_token = response.json()["tenant_access_token"]
+            return tenant_access_token
+        except Exception as e:
+            logger.error("获取飞书 api token 异常:{}", e)
+
+    # 获取表格元数据
+    @classmethod
+    def get_metainfo(cls, crawler):
+        """
+        获取表格元数据
+        :return:
+        """
+        try:
+            get_metainfo_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                               + cls.spreadsheettoken(crawler) + "/metainfo"
+
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "extFields": "protectedRange",  # 额外返回的字段,extFields=protectedRange时返回保护行列信息
+                "user_id_type": "open_id"  # 返回的用户id类型,可选open_id,union_id
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_metainfo_url, headers=headers, params=params, proxies=proxies, verify=False)
+            response = json.loads(r.content.decode("utf8"))
+            return response
+        except Exception as e:
+            logger.error("获取表格元数据异常:{}", e)
+
+    # 读取工作表中所有数据
+    @classmethod
+    def get_values_batch(cls, crawler, sheetid):
+        """
+        读取工作表中所有数据
+        :param crawler: 哪个爬虫
+        :param sheetid: 哪张表
+        :return: 所有数据
+        """
+        try:
+            get_values_batch_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                   + cls.spreadsheettoken(crawler) + "/values_batch_get"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "ranges": sheetid,
+                "valueRenderOption": "ToString",
+                "dateTimeRenderOption": "",
+                "user_id_type": "open_id"
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_values_batch_url, headers=headers, params=params, proxies=proxies, verify=False)
+            response = json.loads(r.content.decode("utf8"))
+            values = response["data"]["valueRanges"][0]["values"]
+            return values
+        except Exception as e:
+            logger.error("读取工作表所有数据异常:{}", e)
+
+    # 工作表,插入行或列
+    @classmethod
+    def insert_columns(cls, crawler, sheetid, majordimension, startindex, endindex):
+        """
+        工作表插入行或列
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫的云文档
+        :param sheetid:哪张工作表
+        :param majordimension:行或者列, ROWS、COLUMNS
+        :param startindex:开始位置
+        :param endindex:结束位置
+        """
+        try:
+            insert_columns_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                 + cls.spreadsheettoken(crawler) + "/insert_dimension_range"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "dimension": {
+                    "sheetId": sheetid,
+                    "majorDimension": majordimension,  # 默认 ROWS ,可选 ROWS、COLUMNS
+                    "startIndex": startindex,  # 开始的位置
+                    "endIndex": endindex  # 结束的位置
+                },
+                "inheritStyle": "AFTER"  # BEFORE 或 AFTER,不填为不继承 style
+            }
+
+            urllib3.disable_warnings()
+            r = requests.post(url=insert_columns_url, headers=headers, json=body, proxies=proxies, verify=False)
+            logger.info("插入行或列:{}", r.json()["msg"])
+        except Exception as e:
+            logger.error("插入行或列异常:{}", e)
+
+    # 写入数据
+    @classmethod
+    def update_values(cls, crawler, sheetid, ranges, values):
+        """
+        写入数据
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫的云文档
+        :param sheetid:哪张工作表
+        :param ranges:单元格范围
+        :param values:写入的具体数据,list
+        """
+        try:
+            update_values_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                + cls.spreadsheettoken(crawler) + "/values_batch_update"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "valueRanges": [
+                    {
+                        "range": sheetid + "!" + ranges,
+                        "values": values
+                    },
+                ],
+            }
+            urllib3.disable_warnings()
+            r = requests.post(url=update_values_url, headers=headers, json=body, proxies=proxies, verify=False)
+            logger.info("写入数据:{}", r.json()["msg"])
+        except Exception as e:
+            logger.error("写入数据异常:{}", e)
+
+    # 合并单元格
+    @classmethod
+    def merge_cells(cls, crawler, sheetid, ranges):
+        """
+        合并单元格
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫
+        :param sheetid:哪张工作表
+        :param ranges:需要合并的单元格范围
+        """
+        try:
+            merge_cells_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                              + cls.spreadsheettoken(crawler) + "/merge_cells"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+
+            body = {
+                "range": sheetid + "!" + ranges,
+                "mergeType": "MERGE_ROWS"
+            }
+            urllib3.disable_warnings()
+            r = requests.post(url=merge_cells_url, headers=headers, json=body, proxies=proxies, verify=False)
+            logger.info("合并单元格:{}", r.json()["msg"])
+        except Exception as e:
+            logger.error("合并单元格异常:{}", e)
+
+    # 读取单元格数据
+    @classmethod
+    def get_range_value(cls, crawler, sheetid, cell):
+        """
+        读取单元格内容
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫
+        :param sheetid: 哪张工作表
+        :param cell: 哪个单元格
+        :return: 单元格内容
+        """
+        try:
+            get_range_value_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                  + cls.spreadsheettoken(crawler) + "/values/" + sheetid + "!" + cell
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            params = {
+                "valueRenderOption": "FormattedValue",
+
+                # dateTimeRenderOption=FormattedString 计算并对时间日期按照其格式进行格式化,但不会对数字进行格式化,返回格式化后的字符串。
+                "dateTimeRenderOption": "",
+
+                # 返回的用户id类型,可选open_id,union_id
+                "user_id_type": "open_id"
+            }
+            urllib3.disable_warnings()
+            r = requests.get(url=get_range_value_url, headers=headers, params=params, proxies=proxies, verify=False)
+            # print(r.text)
+            return r.json()["data"]["valueRange"]["values"][0]
+        except Exception as e:
+            logger.error("读取单元格数据异常:{}", e)
+    # 获取表内容
+    @classmethod
+    def get_sheet_content(cls, crawler, sheet_id):
+        try:
+            sheet = Feishu.get_values_batch(crawler, sheet_id)
+            content_list = []
+            for x in sheet:
+                for y in x:
+                    if y is None:
+                        pass
+                    else:
+                        content_list.append(y)
+            return content_list
+        except Exception as e:
+            logger.error(f'get_sheet_content:{e}\n')
+
+    # 删除行或列,可选 ROWS、COLUMNS
+    @classmethod
+    def dimension_range(cls, log_type, crawler, sheetid, major_dimension, startindex, endindex):
+        """
+        删除行或列
+        :param log_type: 日志路径
+        :param crawler: 哪个爬虫
+        :param sheetid:工作表
+        :param major_dimension:默认 ROWS ,可选 ROWS、COLUMNS
+        :param startindex:开始的位置
+        :param endindex:结束的位置
+        :return:
+        """
+        try:
+            dimension_range_url = "https://open.feishu.cn/open-apis/sheets/v2/spreadsheets/" \
+                                  + cls.spreadsheettoken(crawler) + "/dimension_range"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            body = {
+                "dimension": {
+                    "sheetId": sheetid,
+                    "majorDimension": major_dimension,
+                    "startIndex": startindex,
+                    "endIndex": endindex
+                }
+            }
+            urllib3.disable_warnings()
+            r = requests.delete(url=dimension_range_url, headers=headers, json=body, proxies=proxies, verify=False)
+            logger.info("删除视频数据:{}", r.json()["msg"])
+        except Exception as e:
+            logger.error("删除视频数据异常:{}", e)
+
+    # 获取用户 ID
+    @classmethod
+    def get_userid(cls, username):
+        try:
+            url = "https://open.feishu.cn/open-apis/user/v1/batch_get_id?"
+            headers = {
+                "Authorization": "Bearer " + cls.get_token(),
+                "Content-Type": "application/json; charset=utf-8"
+            }
+            name_phone_dict = {
+                "xinxin": "15546206651",
+                "muxinyi": "13699208058",
+                "wangxueke": "13513479926",
+                "yuzhuoyi": "18624010360",
+                "luojunhui": "18801281360",
+                "fanjun": "15200827642",
+                "zhangyong": "17600025055",
+                'liukunyu': "18810931977"
+            }
+            username = name_phone_dict.get(username)
+
+            data = {"mobiles": [username]}
+            urllib3.disable_warnings()
+            r = requests.get(url=url, headers=headers, params=data, verify=False, proxies=proxies)
+            open_id = r.json()["data"]["mobile_users"][username][0]["open_id"]
+
+            return open_id
+        except Exception as e:
+            logger.error(f"get_userid异常:{e}\n")
+
+    # 飞书机器人
+    @classmethod
+    def bot(cls, log_type, crawler, text, mark_name):
+        try:
+
+            headers = {'Content-Type': 'application/json'}
+            if crawler == "机器自动改造消息通知":
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
+                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            elif crawler == "快手关键词搜索":
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=U1gySe"
+                users = "".join([f'<at id="{cls.get_userid(type)}">{name}</at>' for type, name in
+                                 zip(log_type, mark_name)])
+                # users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            else:
+                url = "https://open.feishu.cn/open-apis/bot/v2/hook/7928f182-08c1-4c4d-b2f7-82e10c93ca80"
+                sheet_url = "https://w42nne6hzg.feishu.cn/sheets/KsoMsyP2ghleM9tzBfmcEEXBnXg?sheet=bc154d"
+                users = f"<at id=" + str(cls.get_userid(log_type)) + f">{mark_name}</at>"
+            data = json.dumps({
+                "msg_type": "interactive",
+                "card": {
+                    "config": {
+                        "wide_screen_mode": True,
+                        "enable_forward": True
+                    },
+                    "elements": [{
+                        "tag": "div",
+                        "text": {
+                            "content": users + text,
+                            "tag": "lark_md"
+                        }
+                    }, {
+                        "actions": [{
+                            "tag": "button",
+                            "text": {
+                                "content": "详情,点击~~~~~",
+                                "tag": "lark_md"
+                            },
+                            "url": sheet_url,
+                            "type": "default",
+                            "value": {}
+                        }],
+                        "tag": "action"
+                    }],
+                    "header": {
+                        "title": {
+                            "content": "📣消息提醒",
+                            "tag": "plain_text"
+                        }
+                    }
+                }
+            })
+            urllib3.disable_warnings()
+            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
+            logger.info(f'触发机器人消息:{r.status_code}, {text}')
+        except Exception as e:
+            logger.error(f"bot异常:{e}\n")
+
+    # 飞书机器人-改造计划完成通知
+    @classmethod
+    def finish_bot(cls, text, url, content):
+        try:
+            headers = {'Content-Type': 'application/json'}
+            data = json.dumps({
+                "msg_type": "interactive",
+                "card": {
+                    "config": {
+                        "wide_screen_mode": True,
+                        "enable_forward": True
+                    },
+                    "elements": [{
+                        "tag": "div",
+                        "text": {
+                            "content": text,
+                            "tag": "lark_md"
+                        }
+                    }],
+                    "header": {
+                        "title": {
+                            "content": content,
+                            "tag": "plain_text"
+                        }
+                    }
+                }
+            })
+            urllib3.disable_warnings()
+            r = requests.post(url, headers=headers, data=data, verify=False, proxies=proxies)
+            logger.info(f'触发机器人消息:{r.status_code}, {text}')
+        except Exception as e:
+            logger.error(f"bot异常:{e}\n")
+
+
+if __name__ == "__main__":
+    Feishu.bot('recommend', '抖音', '测试: 抖音cookie失效,请及时更换')
+

+ 90 - 0
utils/google_ai_studio.py

@@ -0,0 +1,90 @@
+import os
+import time
+import uuid
+from typing import  Optional
+
+import google.generativeai as genai
+import orjson
+import requests
+from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
+from loguru import logger
+
+from utils.feishu_data import Material
+
+CACHE_DIR = '/app/cache/'
+# CACHE_DIR = '/Users/z/Downloads/'
+# PROXY_ADDR = 'http://localhost:1081'
+# os.environ['http_proxy'] = PROXY_ADDR
+# os.environ['https_proxy'] = PROXY_ADDR
+
+class GoogleAI(object):
+
+    @classmethod
+    def download_video(cls, video_link: str) -> Optional[str]:
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+        for _ in range(3):
+            try:
+                response = requests.get(url=video_link, timeout=60)
+                if response.status_code == 200:
+                    with open(file_path, 'wb') as f:
+                        f.write(response.content)
+                    logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}')
+                    return file_path
+            except Exception:
+                time.sleep(1)
+                continue
+        return
+
+    @classmethod
+    def run(cls, api_key, video_url):
+        video_path = None
+        try:
+            genai.configure(api_key=api_key)
+            video_path = cls.download_video(video_link=video_url)
+            if not video_path:
+                logger.error(f'[内容分析] 视频下载失败, 跳过任务')
+                os.remove(video_path)
+                logger.info(f"[内容分析] 文件已删除: {video_path}")
+                return "[异常] 视频下载失败"
+
+            video = genai.upload_file(path=video_path, mime_type='video/mp4')
+            while video.state.name == 'PROCESSING':
+                time.sleep(1)
+                video = genai.get_file(name=video.name)
+            if video.state.name != 'ACTIVE':
+                genai.delete_file(name=video.name)
+                os.remove(video_path)
+                return "[异常] 上传视频失败"
+            model = genai.GenerativeModel(
+                model_name='gemini-1.5-flash',
+                generation_config=genai.GenerationConfig(response_mime_type='application/json'),
+                safety_settings={
+                    HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_NONE,
+                },
+            )
+            # mark, prompt = Material.feishu_list()
+            response = model.generate_content(
+                contents=[
+                    video,
+                    f'{prompt}',
+                ],
+                stream=False,
+                request_options={
+                    'timeout': 600,
+                },
+            )
+            text = orjson.loads(response.text.strip())
+            genai.delete_file(name=video.name)
+            os.remove(video_path)
+            return text
+        except Exception as e:
+            logger.error(f"[内容分析] 处理异常,异常信息{e}")
+            os.remove(video_path)
+            return f"[异常] {e}"
+
+
+if __name__ == '__main__':
+    ai = GoogleAI()
+    ai.run("AIzaSyA_dXKbcW8s0fFIoo89rtPRWU34pJry7mU",
+                 "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+

+ 66 - 0
utils/mysql_db.py

@@ -0,0 +1,66 @@
+# -*- coding: utf-8 -*-
+"""
+数据库连接及操作
+"""
+import pymysql
+
+class MysqlHelper:
+    @classmethod
+    def connect_mysql(cls):
+        # 创建一个 Connection 对象,代表了一个数据库连接
+        connection = pymysql.connect(
+            host="rm-bp1jjv3jv98133plv285-vpc-rw.mysql.rds.aliyuncs.com",  # 数据库IP地址,内网地址
+            port=3306,  # 端口号
+            user="wx2016_longvideo",  # mysql用户名
+            passwd="wx2016_longvideoP@assword1234",  # mysql用户登录密码
+            db="longvideo",  # 数据库名
+            # 如果数据库里面的文本是utf8编码的,charset指定是utf8
+            charset="utf8")
+        return connection
+
+    @classmethod
+    def get_values(cls, sql, params=None):
+        try:
+            # 连接数据库
+            connect = cls.connect_mysql()
+            # 返回一个 Cursor对象
+            mysql = connect.cursor()
+
+            if params:
+                # 如果传递了 params 参数
+                mysql.execute(sql, params)
+            else:
+                # 如果没有传递 params 参数
+                mysql.execute(sql)
+            # fetchall方法返回的是一个元组,里面每个元素也是元组,代表一行记录
+            data = mysql.fetchall()
+
+            # 关闭数据库连接
+            connect.close()
+
+            # 返回查询结果,元组
+            return data
+        except Exception as e:
+            print(f"get_values异常:{e}\n")
+
+    @classmethod
+    def update_values(cls, sql):
+        # 连接数据库
+        connect = cls.connect_mysql()
+        # 返回一个 Cursor对象
+        mysql = connect.cursor()
+        try:
+            # 执行 sql 语句
+            res = mysql.execute(sql)
+            # 注意 一定要commit,否则添加数据不生效
+            connect.commit()
+            return res
+        except Exception as e:
+            # 发生错误时回滚
+            connect.rollback()
+        # 关闭数据库连接
+        connect.close()
+
+
+
+

+ 38 - 0
utils/odps_data.py

@@ -0,0 +1,38 @@
+import requests
+import json
+import datetime
+from odps import ODPS
+
+# ODPS服务配置
+ODPS_CONFIG = {
+    'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+    'ACCESSID': 'LTAIWYUujJAm7CbH',
+    'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+}
+class OdpsDataCount:
+    @classmethod
+    def get_data_count(cls, sql):
+        odps = ODPS(
+            access_id=ODPS_CONFIG['ACCESSID'],
+            secret_access_key=ODPS_CONFIG['ACCESSKEY'],
+            project=project,
+            endpoint=ODPS_CONFIG['ENDPOINT']
+        )
+        data_values = []
+        try:
+            with odps.execute_sql(sql).open_reader() as reader:
+                for row in reader:
+                    data_values.append(json.dumps( {"video_id": row[0]}, ensure_ascii=False ))
+        except Exception as e:
+            print(f"An error occurred: {e}")
+            return data_values
+        return data_values
+
+    @classmethod
+    def main(cls, sql):
+        data_count = cls.get_data_count(sql=sql)
+        print(len(data_count))
+        return data_count
+
+if __name__ == '__main__':
+    OdpsDataCount.main()

+ 197 - 0
utils/piaoquan.py

@@ -0,0 +1,197 @@
+
+import requests
+from urllib.parse import urlencode
+import json
+
+
+
+class PQ:
+
+    @classmethod
+    def install_tj_pq(cls, video_id, new_video_path, new_title, n_id, cover_path):
+        url = "https://longvideoapi.piaoquantv.com/longvideoapi/crawler/video/send?muid=999"
+        payload = {
+            'loginUid': n_id,
+            'oldVideoReRecommendVideoId': video_id,
+            'videoPath': new_video_path,
+            'coverImgPath': cover_path,
+            'appType': 999000,
+            'viewStatus': 1,
+            'versionCode': 100,
+            'fileExtensions': 'mp4',
+            'videoFromScene': 1,
+            'title': new_title,
+            'descr': "",
+            'copyType': 2
+        }
+        headers = {
+            'User-Agent': 'PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0',
+            'cookie': 'JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78; JSESSIONID=3538C8F690744960BC2B4F02B4A3B1E4',
+            'referer': 'http://appspeed.piaoquantv.com',
+            'token': '524a8bc871dbb0f4d4717895083172ab37c02d2f',
+            'accept-language': 'zh-CN,zh-Hans;q=0.9',
+            'Content-Type': 'application/x-www-form-urlencoded'
+        }
+
+        response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
+        data = response.json()
+        code = data["code"]
+        if code == 0:
+            new_video_id = data["data"]["id"]
+            print(new_video_id)
+            return new_video_id
+
+    """
+    新生成视频上传到对应账号下
+    """
+    @classmethod
+    def insert_piaoquantv(cls, new_video_path, new_title, n_id, cover_path):
+        url = "https://videopre.piaoquantv.com/longvideoapi/crawler/video/send?muid=999"
+        headers = {
+            'User-Agent': 'PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0',
+            'cookie': 'JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78',
+            'referer': 'http://appspeed.piaoquantv.com',
+            'token': '524a8bc871dbb0f4d4717895083172ab37c02d2f',
+            'accept-language': 'zh-CN,zh-Hans;q=0.9',
+            'Content-Type': 'application/x-www-form-urlencoded'
+        }
+        payload = {
+            'deviceToken': '9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408',
+            'fileExtensions': 'MP4',
+            'loginUid': n_id,
+            'networkType': 'Wi-Fi',
+            'platform': 'iOS',
+            'requestId': 'fb972cbd4f390afcfd3da1869cd7d001',
+            'sessionId': '362290597725ce1fa870d7be4f46dcc2',
+            'subSessionId': '362290597725ce1fa870d7be4f46dcc2',
+            'title': new_title,
+            'token': '524a8bc871dbb0f4d4717895083172ab37c02d2f',
+            'uid': n_id,
+            'versionCode': '486',
+            'versionName': '3.4.12',
+            'videoFromScene': '1',
+            'videoPath': new_video_path,
+            'viewStatus': '1',
+            'coverImgPath' : cover_path
+        }
+        encoded_payload = urlencode(payload)
+        response = requests.request("POST", url, headers=headers, data=encoded_payload, timeout=30)
+        data = response.json()
+        code = data["code"]
+        if code == 0:
+            new_video_id = data["data"]["id"]
+            print(new_video_id)
+            return new_video_id
+        return None
+
+    @classmethod
+    def get_pq_oss(cls,video_id):
+        try:
+            url = "https://longvideoapi.piaoquantv.com/longvideoapi/openapi/video/getBaseInfo"
+
+            payload = json.dumps({
+                "videoId": int(video_id)
+            })
+            headers = {
+                'Content-Type': 'application/json',
+                'Cookie': 'JSESSIONID=658158EABFCF6AC9B9BB0D8B61897A88'
+            }
+            for i in range(3):
+                response = requests.request("POST", url, headers=headers, data=payload, timeout=30)
+                response = response.json()
+                code = response['code']
+                if code == 0:
+                    data = response['data']
+                    video_path = data["videoPath"]
+
+                    return video_path
+            return None
+        except Exception as e:
+            return None
+
+    @classmethod
+    def video_tag(cls, pq_id: str, tag: str):
+        url = "https://admin.piaoquantv.com/manager/video/tag/addVideoTags"
+
+        payload = json.dumps({
+            "videoId": pq_id,
+            "tagNames": tag
+        })
+        headers = {
+            'Content-Type': 'application/json'
+        }
+
+        requests.request("POST", url, headers=headers, data=payload)
+
+
+    @classmethod
+    def get_pd_id(cls, video_id: str):
+        """获取封面id"""
+        url = "https://admin.piaoquantv.com/manager/video/multiTitleV2/listV2?muid=999"
+
+        payload = json.dumps({
+            "videoId": video_id,
+            "range": "4h"
+        })
+        headers = {
+            'accept': 'application/json',
+            'accept-language': 'zh-CN,zh;q=0.9',
+            'cache-control': 'no-cache',
+            'content-type': 'application/json',
+            'cookie': 'SESSION=YjVlOTI0ZWMtN2JkMy00MWIyLTk1NWItNmY5NTFlYjgxNjAy',
+            'origin': 'https://admin.piaoquantv.com',
+            'pragma': 'no-cache',
+            'priority': 'u=1, i',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"',
+            'sec-fetch-dest': 'empty',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-site': 'same-origin'
+        }
+        response = requests.request("POST", url, headers=headers, data=payload)
+        response = response.json()
+        code = response['code']
+        if code != 0:
+            return None
+        pq_id = response['content'][0]['id']
+        return pq_id
+
+
+    @classmethod
+    def update_pq_title(cls ,video_id: str, new_title: str):
+        pq_id = cls.get_pd_id(video_id)
+        if not pq_id:
+            return
+        url = "https://admin.piaoquantv.com/manager/video/multiTitleV2/update?muid=999"
+        payload = json.dumps([
+            {
+                "id": pq_id,
+                "title": new_title,
+                "videoId": video_id,
+                "distributionWeight": 1000,
+                "shareWeight": 1000
+            }
+        ])
+        headers = {
+            'accept': 'application/json',
+            'accept-language': 'zh-CN,zh;q=0.9',
+            'content-type': 'application/json',
+            'cookie': 'SESSION=YjVlOTI0ZWMtN2JkMy00MWIyLTk1NWItNmY5NTFlYjgxNjAy',
+            'origin': 'https://admin.piaoquantv.com',
+            'priority': 'u=1, i',
+            'sec-ch-ua-mobile': '?0',
+            'sec-ch-ua-platform': '"macOS"',
+            'sec-fetch-dest': 'empty',
+            'sec-fetch-mode': 'cors',
+            'sec-fetch-site': 'same-origin',
+        }
+
+        response = requests.request("POST", url, headers=headers, data=payload)
+        response = response.json()
+        code = response['code']
+        return code
+
+
+
+if __name__ == '__main__':
+    PQ.get_pq_oss(47377130)

+ 37 - 0
utils/redis.py

@@ -0,0 +1,37 @@
+import redis
+
+
+class RedisHelper(object):
+    _pool: redis.ConnectionPool = None
+    _instance = None
+
+    def __init__(self):
+        if not self._instance:
+            self._pool = self._get_pool()
+            self._instance = self
+
+    def _get_pool(self) -> redis.ConnectionPool:
+        if self._pool is None:
+            self._pool = redis.ConnectionPool(
+                host="r-bp1mb0v08fqi4hjffupd.redis.rds.aliyuncs.com",  # 外网地址
+                port=6379,
+                db=0,
+                password="Wqsd@2019",
+                max_connections=100)
+        return self._pool
+
+    def get_client(self) -> redis.Redis:
+        pool = self._get_pool()
+        client = redis.Redis(connection_pool=pool)
+        return client
+
+    def close(self):
+        if self._pool:
+            self._pool.disconnect(inuse_connections=True)
+
+def content_video_data(ret):
+    """分析失败视频重新写入redis"""
+    task = f"task:video_insight"
+    helper = RedisHelper()
+    client = helper.get_client()
+    client.rpush(task, ret)

+ 16 - 0
utils/sql_help.py

@@ -0,0 +1,16 @@
+from utils.mysql_db import MysqlHelper
+from datetime import datetime
+
+
+class sqlCollect():
+    """查询该账号是否存在"""
+    @classmethod
+    def select_ad_list(cls):
+        # 获取当前日期和时间
+        current_date = datetime.now()
+        formatted_date = current_date.strftime("%Y-%m-%d")
+        sql = """SELECT `ad_id` ,`creative_code` , `creative_title` ,`material_address` , `click_button_text` ,`creative_logo_address`,`update_time` FROM `creative`  WHERE `update_time` >=  %s"""
+        data = MysqlHelper.get_values(sql,formatted_date)
+        if data:
+            return data
+        return None

+ 0 - 0
workers/__init__.py


+ 69 - 0
workers/consumption_work.py

@@ -0,0 +1,69 @@
+import asyncio
+import json
+import os
+import sys
+import orjson
+from apscheduler.schedulers.asyncio import AsyncIOScheduler
+from apscheduler.triggers.interval import IntervalTrigger
+from loguru import logger
+sys.path.append('/app')
+
+from utils.aliyun_log import AliyunLogger
+from utils.google_ai_studio import GoogleAI
+from utils.piaoquan import PQ
+from utils.redis import RedisHelper, content_video_data
+
+
+
+
+
+class ConsumptionRecommend(object):
+    @classmethod
+    async def run(cls):
+        logger.info(f"[处理] 开始获取redis数据")
+
+        task = RedisHelper().get_client().rpop(name = 'task:video_insight')
+
+        if not task:
+            logger.info('[处理] 无待执行的扫描任务')
+            return
+        task = orjson.loads(task)
+        logger.info(f"[处理] 获取redis数据{task}")
+        video_id = task['video_id']
+        logger.info(f"[处理] 开始获取原视频OSS地址")
+        video_path = PQ.get_pq_oss(video_id)
+        if not video_path:
+            return
+        logger.info(f"[处理] 获取原视频OSS地址,视频链接:{video_path}")
+        video_url = f"http://rescdn.yishihui.com/{video_path}"
+        logger.info(f"[处理] 开始分析视频")
+        api_key = os.getenv("GEMINI_API_KEY")
+        # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
+        logger.info(f"[处理] 使用的API_KEY:{api_key}")
+        text = GoogleAI.run(api_key, video_url)
+        if "[异常]" in text:
+            content_video_data(json.dumps(task))
+        AliyunLogger.logging(str(video_id), orjson.dumps(text).decode())
+        logger.info(f"[处理] 写入日志成功")
+
+
+async def run():
+    scheduler = AsyncIOScheduler()
+    try:
+        logger.info(f"[处理] 开始启动")
+        scheduler.add_job(ConsumptionRecommend.run, trigger=IntervalTrigger(minutes=2))  # 每2分钟启动一次
+        scheduler.start()
+        await asyncio.Event().wait()
+    except KeyboardInterrupt:
+        pass
+    except Exception as e:
+        logger.error(f"[处理] 启动异常,异常信息:{e}")
+        pass
+    finally:
+        scheduler.shutdown()
+
+
+if __name__ == '__main__':
+    # asyncio.run(ConsumptionRecommend.run())
+    loop = asyncio.get_event_loop()
+    loop.run_until_complete(run())

+ 36 - 0
workers/select_work.py

@@ -0,0 +1,36 @@
+import datetime
+import sys
+import time
+
+import schedule
+from loguru import logger
+sys.path.append('/app')
+
+from utils.odps_data import OdpsDataCount
+from utils.redis import RedisHelper
+
+def requirement_insight():
+    """视频需求点洞察"""
+    try:
+        // 改成获取前一天
+        dt = (datetime.datetime.now() - datetime.timedelta(days=1))..strftime('%Y%m%d')
+        logger.info(f"视频需求点洞察")
+        redis_task = "task:video_insight"
+        sql =f'select clickobjectid as video_id from user_share_log where dt = {dt} and topic = "click" group by clickobjectidorder by count(distinct machinecode) desc limit 100'
+        data = OdpsDataCount.main(table_name, dt)
+        if not data:
+            return
+        RedisHelper().get_client().rpush(redis_task, *data)
+        logger.info(f"[R] 写入Redis 成功 共写入 {len(data)} 条")
+    except Exception as e:
+        logger.error(f"[R] 写入Redis写入失败,失败信息{e}")
+
+def schedule_tasks():
+    schedule.every().day.at("01:00").do(requirement_insight)
+
+
+if __name__ == "__main__":
+    schedule_tasks()  # 调用任务调度函数
+    while True:
+        schedule.run_pending()
+        time.sleep(1)  # 每秒钟检查一次