#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 内容识别脚本 主要功能: 1. 从数据库中拉取一条 recognition_status = 0 的数据 : 2. 解析 formatted_content 中的图片和视频 3. 调用独立的图文识别和视频识别模块 4. 将识别结果更新到数据库 """ import os import json import time import sys import argparse from typing import Dict, Any, List, Optional from dotenv import load_dotenv from datetime import datetime # 导入自定义模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.mysql_db import MysqlHelper from indentify.image_identifier import ImageIdentifier from indentify.video_identifier import VideoIdentifier from utils.logging_config import get_logger class ContentIdentifier: def __init__(self): # 加载环境变量 load_dotenv() # 设置日志 self.logger = get_logger('ContentIdentifier') # 初始化数据库连接 self.db = MysqlHelper() # 初始化识别模块 self.image_identifier = ImageIdentifier() self.video_identifier = VideoIdentifier() def get_unprocessed_record(self) -> Optional[Dict[str, Any]]: """从数据库获取一条未处理的数据 先从 knowledge_content_query 表中选取 category_id = 0 的所有 query_word, 然后用这些 query_word 去 knowledge_search_content 表中匹配, 找出 recognition_status = 0 的一条开始处理 """ try: # 第一步:获取 category_id = 0 的所有 query_word query_sql = """ SELECT query_word FROM knowledge_content_query WHERE category_id = 0 """ query_result = self.db.get_values(query_sql) if not query_result: self.logger.warning("未找到 category_id = 0 的 query_word") return None query_words = [row[0] for row in query_result] self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word") # 第二步:用这些 query_word 去匹配 knowledge_search_content 表 # 使用 IN 查询来匹配多个 query_word if len(query_words) > 0: # 构建带引号的查询条件,因为 query_word 是字符串 quoted_words = [f"'{word}'" for word in query_words] placeholders = ','.join(quoted_words) content_sql = f""" SELECT id, formatted_content FROM knowledge_search_content WHERE recognition_status = 0 AND query_word IN ({placeholders}) LIMIT 1 """ # 不需要传递参数,因为SQL已经包含了具体的值 result = self.db.get_values(content_sql) else: self.logger.warning("没有可用的 query_word 进行匹配") return None if result and len(result) > 0: record = result[0] # 检查返回的字段数量 return { 'id': record[0], 'formatted_content': record[1] } else: self.logger.info("未找到匹配 query_word 且 recognition_status = 0 的记录") return None except Exception as e: self.logger.error(f"获取未处理记录失败: {e}") return None def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]: """解析 formatted_content JSON 字符串""" try: if isinstance(formatted_content, str): return json.loads(formatted_content) elif isinstance(formatted_content, dict): return formatted_content else: raise ValueError(f"不支持的数据类型: {type(formatted_content)}") except json.JSONDecodeError as e: self.logger.error(f"解析 formatted_content JSON 失败: {e}") raise def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]: """处理内容识别,调用独立的识别模块""" self.logger.info("开始内容识别处理...") # 图片识别 image_result = self.image_identifier.process_images(formatted_content) # 视频识别 video_result = self.video_identifier.process_videos(formatted_content) # 整合结果 recognition_result = { 'image_analysis': image_result, 'video_analysis': video_result } self.logger.info(f"识别结果: {recognition_result}") return recognition_result def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool: """更新数据库中的 multimodal_recognition 字段""" try: # 将结果转换为JSON字符串,并处理换行符问题 result_json = json.dumps(recognition_result, ensure_ascii=False) # 将换行符替换为 \n 字符串,确保JSON可以被正确解析 result_json = result_json.replace('\n', '\\n').replace('\r', '\\r') # 构建更新SQL - 使用参数化查询避免换行符问题 sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s" params = (result_json, record_id) # 执行更新 result = self.db.update_values(sql, params) if result is not None: self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段") return True else: self.logger.error(f"更新记录 {record_id} 失败") return False except Exception as e: self.logger.error(f"更新数据库失败: {e}") return False def process_single_record(self) -> bool: """处理单条记录""" try: # 获取未处理的记录 record = self.get_unprocessed_record() if not record: self.logger.warning("没有找到未处理的记录") return False self.logger.info(f"开始处理记录 ID: {record['id']}") # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...") # 先设置这条记录的 recognition_status = 1 self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 1 WHERE id = {record['id']}") # 解析 formatted_content formatted_content = self.parse_formatted_content(record['formatted_content']) # 提取基本信息,处理 null 值 title = formatted_content.get('title') or '' content = formatted_content.get('body_text') or '' channel = formatted_content.get('channel') or '' images = formatted_content.get('image_url_list') or [] videos = formatted_content.get('video_url_list') or [] author = formatted_content.get('channel_account_name') or '' like_count = formatted_content.get('like_count') or 0 collect_count = formatted_content.get('collect_count') or 0 comment_count = formatted_content.get('comment_count') or 0 view_count = formatted_content.get('view_count') or 0 publish_time = formatted_content.get('publish_time') or '' update_timestamp = formatted_content.get('update_timestamp') or '' content_link = formatted_content.get('content_link') or '' content_id = formatted_content.get('channel_content_id') or '' # 调用内容识别处理 recognition_result = self.process_content_recognition(formatted_content) # 判断识别是否成功:如果视频任一项的 asr_content 包含“视频上传失败”或者包含“ASR分析失败”,则标记失败 video_analysis = recognition_result.get('video_analysis', {}) video_upload_failed = False if isinstance(video_analysis, list): for video_item in video_analysis: if isinstance(video_item, dict) and 'asr_content' in video_item: if '视频上传失败' in (video_item.get('asr_content') or '') or 'ASR分析失败' in (video_item.get('asr_content') or ''): video_upload_failed = True break elif isinstance(video_analysis, dict): if 'asr_content' in video_analysis and ('视频上传失败' in (video_analysis.get('asr_content') or '') or 'ASR分析失败' in (video_analysis.get('asr_content') or '')): video_upload_failed = True if video_upload_failed: self.logger.info(f"记录 {record['id']} 识别失败,将 recognition_status 设置为 3") self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}") return False # 构建完整的识别结果 complete_result = { 'id': record['id'], 'channel': channel, 'title': title, 'content': content, 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []), 'videos': recognition_result.get('video_analysis', {}), 'meta': { 'author': author, 'like_count': like_count, 'collect_count': collect_count, 'comment_count': comment_count, 'view_count': view_count, 'publish_time': publish_time, 'update_timestamp': update_timestamp, 'content_link': content_link, 'content_id': content_id, } } # 更新数据库 success = self.update_multimodal_recognition(record['id'], complete_result) if success: self.logger.info(f"记录 {record['id']} 处理完成") return True else: self.logger.error(f"记录 {record['id']} 处理失败") return False except Exception as e: self.logger.error(f"处理记录失败: {e}") return False def main(): """主函数""" identifier = ContentIdentifier() identifier.process_single_record() if __name__ == '__main__': main()