jihuaqiang vor 1 Monat
Commit
6c9ca81a39

BIN
.DS_Store


BIN
0517.xlsx


BIN
__pycache__/analyze_prompt.cpython-313.pyc


+ 467 - 0
analyze_prompt.py

@@ -0,0 +1,467 @@
+# -*- coding: utf-8 -*-
+import socket
+import os
+import time
+import uuid
+import threading
+import pandas as pd
+import requests
+from requests.adapters import HTTPAdapter
+import google.generativeai as genai
+import orjson
+from google.generativeai.types import HarmBlockThreshold, HarmCategory
+from pandas import ExcelWriter
+from prompt.prompt import (
+    VIDEO_TOPIC_ANALYSIS_PROMPT,
+    VIDEO_TEXT_EXTRACTION_PROMPT,
+    VIDEO_SEGMENT_ANALYSIS_PROMPT,
+    HOOK_EXTRACTION_PROMPT
+)
+
+# =================== 环境配置 ===================
+os.environ.update({
+    "GENAI_UPLOAD_CHUNK_SIZE": "5242880",
+    "GENAI_UPLOAD_TIMEOUT": "300",
+    "HTTP_PROXY": "http://127.0.0.1:7890",
+    "HTTPS_PROXY": "http://127.0.0.1:7890"
+})
+
+# =================== 网络配置 ===================
+_original_getaddrinfo = socket.getaddrinfo
+
+def _new_getaddrinfo(*args, **kwargs):
+    return [res for res in _original_getaddrinfo(*args, **kwargs) if res[0] == socket.AF_INET]
+
+socket.getaddrinfo = _new_getaddrinfo
+
+# =================== 常量配置 ===================
+CACHE_DIR = './video_cache/'
+API_KEYS = ["AIzaSyBGPYEc9F3FoDEqwlaVBxUHsNdkxmR_sl0"]
+RESULT_EXCEL = '视频分析报告.xlsx'
+PROXY_CONFIG = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"}
+
+# =================== 初始化配置 ===================
+os.makedirs(CACHE_DIR, exist_ok=True)
+
+# =================== 核心类定义 ===================
+class GoogleVideoAnalyzer:
+    def __init__(self):
+        self.current_api_key = API_KEYS[0]
+        self._stop_event = threading.Event()
+        self.session = self._create_proxied_session()
+        genai.configure(api_key=self.current_api_key, transport='rest')
+
+    def _create_proxied_session(self):
+        """创建带代理配置的会话"""
+        session = requests.Session()
+        session.proxies = PROXY_CONFIG
+        session.verify = False
+        adapter = HTTPAdapter(max_retries=3, pool_connections=30, pool_maxsize=10)
+        session.mount('https://', adapter)
+        session.mount('http/', adapter)
+        
+        # 增强超时处理
+        original_send = session.send
+        def new_send(request, **kwargs):
+            kwargs.setdefault('timeout', (10, 30))
+            return original_send(request, **kwargs)
+        session.send = new_send
+        
+        return session
+
+    def _validate_video_file(self, path: str):
+        """视频文件验证增强"""
+        if not os.path.exists(path):
+            raise FileNotFoundError(f"视频文件不存在: {path}")
+        if os.path.getsize(path) == 0:
+            raise ValueError("空文件无法上传")
+        if not path.lower().endswith('.mp4'):
+            raise ValueError("仅支持MP4格式文件")
+        if os.path.getsize(path) > 100 * 1024 * 1024:  # 100MB限制
+            raise ValueError("视频文件超过100MB限制")
+
+    def _safe_upload(self, video_path: str):
+        """安全上传实现(增强重试机制)"""
+        self._validate_video_file(video_path)
+        video = None
+        retry_count = 0
+        max_retries = 3
+        
+        while retry_count < max_retries:
+            try:
+                print(f'[上传] 开始上传 | 文件大小: {os.path.getsize(video_path)//1024}KB')
+                video = genai.upload_file(path=video_path, mime_type='video/mp4')
+                
+                while True:
+                    current_state = video.state.name
+                    print(f"[状态] {current_state} | 进度: {getattr(video, 'progress', 0)}%")
+                    if current_state == 'ACTIVE':
+                        return video
+                    elif current_state == 'FAILED':
+                        raise Exception("云端处理失败")
+                    elif self._stop_event.is_set():
+                        raise KeyboardInterrupt("用户中断上传")
+                    time.sleep(10)
+                    video = genai.get_file(name=video.name)
+                    
+            except Exception as e:
+                retry_count += 1
+                if video:
+                    genai.delete_file(name=video.name)
+                if retry_count >= max_retries:
+                    raise Exception(f"上传失败(已重试{max_retries}次): {str(e)}")
+                print(f"[重试] 上传失败,第{retry_count}次重试...")
+                time.sleep(5)
+
+    def _download_video(self, video_url: str) -> str:
+        """增强版视频下载(强制完整性校验+断点续传)"""
+        file_path = os.path.join(CACHE_DIR, f'{uuid.uuid4()}.mp4')
+        temp_file = None
+        retry_count = 0
+        max_retries = 3
+        downloaded = 0
+        
+        while retry_count < max_retries:
+            try:
+                # 获取文件总大小(带重试)
+                with self.session.head(video_url, timeout=10) as head_resp:
+                    head_resp.raise_for_status()
+                    total_size = int(head_resp.headers.get('content-length', 0))
+                    if total_size == 0:
+                        raise ValueError("服务器未返回有效文件大小")
+
+                # 支持断点续传
+                if os.path.exists(file_path):
+                    downloaded = os.path.getsize(file_path)
+                    headers = {'Range': f'bytes={downloaded}-'}
+                else:
+                    headers = {}
+
+                with self.session.get(
+                    video_url, 
+                    stream=True, 
+                    timeout=30,
+                    headers=headers
+                ) as response:
+                    response.raise_for_status()
+                    
+                    # 验证范围请求响应
+                    if downloaded > 0 and response.status_code != 206:
+                        raise ConnectionError("服务器不支持断点续传")
+
+                    mode = 'ab' if downloaded > 0 else 'wb'
+                    temp_file = open(file_path, mode)
+                    
+                    for chunk in response.iter_content(chunk_size=8192):
+                        if self._stop_event.is_set():
+                            raise KeyboardInterrupt("用户中断下载")
+                        if chunk:
+                            temp_file.write(chunk)
+                            downloaded += len(chunk)
+                            progress = downloaded/total_size*100
+                            print(f"\r[下载] 进度: {progress:.1f}% | {downloaded//1024}KB/{total_size//1024}KB", 
+                                end='', flush=True)
+
+                    # 强制完整性校验
+                    if downloaded != total_size:
+                        raise IOError(f"下载不完整({downloaded}/{total_size}字节)")
+                    
+                    return file_path
+
+            except Exception as e:
+                retry_count += 1
+                if retry_count >= max_retries:
+                    if os.path.exists(file_path):
+                        os.remove(file_path)
+                    raise Exception(f"下载失败(重试{max_retries}次): {str(e)}")
+                print(f"\n[重试] 下载中断,第{retry_count}次重试...")
+                time.sleep(5)
+                
+            finally:
+                if temp_file and not temp_file.closed:
+                    temp_file.close()
+                    print("\n[下载] 文件句柄已安全关闭")
+
+    def _analyze_content(self, video, prompt):
+        """增强版内容分析"""
+        model = genai.GenerativeModel(
+            model_name='gemini-2.0-flash',
+            generation_config=genai.GenerationConfig(
+                response_mime_type='application/json',
+                temperature=0.3,
+                max_output_tokens=20480
+            ),
+            safety_settings={
+                HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
+                HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
+            }
+        )
+        
+        try:
+            response = model.generate_content(
+                contents=[video, prompt],
+                request_options={'timeout': 300}
+            )
+            
+            if hasattr(response, '_error') and response._error:
+                raise Exception(f"生成错误: {response._error}")
+                
+            result = orjson.loads(response.text.strip())
+            print(f"[视频分析] 响应: {result}")
+            if not isinstance(result, dict):
+                raise ValueError("响应格式错误:非字典结构")
+                
+            return result
+        except orjson.JSONDecodeError:
+            raise Exception("响应解析失败,非JSON格式")
+        except Exception as e:
+            raise Exception(f"分析失败: {str(e)}")
+
+    def _generate_hooks(self, video, hook_prompt, analysis_data):
+        """钩子内容生成专用方法(完整修复版)"""
+        try:
+            # 1. 准备格式化参数(包含空值保护和类型转换)
+            format_args = {
+                "summary": str(analysis_data.get("视频选题与要点理解", {}) or "无相关内容"),
+                "detail": str(analysis_data.get("视频完整文本提取", {}) or "无相关内容"),
+                "timeline": str(analysis_data.get("视频分段与时间点分析", {}) or "无相关内容")
+            }
+
+            # 2. 打印调试信息
+            print(f"[DEBUG] 分析数据类型验证:")
+            print(f"- 选题理解类型:{type(analysis_data.get('视频选题与要点理解'))}")
+            print(f"- 文本提取类型:{type(analysis_data.get('视频完整文本提取'))}")
+            print(f"- 分段分析类型:{type(analysis_data.get('视频分段与时间点分析'))}")
+
+            # 3. 执行模板替换(关键修复点)
+            formatted_prompt = hook_prompt.format(**format_args)
+            print(f"[SUCCESS] 模板替换完成,新Prompt长度:{len(formatted_prompt)}")
+            print(f"[DEBUG] 格式化Prompt预览(前500字符):\n{formatted_prompt[:500]}...")
+
+            # 4. 模型调用(修复参数传递)
+            model = genai.GenerativeModel(
+                model_name='gemini-2.0-flash',
+                generation_config=genai.GenerationConfig(
+                    response_mime_type='application/json',
+                    temperature=0.5,
+                    max_output_tokens=4096
+                )
+            )
+            
+            # 5. 发送格式化后的prompt(关键修复点)
+            response = model.generate_content(
+                contents=[video, formatted_prompt],  # 使用格式化后的内容
+                request_options={'timeout': 600}
+            )
+            print(f"[响应原始数据] 长度:{len(response.text)}字符")
+
+            # 6. 响应预处理(解决单引号问题)
+            clean_text = response.text.replace("'", "\"")  # 替换单引号
+            clean_text = clean_text.replace("\n", "")      # 去除换行符
+            print(f"[响应清洗后] 预览:{clean_text[:200]}...")
+
+            # 7. 严格JSON验证
+            try:
+                result = orjson.loads(clean_text)
+                if not isinstance(result, list):
+                    raise ValueError("响应应为JSON数组")
+                
+                # 字段完整性验证
+                required_fields = {
+                    "需求排序序号", "需求详细query", "需求分类",
+                    "推测出该点需求的原因", "需求钩子话术", "需求钩子出现时间"
+                }
+                
+                for idx, item in enumerate(result):
+                    missing = required_fields - set(item.keys())
+                    if missing:
+                        raise ValueError(f"第{idx+1}个对象缺失字段:{missing}")
+                    if len(item["需求钩子话术"]) > 11:
+                        raise ValueError(f"第{idx+1}个话术超长:'{item['需求钩子话术']}'")
+                    
+                return result
+                
+            except orjson.JSONDecodeError as e:
+                error_msg = f"JSON解析失败:{str(e)}\n原始响应:{clean_text[:500]}"
+                raise ValueError(error_msg)
+
+        except KeyError as e:
+            print(f"!! 关键错误:模板变量 {e} 未定义,请检查Excel占位符")
+            return {"error": f"模板变量 {e} 缺失"}
+        except ValueError as e:
+            print(f"!! 数据验证失败:{str(e)}")
+            return {"error": str(e), "type": "DATA_VALIDATION"}
+        except Exception as e:
+            import traceback
+            error_detail = f"""
+            === 未捕获异常 ===
+            类型:{type(e)}
+            信息:{str(e)}
+            追踪:
+            {traceback.format_exc()}
+            """
+            print(error_detail)
+            return {"error": "未知异常"}
+
+    def cancel_operation(self):
+        """操作中止"""
+        self._stop_event.set()
+        print("[系统] 正在终止操作...")
+
+    def analyze(self, video_url: str, prompts: list):
+        """增强版分析流程"""
+        self._stop_event.clear()
+        video_path = None
+        
+        try:
+            print(f"\n[下载] 开始下载 {video_url}")
+            video_path = self._download_video(video_url)
+            
+            print("[上传] 启动云端处理")
+            video = self._safe_upload(video_path)
+            
+            analysis_data = {}
+            for prompt in prompts[:3]:
+                print(f"[分析] 正在执行: {prompt['name']}")
+                try:
+                    result = self._analyze_content(video, prompt['content'])
+                    analysis_data[prompt['name']] = result
+                except Exception as e:
+                    analysis_data[prompt['name']] = {
+                        "error": str(e),
+                        "error_type": type(e).__name__
+                    }
+            
+            hook_result = {}
+            if len(prompts) >=4:
+                hook_prompt = prompts[3]
+                print(f"[钩子生成] 正在执行: {hook_prompt['name']}")
+                try:
+                    hook_result = self._generate_hooks(video, hook_prompt['content'], analysis_data)
+                    print("钩子提取完成")
+                except Exception as e:
+                    print(e)
+                    hook_result = {
+                        "error": str(e),
+                        "error_type": type(e).__name__
+                    }
+            
+            return {
+                "基础分析": analysis_data,
+                "钩子提取": hook_result
+            }
+            
+        finally:
+            if video_path and os.path.exists(video_path):
+                os.remove(video_path)
+
+# =================== 数据处理 ===================
+def load_prompts():
+    """从prompt.py加载Prompt"""
+    try:
+        print("\n[初始化] 从prompt.py加载Prompt")
+        
+        prompts = [
+            {
+                "name": "视频选题与要点理解",
+                "content": VIDEO_TOPIC_ANALYSIS_PROMPT
+            },
+            {
+                "name": "视频完整文本提取",
+                "content": VIDEO_TEXT_EXTRACTION_PROMPT
+            },
+            {
+                "name": "视频分段与时间点分析",
+                "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
+            },
+            {
+                "name": "钩子提取",
+                "content": HOOK_EXTRACTION_PROMPT
+            }
+        ]
+            
+        print(f"[成功] 加载 {len(prompts)} 个Prompt")
+        return prompts
+        
+    except Exception as e:
+        raise Exception(f"加载Prompt失败: {str(e)}")
+
+def process_video_data():
+    """增强版数据处理"""
+    try:
+        prompts = load_prompts()
+        video_df = pd.read_excel('0517.xlsx', engine='openpyxl').iloc[18:]  # 从第19个视频开始
+        analyzer = GoogleVideoAnalyzer()
+        results = []
+        
+        import signal
+        signal.signal(signal.SIGINT, lambda s,f: analyzer.cancel_operation())
+
+        for idx, row in video_df.iterrows():
+            video_id = row['videoid']
+            video_url = f"http://visionularcdn.yishihui.com/{row['transcode_video_path'].replace('mp4/', 'mp4')}"
+            
+            record = {
+                "视频ID": video_id,
+                "播放量": row.get('播放次数', 'N/A'),
+                "视频标题": row.get('视频标题', 'N/A'),
+                "视频地址": video_url,
+                "状态": "成功"
+            }
+            
+            try:
+                print(f"\n{'='*30} 处理视频 {idx+1}/{len(video_df)} {'='*30}")
+                analysis = analyzer.analyze(video_url, prompts)
+                
+                for prompt in prompts[:3]:
+                    record[prompt['name']] = str(analysis["基础分析"].get(prompt['name'], {}))
+                
+                record["钩子提取"] = str(analysis.get("钩子提取", {}))
+                
+            except Exception as e:
+                record.update({
+                    "状态": "失败",
+                    "错误类型": type(e).__name__,
+                    "错误详情": str(e)
+                })
+                
+            finally:
+                results.append(record)
+                pd.DataFrame(results).to_excel(RESULT_EXCEL, index=False)
+
+        with ExcelWriter(RESULT_EXCEL, engine='openpyxl') as writer:
+            df_results = pd.DataFrame(results)
+            df_results.to_excel(writer, index=False)
+            
+            worksheet = writer.sheets['Sheet1']
+            for col in worksheet.columns:
+                max_len = max(len(str(cell.value)) for cell in col)
+                worksheet.column_dimensions[col[0].column_letter].width = min(max_len + 2, 50)
+                
+        print(f"\n{'='*30}\n报告已生成: {os.path.abspath(RESULT_EXCEL)}")
+
+    except Exception as e:
+        print(f"\n{'!'*30} 系统级错误 {'!'*30}\n{str(e)}")
+
+# =================== 执行入口 ===================
+if __name__ == '__main__':
+    print("=== 视频分析系统启动 ===")
+    print(f"工作目录: {os.getcwd()}")
+    
+    try:
+        test_resp = requests.get("https://www.google.com", 
+                               proxies=PROXY_CONFIG,
+                               timeout=10,
+                               verify=False)
+        print(f"[网络] 连接测试成功 ({test_resp.status_code})")
+    except Exception as e:
+        print(f"[网络] 连接测试失败: {str(e)}")
+        exit(1)
+    
+    start_time = time.time()
+    try:
+        process_video_data()
+    except KeyboardInterrupt:
+        print("\n[中断] 用户主动终止程序")
+    finally:
+        print(f"总运行时间: {time.time()-start_time:.1f}秒")

