瀏覽代碼

新版本视频分析和trigger产出

jihuaqiang 1 月之前
父節點
當前提交
1dbf76c165
共有 4 個文件被更改,包括 398 次插入11 次删除
  1. 133 0
      utils/coze_hook.py
  2. 181 0
      utils/google_ai_analyze.py
  3. 73 0
      utils/google_ai_prompt.py
  4. 11 11
      workers/video_insight_consumption_work.py

+ 133 - 0
utils/coze_hook.py

@@ -0,0 +1,133 @@
+import requests
+import json
+
+class CozeHook(object):
+    def __init__(self):
+        self.url = "https://api.coze.cn/v1/workflow/run"
+        self.headers = {
+            "Content-Type": "application/json",
+            "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
+        }
+        self.hook_id = "7507245138873450535"
+    def call_coze_api(self, summary, timeline):
+        url = self.url
+        headers = self.headers
+        payload = {
+            "workflow_id": self.hook_id,
+            "parameters": {
+                "summary": summary,
+                "timeline": timeline
+            }
+        }
+        response = requests.post(url, json=payload, headers=headers, timeout=600)
+        response.raise_for_status()
+        return response.json()
+
+    def extract_fields_from_response(self, resp):
+        import re
+        
+        # Define patterns at the function level
+        JSON_PATTERNS = [
+            r"```json\\n(.*?)```",  # 转义的换行
+            r"```json\n(.*?)```",   # 普通换行
+            r"```(.*?)```",         # 无语言标记
+            r"\{.*\}"               # 直接JSON对象
+        ]
+        
+        def try_unescape_json_string(s):
+            # 递归反序列化所有层级的转义JSON字符串
+            for _ in range(3):  # 最多尝试3层
+                if isinstance(s, str):
+                    try:
+                        s2 = json.loads(s)
+                        # 如果反序列化后类型有变化,继续递归
+                        if type(s2) != str:
+                            s = s2
+                        else:
+                            break
+                    except Exception as e:
+                        print(f"JSON反序列化失败: {str(e)}")
+                        break
+                else:
+                    break
+            return s
+
+        def extract_json_from_string(s):
+            """Helper function to extract and parse JSON from a string"""
+            if not isinstance(s, str):
+                return s
+                
+            # First try direct JSON parsing
+            try:
+                return json.loads(s)
+            except json.JSONDecodeError:
+                pass
+                
+            # Then try each pattern
+            for pattern in JSON_PATTERNS:
+                json_str = re.search(pattern, s, re.DOTALL)
+                if json_str:
+                    try:
+                        content = json_str.group(1)
+                        return json.loads(content)
+                    except Exception as e:
+                        print(f"使用模式 {pattern} 解析失败: {str(e)}")
+                        continue
+            return s
+
+        try:
+            data = resp.get("data")
+            if not data:
+                print("响应中没有data字段")
+                return ("", "", "")
+                
+            
+            # First parse the outer JSON structure
+            try:
+                data = json.loads(data)
+            except json.JSONDecodeError as e:
+                print(f"解析外层data失败: {str(e)}")
+                return ("", "", "")
+                
+            # Then handle the output field
+            output = data.get("output")
+            if not output:
+                print("data中没有output字段")
+                return ("", "", "")
+                
+            print(f"\n原始output字段: {output}")
+            output = extract_json_from_string(output)
+            
+            if isinstance(output, str):
+                print("output解析后仍为字符串")
+                return ("", "", "")
+                
+
+            if isinstance(output, dict):
+                # 按优先级检查不同的字段名
+                if "需求列表" in output:
+                    demand_list = output["需求列表"]
+                elif "questions" in output:
+                    demand_list = output["questions"]
+                elif "interactive_questions" in output:
+                    demand_list = output["interactive_questions"]
+                else:
+                    print("output中没有找到需求列表、questions或interactive_questions字段")
+                    return []
+            else:
+                demand_list = output
+
+            if not demand_list or not isinstance(demand_list, list):
+                print(f"需求列表无效: {demand_list}")
+
+            return demand_list
+
+        except Exception as e:
+            print(f"解析返回数据出错: {str(e)}")
+            print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
+        return []
+    
+    def run(self, summary, timeline):
+        resp = self.call_coze_api(summary, timeline)
+        list = self.extract_fields_from_response(resp)
+        return list

+ 181 - 0
utils/google_ai_analyze.py

