# -*- coding: utf-8 -*- import socket import os import time import uuid import threading import pandas as pd import requests from requests.adapters import HTTPAdapter import google.generativeai as genai import orjson from google.generativeai.types import HarmBlockThreshold, HarmCategory from pandas import ExcelWriter from prompt.prompt import ( VIDEO_TOPIC_ANALYSIS_PROMPT, VIDEO_TEXT_EXTRACTION_PROMPT, VIDEO_SEGMENT_ANALYSIS_PROMPT, HOOK_EXTRACTION_PROMPT ) # =================== 环境配置 =================== os.environ.update({ "GENAI_UPLOAD_CHUNK_SIZE": "5242880", "GENAI_UPLOAD_TIMEOUT": "300", "HTTP_PROXY": "http://127.0.0.1:7890", "HTTPS_PROXY": "http://127.0.0.1:7890" }) # =================== 网络配置 =================== _original_getaddrinfo = socket.getaddrinfo def _new_getaddrinfo(*args, **kwargs): return [res for res in _original_getaddrinfo(*args, **kwargs) if res[0] == socket.AF_INET] socket.getaddrinfo = _new_getaddrinfo # =================== 常量配置 =================== CACHE_DIR = './video_cache/' API_KEYS = ["AIzaSyBGPYEc9F3FoDEqwlaVBxUHsNdkxmR_sl0"] RESULT_EXCEL = '视频分析报告.xlsx' PROXY_CONFIG = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"} # =================== 初始化配置 =================== os.makedirs(CACHE_DIR, exist_ok=True) # =================== 核心类定义 =================== class GoogleVideoAnalyzer: def __init__(self): self.current_api_key = API_KEYS[0] self._stop_event = threading.Event() self.session = self._create_proxied_session() genai.configure(api_key=self.current_api_key, transport='rest') def _create_proxied_session(self): """创建带代理配置的会话""" session = requests.Session() session.proxies = PROXY_CONFIG session.verify = False adapter = HTTPAdapter(max_retries=3, pool_connections=30, pool_maxsize=10) session.mount('https://', adapter) session.mount('http/', adapter) # 增强超时处理 original_send = session.send def new_send(request, **kwargs): kwargs.setdefault('timeout', (10, 30)) return original_send(request, **kwargs) session.send = new_send return session def _validate_video_file(self, path: str): """视频文件验证增强""" if not os.path.exists(path): raise FileNotFoundError(f"视频文件不存在: {path}") if os.path.getsize(path) == 0: raise ValueError("空文件无法上传") if not path.lower().endswith('.mp4'): raise ValueError("仅支持MP4格式文件") if os.path.getsize(path) > 100 * 1024 * 1024: # 100MB限制 raise ValueError("视频文件超过100MB限制") def _safe_upload(self, video_path: str): """安全上传实现(增强重试机制)""" self._validate_video_file(video_path) video = None retry_count = 0 max_retries = 3 while retry_count < max_retries: try: print(f'[上传] 开始上传 | 文件大小: {os.path.getsize(video_path)//1024}KB') video = genai.upload_file(path=video_path, mime_type='video/mp4') while True: current_state = video.state.name print(f"[状态] {current_state} | 进度: {getattr(video, 'progress', 0)}%") if current_state == 'ACTIVE': return video elif current_state == 'FAILED': raise Exception("云端处理失败") elif self._stop_event.is_set(): raise KeyboardInterrupt("用户中断上传") time.sleep(10) video = genai.get_file(name=video.name) except Exception as e: retry_count += 1 if video: genai.delete_file(name=video.name) if retry_count >= max_retries: raise Exception(f"上传失败(已重试{max_retries}次): {str(e)}") print(f"[重试] 上传失败,第{retry_count}次重试...") time.sleep(5) def _download_video(self, video_url: str) -> str: """增强版视频下载(强制完整性校验+断点续传)""" file_path = os.path.join(CACHE_DIR, f'{uuid.uuid4()}.mp4') temp_file = None retry_count = 0 max_retries = 3 downloaded = 0 while retry_count < max_retries: try: # 获取文件总大小(带重试) with self.session.head(video_url, timeout=10) as head_resp: head_resp.raise_for_status() total_size = int(head_resp.headers.get('content-length', 0)) if total_size == 0: raise ValueError("服务器未返回有效文件大小") # 支持断点续传 if os.path.exists(file_path): downloaded = os.path.getsize(file_path) headers = {'Range': f'bytes={downloaded}-'} else: headers = {} with self.session.get( video_url, stream=True, timeout=30, headers=headers ) as response: response.raise_for_status() # 验证范围请求响应 if downloaded > 0 and response.status_code != 206: raise ConnectionError("服务器不支持断点续传") mode = 'ab' if downloaded > 0 else 'wb' temp_file = open(file_path, mode) for chunk in response.iter_content(chunk_size=8192): if self._stop_event.is_set(): raise KeyboardInterrupt("用户中断下载") if chunk: temp_file.write(chunk) downloaded += len(chunk) progress = downloaded/total_size*100 print(f"\r[下载] 进度: {progress:.1f}% | {downloaded//1024}KB/{total_size//1024}KB", end='', flush=True) # 强制完整性校验 if downloaded != total_size: raise IOError(f"下载不完整({downloaded}/{total_size}字节)") return file_path except Exception as e: retry_count += 1 if retry_count >= max_retries: if os.path.exists(file_path): os.remove(file_path) raise Exception(f"下载失败(重试{max_retries}次): {str(e)}") print(f"\n[重试] 下载中断,第{retry_count}次重试...") time.sleep(5) finally: if temp_file and not temp_file.closed: temp_file.close() print("\n[下载] 文件句柄已安全关闭") def _analyze_content(self, video, prompt): """增强版内容分析""" model = genai.GenerativeModel( model_name='gemini-2.0-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.3, max_output_tokens=20480 ), safety_settings={ HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE, HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE, } ) try: response = model.generate_content( contents=[video, prompt], request_options={'timeout': 300} ) if hasattr(response, '_error') and response._error: raise Exception(f"生成错误: {response._error}") result = orjson.loads(response.text.strip()) print(f"[视频分析] 响应: {result}") if not isinstance(result, dict): raise ValueError("响应格式错误:非字典结构") return result except orjson.JSONDecodeError: raise Exception("响应解析失败,非JSON格式") except Exception as e: raise Exception(f"分析失败: {str(e)}") def _generate_hooks(self, video, hook_prompt, analysis_data): """钩子内容生成专用方法(完整修复版)""" try: # 1. 准备格式化参数(包含空值保护和类型转换) format_args = { "summary": str(analysis_data.get("视频选题与要点理解", {}) or "无相关内容"), "detail": str(analysis_data.get("视频完整文本提取", {}) or "无相关内容"), "timeline": str(analysis_data.get("视频分段与时间点分析", {}) or "无相关内容") } # 2. 打印调试信息 print(f"[DEBUG] 分析数据类型验证:") print(f"- 选题理解类型:{type(analysis_data.get('视频选题与要点理解'))}") print(f"- 文本提取类型:{type(analysis_data.get('视频完整文本提取'))}") print(f"- 分段分析类型:{type(analysis_data.get('视频分段与时间点分析'))}") # 3. 执行模板替换(关键修复点) formatted_prompt = hook_prompt.format(**format_args) print(f"[SUCCESS] 模板替换完成,新Prompt长度:{len(formatted_prompt)}") print(f"[DEBUG] 格式化Prompt预览(前500字符):\n{formatted_prompt[:500]}...") # 4. 模型调用(修复参数传递) model = genai.GenerativeModel( model_name='gemini-2.0-flash', generation_config=genai.GenerationConfig( response_mime_type='application/json', temperature=0.5, max_output_tokens=4096 ) ) # 5. 发送格式化后的prompt(关键修复点) response = model.generate_content( contents=[video, formatted_prompt], # 使用格式化后的内容 request_options={'timeout': 600} ) print(f"[响应原始数据] 长度:{len(response.text)}字符") # 6. 响应预处理(解决单引号问题) clean_text = response.text.replace("'", "\"") # 替换单引号 clean_text = clean_text.replace("\n", "") # 去除换行符 print(f"[响应清洗后] 预览:{clean_text[:200]}...") # 7. 严格JSON验证 try: result = orjson.loads(clean_text) if not isinstance(result, list): raise ValueError("响应应为JSON数组") # 字段完整性验证 required_fields = { "需求排序序号", "需求详细query", "需求分类", "推测出该点需求的原因", "需求钩子话术", "需求钩子出现时间" } for idx, item in enumerate(result): missing = required_fields - set(item.keys()) if missing: raise ValueError(f"第{idx+1}个对象缺失字段:{missing}") if len(item["需求钩子话术"]) > 11: raise ValueError(f"第{idx+1}个话术超长:'{item['需求钩子话术']}'") return result except orjson.JSONDecodeError as e: error_msg = f"JSON解析失败:{str(e)}\n原始响应:{clean_text[:500]}" raise ValueError(error_msg) except KeyError as e: print(f"!! 关键错误:模板变量 {e} 未定义,请检查Excel占位符") return {"error": f"模板变量 {e} 缺失"} except ValueError as e: print(f"!! 数据验证失败:{str(e)}") return {"error": str(e), "type": "DATA_VALIDATION"} except Exception as e: import traceback error_detail = f""" === 未捕获异常 === 类型:{type(e)} 信息:{str(e)} 追踪: {traceback.format_exc()} """ print(error_detail) return {"error": "未知异常"} def cancel_operation(self): """操作中止""" self._stop_event.set() print("[系统] 正在终止操作...") def analyze(self, video_url: str, prompts: list): """增强版分析流程""" self._stop_event.clear() video_path = None try: print(f"\n[下载] 开始下载 {video_url}") video_path = self._download_video(video_url) print("[上传] 启动云端处理") video = self._safe_upload(video_path) analysis_data = {} for prompt in prompts[:3]: print(f"[分析] 正在执行: {prompt['name']}") try: result = self._analyze_content(video, prompt['content']) analysis_data[prompt['name']] = result except Exception as e: analysis_data[prompt['name']] = { "error": str(e), "error_type": type(e).__name__ } hook_result = {} if len(prompts) >=4: hook_prompt = prompts[3] print(f"[钩子生成] 正在执行: {hook_prompt['name']}") try: hook_result = self._generate_hooks(video, hook_prompt['content'], analysis_data) print("钩子提取完成") except Exception as e: print(e) hook_result = { "error": str(e), "error_type": type(e).__name__ } return { "基础分析": analysis_data, "钩子提取": hook_result } finally: if video_path and os.path.exists(video_path): os.remove(video_path) # =================== 数据处理 =================== def load_prompts(): """从prompt.py加载Prompt""" try: print("\n[初始化] 从prompt.py加载Prompt") prompts = [ { "name": "视频选题与要点理解", "content": VIDEO_TOPIC_ANALYSIS_PROMPT }, { "name": "视频完整文本提取", "content": VIDEO_TEXT_EXTRACTION_PROMPT }, { "name": "视频分段与时间点分析", "content": VIDEO_SEGMENT_ANALYSIS_PROMPT }, { "name": "钩子提取", "content": HOOK_EXTRACTION_PROMPT } ] print(f"[成功] 加载 {len(prompts)} 个Prompt") return prompts except Exception as e: raise Exception(f"加载Prompt失败: {str(e)}") def process_video_data(): """增强版数据处理""" try: prompts = load_prompts() video_df = pd.read_excel('0517.xlsx', engine='openpyxl').iloc[18:] # 从第19个视频开始 analyzer = GoogleVideoAnalyzer() results = [] import signal signal.signal(signal.SIGINT, lambda s,f: analyzer.cancel_operation()) for idx, row in video_df.iterrows(): video_id = row['videoid'] video_url = f"http://visionularcdn.yishihui.com/{row['transcode_video_path'].replace('mp4/', 'mp4')}" record = { "视频ID": video_id, "播放量": row.get('播放次数', 'N/A'), "视频标题": row.get('视频标题', 'N/A'), "视频地址": video_url, "状态": "成功" } try: print(f"\n{'='*30} 处理视频 {idx+1}/{len(video_df)} {'='*30}") analysis = analyzer.analyze(video_url, prompts) for prompt in prompts[:3]: record[prompt['name']] = str(analysis["基础分析"].get(prompt['name'], {})) record["钩子提取"] = str(analysis.get("钩子提取", {})) except Exception as e: record.update({ "状态": "失败", "错误类型": type(e).__name__, "错误详情": str(e) }) finally: results.append(record) pd.DataFrame(results).to_excel(RESULT_EXCEL, index=False) with ExcelWriter(RESULT_EXCEL, engine='openpyxl') as writer: df_results = pd.DataFrame(results) df_results.to_excel(writer, index=False) worksheet = writer.sheets['Sheet1'] for col in worksheet.columns: max_len = max(len(str(cell.value)) for cell in col) worksheet.column_dimensions[col[0].column_letter].width = min(max_len + 2, 50) print(f"\n{'='*30}\n报告已生成: {os.path.abspath(RESULT_EXCEL)}") except Exception as e: print(f"\n{'!'*30} 系统级错误 {'!'*30}\n{str(e)}") # =================== 执行入口 =================== if __name__ == '__main__': print("=== 视频分析系统启动 ===") print(f"工作目录: {os.getcwd()}") try: test_resp = requests.get("https://www.google.com", proxies=PROXY_CONFIG, timeout=10, verify=False) print(f"[网络] 连接测试成功 ({test_resp.status_code})") except Exception as e: print(f"[网络] 连接测试失败: {str(e)}") exit(1) start_time = time.time() try: process_video_data() except KeyboardInterrupt: print("\n[中断] 用户主动终止程序") finally: print(f"总运行时间: {time.time()-start_time:.1f}秒")