123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- 内容识别脚本
- 主要功能:
- 1. 从数据库中拉取一条 recognition_status = 0 的数据 :
- 2. 解析 formatted_content 中的图片和视频
- 3. 调用独立的图文识别和视频识别模块
- 4. 将识别结果更新到数据库
- """
- import os
- import json
- import time
- import sys
- import argparse
- from typing import Dict, Any, List, Optional
- from dotenv import load_dotenv
- from datetime import datetime
- # 导入自定义模块
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from utils.mysql_db import MysqlHelper
- from content_indentify.image_identifier import ImageIdentifier
- from content_indentify.video_identifier import VideoIdentifier
- from utils.logging_config import get_logger
- class ContentIdentifier:
- def __init__(self):
- # 加载环境变量
- load_dotenv()
-
- # 设置日志
- self.logger = get_logger('ContentIdentifier')
-
- # 初始化数据库连接
- self.db = MysqlHelper()
-
- # 初始化识别模块
- self.image_identifier = ImageIdentifier()
- self.video_identifier = VideoIdentifier()
-
- def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:
- """从数据库获取一条未处理的数据"""
- sql = """
- SELECT id, formatted_content
- FROM knowledge_search_content
- WHERE recognition_status = 0
- LIMIT 1
- """
-
- try:
- result = self.db.get_values(sql)
- if result and len(result) > 0:
- record = result[0]
- # 检查返回的字段数量
- if len(record) >= 3:
- return {
- 'id': record[0],
- 'formatted_content': record[1],
- 'channel_content_id': record[2]
- }
- elif len(record) == 2:
- # 如果没有channel_content_id字段,使用id作为默认值
- return {
- 'id': record[0],
- 'formatted_content': record[1],
- 'channel_content_id': record[0] # 使用id作为默认值
- }
- else:
- self.logger.error(f"数据库返回字段数量异常: {len(record)}, 期望至少2个字段")
- return None
- return None
- except Exception as e:
- self.logger.error(f"获取未处理记录失败: {e}")
- return None
-
- def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:
- """解析 formatted_content JSON 字符串"""
- try:
- if isinstance(formatted_content, str):
- return json.loads(formatted_content)
- elif isinstance(formatted_content, dict):
- return formatted_content
- else:
- raise ValueError(f"不支持的数据类型: {type(formatted_content)}")
- except json.JSONDecodeError as e:
- self.logger.error(f"解析 formatted_content JSON 失败: {e}")
- raise
-
- def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:
- """处理内容识别,调用独立的识别模块"""
- self.logger.info("开始内容识别处理...")
-
- # 图片识别
- image_result = self.image_identifier.process_images(formatted_content)
-
- # 视频识别
- video_result = self.video_identifier.process_videos(formatted_content)
-
- # 整合结果
- recognition_result = {
- 'image_analysis': image_result,
- 'video_analysis': video_result
- }
- self.logger.info(f"识别结果: {recognition_result}")
-
- return recognition_result
-
- def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:
- """更新数据库中的 multimodal_recognition 字段"""
- try:
- # 将结果转换为JSON字符串,并处理换行符问题
- result_json = json.dumps(recognition_result, ensure_ascii=False)
- # 将换行符替换为 \n 字符串,确保JSON可以被正确解析
- result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')
-
- # 构建更新SQL - 使用参数化查询避免换行符问题
- sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"
- params = (result_json, record_id)
-
- # 执行更新
- result = self.db.update_values(sql, params)
- if result is not None:
- self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")
- return True
- else:
- self.logger.error(f"更新记录 {record_id} 失败")
- return False
-
- except Exception as e:
- self.logger.error(f"更新数据库失败: {e}")
- return False
-
- def process_single_record(self) -> bool:
- """处理单条记录"""
- try:
- # 获取未处理的记录
- record = self.get_unprocessed_record()
- if not record:
- self.logger.warning("没有找到未处理的记录")
- return False
-
- self.logger.info(f"开始处理记录 ID: {record['id']}, 内容ID: {record['channel_content_id']}")
- # self.logger.info(f" 多模态识别: {record['multimodal_recognition'][:300]}...")
- # 先设置这条记录的 recognition_status = 1
- self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")
-
- # 解析 formatted_content
- formatted_content = self.parse_formatted_content(record['formatted_content'])
-
- # 提取基本信息,处理 null 值
- title = formatted_content.get('title') or ''
- content = formatted_content.get('body_text') or ''
- channel = formatted_content.get('channel') or ''
- images = formatted_content.get('image_url_list') or []
- videos = formatted_content.get('video_url_list') or []
- author = formatted_content.get('channel_account_name') or ''
- like_count = formatted_content.get('like_count') or 0
- collect_count = formatted_content.get('collect_count') or 0
- comment_count = formatted_content.get('comment_count') or 0
- view_count = formatted_content.get('view_count') or 0
- publish_time = formatted_content.get('publish_time') or ''
- update_timestamp = formatted_content.get('update_timestamp') or ''
- content_link = formatted_content.get('content_link') or ''
- content_id = formatted_content.get('channel_content_id') or ''
-
- # 调用内容识别处理
- recognition_result = self.process_content_recognition(formatted_content)
-
- # 构建完整的识别结果
- complete_result = {
- 'id': record['id'],
- 'channel': channel,
- 'title': title,
- 'content': content,
- 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),
- 'videos': recognition_result.get('video_analysis', {}),
- 'meta': {
- 'author': author,
- 'like_count': like_count,
- 'collect_count': collect_count,
- 'comment_count': comment_count,
- 'view_count': view_count,
- 'publish_time': publish_time,
- 'update_timestamp': update_timestamp,
- 'content_link': content_link,
- 'content_id': content_id,
- }
- }
-
- # 更新数据库
- success = self.update_multimodal_recognition(record['id'], complete_result)
-
- if success:
- self.logger.info(f"记录 {record['id']} 处理完成")
- return True
- else:
- self.logger.error(f"记录 {record['id']} 处理失败")
- return False
-
- except Exception as e:
- self.logger.error(f"处理记录失败: {e}")
- return False
-
- def process_all_records(self, max_records: int = 10):
- """处理多条记录"""
- self.logger.info(f"开始批量处理,最多处理 {max_records} 条记录")
-
- processed_count = 0
- success_count = 0
-
- for i in range(max_records):
- self.logger.info(f"\n--- 处理第 {i+1}/{max_records} 条记录 ---")
-
- if self.process_single_record():
- success_count += 1
- else:
- self.logger.warning("没有更多记录需要处理,结束批量处理")
- break
-
- processed_count += 1
-
- # 添加延迟避免API限制
- time.sleep(2)
-
- self.logger.info(f"\n批量处理完成!总共处理 {processed_count} 条记录,成功 {success_count} 条")
- def process_continuous(self, max_records: int = None, delay_seconds: int = 2):
- """连续处理记录,直到没有更多记录或达到最大数量限制"""
- self.logger.info("启动连续处理模式...")
- self.logger.info("系统将自动处理数据库中的记录,一条完成后自动处理下一条")
- self.logger.info(f"处理间隔: {delay_seconds} 秒")
- if max_records:
- self.logger.info(f"最大处理数量: {max_records} 条")
- else:
- self.logger.info("无数量限制,将处理所有可用记录")
- self.logger.info("按 Ctrl+C 可以随时停止处理")
- self.logger.info("-" * 60)
-
- processed_count = 0
- success_count = 0
- consecutive_failures = 0
- max_consecutive_failures = 3 # 连续失败3次后停止
-
- try:
- while True:
- # 检查是否达到最大数量限制
- if max_records and processed_count >= max_records:
- self.logger.info(f"\n已达到最大处理数量限制 ({max_records} 条),停止处理")
- break
-
- self.logger.info(f"\n--- 处理第 {processed_count + 1} 条记录 ---")
- self.logger.info(f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
-
- # 处理单条记录
- if self.process_single_record():
- success_count += 1
- consecutive_failures = 0 # 重置连续失败计数
- self.logger.info(f"✅ 记录处理成功 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
- else:
- consecutive_failures += 1
- self.logger.warning(f"❌ 记录处理失败 (成功: {success_count}, 失败: {processed_count - success_count + 1})")
-
- # 检查连续失败次数
- if consecutive_failures >= max_consecutive_failures:
- self.logger.warning(f"\n⚠️ 连续失败 {max_consecutive_failures} 次,可能没有更多记录需要处理")
- self.logger.info("停止连续处理")
- break
-
- processed_count += 1
-
- # 检查是否还有更多记录
- remaining_records = self.get_remaining_records_count()
- if remaining_records == 0:
- self.logger.info(f"\n🎉 所有记录已处理完成!总共处理 {processed_count} 条记录")
- break
-
- self.logger.info(f"剩余待处理记录: {remaining_records} 条")
-
- # 添加延迟避免API限制
- if delay_seconds > 0:
- self.logger.info(f"等待 {delay_seconds} 秒后处理下一条记录...")
- time.sleep(delay_seconds)
-
- except KeyboardInterrupt:
- self.logger.info(f"\n\n⏹️ 用户中断处理")
- self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
- except Exception as e:
- self.logger.error(f"\n\n💥 处理过程中发生错误: {e}")
- self.logger.info(f"已处理 {processed_count} 条记录,成功 {success_count} 条")
-
- self.logger.info(f"\n📊 连续处理完成!")
- self.logger.info(f"总处理数量: {processed_count}")
- self.logger.info(f"成功数量: {success_count}")
- self.logger.info(f"失败数量: {processed_count - success_count}")
- if processed_count > 0:
- success_rate = (success_count / processed_count) * 100
- self.logger.info(f"成功率: {success_rate:.1f}%")
- def get_remaining_records_count(self) -> int:
- """获取剩余待处理记录数量"""
- try:
- sql = "SELECT COUNT(*) FROM knowledge_search_content WHERE recognition_status = 0"
- result = self.db.get_values(sql)
- if result and len(result) > 0:
- return result[0][0]
- return 0
- except Exception as e:
- self.logger.error(f"获取剩余记录数量失败: {e}")
- return 0
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description='内容识别脚本 - 分析图片和视频内容')
- parser.add_argument('--single', action='store_true', help='只处理一条记录')
- parser.add_argument('--batch', type=int, default=10, help='批量处理记录数量,默认10条')
- parser.add_argument('--continuous', action='store_true', help='连续处理模式,自动处理所有可用记录')
- parser.add_argument('--max-records', type=int, help='连续处理模式下的最大处理数量限制')
- parser.add_argument('--delay', type=int, default=2, help='处理间隔时间(秒),默认2秒')
-
- args = parser.parse_args()
-
- try:
- # 创建ContentIdentifier实例
- identifier = ContentIdentifier()
-
- if args.single:
- # 处理单条记录
- identifier.process_single_record()
- elif args.continuous:
- # 连续处理模式
- identifier.process_continuous(args.max_records, args.delay)
- else:
- # 批量处理记录
- identifier.process_all_records(args.batch)
-
- except Exception as e:
- sys.stderr.write(f"程序执行失败: {e}\n")
- sys.exit(1)
- if __name__ == '__main__':
- main()
|