@@ -0,0 +1,181 @@
+import os
+import time
+import uuid
+from typing import  Optional
+
+import google.generativeai as genai
+import orjson
+import requests
+from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
+from loguru import logger
+from utils.coze_hook import CozeHook
+
+from utils.google_ai_prompt import VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT
+# from utils.feishu_data import Material
+
+CACHE_DIR = os.path.join(os.getcwd(), 'video_cache')
+
+# CACHE_DIR = '/Users/z/Downloads/'
+# PROXY_ADDR = 'http://localhost:1081'
+# os.environ['http_proxy'] = PROXY_ADDR
+# os.environ['https_proxy'] = PROXY_ADDR
+def load_prompts():
+    """从prompt.py加载Prompt"""
+    try:
+        print("\n[初始化] 从prompt.py加载Prompt")
+        
+        prompts = [
+            {
+                "name": "视频选题与要点理解",
+                "content": VIDEO_TOPIC_ANALYSIS_PROMPT
+            },
+            {
+                "name": "视频分段与时间点分析",
+                "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
+            }
+        ]
+            
+        print(f"[成功] 加载 {len(prompts)} 个Prompt")
+        return prompts
+        
+    except Exception as e:
+        raise Exception(f"加载Prompt失败: {str(e)}")
+    
+class GoogleAI(object):
+
+    @classmethod
+    def download_video(cls, video_link: str) -> Optional[str]:
+        file_path = os.path.join(CACHE_DIR, f'{str(uuid.uuid4())}.mp4')
+        try:
+            # 确保缓存目录存在
+            try:
+                os.makedirs(CACHE_DIR, exist_ok=True)
+            except Exception as e:
+                error_info = {
+                    "error_type": type(e).__name__,
+                    "error_message": str(e),
+                    "cache_dir": CACHE_DIR,
+                    "current_dir": os.getcwd(),
+                    "dir_exists": os.path.exists(CACHE_DIR),
+                    "dir_permissions": oct(os.stat(os.path.dirname(CACHE_DIR)).st_mode)[-3:] if os.path.exists(os.path.dirname(CACHE_DIR)) else "N/A"
+                }
+                error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
+                logger.error(f'[内容分析] 创建缓存目录失败: {error_json}')
+                return None
+            
+            for _ in range(3):
+                try:
+                    response = requests.get(url=video_link, timeout=60)
+                    print(f"response content: {file_path}")
+                    if response.status_code == 200:
+                        try:
+                            with open(file_path, 'wb') as f:
+                                f.write(response.content)
+                            logger.info(f'[内容分析] 视频链接: {video_link}, 存储地址: {file_path}')
+                        except Exception as e:
+                            error_info = {
+                                "error_type": type(e).__name__,
+                                "error_message": str(e),
+                                "file_path": file_path,
+                                "content_length": len(response.content) if response.content else 0
+                            }
+                            error_json = orjson.dumps(error_info, option=orjson.OPT_INDENT_2).decode('utf-8')
+                            logger.error(f'[内容分析] 视频保存失败: {error_json}')
+                            return None
+                        return file_path
+                except Exception:
+                    time.sleep(1)
+                    continue
+        except Exception:
+            logger.error(f'[内容分析] 创建缓存目录失败')
+            return None
+      
+    def _analyze_content(self, video, prompt):
+        """增强版内容分析"""
+        model = genai.GenerativeModel(
+            model_name='gemini-2.0-flash',
+            generation_config=genai.GenerationConfig(
+                response_mime_type='application/json',
+                temperature=0.3,
+                max_output_tokens=20480
+            ),
+            safety_settings={
+                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+            }
+        )
+        
+        try:
+            response = model.generate_content(
+                contents=[video, prompt],
+                request_options={'timeout': 300}
+            )
+            
+            if hasattr(response, '_error') and response._error:
+                raise Exception(f"生成错误: {response._error}")
+                
+            result = orjson.loads(response.text.strip())
+            print(f"[视频分析] 响应: {result}")
+            if not isinstance(result, dict):
+                raise ValueError("响应格式错误:非字典结构")
+                
+            return result
+        except orjson.JSONDecodeError:
+            raise Exception("响应解析失败,非JSON格式")
+        except Exception as e:
+            raise Exception(f"分析失败: {str(e)}")
+
+    @classmethod
+    def run(cls, api_key, video_url):
+        print(f"api_key:{api_key},video_url:{video_url}")
+        video_path = None
+        try:
+            genai.configure(api_key=api_key)
+            video_path = cls.download_video(video_link=video_url)
+            if not video_path:
+                logger.error(f'[内容分析] 视频下载失败, 跳过任务')
+                os.remove(video_path)
+                logger.info(f"[内容分析] 文件已删除: {video_path}")
+                return "[异常] 视频下载失败",""
+
+            video = genai.upload_file(path=video_path, mime_type='video/mp4')
+            while video.state.name == 'PROCESSING':
+                time.sleep(1)
+                video = genai.get_file(name=video.name)
+            if video.state.name != 'ACTIVE':
+                genai.delete_file(name=video.name)
+                os.remove(video_path)
+                return "[异常] 上传视频失败", ""
+            
+            prompts = load_prompts()
+            analysis_data = {}
+            for prompt in prompts[:3]:
+                print(f"[分析] 正在执行: {prompt['name']}")
+                try:
+                    result = cls._analyze_content(video, prompt['content'])
+                    analysis_data[prompt['name']] = result
+                except Exception as e:
+                    analysis_data[prompt['name']] = {
+                        "error": str(e),
+                        "error_type": type(e).__name__
+                    }
+            print(f"[分析] 所有分析完成, 结果: {analysis_data}")
+
+            coze_hook = CozeHook()
+            demand_list = coze_hook.run(analysis_data["视频选题与要点理解"], analysis_data["视频分段与时间点分析"])
+            print(f"[分析] 所有分析完成, 结果: {demand_list}")
+
+            genai.delete_file(name=video.name)
+            os.remove(video_path)
+            return analysis_data, demand_list
+        except Exception as e:
+            logger.error(f"[内容分析] 处理异常,异常信息{e}")
+            os.remove(video_path)
+            return f"[异常] {e}",""
+
+
+if __name__ == '__main__':
+    ai = GoogleAI()
+    ai.run("AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk",
+                 "http://rescdn.yishihui.com/jq_oss/video/2025012215472528213")
+

