analyze_prompt.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. import socket
  3. import os
  4. import time
  5. import uuid
  6. import threading
  7. import pandas as pd
  8. import requests
  9. from requests.adapters import HTTPAdapter
  10. import google.generativeai as genai
  11. import orjson
  12. from google.generativeai.types import HarmBlockThreshold, HarmCategory
  13. from pandas import ExcelWriter
  14. from prompt.prompt import (
  15. VIDEO_TOPIC_ANALYSIS_PROMPT,
  16. VIDEO_TEXT_EXTRACTION_PROMPT,
  17. VIDEO_SEGMENT_ANALYSIS_PROMPT,
  18. HOOK_EXTRACTION_PROMPT
  19. )
  20. # =================== 环境配置 ===================
  21. os.environ.update({
  22. "GENAI_UPLOAD_CHUNK_SIZE": "5242880",
  23. "GENAI_UPLOAD_TIMEOUT": "300",
  24. "HTTP_PROXY": "http://127.0.0.1:7890",
  25. "HTTPS_PROXY": "http://127.0.0.1:7890"
  26. })
  27. # =================== 网络配置 ===================
  28. _original_getaddrinfo = socket.getaddrinfo
  29. def _new_getaddrinfo(*args, **kwargs):
  30. return [res for res in _original_getaddrinfo(*args, **kwargs) if res[0] == socket.AF_INET]
  31. socket.getaddrinfo = _new_getaddrinfo
  32. # =================== 常量配置 ===================
  33. CACHE_DIR = './video_cache/'
  34. API_KEYS = ["AIzaSyBGPYEc9F3FoDEqwlaVBxUHsNdkxmR_sl0"]
  35. RESULT_EXCEL = '视频分析报告.xlsx'
  36. PROXY_CONFIG = {"http": "http://127.0.0.1:7890", "https": "http://127.0.0.1:7890"}
  37. # =================== 初始化配置 ===================
  38. os.makedirs(CACHE_DIR, exist_ok=True)
  39. # =================== 核心类定义 ===================
  40. class GoogleVideoAnalyzer:
  41. def __init__(self):
  42. self.current_api_key = API_KEYS[0]
  43. self._stop_event = threading.Event()
  44. self.session = self._create_proxied_session()
  45. genai.configure(api_key=self.current_api_key, transport='rest')
  46. def _create_proxied_session(self):
  47. """创建带代理配置的会话"""
  48. session = requests.Session()
  49. session.proxies = PROXY_CONFIG
  50. session.verify = False
  51. adapter = HTTPAdapter(max_retries=3, pool_connections=30, pool_maxsize=10)
  52. session.mount('https://', adapter)
  53. session.mount('http/', adapter)
  54. # 增强超时处理
  55. original_send = session.send
  56. def new_send(request, **kwargs):
  57. kwargs.setdefault('timeout', (10, 30))
  58. return original_send(request, **kwargs)
  59. session.send = new_send
  60. return session
  61. def _validate_video_file(self, path: str):
  62. """视频文件验证增强"""
  63. if not os.path.exists(path):
  64. raise FileNotFoundError(f"视频文件不存在: {path}")
  65. if os.path.getsize(path) == 0:
  66. raise ValueError("空文件无法上传")
  67. if not path.lower().endswith('.mp4'):
  68. raise ValueError("仅支持MP4格式文件")
  69. if os.path.getsize(path) > 100 * 1024 * 1024: # 100MB限制
  70. raise ValueError("视频文件超过100MB限制")
  71. def _safe_upload(self, video_path: str):
  72. """安全上传实现(增强重试机制)"""
  73. self._validate_video_file(video_path)
  74. video = None
  75. retry_count = 0
  76. max_retries = 3
  77. while retry_count < max_retries:
  78. try:
  79. print(f'[上传] 开始上传 | 文件大小: {os.path.getsize(video_path)//1024}KB')
  80. video = genai.upload_file(path=video_path, mime_type='video/mp4')
  81. while True:
  82. current_state = video.state.name
  83. print(f"[状态] {current_state} | 进度: {getattr(video, 'progress', 0)}%")
  84. if current_state == 'ACTIVE':
  85. return video
  86. elif current_state == 'FAILED':
  87. raise Exception("云端处理失败")
  88. elif self._stop_event.is_set():
  89. raise KeyboardInterrupt("用户中断上传")
  90. time.sleep(10)
  91. video = genai.get_file(name=video.name)
  92. except Exception as e:
  93. retry_count += 1
  94. if video:
  95. genai.delete_file(name=video.name)
  96. if retry_count >= max_retries:
  97. raise Exception(f"上传失败(已重试{max_retries}次): {str(e)}")
  98. print(f"[重试] 上传失败,第{retry_count}次重试...")
  99. time.sleep(5)
  100. def _download_video(self, video_url: str) -> str:
  101. """增强版视频下载(强制完整性校验+断点续传)"""
  102. file_path = os.path.join(CACHE_DIR, f'{uuid.uuid4()}.mp4')
  103. temp_file = None
  104. retry_count = 0
  105. max_retries = 3
  106. downloaded = 0
  107. while retry_count < max_retries:
  108. try:
  109. # 获取文件总大小(带重试)
  110. with self.session.head(video_url, timeout=10) as head_resp:
  111. head_resp.raise_for_status()
  112. total_size = int(head_resp.headers.get('content-length', 0))
  113. if total_size == 0:
  114. raise ValueError("服务器未返回有效文件大小")
  115. # 支持断点续传
  116. if os.path.exists(file_path):
  117. downloaded = os.path.getsize(file_path)
  118. headers = {'Range': f'bytes={downloaded}-'}
  119. else:
  120. headers = {}
  121. with self.session.get(
  122. video_url,
  123. stream=True,
  124. timeout=30,
  125. headers=headers
  126. ) as response:
  127. response.raise_for_status()
  128. # 验证范围请求响应
  129. if downloaded > 0 and response.status_code != 206:
  130. raise ConnectionError("服务器不支持断点续传")
  131. mode = 'ab' if downloaded > 0 else 'wb'
  132. temp_file = open(file_path, mode)
  133. for chunk in response.iter_content(chunk_size=8192):
  134. if self._stop_event.is_set():
  135. raise KeyboardInterrupt("用户中断下载")
  136. if chunk:
  137. temp_file.write(chunk)
  138. downloaded += len(chunk)
  139. progress = downloaded/total_size*100
  140. print(f"\r[下载] 进度: {progress:.1f}% | {downloaded//1024}KB/{total_size//1024}KB",
  141. end='', flush=True)
  142. # 强制完整性校验
  143. if downloaded != total_size:
  144. raise IOError(f"下载不完整({downloaded}/{total_size}字节)")
  145. return file_path
  146. except Exception as e:
  147. retry_count += 1
  148. if retry_count >= max_retries:
  149. if os.path.exists(file_path):
  150. os.remove(file_path)
  151. raise Exception(f"下载失败(重试{max_retries}次): {str(e)}")
  152. print(f"\n[重试] 下载中断,第{retry_count}次重试...")
  153. time.sleep(5)
  154. finally:
  155. if temp_file and not temp_file.closed:
  156. temp_file.close()
  157. print("\n[下载] 文件句柄已安全关闭")
  158. def _analyze_content(self, video, prompt):
  159. """增强版内容分析"""
  160. model = genai.GenerativeModel(
  161. model_name='gemini-2.0-flash',
  162. generation_config=genai.GenerationConfig(
  163. response_mime_type='application/json',
  164. temperature=0.3,
  165. max_output_tokens=20480
  166. ),
  167. safety_settings={
  168. HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_NONE,
  169. HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_NONE,
  170. }
  171. )
  172. try:
  173. response = model.generate_content(
  174. contents=[video, prompt],
  175. request_options={'timeout': 300}
  176. )
  177. if hasattr(response, '_error') and response._error:
  178. raise Exception(f"生成错误: {response._error}")
  179. result = orjson.loads(response.text.strip())
  180. print(f"[视频分析] 响应: {result}")
  181. if not isinstance(result, dict):
  182. raise ValueError("响应格式错误:非字典结构")
  183. return result
  184. except orjson.JSONDecodeError:
  185. raise Exception("响应解析失败,非JSON格式")
  186. except Exception as e:
  187. raise Exception(f"分析失败: {str(e)}")
  188. def _generate_hooks(self, video, hook_prompt, analysis_data):
  189. """钩子内容生成专用方法(完整修复版)"""
  190. try:
  191. # 1. 准备格式化参数(包含空值保护和类型转换)
  192. format_args = {
  193. "summary": str(analysis_data.get("视频选题与要点理解", {}) or "无相关内容"),
  194. "detail": str(analysis_data.get("视频完整文本提取", {}) or "无相关内容"),
  195. "timeline": str(analysis_data.get("视频分段与时间点分析", {}) or "无相关内容")
  196. }
  197. # 2. 打印调试信息
  198. print(f"[DEBUG] 分析数据类型验证:")
  199. print(f"- 选题理解类型:{type(analysis_data.get('视频选题与要点理解'))}")
  200. print(f"- 文本提取类型:{type(analysis_data.get('视频完整文本提取'))}")
  201. print(f"- 分段分析类型:{type(analysis_data.get('视频分段与时间点分析'))}")
  202. # 3. 执行模板替换(关键修复点)
  203. formatted_prompt = hook_prompt.format(**format_args)
  204. print(f"[SUCCESS] 模板替换完成,新Prompt长度:{len(formatted_prompt)}")
  205. print(f"[DEBUG] 格式化Prompt预览(前500字符):\n{formatted_prompt[:500]}...")
  206. # 4. 模型调用(修复参数传递)
  207. model = genai.GenerativeModel(
  208. model_name='gemini-2.0-flash',
  209. generation_config=genai.GenerationConfig(
  210. response_mime_type='application/json',
  211. temperature=0.5,
  212. max_output_tokens=4096
  213. )
  214. )
  215. # 5. 发送格式化后的prompt(关键修复点)
  216. response = model.generate_content(
  217. contents=[video, formatted_prompt], # 使用格式化后的内容
  218. request_options={'timeout': 600}
  219. )
  220. print(f"[响应原始数据] 长度:{len(response.text)}字符")
  221. # 6. 响应预处理(解决单引号问题)
  222. clean_text = response.text.replace("'", "\"") # 替换单引号
  223. clean_text = clean_text.replace("\n", "") # 去除换行符
  224. print(f"[响应清洗后] 预览:{clean_text[:200]}...")
  225. # 7. 严格JSON验证
  226. try:
  227. result = orjson.loads(clean_text)
  228. if not isinstance(result, list):
  229. raise ValueError("响应应为JSON数组")
  230. # 字段完整性验证
  231. required_fields = {
  232. "需求排序序号", "需求详细query", "需求分类",
  233. "推测出该点需求的原因", "需求钩子话术", "需求钩子出现时间"
  234. }
  235. for idx, item in enumerate(result):
  236. missing = required_fields - set(item.keys())
  237. if missing:
  238. raise ValueError(f"第{idx+1}个对象缺失字段:{missing}")
  239. if len(item["需求钩子话术"]) > 11:
  240. raise ValueError(f"第{idx+1}个话术超长:'{item['需求钩子话术']}'")
  241. return result
  242. except orjson.JSONDecodeError as e:
  243. error_msg = f"JSON解析失败:{str(e)}\n原始响应:{clean_text[:500]}"
  244. raise ValueError(error_msg)
  245. except KeyError as e:
  246. print(f"!! 关键错误:模板变量 {e} 未定义,请检查Excel占位符")
  247. return {"error": f"模板变量 {e} 缺失"}
  248. except ValueError as e:
  249. print(f"!! 数据验证失败:{str(e)}")
  250. return {"error": str(e), "type": "DATA_VALIDATION"}
  251. except Exception as e:
  252. import traceback
  253. error_detail = f"""
  254. === 未捕获异常 ===
  255. 类型:{type(e)}
  256. 信息:{str(e)}
  257. 追踪:
  258. {traceback.format_exc()}
  259. """
  260. print(error_detail)
  261. return {"error": "未知异常"}
  262. def cancel_operation(self):
  263. """操作中止"""
  264. self._stop_event.set()
  265. print("[系统] 正在终止操作...")
  266. def analyze(self, video_url: str, prompts: list):
  267. """增强版分析流程"""
  268. self._stop_event.clear()
  269. video_path = None
  270. try:
  271. print(f"\n[下载] 开始下载 {video_url}")
  272. video_path = self._download_video(video_url)
  273. print("[上传] 启动云端处理")
  274. video = self._safe_upload(video_path)
  275. analysis_data = {}
  276. for prompt in prompts[:3]:
  277. print(f"[分析] 正在执行: {prompt['name']}")
  278. try:
  279. result = self._analyze_content(video, prompt['content'])
  280. analysis_data[prompt['name']] = result
  281. except Exception as e:
  282. analysis_data[prompt['name']] = {
  283. "error": str(e),
  284. "error_type": type(e).__name__
  285. }
  286. hook_result = {}
  287. if len(prompts) >=4:
  288. hook_prompt = prompts[3]
  289. print(f"[钩子生成] 正在执行: {hook_prompt['name']}")
  290. try:
  291. hook_result = self._generate_hooks(video, hook_prompt['content'], analysis_data)
  292. print("钩子提取完成")
  293. except Exception as e:
  294. print(e)
  295. hook_result = {
  296. "error": str(e),
  297. "error_type": type(e).__name__
  298. }
  299. return {
  300. "基础分析": analysis_data,
  301. "钩子提取": hook_result
  302. }
  303. finally:
  304. if video_path and os.path.exists(video_path):
  305. os.remove(video_path)
  306. # =================== 数据处理 ===================
  307. def load_prompts():
  308. """从prompt.py加载Prompt"""
  309. try:
  310. print("\n[初始化] 从prompt.py加载Prompt")
  311. prompts = [
  312. {
  313. "name": "视频选题与要点理解",
  314. "content": VIDEO_TOPIC_ANALYSIS_PROMPT
  315. },
  316. {
  317. "name": "视频完整文本提取",
  318. "content": VIDEO_TEXT_EXTRACTION_PROMPT
  319. },
  320. {
  321. "name": "视频分段与时间点分析",
  322. "content": VIDEO_SEGMENT_ANALYSIS_PROMPT
  323. },
  324. {
  325. "name": "钩子提取",
  326. "content": HOOK_EXTRACTION_PROMPT
  327. }
  328. ]
  329. print(f"[成功] 加载 {len(prompts)} 个Prompt")
  330. return prompts
  331. except Exception as e:
  332. raise Exception(f"加载Prompt失败: {str(e)}")
  333. def process_video_data():
  334. """增强版数据处理"""
  335. try:
  336. prompts = load_prompts()
  337. video_df = pd.read_excel('0517.xlsx', engine='openpyxl').iloc[19:] # 从第20个视频开始
  338. analyzer = GoogleVideoAnalyzer()
  339. results = []
  340. import signal
  341. signal.signal(signal.SIGINT, lambda s,f: analyzer.cancel_operation())
  342. for idx, row in video_df.iterrows():
  343. video_id = row['videoid']
  344. video_url = f"http://visionularcdn.yishihui.com/{row['transcode_video_path'].replace('mp4/', 'mp4')}"
  345. record = {
  346. "视频ID": video_id,
  347. "播放量": row.get('播放次数', 'N/A'),
  348. "视频标题": row.get('视频标题', 'N/A'),
  349. "视频地址": video_url,
  350. "状态": "成功"
  351. }
  352. try:
  353. print(f"\n{'='*30} 处理视频 {idx+1}/{len(video_df)} {'='*30}")
  354. analysis = analyzer.analyze(video_url, prompts)
  355. for prompt in prompts[:3]:
  356. record[prompt['name']] = str(analysis["基础分析"].get(prompt['name'], {}))
  357. record["钩子提取"] = str(analysis.get("钩子提取", {}))
  358. except Exception as e:
  359. record.update({
  360. "状态": "失败",
  361. "错误类型": type(e).__name__,
  362. "错误详情": str(e)
  363. })
  364. finally:
  365. results.append(record)
  366. pd.DataFrame(results).to_excel(RESULT_EXCEL, index=False)
  367. with ExcelWriter(RESULT_EXCEL, engine='openpyxl') as writer:
  368. df_results = pd.DataFrame(results)
  369. df_results.to_excel(writer, index=False)
  370. worksheet = writer.sheets['Sheet1']
  371. for col in worksheet.columns:
  372. max_len = max(len(str(cell.value)) for cell in col)
  373. worksheet.column_dimensions[col[0].column_letter].width = min(max_len + 2, 50)
  374. print(f"\n{'='*30}\n报告已生成: {os.path.abspath(RESULT_EXCEL)}")
  375. except Exception as e:
  376. print(f"\n{'!'*30} 系统级错误 {'!'*30}\n{str(e)}")
  377. # =================== 执行入口 ===================
  378. if __name__ == '__main__':
  379. print("=== 视频分析系统启动 ===")
  380. print(f"工作目录: {os.getcwd()}")
  381. try:
  382. test_resp = requests.get("https://www.google.com",
  383. proxies=PROXY_CONFIG,
  384. timeout=10,
  385. verify=False)
  386. print(f"[网络] 连接测试成功 ({test_resp.status_code})")
  387. except Exception as e:
  388. print(f"[网络] 连接测试失败: {str(e)}")
  389. exit(1)
  390. start_time = time.time()
  391. try:
  392. process_video_data()
  393. except KeyboardInterrupt:
  394. print("\n[中断] 用户主动终止程序")
  395. finally:
  396. print(f"总运行时间: {time.time()-start_time:.1f}秒")