| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- """
- 内容解析结果搜索工具 - 根据关键词搜索视频标题和标题的解析结果
- 用于 Agent 执行时根据关键词搜索视频内容及其解析结果。
- """
- import json
- import os
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools import tool, ToolResult
- # API 配置
- CONTENT_DECONSTRUCTION_BASE_URL = os.getenv(
- "CONTENT_DECONSTRUCTION_BASE_URL", "http://api.piaoquantv.com"
- )
- DEFAULT_TIMEOUT = 30.0
- async def _call_content_deconstruction_api(
- keywords: List[str],
- ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
- """调用内容解析结果搜索 API,返回结果字典。"""
- url = f"{CONTENT_DECONSTRUCTION_BASE_URL.rstrip('/')}/supply-demand-engine-service/content/queryContentDeconstructionResultByKeywords"
- payload = {"keywords": keywords}
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- )
- resp.raise_for_status()
- data = resp.json()
- except httpx.HTTPStatusError as e:
- raise RuntimeError(
- f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
- )
- except Exception as e:
- raise RuntimeError(f"请求异常: {str(e)}")
- # 解析响应格式: {'code': 0, 'msg': 'success', 'data': {...}, 'success': True}
- if isinstance(data, dict):
- # 检查 code 字段
- code = data.get("code", 0)
- if code != 0:
- msg = data.get("msg", "未知错误")
- raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
- # 获取 data 字段
- result_data = data.get("data", {})
- if isinstance(result_data, dict):
- return result_data
- return {}
- return {}
- @tool(
- description="根据关键词搜索视频标题和标题的解析结果。支持传入多个关键词,返回每个关键词对应的视频列表及其解析信息。",
- display={
- "zh": {
- "name": "内容解析结果搜索",
- "params": {
- "keywords": "关键词列表,例如:['食用', '禁忌']",
- },
- },
- },
- )
- async def query_content_deconstruction_by_keywords(
- keywords: List[str],
- ) -> ToolResult:
- """
- 根据关键词搜索视频标题和标题的解析结果。
- Args:
- keywords: 关键词列表,例如:['食用', '禁忌']
- Returns:
- ToolResult: 包含每个关键词对应的视频列表及其解析结果
- """
- # 验证关键词列表
- if not keywords:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error="关键词列表不能为空",
- )
- if not isinstance(keywords, list):
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error=f"关键词必须是列表类型,当前类型为: {type(keywords).__name__}",
- )
- # 过滤空字符串
- keywords = [kw.strip() for kw in keywords if kw and kw.strip()]
- if not keywords:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error="关键词列表中没有有效的关键词",
- )
- try:
- results = await _call_content_deconstruction_api(keywords=keywords)
- except RuntimeError as e:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error=str(e),
- )
- if not results:
- return ToolResult(
- title="内容解析结果搜索",
- output=json.dumps(
- {
- "message": "未找到相关内容",
- "keywords": keywords,
- },
- ensure_ascii=False,
- indent=2,
- ),
- )
- # 统计每个关键词的结果数量
- keyword_counts = {
- keyword: len(videos) if isinstance(videos, list) else 0
- for keyword, videos in results.items()
- }
- total_count = sum(keyword_counts.values())
- output = json.dumps(results, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"内容解析结果搜索 - {len(keywords)} 个关键词",
- output=output,
- long_term_memory=f"检索到内容解析结果,关键词: {', '.join(keywords)},共 {total_count} 条结果",
- )
|