+ 273 - 0
get_trigger2.py

@@ -0,0 +1,273 @@
+# 读取视频分析报告1.xlsx,调用coze工作流,获取需求列表
+import pandas as pd
+import orjson
+import os
+from analyze_prompt import GoogleVideoAnalyzer
+from prompt.prompt import HOOK_EXTRACTION_PROMPT_V2
+import requests
+import json
+from datetime import datetime
+import re
+
+def get_first_row_and_call_coze():
+    import json
+    import requests
+    from datetime import datetime
+    
+    try:
+        # 读取Excel文件
+        print(f"[{datetime.now()}] 开始读取Excel文件...")
+        df = pd.read_excel('视频分析报告1.xlsx')
+        
+        # 获取第一行数据
+        first_row = df.iloc[0]
+        
+        # 获取第6、7、8列的数据(对应F、G、H列)
+        summary = first_row.iloc[5] if pd.notna(first_row.iloc[5]) else "{}"
+        timeline = first_row.iloc[7] if pd.notna(first_row.iloc[7]) else "{}"
+        video_url = first_row.iloc[3] if pd.notna(first_row.iloc[3]) else ""
+        
+        # 准备数据 - 确保数据格式正确
+        data = {
+            "summary": summary,
+            "timeline": timeline
+        }
+        
+        # 调用Coze工作流
+        url = "https://api.coze.cn/v1/workflow/run"
+        headers = {
+            "Content-Type": "application/json",
+            "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
+        }
+        
+        payload = {
+            "workflow_id": "7507245138873450535",
+            "parameters": data
+        }
+        
+        print(f"[{datetime.now()}] 开始调用Coze工作流...")
+        print(f"请求URL: {url}")
+        print(f"请求头: {headers}")
+        print(f"请求数据: {json.dumps(payload, ensure_ascii=False, indent=2)}")
+        
+        response = requests.post(url, json=payload, headers=headers, timeout=60)
+        
+        # 打印响应状态和头信息
+        print(f"\n响应状态码: {response.status_code}")
+        print(f"响应头: {dict(response.headers)}")
+        
+        # 尝试获取响应内容
+        try:
+            response_text = response.text
+            print(f"响应内容: {response_text}")
+            
+            # 检查响应内容是否为空
+            if not response_text.strip():
+                print("警告: 响应内容为空")
+                return None
+                
+            # 尝试解析JSON
+            try:
+                result = response.json()
+                print(f"\n[{datetime.now()}] Coze工作流返回结果:")
+                print(json.dumps(result, ensure_ascii=False, indent=2))
+                return result
+            except json.JSONDecodeError as je:
+                print(f"JSON解析错误: {str(je)}")
+                print(f"原始响应内容: {response_text}")
+                return None
+                
+        except Exception as e:
+            print(f"读取响应内容时出错: {str(e)}")
+            return None
+            
+    except pd.errors.EmptyDataError:
+        print("错误: Excel文件为空")
+    except requests.exceptions.RequestException as e:
+        print(f"请求错误: {str(e)}")
+        if hasattr(e, 'response') and e.response is not None:
+            print(f"错误响应状态码: {e.response.status_code}")
+            print(f"错误响应内容: {e.response.text}")
+    except Exception as e:
+        print(f"发生未知错误: {str(e)}")
+    return None
+
+def call_coze_api(summary, timeline):
+    url = "https://api.coze.cn/v1/workflow/run"
+    headers = {
+        "Content-Type": "application/json",
+        "Authorization": "Bearer pat_ddPm5K5tCKXU2zH1NChGHoOKGOSECyzxmXPEcrtJg52UcCIndRAfiWRRxCH95pdB"
+    }
+    payload = {
+        "workflow_id": "7507245138873450535",
+        "parameters": {
+            "summary": summary,
+            "timeline": timeline
+        }
+    }
+    response = requests.post(url, json=payload, headers=headers, timeout=600)
+    response.raise_for_status()
+    return response.json()
+
+def extract_fields_from_response(resp):
+    import re
+    
+    # Define patterns at the function level
+    JSON_PATTERNS = [
+        r"```json\\n(.*?)```",  # 转义的换行
+        r"```json\n(.*?)```",   # 普通换行
+        r"```(.*?)```",         # 无语言标记
+        r"\{.*\}"               # 直接JSON对象
+    ]
+    
+    def try_unescape_json_string(s):
+        # 递归反序列化所有层级的转义JSON字符串
+        for _ in range(3):  # 最多尝试3层
+            if isinstance(s, str):
+                try:
+                    s2 = json.loads(s)
+                    # 如果反序列化后类型有变化,继续递归
+                    if type(s2) != str:
+                        s = s2
+                    else:
+                        break
+                except Exception as e:
+                    print(f"JSON反序列化失败: {str(e)}")
+                    break
+            else:
+                break
+        return s
+
+    def extract_json_from_string(s):
+        """Helper function to extract and parse JSON from a string"""
+        if not isinstance(s, str):
+            return s
+            
+        # First try direct JSON parsing
+        try:
+            return json.loads(s)
+        except json.JSONDecodeError:
+            pass
+            
+        # Then try each pattern
+        for pattern in JSON_PATTERNS:
+            json_str = re.search(pattern, s, re.DOTALL)
+            if json_str:
+                try:
+                    content = json_str.group(1)
+                    return json.loads(content)
+                except Exception as e:
+                    print(f"使用模式 {pattern} 解析失败: {str(e)}")
+                    continue
+        return s
+
+    try:
+        data = resp.get("data")
+        if not data:
+            print("响应中没有data字段")
+            return ("", "", "")
+            
+        
+        # First parse the outer JSON structure
+        try:
+            data = json.loads(data)
+        except json.JSONDecodeError as e:
+            print(f"解析外层data失败: {str(e)}")
+            return ("", "", "")
+            
+        # Then handle the output field
+        output = data.get("output")
+        if not output:
+            print("data中没有output字段")
+            return ("", "", "")
+            
+        print(f"\n原始output字段: {output}")
+        output = extract_json_from_string(output)
+        
+        if isinstance(output, str):
+            print("output解析后仍为字符串")
+            return ("", "", "")
+            
+
+        if isinstance(output, dict):
+            if "需求列表" in output:
+                demand_list = output["需求列表"]
+            elif "questions" in output:
+                demand_list = output["questions"]
+            else:
+                print("output中没有找到需求列表或questions字段")
+                return ("", "", "")
+        else:
+            demand_list = output
+
+        if not demand_list or not isinstance(demand_list, list):
+            print(f"需求列表无效: {demand_list}")
+            return ("", "", "")
+
+        times = []
+        queries = []
+        hooks = []
+        
+        for item in demand_list:
+            if not isinstance(item, dict):
+                print(f"跳过非字典项: {item}")
+                continue
+                
+            time = item.get("需求钩子出现时间", "")
+            query = item.get("需求详细query", "")
+            hook = item.get("需求钩子话术", "")
+            
+            if not all([time, query, hook]):
+                print(f"跳过不完整项: {item}")
+                continue
+                
+            # Handle time format
+            if time == "end":
+                time = "视频结束"
+            elif "-" in time:
+                time = time.split("-")[0]  # Take the start time
+                
+            times.append(time)
+            queries.append(query)
+            hooks.append(hook)
+
+        if not times:
+            print("没有提取到有效的需求项")
+            return ("", "", "")
+
+        return ("\n".join(times), "\n".join(queries), "\n".join(hooks))
+
+    except Exception as e:
+        print(f"解析返回数据出错: {str(e)}")
+        print(f"原始响应: {json.dumps(resp, ensure_ascii=False, indent=2)}")
+    return ("", "", "")
+
+def process_excel(input_excel, output_excel=None):
+    df = pd.read_excel(input_excel)
+    for idx, row in df.iterrows():
+        summary = row.iloc[5] if pd.notna(row.iloc[5]) else "{}"
+        timeline = row.iloc[7] if pd.notna(row.iloc[7]) else "{}"
+        try:
+            print(f"开始处理第{idx+1}行")
+            resp = call_coze_api(summary, timeline)
+            time, query, hook = extract_fields_from_response(resp)
+            df.iat[idx, 9] = time    # 第8列
+            df.iat[idx, 10] = query   # 第9列
+            df.iat[idx, 11] = hook    # 第10列
+            print(f"第{idx+1}行处理完成")
+            print(hook)
+        except Exception as e:
+            print(f"第{idx+1}行处理失败: {e}")
+            df.iat[idx, 9] = "error"
+            df.iat[idx, 10] = "error"
+            df.iat[idx, 11] = "error"
+        # 每处理一行就保存一次,防止中断丢失
+        df.to_excel(input_excel, index=False)
+        print(f"已保存到第{idx+1}行")
+    df.to_excel(input_excel, index=False)
+    print("全部处理完成,已保存。")
+
+if __name__ == "__main__":
+    process_excel("视频分析报告1_拆分钩子.xlsx")  # 直接处理原文件
+
+

