import os import sys import json from typing import Any, Dict, List, Optional sys.path.append(os.path.dirname(os.path.abspath(__file__))) from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper from indentify.indentify import ContentIdentifier logger = get_logger('AgentTools') class QueryDataTool: """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段""" @staticmethod def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]: sql = "SELECT data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC" rows = MysqlHelper.get_values(sql, (request_id,)) if not rows: logger.info(f"request_id={request_id} 未查询到数据") return [] results: List[Dict[str, Any]] = [] for row in rows: data_cell = row[0] if not data_cell: continue try: parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell if isinstance(parsed, list): for item in parsed: if isinstance(item, dict): crawl_data = item.get('crawl_data') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "raw": item}) else: results.append({"crawl_data": item, "raw": item}) elif isinstance(parsed, dict): crawl_data = parsed.get('crawl_data') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "raw": parsed}) else: results.append({"crawl_data": parsed, "raw": parsed}) else: logger.warning("data 字段非期望的 JSON 结构,已跳过一行") except Exception as e: logger.error(f"解析 data JSON 失败: {e}") logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}") return results class IdentifyTool: """调用 indentify 内部能力,完成图像/视频识别""" def __init__(self) -> None: self.identifier = ContentIdentifier() def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]: try: formatted_content = self.identifier.parse_formatted_content(crawl_data) recognition_result = self.identifier.process_content_recognition(formatted_content) title = formatted_content.get('title') or '' content = formatted_content.get('body_text') or '' channel = formatted_content.get('channel') or '' author = formatted_content.get('channel_account_name') or '' like_count = formatted_content.get('like_count') or 0 collect_count = formatted_content.get('collect_count') or 0 comment_count = formatted_content.get('comment_count') or 0 view_count = formatted_content.get('view_count') or 0 publish_time = formatted_content.get('publish_time') or '' update_timestamp = formatted_content.get('update_timestamp') or '' content_link = formatted_content.get('content_link') or '' content_id = formatted_content.get('channel_content_id') or '' complete_result = { 'channel': channel, 'title': title, 'content': content, 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []), 'videos': recognition_result.get('video_analysis', {}), 'meta': { 'author': author, 'like_count': like_count, 'collect_count': collect_count, 'comment_count': comment_count, 'view_count': view_count, 'publish_time': publish_time, 'update_timestamp': update_timestamp, 'content_link': content_link, 'content_id': content_id, } } return complete_result except Exception as e: logger.error(f"识别失败: {e}") return { 'channel': '', 'title': '', 'content': '', 'images': [], 'videos': [], 'meta': {}, 'error': str(e) } class StructureTool: """ 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合, 并存入 knowledge_parsing_content 表。 """ @staticmethod def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]: payload = { 'request_id': request_id, 'crawl_raw': crawl_raw, 'identify_result': identify_result, } sql = ( "INSERT INTO knowledge_parsing_content (request_id, parsing_result, created_at) " "VALUES (%s, %s, NOW())" ) params = (request_id, json.dumps(payload, ensure_ascii=False)) return MysqlHelper.update_values(sql, params)