single_analyze.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551
  1. import pandas as pd
  2. import requests
  3. import logging
  4. from typing import Dict, Any
  5. import json
  6. import time
  7. import os
  8. import tempfile
  9. from PIL import Image
  10. import google.generativeai as genai
  11. import shutil
  12. from urllib.parse import urlparse
  13. from openpyxl import Workbook, load_workbook
  14. from openpyxl.drawing.image import Image as ExcelImage
  15. from openpyxl.utils.dataframe import dataframe_to_rows
  16. # 配置日志
  17. logging.basicConfig(
  18. level=logging.INFO,
  19. format='%(asctime)s - %(levelname)s - %(message)s',
  20. handlers=[
  21. logging.FileHandler('analysis_results.log', encoding='utf-8'),
  22. logging.StreamHandler()
  23. ]
  24. )
  25. logger = logging.getLogger(__name__)
  26. class ImageAnalyzer:
  27. """图片内容分析器"""
  28. def __init__(self, api_key: str = None, images_dir: str = "images"):
  29. """初始化分析器"""
  30. self.session = requests.Session()
  31. self.session.headers.update({
  32. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
  33. })
  34. # 创建图片存储目录
  35. self.images_dir = images_dir
  36. if not os.path.exists(self.images_dir):
  37. os.makedirs(self.images_dir)
  38. logger.info(f"创建图片存储目录: {self.images_dir}")
  39. # 初始化 Gemini API
  40. if api_key:
  41. genai.configure(api_key=api_key)
  42. self.model = genai.GenerativeModel('gemini-2.5-flash')
  43. else:
  44. # 尝试从环境变量获取 API key
  45. api_key = 'AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI'
  46. if api_key:
  47. genai.configure(api_key=api_key)
  48. self.model = genai.GenerativeModel('gemini-2.5-flash')
  49. else:
  50. logger.warning("未找到 Gemini API key,将使用模拟分析")
  51. self.model = None
  52. def analyze_image_content(self, image_url: str, title: str) -> Dict[str, Any]:
  53. """
  54. 分析图片内容和标题的匹配度
  55. Args:
  56. image_url: 图片链接
  57. title: 分享标题
  58. Returns:
  59. 分析结果字典,包含保存的图片路径
  60. """
  61. try:
  62. # 下载并保存图片
  63. saved_image_path = self._download_and_save_image(image_url)
  64. if self.model:
  65. # 使用 Gemini 进行真实分析
  66. analysis_result = self._gemini_analysis(saved_image_path, title)
  67. else:
  68. # 使用模拟分析
  69. analysis_result = self._mock_llm_analysis(image_url, title)
  70. # 添加保存的图片路径到结果中
  71. analysis_result['saved_image_path'] = saved_image_path
  72. return analysis_result
  73. except Exception as e:
  74. logger.error(f"分析图片时出错: {e}")
  75. return {
  76. 'error': str(e),
  77. 'image_url': image_url,
  78. 'title': title,
  79. 'analysis': '分析失败',
  80. 'saved_image_path': None
  81. }
  82. def _download_and_save_image(self, image_url: str) -> str:
  83. """
  84. 下载图片并保存到images目录
  85. Args:
  86. image_url: 图片链接
  87. Returns:
  88. 保存的图片文件路径
  89. """
  90. try:
  91. response = self.session.get(image_url, timeout=30)
  92. response.raise_for_status()
  93. # 从URL中提取文件名
  94. parsed_url = urlparse(image_url)
  95. filename = os.path.basename(parsed_url.path)
  96. if not filename or '.' not in filename:
  97. # 如果没有文件名或扩展名,生成一个
  98. filename = f"image_{int(time.time())}.jpg"
  99. # 确保文件名是唯一的
  100. base_name, ext = os.path.splitext(filename)
  101. counter = 1
  102. while os.path.exists(os.path.join(self.images_dir, filename)):
  103. filename = f"{base_name}_{counter}{ext}"
  104. counter += 1
  105. # 保存到images目录
  106. saved_path = os.path.join(self.images_dir, filename)
  107. with open(saved_path, 'wb') as f:
  108. f.write(response.content)
  109. # 验证图片格式
  110. try:
  111. with Image.open(saved_path) as img:
  112. img.verify()
  113. logger.info(f"图片已保存到: {saved_path}")
  114. return saved_path
  115. except Exception:
  116. os.unlink(saved_path)
  117. raise ValueError("下载的文件不是有效的图片格式")
  118. except Exception as e:
  119. logger.error(f"下载图片失败: {e}")
  120. raise
  121. def _gemini_analysis(self, image_path: str, title: str) -> Dict[str, Any]:
  122. """
  123. 使用 Gemini 2.5 模型分析图片内容
  124. Args:
  125. image_path: 本地图片文件路径
  126. title: 分享标题
  127. Returns:
  128. 分析结果字典
  129. """
  130. try:
  131. # 构建分析提示词
  132. with open('prompt/single_analysis.md', 'r', encoding='utf-8') as f:
  133. prompt_template = f.read()
  134. # 将标题插入到提示词中
  135. prompt = prompt_template.replace('{title}', title)
  136. # 使用 PIL 打开图片并调用 Gemini API
  137. with Image.open(image_path) as pil_image:
  138. # 调用 Gemini API - 使用 PIL Image 对象
  139. response = self.model.generate_content([prompt, pil_image])
  140. # 解析响应
  141. try:
  142. # 尝试解析 JSON 响应
  143. result_text = response.text.strip()
  144. if result_text.startswith('```json'):
  145. result_text = result_text[7:-3]
  146. elif result_text.startswith('```'):
  147. result_text = result_text[3:-3]
  148. analysis_result = json.loads(result_text)
  149. except json.JSONDecodeError:
  150. # 如果无法解析为JSON,使用文本响应
  151. analysis_result = {
  152. 'video_id': f'video_{int(time.time())}',
  153. '标题分析': {
  154. '标题语气': '未知',
  155. '标题长度': '未知',
  156. '标题行动号召强度': 0,
  157. '标题紧急感强度': 0
  158. },
  159. '分享图分析': {
  160. '构图完整性': '未知',
  161. '人物数量': '未知',
  162. '人物年龄特征': '未知',
  163. '色彩对比度': '未知',
  164. '视觉复杂度': '未知',
  165. '怀旧元素强度': 0,
  166. '图文一致性': 0,
  167. '情感共鸣潜力': '未知',
  168. '好奇心激发程度': 0,
  169. '视觉焦点明确性': '未知',
  170. '图片文字可读性': '未知',
  171. '整体协调性': '未知'
  172. },
  173. '成功特征总结': '无法解析为结构化结果',
  174. 'raw_response': response.text
  175. }
  176. return analysis_result
  177. except Exception as e:
  178. logger.error(f"Gemini 分析失败: {e}")
  179. raise
  180. def _mock_llm_analysis(self, image_url: str, title: str) -> Dict[str, Any]:
  181. """
  182. 模拟LLM分析过程
  183. 实际使用时需要替换为真实的LLM API调用
  184. """
  185. # 模拟分析延迟
  186. time.sleep(0.1)
  187. # 模拟的分析结果
  188. analysis = {
  189. 'image_content_description': f'模拟分析:图片内容与标题"{title}"相关',
  190. 'title_relevance_score': 7,
  191. 'match_analysis': '模拟匹配分析结果',
  192. 'keywords_extracted': ['模拟', '关键词'],
  193. 'sentiment': '积极',
  194. 'confidence': 6
  195. }
  196. return analysis
  197. def save_single_result_to_excel(result: dict, output_file: str, is_first_row: bool = False):
  198. """
  199. 将单行分析结果保存到Excel文件,并嵌入图片
  200. Args:
  201. result: 单行分析结果
  202. output_file: 输出文件路径
  203. is_first_row: 是否为第一行(需要设置列标题)
  204. """
  205. try:
  206. # 如果文件已存在,加载它;否则创建新的工作簿
  207. if os.path.exists(output_file):
  208. wb = load_workbook(output_file)
  209. ws = wb.active
  210. else:
  211. wb = Workbook()
  212. ws = wb.active
  213. ws.title = "分析结果"
  214. logger.info(f"创建新的Excel文件: {output_file}")
  215. if is_first_row:
  216. # 设置列标题
  217. headers = list(result.keys())
  218. for col, header in enumerate(headers, 1):
  219. ws.cell(row=1, column=col, value=header)
  220. # 设置图片列的宽度
  221. image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
  222. if image_col_index:
  223. ws.column_dimensions[chr(64 + image_col_index)].width = 20
  224. # 计算新行的位置
  225. new_row = ws.max_row + 1
  226. # 添加数据
  227. for col_idx, (key, value) in enumerate(result.items(), 1):
  228. ws.cell(row=new_row, column=col_idx, value=value)
  229. # 设置行高以适应图片
  230. ws.row_dimensions[new_row].height = 120
  231. # 如果存在图片文件,则嵌入图片
  232. image_path = result.get('图片文件路径', '')
  233. if image_path and os.path.exists(image_path):
  234. try:
  235. # 创建Excel图片对象
  236. img = ExcelImage(image_path)
  237. # 调整图片大小
  238. img.width = 150
  239. img.height = 100
  240. # 找到图片列的位置
  241. headers = list(result.keys())
  242. image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
  243. if image_col_index:
  244. # 将图片添加到指定单元格
  245. cell_ref = f"{chr(64 + image_col_index)}{new_row}"
  246. ws.add_image(img, cell_ref)
  247. logger.info(f"已将图片嵌入到单元格 {cell_ref}")
  248. except Exception as e:
  249. logger.warning(f"嵌入图片失败 {image_path}: {e}")
  250. # 保存工作簿
  251. wb.save(output_file)
  252. logger.info(f"✅ 第 {result.get('原始行索引', '?') + 1} 行结果已保存到: {output_file}")
  253. except Exception as e:
  254. logger.error(f"保存单行结果失败: {e}")
  255. raise
  256. def save_results_with_images(results: list, output_file: str):
  257. """
  258. 将分析结果保存到Excel文件,并嵌入图片(批量保存)
  259. Args:
  260. results: 分析结果列表
  261. output_file: 输出文件路径
  262. """
  263. try:
  264. # 如果文件已存在,加载它;否则创建新的工作簿
  265. if os.path.exists(output_file):
  266. wb = load_workbook(output_file)
  267. ws = wb.active
  268. logger.info(f"加载已存在的Excel文件: {output_file}")
  269. else:
  270. wb = Workbook()
  271. ws = wb.active
  272. ws.title = "分析结果"
  273. logger.info(f"创建新的Excel文件: {output_file}")
  274. # 清空现有内容
  275. ws.delete_rows(1, ws.max_row)
  276. if not results:
  277. logger.warning("没有结果需要保存")
  278. wb.save(output_file)
  279. logger.info(f"已保存空的Excel文件: {output_file}")
  280. return
  281. # 设置列标题
  282. headers = list(results[0].keys())
  283. for col, header in enumerate(headers, 1):
  284. ws.cell(row=1, column=col, value=header)
  285. # 设置图片列的宽度和行高
  286. image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
  287. if image_col_index:
  288. ws.column_dimensions[chr(64 + image_col_index)].width = 20 # 设置列宽
  289. # 添加数据行
  290. for row_idx, result in enumerate(results, 2):
  291. for col_idx, (key, value) in enumerate(result.items(), 1):
  292. ws.cell(row=row_idx, column=col_idx, value=value)
  293. # 设置行高以适应图片
  294. ws.row_dimensions[row_idx].height = 120
  295. # 如果存在图片文件,则嵌入图片
  296. image_path = result.get('图片文件路径', '')
  297. if image_path and os.path.exists(image_path):
  298. try:
  299. # 创建Excel图片对象
  300. img = ExcelImage(image_path)
  301. # 调整图片大小
  302. img.width = 150
  303. img.height = 100
  304. # 将图片添加到指定单元格
  305. cell_ref = f"{chr(64 + image_col_index)}{row_idx}"
  306. ws.add_image(img, cell_ref)
  307. logger.info(f"已将图片嵌入到单元格 {cell_ref}")
  308. except Exception as e:
  309. logger.warning(f"嵌入图片失败 {image_path}: {e}")
  310. # 保存工作簿
  311. wb.save(output_file)
  312. logger.info(f"✅ 结果已保存到: {output_file}")
  313. logger.info(f"✅ 文件大小: {os.path.getsize(output_file)} 字节")
  314. except Exception as e:
  315. logger.error(f"保存Excel文件失败: {e}")
  316. raise
  317. def read_excel_file(file_path: str) -> pd.DataFrame:
  318. """
  319. 读取Excel文件
  320. Args:
  321. file_path: Excel文件路径
  322. Returns:
  323. DataFrame对象
  324. """
  325. try:
  326. df = pd.read_excel(file_path)
  327. logger.info(f"成功读取Excel文件: {file_path}")
  328. logger.info(f"数据形状: {df.shape}")
  329. logger.info(f"列名: {list(df.columns)}")
  330. return df
  331. except Exception as e:
  332. logger.error(f"读取Excel文件失败: {e}")
  333. raise
  334. def process_single_row(row_data: pd.Series, analyzer: ImageAnalyzer, row_index: int) -> Dict[str, Any]:
  335. """
  336. 处理单行数据
  337. Args:
  338. row_data: 行数据
  339. analyzer: 分析器实例
  340. row_index: 行索引
  341. Returns:
  342. 包含分析结果的字典
  343. """
  344. try:
  345. # 获取第三列(图片链接)和第四列(分享标题)
  346. # 注意:pandas的列索引从0开始,所以第三列是索引2,第四列是索引3
  347. image_url = row_data.iloc[2] if len(row_data) > 2 else None
  348. title = row_data.iloc[3] if len(row_data) > 3 else None
  349. logger.info(f"处理第 {row_index + 1} 行数据:")
  350. logger.info(f" 图片链接: {image_url}")
  351. logger.info(f" 分享标题: {title}")
  352. # 检查数据有效性
  353. if pd.isna(image_url) or pd.isna(title):
  354. logger.warning(f"第 {row_index + 1} 行数据不完整,跳过分析")
  355. return None
  356. # 进行LLM分析
  357. analysis_result = analyzer.analyze_image_content(str(image_url), str(title))
  358. # 输出分析结果
  359. logger.info(f"第 {row_index + 1} 行分析结果:{analysis_result}")
  360. if 'error' in analysis_result:
  361. logger.error(f"第 {row_index + 1} 行分析出错: {analysis_result['error']}")
  362. return None
  363. # 构建完整的结果记录
  364. result_record = {
  365. '原始行索引': row_index,
  366. '图片文件路径': analysis_result.get('saved_image_path', ''),
  367. '原始图片链接': str(image_url),
  368. '分享标题': str(title),
  369. }
  370. # 添加标题分析结果
  371. title_analysis = analysis_result.get('标题分析', {})
  372. result_record.update({
  373. '标题语气': title_analysis.get('标题语气', ''),
  374. '标题长度': title_analysis.get('标题长度', ''),
  375. '标题行动号召强度': title_analysis.get('标题行动号召强度', 0),
  376. '标题紧急感强度': title_analysis.get('标题紧急感强度', 0),
  377. })
  378. # 添加分享图分析结果
  379. image_analysis = analysis_result.get('分享图分析', {})
  380. result_record.update({
  381. '构图主体完整性': image_analysis.get('构图主体完整性', ''),
  382. '有无人物': image_analysis.get('有无人物', ''),
  383. '人物数量': image_analysis.get('人物数量', ''),
  384. '人物年龄特征': image_analysis.get('人物年龄特征', ''),
  385. '图片上有和标题关联的贴图文字': image_analysis.get('图片上有和标题关联的贴图文字', ''),
  386. '色彩对比度': image_analysis.get('色彩对比度', ''),
  387. '视觉复杂度': image_analysis.get('视觉复杂度', ''),
  388. '怀旧元素强度': image_analysis.get('怀旧元素强度', 0),
  389. '图文一致性': image_analysis.get('图文一致性', 0),
  390. '情感共鸣潜力': image_analysis.get('情感共鸣潜力', ''),
  391. '好奇心激发程度': image_analysis.get('好奇心激发程度', 0),
  392. '视觉焦点明确性': image_analysis.get('视觉焦点明确性', ''),
  393. '整体清晰度': image_analysis.get('整体清晰度', 0),
  394. })
  395. # 添加成功特征总结
  396. success_features = analysis_result.get('成功特征总结', '')
  397. # 如果是列表,转换为字符串格式
  398. if isinstance(success_features, list):
  399. result_record['成功特征总结'] = ', '.join(success_features)
  400. else:
  401. result_record['成功特征总结'] = success_features
  402. logger.info("-" * 80) # 分隔线
  403. return result_record
  404. except Exception as e:
  405. logger.error(f"处理第 {row_index + 1} 行数据时出错: {e}")
  406. return None
  407. def main():
  408. """主函数"""
  409. try:
  410. # 获取 Gemini API key
  411. api_key = 'AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI'
  412. if not api_key:
  413. logger.warning("未设置 GEMINI_API_KEY 环境变量,将使用模拟分析")
  414. logger.info("要使用真实的 Gemini 分析,请设置环境变量:")
  415. logger.info("export GEMINI_API_KEY='your_api_key_here'")
  416. # 初始化分析器
  417. analyzer = ImageAnalyzer(api_key)
  418. # 读取Excel文件
  419. excel_file_path = "高ROS2/罕见画面原始数据.xlsx"
  420. df = read_excel_file(excel_file_path)
  421. # 准备结果存储文件
  422. output_file = "高ROS2/罕见画面-高ROS-维度分析.xlsx"
  423. # 检查结果文件是否存在
  424. if os.path.exists(output_file):
  425. logger.info(f"结果文件 {output_file} 已存在,将重新创建")
  426. os.remove(output_file)
  427. logger.info(f"已删除旧的结果文件: {output_file}")
  428. else:
  429. logger.info(f"结果文件 {output_file} 不存在,将创建新文件")
  430. # 立即创建一个空的Excel文件
  431. wb = Workbook()
  432. ws = wb.active
  433. ws.title = "分析结果"
  434. wb.save(output_file)
  435. logger.info(f"已创建新的Excel文件: {output_file}")
  436. logger.info(f"开始逐行处理数据,总共 {len(df)} 行...")
  437. # 从第17行开始处理数据(跳过前16行,索引0-15)
  438. start_row = 0 # 第17行的索引是16
  439. if len(df) > start_row:
  440. logger.info(f"从第 {start_row + 1} 行开始处理,总共需要处理 {len(df) - start_row} 行数据...")
  441. for index, row in df.iterrows():
  442. if index < start_row:
  443. continue # 跳过前16行
  444. logger.info(f"正在处理第 {index + 1}/{len(df)} 行数据...")
  445. result = process_single_row(row, analyzer, index)
  446. if result:
  447. # 立即保存这一行的结果
  448. is_first_row = (index == start_row) # 第一行需要设置列标题
  449. save_single_result_to_excel(result, output_file, is_first_row)
  450. logger.info(f"第 {index + 1} 行数据处理并保存完成!")
  451. else:
  452. logger.warning(f"第 {index + 1} 行数据处理失败,跳过")
  453. else:
  454. logger.warning(f"Excel文件中数据行数不足,需要至少 {start_row + 1} 行数据")
  455. # 即使没有数据,也创建一个空的Excel文件
  456. save_results_with_images([], output_file)
  457. logger.info(f"✅ 所有数据处理完成,结果已保存到: {output_file}")
  458. except Exception as e:
  459. logger.error(f"程序执行出错: {e}")
  460. if __name__ == "__main__":
  461. main()