+ 38 - 0
parse_gen_data.py

@@ -0,0 +1,38 @@
+import pandas as pd
+import ast
+
+# 读取Excel文件
+input_file = '视频分析报告1.xlsx'
+df = pd.read_excel(input_file)
+
+# 第8列(索引7)为"钩子提取"
+def extract_fields(hook_col):
+    times, queries, hooks = [], [], []
+    try:
+        # 兼容字符串形式的列表
+        items = ast.literal_eval(hook_col) if isinstance(hook_col, str) else []
+        for item in items:
+            if isinstance(item, dict):
+                times.append(item.get('需求钩子出现时间', ''))
+                queries.append(item.get('需求详细query', ''))
+                hooks.append(item.get('需求钩子话术', ''))
+    except Exception:
+        pass
+    return '\n'.join(times), '\n'.join(queries), '\n'.join(hooks)
+
+# 新增三列
+new_times, new_queries, new_hooks = [], [], []
+for val in df.iloc[:, 8]:
+    t, q, h = extract_fields(val)
+    new_times.append(t)
+    new_queries.append(q)
+    new_hooks.append(h)
+
+df.insert(11, 'time', new_times)
+df.insert(12, 'query', new_queries)
+df.insert(13, 'hook', new_hooks)
+
+# 保存为新文件
+output_file = '视频分析报告1_拆分钩子.xlsx'
+df.to_excel(output_file, index=False)
+print(f'已保存到 {output_file}')

