#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容识别脚本 主要功能: 1. 从数据库中拉取一条 recognition_status = 0 的数据 : 2. 解析 formatted_content 中的图片和视频 3. 调用独立的图文识别和视频识别模块 4. 将识别结果更新到数据库 """ import os import json import time import sys import argparse from typing import Dict, Any, List, Optional from dotenv import load_dotenv from datetime import datetime # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from content_indentify.image_identifier import ImageIdentifier from content_indentify.video_identifier import VideoIdentifier from utils.logging_config import get_logger class ContentIdentifier: def __init__(self): # 加载环境变量 load_dotenv() # 设置日志 self.logger = get_logger('ContentIdentifier') # 初始化数据库连接 self.db = MysqlHelper() # 初始化识别模块 self.image_identifier = ImageIdentifier() self.video_identifier = VideoIdentifier() def get_unprocessed_record(self) -> Optional[Dict[str, Any]]: """从数据库获取一条未处理的数据""" sql = """ SELECT id, formatted_content FROM knowledge_search_content WHERE recognition_status = 0 LIMIT 1 """ try: result = self.db.get_values(sql) if result and len(result) > 0: record = result[0] # 检查返回的字段数量 if len(record) >= 3: return { 'id': record[0], 'formatted_content': record[1], 'channel_content_id': record[2] } elif len(record) == 2: # 如果没有channel_content_id字段,使用id作为默认值 return { 'id': record[0], 'formatted_content': record[1], 'channel_content_id': record[0] # 使用id作为默认值 } else: self.logger.error(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段") return None return None except Exception as e: self.logger.error(f"获取未处理记录失败: {e}") return None def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]: """解析 formatted_content JSON 字符串""" try: if isinstance(formatted_content, str): return json.loads(formatted_content) elif isinstance(formatted_content, dict): return formatted_content else: raise ValueError(f"不支持的数据类型: {type(formatted_content)}") except json.JSONDecodeError as e: self.logger.error(f"解析 formatted_content JSON 失败: {e}") raise def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]: """处理内容识别,调用独立的识别模块""" self.logger.info("开始内容识别处理...") # 图片识别 image_result = self.image_identifier.process_images(formatted_content) # 视频识别 video_result = self.video_identifier.process_videos(formatted_content) # 整合结果 recognition_result = { 'image_analysis': image_result, 'video_analysis': video_result } self.logger.info(f"识别结果: {recognition_result}") return recognition_result def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool: """更新数据库中的 multimodal_recognition 字段""" try: # 将结果转换为JSON字符串,并处理换行符问题 result_json = json.dumps(recognition_result, ensure_ascii=False) # 将换行符替换为 \n 字符串,确保JSON可以被正确解析 result_json = result_json.replace('\n', '\\n').replace('\r', '\\r') # 构建更新SQL - 使用参数化查询避免换行符问题 sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s" params = (result_json, record_id) # 执行更新 result = self.db.update_values(sql, params) if result is not None: self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段") return True else: self.logger.error(f"更新记录 {record_id} 失败") return False except Exception as e: self.logger.error(f"更新数据库失败: {e}") return False def process_single_record(self) -> bool: """处理单条记录""" try: # 获取未处理的记录 record = self.get_unprocessed_record() if not record: self.logger.warning("没有找到未处理的记录") return False self.logger.info(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}") # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...") # 先设置这条记录的 recognition_status = 1 self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}") # 解析 formatted_content formatted_content = self.parse_formatted_content(record['formatted_content']) # 提取基本信息,处理 null 值 title = formatted_content.get('title') or '' content = formatted_content.get('body_text') or '' channel = formatted_content.get('channel') or '' images = formatted_content.get('image_url_list') or [] videos = formatted_content.get('video_url_list') or [] author = formatted_content.get('channel_account_name') or '' like_count = formatted_content.get('like_count') or 0 collect_count = formatted_content.get('collect_count') or 0 comment_count = formatted_content.get('comment_count') or 0 view_count = formatted_content.get('view_count') or 0 publish_time = formatted_content.get('publish_time') or '' update_timestamp = formatted_content.get('update_timestamp') or '' content_link = formatted_content.get('content_link') or '' content_id = formatted_content.get('channel_content_id') or '' # 调用内容识别处理 recognition_result = self.process_content_recognition(formatted_content) # 构建完整的识别结果 complete_result = { 'id': record['id'], 'channel': channel, 'title': title, 'content': content, 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []), 'videos': recognition_result.get('video_analysis', {}), 'meta': { 'author': author, 'like_count': like_count, 'collect_count': collect_count, 'comment_count': comment_count, 'view_count': view_count, 'publish_time': publish_time, 'update_timestamp': update_timestamp, 'content_link': content_link, 'content_id': content_id, } } # 更新数据库 success = self.update_multimodal_recognition(record['id'], complete_result) if success: self.logger.info(f"记录 {record['id']} 处理完成") return True else: self.logger.error(f"记录 {record['id']} 处理失败") return False except Exception as e: self.logger.error(f"处理记录失败: {e}") return False def process_all_records(self, max_records: int = 10): """处理多条记录""" self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录") processed_count = 0 success_count = 0 for i in range(max_records): self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---") if self.process_single_record(): success_count += 1 else: self.logger.warning("没有更多记录需要处理,结束批量处理") break processed_count += 1 # 添加延迟避免API限制 time.sleep(2) self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条") def process_continuous(self, max_records: int = None, delay_seconds: int = 2): """连续处理记录,直到没有更多记录或达到最大数量限制""" self.logger.info("启动连续处理模式...") self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条") self.logger.info(f"处理间隔: {delay_seconds} 秒") if max_records: self.logger.info(f"最大处理数量: {max_records} 条") else: self.logger.info("无数量限制,将处理所有可用记录") self.logger.info("按 Ctrl+C 可以随时停止处理") self.logger.info("-" * 60) processed_count = 0 success_count = 0 consecutive_failures = 0 max_consecutive_failures = 3 # 连续失败3次后停止 try: while True: # 检查是否达到最大数量限制 if max_records and processed_count >= max_records: self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理") break self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---") self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") # 处理单条记录 if self.process_single_record(): success_count += 1 consecutive_failures = 0 # 重置连续失败计数 self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})") else: consecutive_failures += 1 self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})") # 检查连续失败次数 if consecutive_failures >= max_consecutive_failures: self.logger.warning(f"\n⚠️ 连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理") self.logger.info("停止连续处理") break processed_count += 1 # 检查是否还有更多记录 remaining_records = self.get_remaining_records_count() if remaining_records == 0: self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录") break self.logger.info(f"剩余待处理记录: {remaining_records} 条") # 添加延迟避免API限制 if delay_seconds > 0: self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...") time.sleep(delay_seconds) except KeyboardInterrupt: self.logger.info(f"\n\n⏹️ 用户中断处理") self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条") except Exception as e: self.logger.error(f"\n\n💥 处理过程中发生错误: {e}") self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条") self.logger.info(f"\n📊 连续处理完成!") self.logger.info(f"总处理数量: {processed_count}") self.logger.info(f"成功数量: {success_count}") self.logger.info(f"失败数量: {processed_count - success_count}") if processed_count > 0: success_rate = (success_count / processed_count) * 100 self.logger.info(f"成功率: {success_rate:.1f}%") def get_remaining_records_count(self) -> int: """获取剩余待处理记录数量""" try: sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0" result = self.db.get_values(sql) if result and len(result) > 0: return result[0][0] return 0 except Exception as e: self.logger.error(f"获取剩余记录数量失败: {e}") return 0 def main(): """主函数""" parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容') parser.add_argument('--single', action='store_true', help='只处理一条记录') parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条') parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录') parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制') parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒') args = parser.parse_args() try: # 创建ContentIdentifier实例 identifier = ContentIdentifier() if args.single: # 处理单条记录 identifier.process_single_record() elif args.continuous: # 连续处理模式 identifier.process_continuous(args.max_records, args.delay) else: # 批量处理记录 identifier.process_all_records(args.batch) except Exception as e: sys.stderr.write(f"程序执行失败: {e}\n") sys.exit(1) if __name__ == '__main__': main()