| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266 |
- """
- 内容工具族 —— 统一入口
- 4 个 @tool 注册给 LLM:
- - content_platforms: 列出/查询平台及其参数
- - content_search: 跨平台搜索
- - content_detail: 查看详情
- - content_suggest: 搜索建议词
- 所有平台的具体实现在 platforms/ 子目录,按模块自注册到 registry。
- """
- import json
- import os
- import uuid
- from typing import Any, Dict, Optional
- from agent.tools import tool, ToolResult, ToolContext
- from agent.tools.builtin.content.registry import (
- all_platforms, get_platform, match_platforms,
- )
- from agent.tools.builtin.content import cache as _cache
- # 导入平台模块以触发自注册(副作用导入)
- import agent.tools.builtin.content.platforms.aigc_channel # noqa: F401
- import agent.tools.builtin.content.platforms.youtube # noqa: F401
- import agent.tools.builtin.content.platforms.x # noqa: F401
- def _get_trace_id(context: Optional[ToolContext]) -> str:
- """从 context 取 trace_id,回退到环境变量或自动生成"""
- if context and hasattr(context, "trace_id") and context.trace_id:
- return context.trace_id
- return os.getenv("TRACE_ID") or f"anon-{uuid.uuid4().hex[:8]}"
- # ── content_platforms ──
- @tool(hidden_params=["context"], groups=["content"])
- async def content_platforms(
- platform: str = "",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 列出支持的内容平台及其搜索参数。
- 不传 platform 时返回所有平台的概要列表(仅名称和 ID)。
- 传入 platform 时模糊匹配并返回匹配平台的详细参数说明(支持 ID、中文名、别名)。
- 建议在不熟悉平台参数时先调用此工具查看,再构造 content_search / content_detail 的参数。
- Args:
- platform: 可选,平台名称或关键词。支持模糊匹配(如 "xhs"、"小红书"、"youtube")。
- 留空返回全部平台概要。
- context: 工具上下文(自动注入)
- """
- hits = match_platforms(platform)
- if not hits:
- all_ids = [p.id for p in all_platforms()]
- return ToolResult(
- title="未找到匹配平台",
- output=f"没有匹配 '{platform}' 的平台。可用平台: {', '.join(all_ids)}",
- )
- if platform:
- # 有 query:返回匹配平台的详细参数
- result = [p.detail() for p in hits]
- else:
- # 无 query:返回概要列表
- result = [p.summary() for p in hits]
- return ToolResult(
- title=f"内容平台" + (f" ({platform})" if platform else ""),
- output=json.dumps(result, ensure_ascii=False, indent=2),
- )
- # ── content_search ──
- @tool(hidden_params=["context"], groups=["content"])
- async def content_search(
- platform: str,
- keyword: str,
- max_count: int = 20,
- cursor: str = "",
- extras: Optional[Dict[str, Any]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 跨平台内容搜索,返回带索引编号的封面拼图 + 概览列表。
- 返回的是摘要信息(标题 + 正文截断 + 互动数据),不含完整正文和所有图片。
- 如需查看某条内容的完整信息,请使用 content_detail。
- Args:
- platform: 平台标识,如 'xhs'、'youtube'、'x'。完整列表见 content_platforms。
- keyword: 搜索关键词。
- max_count: 返回条数上限,默认 20。
- cursor: 分页游标,首次搜索留空,翻页时传入上次返回值。
- extras: 平台专用参数(dict)。不同平台支持不同参数,
- 如 xhs 支持 sort_type / publish_time / content_type / filter_note_range。
- 不清楚可先调 content_platforms(platform) 查看。
- context: 工具上下文(自动注入)
- """
- pdef = get_platform(platform)
- if not pdef:
- # 尝试模糊匹配
- hits = match_platforms(platform)
- if hits:
- suggestions = ", ".join(f"{p.id}({p.name})" for p in hits[:3])
- return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。你是否想要: {suggestions}")
- all_ids = [p.id for p in all_platforms()]
- return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'。可用: {', '.join(all_ids)}")
- if not pdef.search_impl:
- return ToolResult(title="不支持搜索", output=f"平台 {pdef.name} 暂不支持搜索")
- result = await pdef.search_impl(
- platform_id=pdef.id,
- keyword=keyword,
- max_count=max_count,
- cursor=cursor,
- extras=extras,
- )
- # 持久化搜索结果到磁盘缓存
- if not result.error:
- posts = result.metadata.pop("posts", [])
- trace_id = _get_trace_id(context)
- _cache.save_search_results(trace_id, pdef.id, keyword, posts)
- return result
- # ── content_detail ──
- @tool(hidden_params=["context"], groups=["content"])
- async def content_detail(
- platform: str,
- index: int,
- extras: Optional[Dict[str, Any]] = None,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 查看内容详情。从最近一次 content_search 的结果中按索引取完整记录。
- Args:
- platform: 平台标识(必须和之前 content_search 用的一致)。
- index: 内容序号(1-based),来自 content_search 返回的 index 字段。
- extras: 平台专用详情参数。YouTube 支持 include_captions / download_video。
- 其他平台通常不需要。
- context: 工具上下文(自动注入)
- """
- pdef = get_platform(platform)
- if not pdef:
- return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'")
- trace_id = _get_trace_id(context)
- post = _cache.get_cached_post(trace_id, pdef.id, index)
- if not post:
- info = _cache.get_cached_search_info(trace_id, pdef.id)
- if info:
- return ToolResult(
- title="索引无效",
- output=f"平台 {pdef.name} 上次搜索 '{info['keyword']}' 共 {info['total']} 条,"
- f"有效索引 1-{info['total']},你传入了 {index}。",
- error="Invalid index",
- )
- return ToolResult(
- title="缓存未命中",
- output=f"没有 {pdef.name} 的搜索缓存。请先调用 content_search(platform='{pdef.id}', keyword=...) 搜索。",
- error="No cache",
- )
- if pdef.detail_impl:
- return await pdef.detail_impl(post, extras)
- # fallback:直接返回缓存的完整数据
- return ToolResult(
- title=f"详情 #{index}",
- output=json.dumps(post, ensure_ascii=False, indent=2),
- )
- # ── content_suggest ──
- @tool(hidden_params=["context"], groups=["content"])
- async def content_suggest(
- platform: str,
- keyword: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 获取搜索关键词补全建议。
- 仅部分平台支持(xhs、toutiao、douyin、bili、zhihu)。
- 用于辅助用户发现更精准的搜索词。
- Args:
- platform: 平台标识。
- keyword: 搜索关键词(输入中的部分词即可)。
- context: 工具上下文(自动注入)
- """
- pdef = get_platform(platform)
- if not pdef:
- return ToolResult(title="平台不存在", output=f"未找到平台 '{platform}'")
- if not pdef.suggest_impl:
- supported = [p.id for p in all_platforms() if p.supports_suggest]
- return ToolResult(
- title="不支持建议词",
- output=f"平台 {pdef.name} 不支持建议词。支持的平台: {', '.join(supported)}",
- )
- channel = (pdef.suggest_channels or [pdef.id])[0]
- return await pdef.suggest_impl(channel, keyword)
- # ── CLI 入口 ──
- def _parse_args(argv: list) -> dict:
- """解析 --key=value 格式的 CLI 参数"""
- kwargs = {}
- for arg in argv:
- if arg.startswith("--") and "=" in arg:
- key, val = arg[2:].split("=", 1)
- # 尝试 JSON 解析(dict / int / bool)
- try:
- val = json.loads(val)
- except (json.JSONDecodeError, ValueError):
- pass
- kwargs[key] = val
- return kwargs
- if __name__ == "__main__":
- import sys
- import asyncio
- COMMANDS = {
- "platforms": content_platforms,
- "search": content_search,
- "detail": content_detail,
- "suggest": content_suggest,
- }
- if len(sys.argv) < 2 or sys.argv[1] not in COMMANDS:
- print(f"Usage: python {sys.argv[0]} <{'|'.join(COMMANDS)}> [--key=value ...]")
- sys.exit(1)
- cmd = sys.argv[1]
- kwargs = _parse_args(sys.argv[2:])
- # trace_id:CLI 参数 > 环境变量 > 自动生成
- trace_id = kwargs.pop("trace_id", None) or os.getenv("TRACE_ID") or f"cli-{uuid.uuid4().hex[:8]}"
- os.environ["TRACE_ID"] = trace_id
- result = asyncio.run(COMMANDS[cmd](**kwargs))
- # 输出 JSON(与 toolhub CLI 格式一致)
- out = {"trace_id": trace_id, "output": result.output, "error": result.error}
- if result.metadata:
- out["metadata"] = result.metadata
- print(json.dumps(out, ensure_ascii=False, indent=2))
|