+ 76 - 0
process_trigger.py

@@ -0,0 +1,76 @@
+import pandas as pd
+import orjson
+import os
+
+def process_trigger_results(json_data):
+    """
+    Process the trigger results from the JSON data and write to Excel.
+    
+    Args:
+        json_data (str): JSON string containing the trigger results
+    """
+    try:
+        # Parse the JSON data
+        data = orjson.loads(json_data)
+        
+        # Extract the output array from the data
+        if isinstance(data, dict) and 'data' in data:
+            output_data = orjson.loads(data['data'])
+            if isinstance(output_data, dict) and 'output' in output_data:
+                output_array = orjson.loads(output_data['output'])
+            else:
+                raise ValueError("Invalid data structure: 'output' not found in data")
+        else:
+            raise ValueError("Invalid data structure: 'data' not found in root")
+        
+        # Create a list to store the extracted fields
+        results = []
+        
+        # Extract required fields from each item
+        for item in output_array:
+            result = {
+                '需求钩子出现时间': item.get('需求钩子出现时间', ''),
+                '需求详细query': item.get('需求详细query', ''),
+                '需求钩子话术': item.get('需求钩子话术', '')
+            }
+            results.append(result)
+        
+        # Create DataFrame
+        df = pd.DataFrame(results)
+        
+        # Write to Excel
+        output_file = 'trigger_result.xlsx'
+        df.to_excel(output_file, index=False)
+        
+        print(f"Successfully wrote {len(results)} records to {output_file}")
+        
+    except Exception as e:
+        print(f"Error processing trigger results: {str(e)}")
+        raise
+
+def process_file(input_file):
+    """
+    Process a file containing JSON data and write results to Excel.
+    
+    Args:
+        input_file (str): Path to the input file containing JSON data
+    """
+    try:
+        # Read the input file
+        with open(input_file, 'r', encoding='utf-8') as f:
+            json_data = f.read()
+        
+        # Process the data
+        process_trigger_results(json_data)
+        
+    except Exception as e:
+        print(f"Error processing file {input_file}: {str(e)}")
+        raise
+
+if __name__ == '__main__':
+    # Example usage
+    input_file = 'trigger_response.json'  # Replace with your input file path
+    if os.path.exists(input_file):
+        process_file(input_file)
+    else:
+        print(f"Input file {input_file} not found") 

