| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 | #!/usr/bin/env python3# -*- coding: utf-8 -*-"""内容识别脚本主要功能:1. 从数据库中拉取一条 recognition_status = 0 的数据 : 2. 解析 formatted_content 中的图片和视频3. 调用独立的图文识别和视频识别模块4. 将识别结果更新到数据库"""import osimport jsonimport timeimport sysimport argparsefrom typing import Dict, Any, List, Optionalfrom dotenv import load_dotenvfrom datetime import datetime# 导入自定义模块sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))from utils.mysql_db import MysqlHelperfrom indentify.image_identifier import ImageIdentifierfrom indentify.video_identifier import VideoIdentifierfrom indentify.audio_identifier import AudioIdentifierfrom utils.logging_config import get_loggerclass ContentIdentifier:    def __init__(self):        # 加载环境变量        load_dotenv()                # 设置日志        self.logger = get_logger('ContentIdentifier')                # 初始化数据库连接        self.db = MysqlHelper()                # 延迟初始化识别模块,确保在需要时使用正确的环境变量        self.image_identifier = None        self.video_identifier = None        self.audio_identifier = None        def get_unprocessed_record(self) -> Optional[Dict[str, Any]]:        """从数据库获取一条未处理的数据        先从 knowledge_content_query 表中选取 category_id = 0 的所有 query_word,        然后用这些 query_word 去 knowledge_search_content 表中匹配,        找出 recognition_status = 0 的一条开始处理        """        try:            # 第一步:获取 category_id = 0 的所有 query_word            query_sql = """            SELECT query_word             FROM knowledge_content_query             WHERE category_id = 0            """                        query_result = self.db.get_values(query_sql)            if not query_result:                self.logger.warning("未找到 category_id = 0 的 query_word")                return None                        query_words = [row[0] for row in query_result]            self.logger.info(f"找到 {len(query_words)} 个 category_id = 0 的 query_word")                        # 第二步:用这些 query_word 去匹配 knowledge_search_content 表            # 使用 IN 查询来匹配多个 query_word            if len(query_words) > 0:                # 构建带引号的查询条件,因为 query_word 是字符串                quoted_words = [f"'{word}'" for word in query_words]                placeholders = ','.join(quoted_words)                                content_sql = f"""                SELECT id, formatted_content                FROM knowledge_search_content                 WHERE recognition_status = 0                AND query_word IN ({placeholders})                LIMIT 1                """                                # 不需要传递参数,因为SQL已经包含了具体的值                result = self.db.get_values(content_sql)            else:                self.logger.warning("没有可用的 query_word 进行匹配")                return None            if result and len(result) > 0:                record = result[0]                # 检查返回的字段数量                return {                    'id': record[0],                    'formatted_content': record[1]                }            else:                self.logger.info("未找到匹配 query_word 且 recognition_status = 0 的记录")                return None                        except Exception as e:            self.logger.error(f"获取未处理记录失败: {e}")            return None        def parse_formatted_content(self, formatted_content: str) -> Dict[str, Any]:        """解析 formatted_content JSON 字符串"""        try:            if isinstance(formatted_content, str):                return json.loads(formatted_content)            elif isinstance(formatted_content, dict):                return formatted_content            else:                raise ValueError(f"不支持的数据类型: {type(formatted_content)}")        except json.JSONDecodeError as e:            self.logger.error(f"解析 formatted_content JSON 失败: {e}")            raise        def process_content_recognition(self, formatted_content: Dict[str, Any]) -> Dict[str, Any]:        """处理内容识别,调用独立的识别模块"""        self.logger.info(f"开始内容识别处理...{formatted_content.get('channel_content_id')}")                # 延迟初始化识别模块,确保使用正确的环境变量        if self.image_identifier is None:            self.image_identifier = ImageIdentifier()        if self.video_identifier is None:            self.video_identifier = VideoIdentifier()        if self.audio_identifier is None:            self.audio_identifier = AudioIdentifier()                # 图片识别        image_result = self.image_identifier.process_images(formatted_content)                # 视频识别        video_result = self.video_identifier.process_videos(formatted_content)        # 音频识别(仅ASR)        audio_result = self.audio_identifier.process_audios(formatted_content)                # 整合结果        recognition_result = {            'image_analysis': image_result,            'video_analysis': video_result,            'audio_analysis': audio_result        }                return recognition_result        def update_multimodal_recognition(self, record_id: int, recognition_result: Dict[str, Any]) -> bool:        """更新数据库中的 multimodal_recognition 字段"""        try:            # 将结果转换为JSON字符串,并处理换行符问题            result_json = json.dumps(recognition_result, ensure_ascii=False)            # 将换行符替换为 \n 字符串,确保JSON可以被正确解析            result_json = result_json.replace('\n', '\\n').replace('\r', '\\r')                        # 构建更新SQL - 使用参数化查询避免换行符问题            sql = "UPDATE knowledge_search_content SET multimodal_recognition = %s, updated_at = NOW(), recognition_status = 2 WHERE id = %s"            params = (result_json, record_id)                        # 执行更新            result = self.db.update_values(sql, params)            if result is not None:                self.logger.info(f"已更新记录 {record_id} 的 multimodal_recognition 字段")                return True            else:                self.logger.error(f"更新记录 {record_id} 失败")                return False                        except Exception as e:            self.logger.error(f"更新数据库失败: {e}")            return False        def process_single_record(self) -> bool:        """处理单条记录"""        try:            # 获取未处理的记录            record = self.get_unprocessed_record()            if not record:                self.logger.warning("没有找到未处理的记录")                return False                        self.logger.info(f"开始处理记录 ID: {record['id']}")            # self.logger.info(f"  多模态识别: {record['multimodal_recognition'][:300]}...")            # 先设置这条记录的 recognition_status = 1            self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 1 WHERE id = {record['id']}")                         # 解析 formatted_content            formatted_content = self.parse_formatted_content(record['formatted_content'])                        # 提取基本信息,处理 null 值            title = formatted_content.get('title') or ''            content = formatted_content.get('body_text') or ''            channel = formatted_content.get('channel') or ''            images = formatted_content.get('image_url_list') or []            videos = formatted_content.get('video_url_list') or []            author = formatted_content.get('channel_account_name') or ''            like_count = formatted_content.get('like_count') or 0            collect_count = formatted_content.get('collect_count') or 0            comment_count = formatted_content.get('comment_count') or 0            view_count = formatted_content.get('view_count') or 0            publish_time = formatted_content.get('publish_time') or ''            update_timestamp = formatted_content.get('update_timestamp') or ''            content_link = formatted_content.get('content_link') or ''            content_id = formatted_content.get('channel_content_id') or ''                        # 调用内容识别处理            recognition_result = self.process_content_recognition(formatted_content)            # 判断识别是否成功:如果视频任一项的 asr_content 包含“视频上传失败”或者包含“ASR分析失败”,则标记失败            video_analysis = recognition_result.get('video_analysis', {})            video_upload_failed = False            if isinstance(video_analysis, list):                for video_item in video_analysis:                    if isinstance(video_item, dict) and 'asr_content' in video_item:                        if '视频上传失败' in (video_item.get('asr_content') or '') or 'ASR分析失败' in (video_item.get('asr_content') or ''):                            video_upload_failed = True                            break            elif isinstance(video_analysis, dict):                if 'asr_content' in video_analysis and ('视频上传失败' in (video_analysis.get('asr_content') or '') or 'ASR分析失败' in (video_analysis.get('asr_content') or '')):                    video_upload_failed = True            if video_upload_failed:                self.logger.info(f"记录 {record['id']} 识别失败,将 recognition_status 设置为 3")                self.db.update_values(f"UPDATE knowledge_search_content SET recognition_status = 3 WHERE id = {record['id']}")                return False                        # 构建完整的识别结果            complete_result = {                'id': record['id'],                'channel': channel,                'title': title,                'content': content,                'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []),                'videos': recognition_result.get('video_analysis', {}),                'audios': recognition_result.get('audio_analysis', []),                'meta': {                    'author': author,                    'like_count': like_count,                    'collect_count': collect_count,                    'comment_count': comment_count,                    'view_count': view_count,                    'publish_time': publish_time,                    'update_timestamp': update_timestamp,                    'content_link': content_link,                    'content_id': content_id,                }            }                        # 更新数据库            success = self.update_multimodal_recognition(record['id'], complete_result)                        if success:                self.logger.info(f"记录 {record['id']} 处理完成")                return True            else:                self.logger.error(f"记录 {record['id']} 处理失败")                return False                        except Exception as e:            self.logger.error(f"处理记录失败: {e}")            return Falsedef main():    """主函数"""    identifier = ContentIdentifier()    identifier.process_single_record()if __name__ == '__main__':    main()
 |