""" 内容解析结果搜索工具 - 根据关键词搜索视频标题和标题的解析结果 用于 Agent 执行时根据关键词搜索视频内容及其解析结果。 """ import json import os from typing import Any, Dict, List, Optional import httpx from agent.tools import tool, ToolResult # API 配置 CONTENT_DECONSTRUCTION_BASE_URL = os.getenv( "CONTENT_DECONSTRUCTION_BASE_URL", "http://api.piaoquantv.com" ) DEFAULT_TIMEOUT = 30.0 async def _call_content_deconstruction_api( keywords: List[str], ) -> Optional[Dict[str, List[Dict[str, Any]]]]: """调用内容解析结果搜索 API,返回结果字典。""" url = f"{CONTENT_DECONSTRUCTION_BASE_URL.rstrip('/')}/supply-demand-engine-service/content/queryContentDeconstructionResultByKeywords" payload = {"keywords": keywords} try: async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client: resp = await client.post( url, json=payload, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() data = resp.json() except httpx.HTTPStatusError as e: raise RuntimeError( f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}" ) except Exception as e: raise RuntimeError(f"请求异常: {str(e)}") # 解析响应格式: {'code': 0, 'msg': 'success', 'data': {...}, 'success': True} if isinstance(data, dict): # 检查 code 字段 code = data.get("code", 0) if code != 0: msg = data.get("msg", "未知错误") raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}") # 获取 data 字段 result_data = data.get("data", {}) if isinstance(result_data, dict): return result_data return {} return {} @tool( description="根据关键词搜索视频标题和标题的解析结果。支持传入多个关键词,返回每个关键词对应的视频列表及其解析信息。", display={ "zh": { "name": "内容解析结果搜索", "params": { "keywords": "关键词列表,例如:['食用', '禁忌']", }, }, }, ) async def query_content_deconstruction_by_keywords( keywords: List[str], ) -> ToolResult: """ 根据关键词搜索视频标题和标题的解析结果。 Args: keywords: 关键词列表,例如:['食用', '禁忌'] Returns: ToolResult: 包含每个关键词对应的视频列表及其解析结果 """ # 验证关键词列表 if not keywords: return ToolResult( title="内容解析结果搜索失败", output="", error="关键词列表不能为空", ) if not isinstance(keywords, list): return ToolResult( title="内容解析结果搜索失败", output="", error=f"关键词必须是列表类型,当前类型为: {type(keywords).__name__}", ) # 过滤空字符串 keywords = [kw.strip() for kw in keywords if kw and kw.strip()] if not keywords: return ToolResult( title="内容解析结果搜索失败", output="", error="关键词列表中没有有效的关键词", ) try: results = await _call_content_deconstruction_api(keywords=keywords) except RuntimeError as e: return ToolResult( title="内容解析结果搜索失败", output="", error=str(e), ) if not results: return ToolResult( title="内容解析结果搜索", output=json.dumps( { "message": "未找到相关内容", "keywords": keywords, }, ensure_ascii=False, indent=2, ), ) # 统计每个关键词的结果数量 keyword_counts = { keyword: len(videos) if isinstance(videos, list) else 0 for keyword, videos in results.items() } total_count = sum(keyword_counts.values()) output = json.dumps(results, ensure_ascii=False, indent=2) return ToolResult( title=f"内容解析结果搜索 - {len(keywords)} 个关键词", output=output, long_term_memory=f"检索到内容解析结果,关键词: {', '.join(keywords)},共 {total_count} 条结果", )