+ 73 - 0
utils/google_ai_prompt.py

@@ -0,0 +1,73 @@
+# 视频选题与要点理解
+
+VIDEO_TOPIC_ANALYSIS_PROMPT = '''# 任务说明:
+你是一位短视频结构化分析专家。请严格按以下要求处理输入视频:
+
+# 分析规范:
+1. 使用标准JSON格式输出,遵循以下规则:
+   - 所有键名使用英文双引号包裹
+   - 字符串值使用中文双引号""
+   - 禁止换行符、Markdown符号
+   - 数值类目用字符串表示
+
+2. 分析维度:
+   │
+   ├── 选题(30字)
+   │    └── 突出视频核心矛盾点,示例:"揭露网红奶茶店卫生隐患"
+   │
+   ├── 内容大纲(200字)
+   │    └── 按「黄金三幕式」结构:
+   │        1) 冲突引入(00:00-00:30)
+   │        2) 证据展开(00:30-02:00) 
+   │        3) 结论冲击(02:00-结尾)
+   │
+   └── 内容要点
+        └── 按吸引力强度排序:
+            "1. 现场实拍过期原料特写镜头(00:01:45)
+             2. 员工偷拍工作流程(00:03:20)
+             3. 专家访谈数据对比(00:04:10)"
+
+# 输出示例:
+{
+  "选题": "揭秘网红零食代工黑幕",
+  "内容大纲": "视频首先展示代工厂合规车间环境(00:00:15-00:01:30),随后突袭检查发现原料过期问题(00:01:45-00:03:20),最后通过员工采访揭露生产日期篡改流程(00:04:10-00:05:50)",
+  "内容要点": [
+    "1. 车间环境与原料仓库的视觉反差",
+    "2. 特写镜头展示虫蛀原料袋(00:02:15)",
+    "3. 隐蔽摄像头拍摄的灌装过程(00:04:30)"
+  ]
+}
+
+请现在开始分析:'''
+
+# timeline
+VIDEO_SEGMENT_ANALYSIS_PROMPT = '''# 任务说明:你是一位短视频分析专家。请根据输入视频内容,从整体结构出发,分析视频的分段逻辑与关键时间点。
+
+# 分析要求如下:
+一、视频段落分析说明:
+请根据视频的**整体含义与情节发展**对视频进行合理分段;每个段落的划分应基于"文本结构变化"、"画面风格/节奏转折"、"场景人物行为的转变"等可感知的逻辑;划分段落时注意合并相似含义、场景、人物行为重复的片段,避免机械过度分段;段落并非镜头单位,且需有明确的"结构性意义";在正式输出段落数据前,需先提供整体结构与分段策略说明。
+
+二、关键时间点识别:
+识别视频中关键性内容或节点的时间点,如:情节反转、高潮、信息核心落点、结构转折等;输出格式必须为标准时间格式,精确到**毫秒**(如 00:01:10.234)。
+
+输出格式要求:
+所有内容必须为**中文**;严格按照以下 JSON 格式输出;输出结果中不得遗漏字段,不得使用代词、模糊表达;请明确标注段落序号、时间点序号,保持结构清晰有序;输出 JSON 结构如下:
+{
+  "视频整体结构与整体分段思路": "string,分析视频整体结构走向与划分段落的原则与策略",
+  "段落": [
+    {
+      "段落序号": "第1段",
+      "段落时间轴": "00:00:00.000 - 00:00:30.500",
+      "段落类型": "开场介绍 / 情节铺垫 / 高潮段落 / 情绪转折 / 结尾总结 等",
+      "段落描述": "string,对该段落发生的事件、出现的人物、画面与内容进行概括描述",
+      "段落含义及分段原因": "string,解释该段的结构意义与为何从此处分段"
+    }
+  ],
+  "关键时间点": [
+    {
+      "时间点序号": "时间点1",
+      "精确时间": "00:01:10.234",
+      "时间点描述": "string,描述该时间点对应的事件或结构意义"
+    }
+  ]
+}'''

