import os import sys import json from typing import Any, Dict, List, Optional sys.path.append(os.path.dirname(os.path.abspath(__file__))) from utils.logging_config import get_logger from utils.mysql_db import MysqlHelper from indentify.indentify import ContentIdentifier logger = get_logger('AgentTools') class QueryDataTool: """查询 knowledge_crawl_content 获取 data 列表中的 crawl_data 字段""" @staticmethod def fetch_crawl_data_list(request_id: str) -> List[Dict[str, Any]]: sql = "SELECT data FROM knowledge_crawl_content WHERE request_id = %s ORDER BY id ASC" rows = MysqlHelper.get_values(sql, (request_id,)) if not rows: logger.info(f"request_id={request_id} 未查询到数据,使用默认值") # 返回默认数据 default_data = { "crawl_data": { "channel": 1, "channel_content_id": "684a789b000000002202a61b", "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b", "wx_sn": None, "title": "一个视频学会,5个剪辑工具,超详细教程", "content_type": "video", "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#", "location": "未知", "source_url": None, "mini_program": None, "topic_list": [], "image_url_list": [ { "image_type": 2, "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg" } ], "video_url_list": [ { "video_url": "http://rescdn.yishihui.com/pipeline/video/6c2330e3-0674-4f01-b5b2-fc8c240158f8.mp4", "video_duration": 615 } ], "bgm_data": None, "ad_info": None, "is_original": False, "voice_data": None, "channel_account_id": "670a10ac000000001d0216ec", "channel_account_name": "小伍剪辑视频", "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg", "item_index": None, "view_count": None, "play_count": None, "like_count": 692, "collect_count": 996, "comment_count": 37, "share_count": None, "looking_count": None, "publish_timestamp": 1749711589000, "modify_timestamp": 1749711589000, "update_timestamp": 1755239186502 }, "raw": { "channel": 1, "channel_content_id": "684a789b000000002202a61b", "content_link": "https://www.xiaohongshu.com/explore/684a789b000000002202a61b", "wx_sn": None, "title": "一个视频学会,5个剪辑工具,超详细教程", "content_type": "video", "body_text": "#剪辑教程[话题]# #剪辑[话题]# #手机剪辑[话题]# #视频制作[话题]# #视频剪辑[话题]# #自学剪辑[话题]# #原创视频[话题]# #新手小白学剪辑[话题]#", "location": "未知", "source_url": None, "mini_program": None, "topic_list": [], "image_url_list": [ { "image_type": 2, "image_url": "http://rescdn.yishihui.com/pipeline/image/5be8f08a-4691-41b6-8dda-0b63cc2c1056.jpg" } ], "video_url_list": [ { "video_url": "http://rescdn.yishihui.com/pipeline/video/9e38400e-21dc-4063-bab5-47c1667bb59d.mp4", "video_duration": 615 } ], "bgm_data": None, "ad_info": None, "is_original": False, "voice_data": None, "channel_account_id": "670a10ac000000001d0216ec", "channel_account_name": "小伍剪辑视频", "channel_account_avatar": "https://sns-avatar-qc.xhscdn.com/avatar/1040g2jo31e469dkq0e005poa22m7c5ncbtuk1g0?imageView2/2/w/80/format/jpg", "item_index": None, "view_count": None, "play_count": None, "like_count": 692, "collect_count": 996, "comment_count": 37, "share_count": None, "looking_count": None, "publish_timestamp": 1749711589000, "modify_timestamp": 1749711589000, "update_timestamp": 1755239186502 } } return [default_data] results: List[Dict[str, Any]] = [] for row in rows: data_cell = row[0] if not data_cell: continue try: parsed = json.loads(data_cell) if isinstance(data_cell, (str, bytes)) else data_cell if isinstance(parsed, list): for item in parsed: if isinstance(item, dict): crawl_data = item.get('crawl_data') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "raw": item}) else: results.append({"crawl_data": item, "raw": item}) elif isinstance(parsed, dict): crawl_data = parsed.get('crawl_data') if isinstance(crawl_data, (dict, list)): results.append({"crawl_data": crawl_data, "raw": parsed}) else: results.append({"crawl_data": parsed, "raw": parsed}) else: logger.warning("data 字段非期望的 JSON 结构,已跳过一行") except Exception as e: logger.error(f"解析 data JSON 失败: {e}") logger.info(f"request_id={request_id} 提取 crawl_data 数量: {len(results)}") return results class IdentifyTool: """调用 indentify 内部能力,完成图像/视频识别""" def __init__(self) -> None: self.identifier = ContentIdentifier() def run(self, crawl_data: Dict[str, Any]) -> Dict[str, Any]: try: formatted_content = self.identifier.parse_formatted_content(crawl_data) recognition_result = self.identifier.process_content_recognition(formatted_content) title = formatted_content.get('title') or '' content = formatted_content.get('body_text') or '' channel = formatted_content.get('channel') or '' author = formatted_content.get('channel_account_name') or '' like_count = formatted_content.get('like_count') or 0 collect_count = formatted_content.get('collect_count') or 0 comment_count = formatted_content.get('comment_count') or 0 view_count = formatted_content.get('view_count') or 0 publish_time = formatted_content.get('publish_time') or '' update_timestamp = formatted_content.get('update_timestamp') or '' content_link = formatted_content.get('content_link') or '' content_id = formatted_content.get('channel_content_id') or '' complete_result = { 'channel': channel, 'title': title, 'content': content, 'images': recognition_result.get('image_analysis', {}).get('images_comprehension', []), 'videos': recognition_result.get('video_analysis', {}), 'meta': { 'author': author, 'like_count': like_count, 'collect_count': collect_count, 'comment_count': comment_count, 'view_count': view_count, 'publish_time': publish_time, 'update_timestamp': update_timestamp, 'content_link': content_link, 'content_id': content_id, } } return complete_result except Exception as e: logger.error(f"识别失败: {e}") return { 'channel': '', 'title': '', 'content': '', 'images': [], 'videos': [], 'meta': {}, 'error': str(e) } class StructureTool: """ 结构化工具:按照既定的结构将识别结果与原始 crawl_data 组合, 并存入 knowledge_parsing_content 表。 """ @staticmethod def store_parsing_result(request_id: str, crawl_raw: Dict[str, Any], identify_result: Dict[str, Any]) -> Optional[int]: payload = { 'request_id': request_id, 'crawl_raw': crawl_raw, 'identify_result': identify_result, } sql = ( "INSERT INTO knowledge_parsing_content (request_id, parsing_result, created_at) " "VALUES (%s, %s, NOW())" ) params = (request_id, json.dumps(payload, ensure_ascii=False)) return MysqlHelper.update_values(sql, params)