import os import sys import json from typing import Any, Dict, List, Optional sys.path.append(os.path.dirname(os.path.abspath(__file__))) from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper from indentify.indentify import ContentIdentifier from structure import StructureProcessor logger = get_logger('AgentTools') class QueryDataTool: """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段""" @staticmethod def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]: sql = "SELECT content_id, crawl_data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC" rows = MysqlHelper.get_values(sql, (request_id,)) if not rows: logger.info(f"request_id={request_id} 未查询到数据,使用默认值") # 返回默认数据 default_data = [{ "request_id": request_id, "content_id": "1", "id": 1, "task_id": 1, "crawl_data": { "channel": 1, "channel_content_id": "684a789b000000002202a61b", "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b", "wx_sn": None, "title": "一个视频学会,5个剪辑工具,超详细教程", "content_type": "video", "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#", "location": "未知", "source_url": None, "mini_program": None, "topic_list": [], "image_url_list": [ { "image_type": 2, "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg" } ], "video_url_list": [ # { # "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4", # "video_duration": 615 # } ], "bgm_data": None, "ad_info": None, "is_original": False, "voice_data": None, "channel_account_id": "670a10ac000000001d0216ec", "channel_account_name": "小伍剪辑视频", "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg", "item_index": None, "view_count": None, "play_count": None, "like_count": 692, "collect_count": 996, "comment_count": 37, "share_count": None, "looking_count": None, "publish_timestamp": 1749711589000, "modify_timestamp": 1749711589000, "update_timestamp": 1755239186502 } },{ "request_id": request_id, "content_id": "2", "id": 2, "task_id": 2, "crawl_data": { "channel": 1, "channel_content_id": "684a789b000000002202a61b", "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b", "wx_sn": None, "title": "一个视频学会,5个剪辑工具,超详细教程", "content_type": "video", "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#", "location": "未知", "source_url": None, "mini_program": None, "topic_list": [], "image_url_list": [ { "image_type": 2, "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg" } ], "video_url_list": [ { "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4", "video_duration": 615 } ], "bgm_data": None, "ad_info": None, "is_original": False, "voice_data": None, "channel_account_id": "670a10ac000000001d0216ec", "channel_account_name": "小伍剪辑视频", "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg", "item_index": None, "view_count": None, "play_count": None, "like_count": 692, "collect_count": 996, "comment_count": 37, "share_count": None, "looking_count": None, "publish_timestamp": 1749711589000, "modify_timestamp": 1749711589000, "update_timestamp": 1755239186502 } }] return [] results: List[Dict[str, Any]] = [] for row in rows: data_cell = row if not data_cell: continue try: parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell # logger.info(f"parsed: {parsed}") # 处理元组类型(数据库查询结果) if isinstance(parsed, tuple) and len(parsed) > 4: # 假设第4个元素是JSON字符串 json_str = parsed[4] content_id = parsed[1] if isinstance(json_str, str): try: json_data = json.loads(json_str) if isinstance(json_data, dict): results.append({"crawl_data": json_data, "content_id": content_id, "raw": parsed}) elif isinstance(json_data, list): for item in json_data: if isinstance(item, dict): results.append({"crawl_data": item, "content_id": content_id, "raw": parsed}) except json.JSONDecodeError: logger.warning(f"元组中第4个元素不是有效的JSON: {json_str}") else: logger.warning(f"元组中第4个元素不是字符串: {type(json_str)}") # 处理列表类型 elif isinstance(parsed, list): for item in parsed: if isinstance(item, dict): crawl_data = item.get('crawl_data') content_id = item.get('content_id') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "content_id": content_id, "raw": item}) else: results.append({"crawl_data": item, "content_id": content_id, "raw": item}) # 处理字典类型 elif isinstance(parsed, dict): crawl_data = parsed.get('crawl_data') content_id = parsed.get('content_id') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "content_id": content_id, "raw": parsed}) else: results.append({"crawl_data": parsed, "content_id": content_id, "raw": parsed}) else: logger.warning(f"data 字段非期望的数据结构: {type(parsed)}, 已跳过一行") except Exception as e: logger.error(f"解析 data JSON 失败: {e}") logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}") return results class IdentifyTool: """调用 indentify 内部能力,完成图像/视频识别""" def __init__(self) -> None: self.identifier = ContentIdentifier() def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]: try: formatted_content = self.identifier.parse_formatted_content(crawl_data) recognition_result = self.identifier.process_content_recognition(formatted_content) title = formatted_content.get('title') or '' content = formatted_content.get('body_text') or '' channel = formatted_content.get('channel') or '' author = formatted_content.get('channel_account_name') or '' like_count = formatted_content.get('like_count') or 0 collect_count = formatted_content.get('collect_count') or 0 comment_count = formatted_content.get('comment_count') or 0 view_count = formatted_content.get('view_count') or 0 publish_time = formatted_content.get('publish_time') or '' update_timestamp = formatted_content.get('update_timestamp') or '' content_link = formatted_content.get('content_link') or '' content_id = formatted_content.get('channel_content_id') or '' complete_result = { 'channel': channel, 'title': title, 'content': content, 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []), 'videos': recognition_result.get('video_analysis', {}), 'meta': { 'author': author, 'like_count': like_count, 'collect_count': collect_count, 'comment_count': comment_count, 'view_count': view_count, 'publish_time': publish_time, 'update_timestamp': update_timestamp, 'content_link': content_link, 'content_id': content_id, } } return complete_result except Exception as e: logger.error(f"识别失败: {e}") return { 'channel': '', 'title': '', 'content': '', 'images': [], 'videos': [], 'meta': {}, 'error': str(e) } class UpdateDataTool: """ 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合, 并存入 knowledge_parsing_content 表。 """ @staticmethod def store_indentify_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]: """ 存储解析结果到 knowledge_parsing_content 表 Args: request_id: 请求ID crawl_raw: 原始爬取数据 identify_result: 识别结果 Returns: 插入的行ID,失败返回None """ try: logger.info(f"存储识别结果: request_id={request_id}, crawl_raw={crawl_raw}, identify_result={identify_result}") # 从原始数据中提取必要字段 content_id = crawl_raw.get('content_id') or '' task_id = crawl_raw.get('task_id') or '' # 默认任务ID,可根据需要调整 # 先查询是否存在相同 request_id + content_id 的记录 check_sql = "SELECT id, status FROM knowledge_parsing_content WHERE request_id = %s AND content_id = %s LIMIT 1" exists = MysqlHelper.get_values(check_sql, (request_id, content_id)) # 状态:2 表示识别处理完成 status = 2 serialized_identify = json.dumps(identify_result, ensure_ascii=False) if exists: # 已存在则更新,不新建 existing_id = exists[0][0] if isinstance(exists, list) and len(exists) > 0 else None update_sql = ( "UPDATE knowledge_parsing_content " "SET indentify_data = %s, task_id = %s, status = %s " "WHERE request_id = %s AND content_id = %s" ) update_params = ( serialized_identify, task_id, status, request_id, content_id ) updated = MysqlHelper.update_values(update_sql, update_params) if updated is not None: logger.info(f"更新识别结果成功: request_id={request_id}, content_id={content_id}, id={existing_id}") return existing_id else: # 不存在则插入 insert_sql = ( "INSERT INTO knowledge_parsing_content " "(content_id, request_id, task_id, indentify_data, create_time, status) " "VALUES (%s, %s, %s, %s, NOW(), %s)" ) insert_params = ( content_id, request_id, task_id, serialized_identify, status ) result = MysqlHelper.insert_and_get_id(insert_sql, insert_params) if result: logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}, insert_id={result}") return result except Exception as e: logger.error(f"存储解析结果失败: request_id={request_id}, error={e}") return None @staticmethod def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], parsing_result: Dict[str, Any]) -> Optional[int]: """ 存储解析结果到 knowledge_parsing_content 表 Args: request_id: 请求ID crawl_raw: 原始爬取数据 parsing_result: 结构化的结果(建议传入 StructureTool.process_content_structure 的返回值) Returns: 受影响的行数,失败返回None """ try: # 从原始数据中提取必要字段 content_id = crawl_raw.get('content_id') or '' # 只提取result字段的内容,如果不存在则使用整个对象 structured_content = parsing_result.get('structured_content', {}) if isinstance(structured_content, dict) and 'result' in structured_content: # 如果structured_content是字典且包含result字段,只存储result字段 parsing_payload = structured_content['result'] else: # 否则存储整个structured_content parsing_payload = structured_content # 更新数据 sql = ( "UPDATE knowledge_parsing_content " "SET parsing_data = %s, status = %s " "WHERE content_id = %s AND id = %s" ) # 状态:5 表示结构化处理完成 status = 5 # 判断是否需要序列化为字符串 if isinstance(parsing_payload, (dict, list)): parsing_payload = json.dumps(parsing_payload, ensure_ascii=False) params = ( parsing_payload, status, content_id, crawl_raw.get('id') or '' ) result = MysqlHelper.update_values(sql, params) if result: logger.info(f"存储解析结果成功: request_id={request_id}, content_id={content_id}") return result except Exception as e: logger.error(f"存储解析结果失败: request_id={request_id}, error={e}") return None class StructureTool: """ 内容结构化工具:调用tools/structure内部的方法进行内容结构化处理 """ def __init__(self): """初始化结构化工具""" self.structure_processor = StructureProcessor() def process_content_structure(self, content_data: Dict[str, Any]) -> Dict[str, Any]: """ 处理内容结构化 Args: content_data: 包含识别结果的内容数据,格式如下: { 'channel': str, 'title': str, 'content': str, 'images': List[str], 'videos': Dict, 'meta': Dict } Returns: Dict[str, Any]: 结构化处理后的结果 """ try: # 结构化输入规范化 structure_input = { "title": content_data.get('title', ''), "body_text": content_data.get('content', ''), "images_comprehension": content_data.get('images', []) } # 调用结构化处理器 structured_content = self.structure_processor.process_content(structure_input) # 若返回为字符串或字典,直接封装;不访问 .result result = { 'original_data': content_data, 'structured_content': structured_content, 'structure_status': 'success', 'process_time': self._get_current_timestamp() } logger.info(f"内容结构化处理成功: title={content_data.get('title', '')}") return result except Exception as e: logger.error(f"内容结构化处理失败: {e}") return { 'original_data': content_data, 'structured_content': '', 'structure_status': 'failed', 'error': str(e), 'process_time': self._get_current_timestamp() } def _get_current_timestamp(self) -> str: """获取当前时间戳字符串""" from datetime import datetime return datetime.now().strftime('%Y-%m-%d %H:%M:%S')