""" 内容工具族 —— 统一入口 4 个 @tool 注册给 LLM: - content_platforms: 列出/查询平台及其参数 - content_search: 跨平台搜索 - content_detail: 查看详情 - content_suggest: 搜索建议词 所有平台的具体实现在 platforms/ 子目录,按模块自注册到 registry。 """ import json import os import uuid from typing import Any, Dict, Optional from agent.tools import tool, ToolResult, ToolContext from agent.tools.builtin.content.registry import ( all_platforms, get_platform, match_platforms, ) from agent.tools.builtin.content import cache as _cache # 导入平台模块以触发自注册(副作用导入) import agent.tools.builtin.content.platforms.aigc_channel # noqa: F401 import agent.tools.builtin.content.platforms.youtube # noqa: F401 import agent.tools.builtin.content.platforms.x # noqa: F401 def _get_trace_id(context: Optional[ToolContext]) -> str: """从 context 取 trace_id,回退到环境变量或自动生成""" if context and hasattr(context, "trace_id") and context.trace_id: return context.trace_id return os.getenv("TRACE_ID") or f"anon-{uuid.uuid4().hex[:8]}" # ── content_platforms ── @tool(hidden_params=["context"], groups=["content"]) async def content_platforms( platform: str = "", context: Optional[ToolContext] = None, ) -> ToolResult: """ 列出支持的内容平台及其搜索参数。 不传 platform 时返回所有平台的概要列表(仅名称和 ID)。 传入 platform 时模糊匹配并返回匹配平台的详细参数说明(支持 ID、中文名、别名)。 建议在不熟悉平台参数时先调用此工具查看,再构造 content_search / content_detail 的参数。 Args: platform: 可选,平台名称或关键词。支持模糊匹配(如 "xhs"、"小红书"、"youtube")。 留空返回全部平台概要。 context: 工具上下文(自动注入) """ hits = match_platforms(platform) if not hits: all_ids = [p.id for p in all_platforms()] return ToolResult( title="未找到匹配平台", output=f"没有匹配 '{platform}' 的平台。可用平台: {', '.join(all_ids)}", ) if platform: # 有 query:返回匹配平台的详细参数 result = [p.detail() for p in hits] else: # 无 query:返回概要列表 result = [p.summary() for p in hits] return ToolResult( title=f"内容平台" + (f" ({platform})" if platform else ""), output=json.dumps(result, ensure_ascii=False, indent=2), ) # ── content_search ── @tool(hidden_params=["context"], groups=["content"]) async def content_search( platform: str, keyword: str, max_count: int = 20, cursor: str = "", extras: Optional[Dict[str, Any]] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 跨平台内容搜索,返回带索引编号的封面拼图 + 概览列表。 返回的是摘要信息(标题 + 正文截断 + 互动数据),不含完整正文和所有图片。 如需查看某条内容的完整信息,请使用 content_detail。 Args: platform: 平台标识,如 'xhs'、'youtube'、'x'。完整列表见 content_platforms。 keyword: 搜索关键词。 max_count: 返回条数上限,默认 20。 cursor: 分页游标,首次搜索留空,翻页时传入上次返回值。 extras: 平台专用参数(dict)。不同平台支持不同参数, 如 xhs 支持 sort_type / publish_time / content_type / filter_note_range。 不清楚可先调 content_platforms(platform) 查看。 context: 工具上下文(自动注入) """ pdef = get_platform(platform) if not pdef: # 尝试模糊匹配 hits = match_platforms(platform) if hits: suggestions = ", ".join(f"{p.id}({p.name})" for p in hits[:3]) return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。你是否想要: {suggestions}") all_ids = [p.id for p in all_platforms()] return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。可用: {', '.join(all_ids)}") if not pdef.search_impl: return ToolResult(title="不支持搜索", output=f"平台 {pdef.name} 暂不支持搜索") result = await pdef.search_impl( platform_id=pdef.id, keyword=keyword, max_count=max_count, cursor=cursor, extras=extras, ) # 持久化搜索结果到磁盘缓存 if not result.error: posts = result.metadata.pop("posts", []) trace_id = _get_trace_id(context) _cache.save_search_results(trace_id, pdef.id, keyword, posts) return result # ── content_detail ── @tool(hidden_params=["context"], groups=["content"]) async def content_detail( platform: str, index: int, extras: Optional[Dict[str, Any]] = None, context: Optional[ToolContext] = None, ) -> ToolResult: """ 查看内容详情。从最近一次 content_search 的结果中按索引取完整记录。 Args: platform: 平台标识(必须和之前 content_search 用的一致)。 index: 内容序号(1-based),来自 content_search 返回的 index 字段。 extras: 平台专用详情参数。YouTube 支持 include_captions / download_video。 其他平台通常不需要。 context: 工具上下文(自动注入) """ pdef = get_platform(platform) if not pdef: return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'") trace_id = _get_trace_id(context) post = _cache.get_cached_post(trace_id, pdef.id, index) if not post: info = _cache.get_cached_search_info(trace_id, pdef.id) if info: return ToolResult( title="索引无效", output=f"平台 {pdef.name} 上次搜索 '{info['keyword']}' 共 {info['total']} 条," f"有效索引 1-{info['total']},你传入了 {index}。", error="Invalid index", ) return ToolResult( title="缓存未命中", output=f"没有 {pdef.name} 的搜索缓存。请先调用 content_search(platform='{pdef.id}', keyword=...) 搜索。", error="No cache", ) if pdef.detail_impl: return await pdef.detail_impl(post, extras) # fallback:直接返回缓存的完整数据 return ToolResult( title=f"详情 #{index}", output=json.dumps(post, ensure_ascii=False, indent=2), ) # ── content_suggest ── @tool(hidden_params=["context"], groups=["content"]) async def content_suggest( platform: str, keyword: str, context: Optional[ToolContext] = None, ) -> ToolResult: """ 获取搜索关键词补全建议。 仅部分平台支持(xhs、toutiao、douyin、bili、zhihu)。 用于辅助用户发现更精准的搜索词。 Args: platform: 平台标识。 keyword: 搜索关键词(输入中的部分词即可)。 context: 工具上下文(自动注入) """ pdef = get_platform(platform) if not pdef: return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'") if not pdef.suggest_impl: supported = [p.id for p in all_platforms() if p.supports_suggest] return ToolResult( title="不支持建议词", output=f"平台 {pdef.name} 不支持建议词。支持的平台: {', '.join(supported)}", ) channel = (pdef.suggest_channels or [pdef.id])[0] return await pdef.suggest_impl(channel, keyword) # ── CLI 入口 ── def _parse_args(argv: list) -> dict: """解析 --key=value 格式的 CLI 参数""" kwargs = {} for arg in argv: if arg.startswith("--") and "=" in arg: key, val = arg[2:].split("=", 1) # 尝试 JSON 解析(dict / int / bool) try: val = json.loads(val) except (json.JSONDecodeError, ValueError): pass kwargs[key] = val return kwargs if __name__ == "__main__": import sys import asyncio COMMANDS = { "platforms": content_platforms, "search": content_search, "detail": content_detail, "suggest": content_suggest, } if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS: print(f"Usage: python {sys.argv[0]} <{'|'.join(COMMANDS)}> [--key=value ...]") sys.exit(1) cmd = sys.argv[1] kwargs = _parse_args(sys.argv[2:]) # trace_id:CLI 参数 > 环境变量 > 自动生成 trace_id = kwargs.pop("trace_id", None) or os.getenv("TRACE_ID") or f"cli-{uuid.uuid4().hex[:8]}" os.environ["TRACE_ID"] = trace_id result = asyncio.run(COMMANDS[cmd](**kwargs)) # 输出 JSON(与 toolhub CLI 格式一致) out = {"trace_id": trace_id, "output": result.output, "error": result.error} if result.metadata: out["metadata"] = result.metadata print(json.dumps(out, ensure_ascii=False, indent=2))