+ 11 - 11
workers/video_insight_consumption_work.py

@@ -11,7 +11,7 @@ logger.add("/app/logs/consumption.log", rotation="10 MB")
 sys.path.append('/app')
 
 from utils.aliyun_log import AliyunLogger
-from utils.google_ai_studio import GoogleAI
+from utils.google_ai_analyze import GoogleAI
 from utils.piaoquan import PQ
 from utils.redis import RedisHelper, content_video_data
 from utils.mysql_db import MysqlHelper
@@ -56,16 +56,16 @@ class ConsumptionRecommend(object):
         api_key = os.getenv("VIDEO_INSIGHT_GEMINI_API_KEY")
         # api_key = 'AIzaSyBFLCKMLX-Pf1iXoC2e_rMDLbNhNG23vTk'
         logger.info(f"[处理] 使用的API_KEY:{api_key}")
-        text,text1 = GoogleAI.run(api_key, video_url)
-        if "[异常]" in text:
+        analysis_data, demand_list = GoogleAI.run(api_key, video_url)
+        if "[异常]" in analysis_data:
             content_video_data(json.dumps(task))
         # Parse JSON data
-        data = json.loads(orjson.dumps(text).decode())
+        data = json.loads(orjson.dumps(demand_list).decode())
         # Generate SQL insert statement
         sql = """
         INSERT INTO video_demand_analysis (
             video_id, video_link, video_title, content_type,
-            demand_order, demand_score, user_demand, demand_category,
+            demand_order, demand_score, user_demand, demand_category, demand_time,
             demand_reason, product_hook, hook_time, hook_desc,
             hook_type, landing_desc, landing_type, platform_case, reasoning_process,audit_status,audit_desc
         ) VALUES
@@ -86,7 +86,7 @@ class ConsumptionRecommend(object):
 
             value = f"""(
                 {video_id}, '{link}', '{video_title}', NULL,
-                '{entry.get('需求排序序号', '')}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}',
+                '{entry.get('需求排序序号', '')}', '{entry.get('需求强烈程度分值', '')}', '{entry.get('用户具体的需求描述', '')}', '{entry.get('需求分类', '')}', '{entry.get('需求钩子出现时间', '')}',
                 '{entry.get('推测出该点需求的原因', '')}', '{entry.get('需求详细query', '')}', '', '{entry.get('需求钩子话术', '')}',
                 '', '{entry.get('落地方案形态描述', '')}', '{entry.get('落地方案类型', '')}', '', '','{audit_status}','{audit_desc}'
             )"""
@@ -97,15 +97,15 @@ class ConsumptionRecommend(object):
         logger.info(f"{sql}")
         MysqlHelper.update_values(sql)
 
-        logger.info(f"[处理] text写入数据库成功")
+        logger.info(f"[处理] 需求列表写入数据库成功")
 
 
         # Parse JSON data
-        data = json.loads(orjson.dumps(text1).decode())
+        data = json.loads(orjson.dumps(analysis_data).decode())
         # Generate SQL insert statement
         sql = """
         INSERT INTO video_demand_score (
-            video_id, video_link, video_title, demand_score,reason
+            video_id, video_link, video_title, analysis_summary, analysis_timeline
         ) VALUES
         """
         # Add values for each entry
@@ -113,7 +113,7 @@ class ConsumptionRecommend(object):
         link = f"""https://admin.piaoquantv.com/cms/post-detail/{video_id}/detail"""
         entry = data
         value = f"""(
-            {video_id}, '{link}', '{video_title}', '{entry.get('需求强烈程度', )}', '{entry.get('理由', '')}'
+            {video_id}, '{link}', '{video_title}', '{entry.get('视频选题与要点理解', )}', '{entry.get('视频分段与时间点分析', '')}'
         )"""
         values.append(value)
         # Combine SQL statement and values
@@ -122,7 +122,7 @@ class ConsumptionRecommend(object):
         logger.info(f"{sql}")
         MysqlHelper.update_values(sql)
 
-        logger.info(f"[处理] text1写入数据库成功")
+        logger.info(f"[处理] 视频分析结果写入数据库成功")
 
 
 async def run():