123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467 |
- # -*- coding: utf-8 -*-
- import socket
- import os
- import time
- import uuid
- import threading
- import pandas as pd
- import requests
- from requests.adapters import HTTPAdapter
- import google.generativeai as genai
- import orjson
- from google.generativeai.types import HarmBlockThreshold, HarmCategory
- from pandas import ExcelWriter
- from prompt.prompt import (
- VIDEO_TOPIC_ANALYSIS_PROMPT,
- VIDEO_TEXT_EXTRACTION_PROMPT,
- VIDEO_SEGMENT_ANALYSIS_PROMPT,
- HOOK_EXTRACTION_PROMPT
- )
- # =================== 环境配置 ===================
- os.environ.update({
- "GENAI_UPLOAD_CHUNK_SIZE": "5242880",
- "GENAI_UPLOAD_TIMEOUT": "300",
- "HTTP_PROXY": "http://127.0.0.1:7890",
- "HTTPS_PROXY": "http://127.0.0.1:7890"
- })
- # =================== 网络配置 ===================
- _original_getaddrinfo = socket.getaddrinfo
- def _new_getaddrinfo(*args, **kwargs):
- return [res for res in _original_getaddrinfo(*args, **kwargs) if res[0] == socket.AF_INET]
- socket.getaddrinfo = _new_getaddrinfo
- # =================== 常量配置 ===================
- CACHE_DIR = './video_cache/'
- API_KEYS = ["AIzaSyBGPYEc9F3FoDEqwlaVBxUHsNdkxmR_sl0"]
- RESULT_EXCEL = '视频分析报告.xlsx'
- PROXY_CONFIG = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"}
- # =================== 初始化配置 ===================
- os.makedirs(CACHE_DIR, exist_ok=True)
- # =================== 核心类定义 ===================
- class GoogleVideoAnalyzer:
- def __init__(self):
- self.current_api_key = API_KEYS[0]
- self._stop_event = threading.Event()
- self.session = self._create_proxied_session()
- genai.configure(api_key=self.current_api_key, transport='rest')
- def _create_proxied_session(self):
- """创建带代理配置的会话"""
- session = requests.Session()
- session.proxies = PROXY_CONFIG
- session.verify = False
- adapter = HTTPAdapter(max_retries=3, pool_connections=30, pool_maxsize=10)
- session.mount('https://', adapter)
- session.mount('http/', adapter)
-
- # 增强超时处理
- original_send = session.send
- def new_send(request, **kwargs):
- kwargs.setdefault('timeout', (10, 30))
- return original_send(request, **kwargs)
- session.send = new_send
-
- return session
- def _validate_video_file(self, path: str):
- """视频文件验证增强"""
- if not os.path.exists(path):
- raise FileNotFoundError(f"视频文件不存在: {path}")
- if os.path.getsize(path) == 0:
- raise ValueError("空文件无法上传")
- if not path.lower().endswith('.mp4'):
- raise ValueError("仅支持MP4格式文件")
- if os.path.getsize(path) > 100 * 1024 * 1024: # 100MB限制
- raise ValueError("视频文件超过100MB限制")
- def _safe_upload(self, video_path: str):
- """安全上传实现(增强重试机制)"""
- self._validate_video_file(video_path)
- video = None
- retry_count = 0
- max_retries = 3
-
- while retry_count < max_retries:
- try:
- print(f'[上传] 开始上传 | 文件大小: {os.path.getsize(video_path)//1024}KB')
- video = genai.upload_file(path=video_path, mime_type='video/mp4')
-
- while True:
- current_state = video.state.name
- print(f"[状态] {current_state} | 进度: {getattr(video, 'progress', 0)}%")
- if current_state == 'ACTIVE':
- return video
- elif current_state == 'FAILED':
- raise Exception("云端处理失败")
- elif self._stop_event.is_set():
- raise KeyboardInterrupt("用户中断上传")
- time.sleep(10)
- video = genai.get_file(name=video.name)
-
- except Exception as e:
- retry_count += 1
- if video:
- genai.delete_file(name=video.name)
- if retry_count >= max_retries:
- raise Exception(f"上传失败(已重试{max_retries}次): {str(e)}")
- print(f"[重试] 上传失败,第{retry_count}次重试...")
- time.sleep(5)
- def _download_video(self, video_url: str) -> str:
- """增强版视频下载(强制完整性校验+断点续传)"""
- file_path = os.path.join(CACHE_DIR, f'{uuid.uuid4()}.mp4')
- temp_file = None
- retry_count = 0
- max_retries = 3
- downloaded = 0
-
- while retry_count < max_retries:
- try:
- # 获取文件总大小(带重试)
- with self.session.head(video_url, timeout=10) as head_resp:
- head_resp.raise_for_status()
- total_size = int(head_resp.headers.get('content-length', 0))
- if total_size == 0:
- raise ValueError("服务器未返回有效文件大小")
- # 支持断点续传
- if os.path.exists(file_path):
- downloaded = os.path.getsize(file_path)
- headers = {'Range': f'bytes={downloaded}-'}
- else:
- headers = {}
- with self.session.get(
- video_url,
- stream=True,
- timeout=30,
- headers=headers
- ) as response:
- response.raise_for_status()
-
- # 验证范围请求响应
- if downloaded > 0 and response.status_code != 206:
- raise ConnectionError("服务器不支持断点续传")
- mode = 'ab' if downloaded > 0 else 'wb'
- temp_file = open(file_path, mode)
-
- for chunk in response.iter_content(chunk_size=8192):
- if self._stop_event.is_set():
- raise KeyboardInterrupt("用户中断下载")
- if chunk:
- temp_file.write(chunk)
- downloaded += len(chunk)
- progress = downloaded/total_size*100
- print(f"\r[下载] 进度: {progress:.1f}% | {downloaded//1024}KB/{total_size//1024}KB",
- end='', flush=True)
- # 强制完整性校验
- if downloaded != total_size:
- raise IOError(f"下载不完整({downloaded}/{total_size}字节)")
-
- return file_path
- except Exception as e:
- retry_count += 1
- if retry_count >= max_retries:
- if os.path.exists(file_path):
- os.remove(file_path)
- raise Exception(f"下载失败(重试{max_retries}次): {str(e)}")
- print(f"\n[重试] 下载中断,第{retry_count}次重试...")
- time.sleep(5)
-
- finally:
- if temp_file and not temp_file.closed:
- temp_file.close()
- print("\n[下载] 文件句柄已安全关闭")
- def _analyze_content(self, video, prompt):
- """增强版内容分析"""
- model = genai.GenerativeModel(
- model_name='gemini-2.0-flash',
- generation_config=genai.GenerationConfig(
- response_mime_type='application/json',
- temperature=0.3,
- max_output_tokens=20480
- ),
- safety_settings={
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
- }
- )
-
- try:
- response = model.generate_content(
- contents=[video, prompt],
- request_options={'timeout': 300}
- )
-
- if hasattr(response, '_error') and response._error:
- raise Exception(f"生成错误: {response._error}")
-
- result = orjson.loads(response.text.strip())
- print(f"[视频分析] 响应: {result}")
- if not isinstance(result, dict):
- raise ValueError("响应格式错误:非字典结构")
-
- return result
- except orjson.JSONDecodeError:
- raise Exception("响应解析失败,非JSON格式")
- except Exception as e:
- raise Exception(f"分析失败: {str(e)}")
- def _generate_hooks(self, video, hook_prompt, analysis_data):
- """钩子内容生成专用方法(完整修复版)"""
- try:
- # 1. 准备格式化参数(包含空值保护和类型转换)
- format_args = {
- "summary": str(analysis_data.get("视频选题与要点理解", {}) or "无相关内容"),
- "detail": str(analysis_data.get("视频完整文本提取", {}) or "无相关内容"),
- "timeline": str(analysis_data.get("视频分段与时间点分析", {}) or "无相关内容")
- }
- # 2. 打印调试信息
- print(f"[DEBUG] 分析数据类型验证:")
- print(f"- 选题理解类型:{type(analysis_data.get('视频选题与要点理解'))}")
- print(f"- 文本提取类型:{type(analysis_data.get('视频完整文本提取'))}")
- print(f"- 分段分析类型:{type(analysis_data.get('视频分段与时间点分析'))}")
- # 3. 执行模板替换(关键修复点)
- formatted_prompt = hook_prompt.format(**format_args)
- print(f"[SUCCESS] 模板替换完成,新Prompt长度:{len(formatted_prompt)}")
- print(f"[DEBUG] 格式化Prompt预览(前500字符):\n{formatted_prompt[:500]}...")
- # 4. 模型调用(修复参数传递)
- model = genai.GenerativeModel(
- model_name='gemini-2.0-flash',
- generation_config=genai.GenerationConfig(
- response_mime_type='application/json',
- temperature=0.5,
- max_output_tokens=4096
- )
- )
-
- # 5. 发送格式化后的prompt(关键修复点)
- response = model.generate_content(
- contents=[video, formatted_prompt], # 使用格式化后的内容
- request_options={'timeout': 600}
- )
- print(f"[响应原始数据] 长度:{len(response.text)}字符")
- # 6. 响应预处理(解决单引号问题)
- clean_text = response.text.replace("'", "\"") # 替换单引号
- clean_text = clean_text.replace("\n", "") # 去除换行符
- print(f"[响应清洗后] 预览:{clean_text[:200]}...")
- # 7. 严格JSON验证
- try:
- result = orjson.loads(clean_text)
- if not isinstance(result, list):
- raise ValueError("响应应为JSON数组")
-
- # 字段完整性验证
- required_fields = {
- "需求排序序号", "需求详细query", "需求分类",
- "推测出该点需求的原因", "需求钩子话术", "需求钩子出现时间"
- }
-
- for idx, item in enumerate(result):
- missing = required_fields - set(item.keys())
- if missing:
- raise ValueError(f"第{idx+1}个对象缺失字段:{missing}")
- if len(item["需求钩子话术"]) > 11:
- raise ValueError(f"第{idx+1}个话术超长:'{item['需求钩子话术']}'")
-
- return result
-
- except orjson.JSONDecodeError as e:
- error_msg = f"JSON解析失败:{str(e)}\n原始响应:{clean_text[:500]}"
- raise ValueError(error_msg)
- except KeyError as e:
- print(f"!! 关键错误:模板变量 {e} 未定义,请检查Excel占位符")
- return {"error": f"模板变量 {e} 缺失"}
- except ValueError as e:
- print(f"!! 数据验证失败:{str(e)}")
- return {"error": str(e), "type": "DATA_VALIDATION"}
- except Exception as e:
- import traceback
- error_detail = f"""
- === 未捕获异常 ===
- 类型:{type(e)}
- 信息:{str(e)}
- 追踪:
- {traceback.format_exc()}
- """
- print(error_detail)
- return {"error": "未知异常"}
- def cancel_operation(self):
- """操作中止"""
- self._stop_event.set()
- print("[系统] 正在终止操作...")
- def analyze(self, video_url: str, prompts: list):
- """增强版分析流程"""
- self._stop_event.clear()
- video_path = None
-
- try:
- print(f"\n[下载] 开始下载 {video_url}")
- video_path = self._download_video(video_url)
-
- print("[上传] 启动云端处理")
- video = self._safe_upload(video_path)
-
- analysis_data = {}
- for prompt in prompts[:3]:
- print(f"[分析] 正在执行: {prompt['name']}")
- try:
- result = self._analyze_content(video, prompt['content'])
- analysis_data[prompt['name']] = result
- except Exception as e:
- analysis_data[prompt['name']] = {
- "error": str(e),
- "error_type": type(e).__name__
- }
-
- hook_result = {}
- if len(prompts) >=4:
- hook_prompt = prompts[3]
- print(f"[钩子生成] 正在执行: {hook_prompt['name']}")
- try:
- hook_result = self._generate_hooks(video, hook_prompt['content'], analysis_data)
- print("钩子提取完成")
- except Exception as e:
- print(e)
- hook_result = {
- "error": str(e),
- "error_type": type(e).__name__
- }
-
- return {
- "基础分析": analysis_data,
- "钩子提取": hook_result
- }
-
- finally:
- if video_path and os.path.exists(video_path):
- os.remove(video_path)
- # =================== 数据处理 ===================
- def load_prompts():
- """从prompt.py加载Prompt"""
- try:
- print("\n[初始化] 从prompt.py加载Prompt")
-
- prompts = [
- {
- "name": "视频选题与要点理解",
- "content": VIDEO_TOPIC_ANALYSIS_PROMPT
- },
- {
- "name": "视频完整文本提取",
- "content": VIDEO_TEXT_EXTRACTION_PROMPT
- },
- {
- "name": "视频分段与时间点分析",
- "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
- },
- {
- "name": "钩子提取",
- "content": HOOK_EXTRACTION_PROMPT
- }
- ]
-
- print(f"[成功] 加载 {len(prompts)} 个Prompt")
- return prompts
-
- except Exception as e:
- raise Exception(f"加载Prompt失败: {str(e)}")
- def process_video_data():
- """增强版数据处理"""
- try:
- prompts = load_prompts()
- video_df = pd.read_excel('0517.xlsx', engine='openpyxl').iloc[18:] # 从第19个视频开始
- analyzer = GoogleVideoAnalyzer()
- results = []
-
- import signal
- signal.signal(signal.SIGINT, lambda s,f: analyzer.cancel_operation())
- for idx, row in video_df.iterrows():
- video_id = row['videoid']
- video_url = f"http://visionularcdn.yishihui.com/{row['transcode_video_path'].replace('mp4/', 'mp4')}"
-
- record = {
- "视频ID": video_id,
- "播放量": row.get('播放次数', 'N/A'),
- "视频标题": row.get('视频标题', 'N/A'),
- "视频地址": video_url,
- "状态": "成功"
- }
-
- try:
- print(f"\n{'='*30} 处理视频 {idx+1}/{len(video_df)} {'='*30}")
- analysis = analyzer.analyze(video_url, prompts)
-
- for prompt in prompts[:3]:
- record[prompt['name']] = str(analysis["基础分析"].get(prompt['name'], {}))
-
- record["钩子提取"] = str(analysis.get("钩子提取", {}))
-
- except Exception as e:
- record.update({
- "状态": "失败",
- "错误类型": type(e).__name__,
- "错误详情": str(e)
- })
-
- finally:
- results.append(record)
- pd.DataFrame(results).to_excel(RESULT_EXCEL, index=False)
- with ExcelWriter(RESULT_EXCEL, engine='openpyxl') as writer:
- df_results = pd.DataFrame(results)
- df_results.to_excel(writer, index=False)
-
- worksheet = writer.sheets['Sheet1']
- for col in worksheet.columns:
- max_len = max(len(str(cell.value)) for cell in col)
- worksheet.column_dimensions[col[0].column_letter].width = min(max_len + 2, 50)
-
- print(f"\n{'='*30}\n报告已生成: {os.path.abspath(RESULT_EXCEL)}")
- except Exception as e:
- print(f"\n{'!'*30} 系统级错误 {'!'*30}\n{str(e)}")
- # =================== 执行入口 ===================
- if __name__ == '__main__':
- print("=== 视频分析系统启动 ===")
- print(f"工作目录: {os.getcwd()}")
-
- try:
- test_resp = requests.get("https://www.google.com",
- proxies=PROXY_CONFIG,
- timeout=10,
- verify=False)
- print(f"[网络] 连接测试成功 ({test_resp.status_code})")
- except Exception as e:
- print(f"[网络] 连接测试失败: {str(e)}")
- exit(1)
-
- start_time = time.time()
- try:
- process_video_data()
- except KeyboardInterrupt:
- print("\n[中断] 用户主动终止程序")
- finally:
- print(f"总运行时间: {time.time()-start_time:.1f}秒")
|