||
- import pandas as pd
- import requests
- import logging
- from typing import Dict, Any
- import json
- import time
- import os
- import tempfile
- from PIL import Image
- import google.generativeai as genai
- import shutil
- from urllib.parse import urlparse
- from openpyxl import Workbook, load_workbook
- from openpyxl.drawing.image import Image as ExcelImage
- from openpyxl.utils.dataframe import dataframe_to_rows
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('analysis_results.log', encoding='utf-8'),
- logging.StreamHandler()
- ]
- )
- logger = logging.getLogger(__name__)
- class ImageAnalyzer:
- """图片内容分析器"""
-
- def __init__(self, api_key: str = None, images_dir: str = "images"):
- """初始化分析器"""
- self.session = requests.Session()
- self.session.headers.update({
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
- })
-
- # 创建图片存储目录
- self.images_dir = images_dir
- if not os.path.exists(self.images_dir):
- os.makedirs(self.images_dir)
- logger.info(f"创建图片存储目录: {self.images_dir}")
-
- # 初始化 Gemini API
- if api_key:
- genai.configure(api_key=api_key)
- self.model = genai.GenerativeModel('gemini-2.5-flash')
- else:
- # 尝试从环境变量获取 API key
- api_key = 'AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI'
- if api_key:
- genai.configure(api_key=api_key)
- self.model = genai.GenerativeModel('gemini-2.5-flash')
- else:
- logger.warning("未找到 Gemini API key,将使用模拟分析")
- self.model = None
-
- def analyze_image_content(self, image_url: str, title: str) -> Dict[str, Any]:
- """
- 分析图片内容和标题的匹配度
-
- Args:
- image_url: 图片链接
- title: 分享标题
-
- Returns:
- 分析结果字典,包含保存的图片路径
- """
- try:
- # 下载并保存图片
- saved_image_path = self._download_and_save_image(image_url)
-
- if self.model:
- # 使用 Gemini 进行真实分析
- analysis_result = self._gemini_analysis(saved_image_path, title)
- else:
- # 使用模拟分析
- analysis_result = self._mock_llm_analysis(image_url, title)
-
- # 添加保存的图片路径到结果中
- analysis_result['saved_image_path'] = saved_image_path
- return analysis_result
- except Exception as e:
- logger.error(f"分析图片时出错: {e}")
- return {
- 'error': str(e),
- 'image_url': image_url,
- 'title': title,
- 'analysis': '分析失败',
- 'saved_image_path': None
- }
-
- def _download_and_save_image(self, image_url: str) -> str:
- """
- 下载图片并保存到images目录
-
- Args:
- image_url: 图片链接
-
- Returns:
- 保存的图片文件路径
- """
- try:
- response = self.session.get(image_url, timeout=30)
- response.raise_for_status()
-
- # 从URL中提取文件名
- parsed_url = urlparse(image_url)
- filename = os.path.basename(parsed_url.path)
- if not filename or '.' not in filename:
- # 如果没有文件名或扩展名,生成一个
- filename = f"image_{int(time.time())}.jpg"
-
- # 确保文件名是唯一的
- base_name, ext = os.path.splitext(filename)
- counter = 1
- while os.path.exists(os.path.join(self.images_dir, filename)):
- filename = f"{base_name}_{counter}{ext}"
- counter += 1
-
- # 保存到images目录
- saved_path = os.path.join(self.images_dir, filename)
-
- with open(saved_path, 'wb') as f:
- f.write(response.content)
-
- # 验证图片格式
- try:
- with Image.open(saved_path) as img:
- img.verify()
- logger.info(f"图片已保存到: {saved_path}")
- return saved_path
- except Exception:
- os.unlink(saved_path)
- raise ValueError("下载的文件不是有效的图片格式")
-
- except Exception as e:
- logger.error(f"下载图片失败: {e}")
- raise
-
- def _gemini_analysis(self, image_path: str, title: str) -> Dict[str, Any]:
- """
- 使用 Gemini 2.5 模型分析图片内容
-
- Args:
- image_path: 本地图片文件路径
- title: 分享标题
-
- Returns:
- 分析结果字典
- """
- try:
- # 构建分析提示词
- with open('prompt/single_analysis.md', 'r', encoding='utf-8') as f:
- prompt_template = f.read()
-
- # 将标题插入到提示词中
- prompt = prompt_template.replace('{title}', title)
-
- # 使用 PIL 打开图片并调用 Gemini API
- with Image.open(image_path) as pil_image:
- # 调用 Gemini API - 使用 PIL Image 对象
- response = self.model.generate_content([prompt, pil_image])
-
- # 解析响应
- try:
- # 尝试解析 JSON 响应
- result_text = response.text.strip()
- if result_text.startswith('```json'):
- result_text = result_text[7:-3]
- elif result_text.startswith('```'):
- result_text = result_text[3:-3]
-
- analysis_result = json.loads(result_text)
- except json.JSONDecodeError:
- # 如果无法解析为JSON,使用文本响应
- analysis_result = {
- 'video_id': f'video_{int(time.time())}',
- '标题分析': {
- '标题语气': '未知',
- '标题长度': '未知',
- '标题行动号召强度': 0,
- '标题紧急感强度': 0
- },
- '分享图分析': {
- '构图完整性': '未知',
- '人物数量': '未知',
- '人物年龄特征': '未知',
- '色彩对比度': '未知',
- '视觉复杂度': '未知',
- '怀旧元素强度': 0,
- '图文一致性': 0,
- '情感共鸣潜力': '未知',
- '好奇心激发程度': 0,
- '视觉焦点明确性': '未知',
- '图片文字可读性': '未知',
- '整体协调性': '未知'
- },
- '成功特征总结': '无法解析为结构化结果',
- 'raw_response': response.text
- }
-
- return analysis_result
-
- except Exception as e:
- logger.error(f"Gemini 分析失败: {e}")
- raise
-
- def _mock_llm_analysis(self, image_url: str, title: str) -> Dict[str, Any]:
- """
- 模拟LLM分析过程
- 实际使用时需要替换为真实的LLM API调用
- """
- # 模拟分析延迟
- time.sleep(0.1)
-
- # 模拟的分析结果
- analysis = {
- 'image_content_description': f'模拟分析:图片内容与标题"{title}"相关',
- 'title_relevance_score': 7,
- 'match_analysis': '模拟匹配分析结果',
- 'keywords_extracted': ['模拟', '关键词'],
- 'sentiment': '积极',
- 'confidence': 6
- }
-
- return analysis
- def save_single_result_to_excel(result: dict, output_file: str, is_first_row: bool = False):
- """
- 将单行分析结果保存到Excel文件,并嵌入图片
-
- Args:
- result: 单行分析结果
- output_file: 输出文件路径
- is_first_row: 是否为第一行(需要设置列标题)
- """
- try:
- # 如果文件已存在,加载它;否则创建新的工作簿
- if os.path.exists(output_file):
- wb = load_workbook(output_file)
- ws = wb.active
- else:
- wb = Workbook()
- ws = wb.active
- ws.title = "分析结果"
- logger.info(f"创建新的Excel文件: {output_file}")
-
- if is_first_row:
- # 设置列标题
- headers = list(result.keys())
- for col, header in enumerate(headers, 1):
- ws.cell(row=1, column=col, value=header)
-
- # 设置图片列的宽度
- image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
- if image_col_index:
- ws.column_dimensions[chr(64 + image_col_index)].width = 20
-
- # 计算新行的位置
- new_row = ws.max_row + 1
-
- # 添加数据
- for col_idx, (key, value) in enumerate(result.items(), 1):
- ws.cell(row=new_row, column=col_idx, value=value)
-
- # 设置行高以适应图片
- ws.row_dimensions[new_row].height = 120
-
- # 如果存在图片文件,则嵌入图片
- image_path = result.get('图片文件路径', '')
- if image_path and os.path.exists(image_path):
- try:
- # 创建Excel图片对象
- img = ExcelImage(image_path)
-
- # 调整图片大小
- img.width = 150
- img.height = 100
-
- # 找到图片列的位置
- headers = list(result.keys())
- image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
-
- if image_col_index:
- # 将图片添加到指定单元格
- cell_ref = f"{chr(64 + image_col_index)}{new_row}"
- ws.add_image(img, cell_ref)
- logger.info(f"已将图片嵌入到单元格 {cell_ref}")
-
- except Exception as e:
- logger.warning(f"嵌入图片失败 {image_path}: {e}")
-
- # 保存工作簿
- wb.save(output_file)
- logger.info(f"✅ 第 {result.get('原始行索引', '?') + 1} 行结果已保存到: {output_file}")
-
- except Exception as e:
- logger.error(f"保存单行结果失败: {e}")
- raise
- def save_results_with_images(results: list, output_file: str):
- """
- 将分析结果保存到Excel文件,并嵌入图片(批量保存)
-
- Args:
- results: 分析结果列表
- output_file: 输出文件路径
- """
- try:
- # 如果文件已存在,加载它;否则创建新的工作簿
- if os.path.exists(output_file):
- wb = load_workbook(output_file)
- ws = wb.active
- logger.info(f"加载已存在的Excel文件: {output_file}")
- else:
- wb = Workbook()
- ws = wb.active
- ws.title = "分析结果"
- logger.info(f"创建新的Excel文件: {output_file}")
-
- # 清空现有内容
- ws.delete_rows(1, ws.max_row)
-
- if not results:
- logger.warning("没有结果需要保存")
- wb.save(output_file)
- logger.info(f"已保存空的Excel文件: {output_file}")
- return
-
- # 设置列标题
- headers = list(results[0].keys())
- for col, header in enumerate(headers, 1):
- ws.cell(row=1, column=col, value=header)
-
- # 设置图片列的宽度和行高
- image_col_index = headers.index('图片文件路径') + 1 if '图片文件路径' in headers else None
- if image_col_index:
- ws.column_dimensions[chr(64 + image_col_index)].width = 20 # 设置列宽
-
- # 添加数据行
- for row_idx, result in enumerate(results, 2):
- for col_idx, (key, value) in enumerate(result.items(), 1):
- ws.cell(row=row_idx, column=col_idx, value=value)
-
- # 设置行高以适应图片
- ws.row_dimensions[row_idx].height = 120
-
- # 如果存在图片文件,则嵌入图片
- image_path = result.get('图片文件路径', '')
- if image_path and os.path.exists(image_path):
- try:
- # 创建Excel图片对象
- img = ExcelImage(image_path)
-
- # 调整图片大小
- img.width = 150
- img.height = 100
-
- # 将图片添加到指定单元格
- cell_ref = f"{chr(64 + image_col_index)}{row_idx}"
- ws.add_image(img, cell_ref)
-
- logger.info(f"已将图片嵌入到单元格 {cell_ref}")
-
- except Exception as e:
- logger.warning(f"嵌入图片失败 {image_path}: {e}")
-
- # 保存工作簿
- wb.save(output_file)
- logger.info(f"✅ 结果已保存到: {output_file}")
- logger.info(f"✅ 文件大小: {os.path.getsize(output_file)} 字节")
-
- except Exception as e:
- logger.error(f"保存Excel文件失败: {e}")
- raise
- def read_excel_file(file_path: str) -> pd.DataFrame:
- """
- 读取Excel文件
-
- Args:
- file_path: Excel文件路径
-
- Returns:
- DataFrame对象
- """
- try:
- df = pd.read_excel(file_path)
- logger.info(f"成功读取Excel文件: {file_path}")
- logger.info(f"数据形状: {df.shape}")
- logger.info(f"列名: {list(df.columns)}")
- return df
- except Exception as e:
- logger.error(f"读取Excel文件失败: {e}")
- raise
- def process_single_row(row_data: pd.Series, analyzer: ImageAnalyzer, row_index: int) -> Dict[str, Any]:
- """
- 处理单行数据
-
- Args:
- row_data: 行数据
- analyzer: 分析器实例
- row_index: 行索引
-
- Returns:
- 包含分析结果的字典
- """
- try:
- # 获取第三列(图片链接)和第四列(分享标题)
- # 注意:pandas的列索引从0开始,所以第三列是索引2,第四列是索引3
- image_url = row_data.iloc[2] if len(row_data) > 2 else None
- title = row_data.iloc[3] if len(row_data) > 3 else None
-
- logger.info(f"处理第 {row_index + 1} 行数据:")
- logger.info(f" 图片链接: {image_url}")
- logger.info(f" 分享标题: {title}")
-
- # 检查数据有效性
- if pd.isna(image_url) or pd.isna(title):
- logger.warning(f"第 {row_index + 1} 行数据不完整,跳过分析")
- return None
-
- # 进行LLM分析
- analysis_result = analyzer.analyze_image_content(str(image_url), str(title))
-
- # 输出分析结果
- logger.info(f"第 {row_index + 1} 行分析结果:{analysis_result}")
-
- if 'error' in analysis_result:
- logger.error(f"第 {row_index + 1} 行分析出错: {analysis_result['error']}")
- return None
-
- # 构建完整的结果记录
- result_record = {
- '原始行索引': row_index,
- '图片文件路径': analysis_result.get('saved_image_path', ''),
- '原始图片链接': str(image_url),
- '分享标题': str(title),
- }
-
- # 添加标题分析结果
- title_analysis = analysis_result.get('标题分析', {})
- result_record.update({
- '标题语气': title_analysis.get('标题语气', ''),
- '标题长度': title_analysis.get('标题长度', ''),
- '标题行动号召强度': title_analysis.get('标题行动号召强度', 0),
- '标题紧急感强度': title_analysis.get('标题紧急感强度', 0),
- })
-
- # 添加分享图分析结果
- image_analysis = analysis_result.get('分享图分析', {})
- result_record.update({
- '构图主体完整性': image_analysis.get('构图主体完整性', ''),
- '有无人物': image_analysis.get('有无人物', ''),
- '人物数量': image_analysis.get('人物数量', ''),
- '人物年龄特征': image_analysis.get('人物年龄特征', ''),
- '图片上有和标题关联的贴图文字': image_analysis.get('图片上有和标题关联的贴图文字', ''),
- '色彩对比度': image_analysis.get('色彩对比度', ''),
- '视觉复杂度': image_analysis.get('视觉复杂度', ''),
- '怀旧元素强度': image_analysis.get('怀旧元素强度', 0),
- '图文一致性': image_analysis.get('图文一致性', 0),
- '情感共鸣潜力': image_analysis.get('情感共鸣潜力', ''),
- '好奇心激发程度': image_analysis.get('好奇心激发程度', 0),
- '视觉焦点明确性': image_analysis.get('视觉焦点明确性', ''),
- '整体清晰度': image_analysis.get('整体清晰度', 0),
- })
-
- # 添加成功特征总结
- success_features = analysis_result.get('成功特征总结', '')
- # 如果是列表,转换为字符串格式
- if isinstance(success_features, list):
- result_record['成功特征总结'] = ', '.join(success_features)
- else:
- result_record['成功特征总结'] = success_features
-
- logger.info("-" * 80) # 分隔线
- return result_record
-
- except Exception as e:
- logger.error(f"处理第 {row_index + 1} 行数据时出错: {e}")
- return None
- def main():
- """主函数"""
- try:
- # 获取 Gemini API key
- api_key = 'AIzaSyAkt1l9Kw1CQgHFzTpla0vgt0OE53fr-BI'
- if not api_key:
- logger.warning("未设置 GEMINI_API_KEY 环境变量,将使用模拟分析")
- logger.info("要使用真实的 Gemini 分析,请设置环境变量:")
- logger.info("export GEMINI_API_KEY='your_api_key_here'")
-
- # 初始化分析器
- analyzer = ImageAnalyzer(api_key)
-
- # 读取Excel文件
- excel_file_path = "高ROS2/罕见画面原始数据.xlsx"
- df = read_excel_file(excel_file_path)
-
- # 准备结果存储文件
- output_file = "高ROS2/罕见画面-高ROS-维度分析.xlsx"
-
- # 检查结果文件是否存在
- if os.path.exists(output_file):
- logger.info(f"结果文件 {output_file} 已存在,将重新创建")
- os.remove(output_file)
- logger.info(f"已删除旧的结果文件: {output_file}")
- else:
- logger.info(f"结果文件 {output_file} 不存在,将创建新文件")
-
- # 立即创建一个空的Excel文件
- wb = Workbook()
- ws = wb.active
- ws.title = "分析结果"
- wb.save(output_file)
- logger.info(f"已创建新的Excel文件: {output_file}")
-
- logger.info(f"开始逐行处理数据,总共 {len(df)} 行...")
-
- # 从第17行开始处理数据(跳过前16行,索引0-15)
- start_row = 0 # 第17行的索引是16
- if len(df) > start_row:
- logger.info(f"从第 {start_row + 1} 行开始处理,总共需要处理 {len(df) - start_row} 行数据...")
- for index, row in df.iterrows():
- if index < start_row:
- continue # 跳过前16行
-
- logger.info(f"正在处理第 {index + 1}/{len(df)} 行数据...")
- result = process_single_row(row, analyzer, index)
- if result:
- # 立即保存这一行的结果
- is_first_row = (index == start_row) # 第一行需要设置列标题
- save_single_result_to_excel(result, output_file, is_first_row)
- logger.info(f"第 {index + 1} 行数据处理并保存完成!")
- else:
- logger.warning(f"第 {index + 1} 行数据处理失败,跳过")
- else:
- logger.warning(f"Excel文件中数据行数不足,需要至少 {start_row + 1} 行数据")
- # 即使没有数据,也创建一个空的Excel文件
- save_results_with_images([], output_file)
-
- logger.info(f"✅ 所有数据处理完成,结果已保存到: {output_file}")
-
- except Exception as e:
- logger.error(f"程序执行出错: {e}")
- if __name__ == "__main__":
- main()
|