BIN
prompt/__pycache__/prompt.cpython-313.pyc


+ 163 - 0
prompt/prompt.py

@@ -0,0 +1,163 @@
+# 视频选题与要点理解
+
+VIDEO_TOPIC_ANALYSIS_PROMPT = '''# 任务说明:
+你是一位短视频结构化分析专家。请严格按以下要求处理输入视频:
+
+# 分析规范:
+1. 使用标准JSON格式输出,遵循以下规则:
+   - 所有键名使用英文双引号包裹
+   - 字符串值使用中文双引号""
+   - 禁止换行符、Markdown符号
+   - 数值类目用字符串表示
+
+2. 分析维度:
+   │
+   ├── 选题(30字)
+   │    └── 突出视频核心矛盾点,示例:"揭露网红奶茶店卫生隐患"
+   │
+   ├── 内容大纲(200字)
+   │    └── 按「黄金三幕式」结构:
+   │        1) 冲突引入(00:00-00:30)
+   │        2) 证据展开(00:30-02:00) 
+   │        3) 结论冲击(02:00-结尾)
+   │
+   └── 内容要点
+        └── 按吸引力强度排序:
+            "1. 现场实拍过期原料特写镜头(00:01:45)
+             2. 员工偷拍工作流程(00:03:20)
+             3. 专家访谈数据对比(00:04:10)"
+
+# 输出示例:
+{
+  "选题": "揭秘网红零食代工黑幕",
+  "内容大纲": "视频首先展示代工厂合规车间环境(00:00:15-00:01:30),随后突袭检查发现原料过期问题(00:01:45-00:03:20),最后通过员工采访揭露生产日期篡改流程(00:04:10-00:05:50)",
+  "内容要点": [
+    "1. 车间环境与原料仓库的视觉反差",
+    "2. 特写镜头展示虫蛀原料袋(00:02:15)",
+    "3. 隐蔽摄像头拍摄的灌装过程(00:04:30)"
+  ]
+}
+
+请现在开始分析:'''
+
+VIDEO_TEXT_EXTRACTION_PROMPT = '''# 结构化文本提取指令
+你是一位专业视频文本转录员,请严格按以下要求处理:
+
+## 输入规范:
+视频文件:需分析完整音画内容
+
+## 处理规则:
+1. **完整性要求**:
+   - 提取所有中文文本(含口播/字幕/图形文字/背景对话)
+   - 保留重复内容(如广告slogan重复出现)
+   - 时间戳精确到毫秒(例:00:01:23.456)
+
+2. **格式规范**:
+   ├── 禁用Markdown符号
+   ├── 键名用英文双引号包裹
+   ├── 字符串值用中文双引号""
+   └── 时间区间格式:HH:MM:SS.msmsms
+
+## 输出示例:
+{
+  "视频文本内容": [
+    {
+      "文本序号": "文本1",
+      "时间区间": "00:00:01.230 - 00:00:03.780",
+      "文本内容": "【开场字幕】揭秘网红产品背后的真相"
+    },
+    {
+      "文本序号": "文本2",
+      "时间区间": "00:00:04.500 - 00:00:06.200",
+      "文本内容": "旁白:我们随机购买了市面销量Top3的产品"
+    }
+  ]
+}
+
+## 特殊处理:
+- 背景模糊文字标注[模糊]
+- 重叠文本用「」分隔
+- 非中文内容标注[非中文字幕]
+
+立即开始分析:'''
+
+VIDEO_SEGMENT_ANALYSIS_PROMPT = '''# 任务说明:你是一位短视频分析专家。请根据输入视频内容,从整体结构出发,分析视频的分段逻辑与关键时间点。
+
+# 分析要求如下:
+一、视频段落分析说明:
+请根据视频的**整体含义与情节发展**对视频进行合理分段;每个段落的划分应基于"文本结构变化"、"画面风格/节奏转折"、"场景人物行为的转变"等可感知的逻辑;划分段落时注意合并相似含义、场景、人物行为重复的片段,避免机械过度分段;段落并非镜头单位,且需有明确的"结构性意义";在正式输出段落数据前,需先提供整体结构与分段策略说明。
+
+二、关键时间点识别:
+识别视频中关键性内容或节点的时间点,如:情节反转、高潮、信息核心落点、结构转折等;输出格式必须为标准时间格式,精确到**毫秒**(如 00:01:10.234)。
+
+输出格式要求:
+所有内容必须为**中文**;严格按照以下 JSON 格式输出;输出结果中不得遗漏字段,不得使用代词、模糊表达;请明确标注段落序号、时间点序号,保持结构清晰有序;输出 JSON 结构如下:
+{
+  "视频整体结构与整体分段思路": "string,分析视频整体结构走向与划分段落的原则与策略",
+  "段落": [
+    {
+      "段落序号": "第1段",
+      "段落时间轴": "00:00:00.000 - 00:00:30.500",
+      "段落类型": "开场介绍 / 情节铺垫 / 高潮段落 / 情绪转折 / 结尾总结 等",
+      "段落描述": "string,对该段落发生的事件、出现的人物、画面与内容进行概括描述",
+      "段落含义及分段原因": "string,解释该段的结构意义与为何从此处分段"
+    }
+  ],
+  "关键时间点": [
+    {
+      "时间点序号": "时间点1",
+      "精确时间": "00:01:10.234",
+      "时间点描述": "string,描述该时间点对应的事件或结构意义"
+    }
+  ]
+}'''
+
+HOOK_EXTRACTION_PROMPT = '''# 任务角色说明:
+你是一个面向老年人用户的视频平台智能AI助手,当前任务是基于视频内容,模拟中老年人在观看视频过程中的真实观看心理和信息需求,生成具体、真实、有好奇心驱动的互动问题,并配置相应的钩子话术和出现时间。
+
+# 输入数据结构:
+视频选题与要点理解对应内容:{summary}
+视频完整文本提取对应内容:{detail}
+视频分段与时间点分析:{timeline}
+
+# 任务目标:
+你需要基于以上视频分析信息,从中老年用户的观看视角出发,构思他们可能在观看该视频过程中**产生的具体信息需求问题**,并对每个问题进行详细说明。输出应包括以下维度:
+1. 需求详细query:清晰、具体、真实的问题,不泛泛而谈,必须与视频选题和视频内容关键点强相关;
+2. 需求分类:如 旅游类 / 健康类 / 情感类 / 科普类 / 家庭类 / 技巧类 / 新闻类 / 生活常识类等;
+3. 推测出该点需求的原因:从用户的情感、场景、注意力轨迹出发,说明为什么该问题会引起兴趣;
+4. 需求钩子话术:在用户观看视频过程中弹出的引导文案,**11字内**,需具有吸引力和点击驱动性;
+5. 需求钩子出现时间:该钩子弹出时的时间点,需**精确到秒**,格式为:hh:mm:ss
+6. 需求排序序号:请根据"用户兴趣强度"与"视频关键程度"对问题进行排序,按兴趣优先级编号(非时间顺序)
+
+输出格式要求:
+- 所有字段都必须有值;
+- 输出内容必须为**中文**;
+- 时间格式为严格标准格式:hh:mm:ss;
+- 输出结果必须为标准 JSON;
+- 输出字段中的内容必须使用**双引号**,不能使用单引号;
+- 严禁输出与任务无关的内容(如提示语、注释、范例等);'''
+
+HOOK_EXTRACTION_PROMPT_V2 = '''# 任务角色说明:
+你是一个面向老年人用户的视频平台智能AI助手,当前任务是基于视频内容,模拟中老年人在观看视频过程中的真实观看心理和信息需求,生成具体、真实、有好奇心驱动的互动问题,并配置相应的钩子话术和出现时间。
+
+# 输入数据结构:
+视频选题与要点理解对应内容:{summary}
+视频完整文本提取对应内容:{detail}
+视频分段与时间点分析:{timeline}
+
+# 任务目标:
+你需要基于以上视频分析信息,从中老年用户的观看视角出发,构思他们可能在观看该视频过程中**产生的具体信息需求问题**,并对每个问题进行详细说明。输出应包括以下维度:
+1. 需求详细query:清晰、具体、真实的问题,不泛泛而谈,必须与视频选题强相关;
+2. 需求分类:如 旅游类 / 健康类 / 情感类 / 科普类 / 家庭类 / 技巧类 / 新闻类 / 生活常识类等;
+3. 推测出该点需求的原因:从用户的情感、场景、注意力轨迹出发,说明为什么该问题会引起兴趣;
+4. 需求钩子话术:在用户观看视频过程中弹出的引导文案,**11字内**,需具有吸引力和点击驱动性;
+5. 需求钩子出现时间:该钩子弹出时的时间点,需**精确到秒**,格式为:hh:mm:ss
+6. 需求排序序号:请根据"用户兴趣强度"与"视频关键程度"对问题进行排序,按兴趣优先级编号(非时间顺序)
+
+输出格式要求:
+- 所有字段都必须有值;
+- 输出内容必须为**中文**;
+- 时间格式为严格标准格式:hh:mm:ss;
+- 输出结果必须为标准 JSON;
+- 输出字段中的内容必须使用**双引号**,不能使用单引号;
+- 严禁输出与任务无关的内容(如提示语、注释、范例等);'''

+ 5 - 0
requirements.txt

@@ -0,0 +1,5 @@
+google-generativeai==0.8.3
+requests==2.32.3
+orjson==3.10.13
+pandas==2.2.1
+openpyxl==3.1.2

Datei-Diff unterdrückt, da er zu groß ist
+ 3 - 0
trigger_response.json


BIN
trigger_result.xlsx


BIN
video_cache/2fa823dd-3b9c-423c-baa1-a9f60b760c89.mp4


BIN
video_cache/58326116-c07c-4024-bec3-82a785447c7c.mp4


BIN
video_cache/d6262118-9507-40d4-be24-2b2274867955.mp4


BIN
~$视频分析报告1_拆分钩子.xlsx


BIN
视频分析报告.xlsx


BIN
视频分析报告1_拆分钩子.xlsx


Einige Dateien werden nicht angezeigt, da zu viele Dateien in diesem Diff geändert wurden.