| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202 |
- """
- 内容解析结果搜索工具 - 根据关键词搜索视频标题和标题的解析结果
- 用于 Agent 执行时根据关键词搜索视频内容及其解析结果。
- """
- import asyncio
- import json
- import os
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools import tool, ToolResult
- # API 配置
- CONTENT_DECONSTRUCTION_BASE_URL = os.getenv(
- "CONTENT_DECONSTRUCTION_BASE_URL", "http://api.piaoquantv.com"
- )
- DEFAULT_TIMEOUT = 30.0
- async def _call_content_deconstruction_api(
- keywords: List[str],
- ) -> Optional[Dict[str, List[Dict[str, Any]]]]:
- """调用内容解析结果搜索 API,返回结果字典。"""
- url = f"{CONTENT_DECONSTRUCTION_BASE_URL.rstrip('/')}/supply-demand-engine-service/content/queryContentDeconstructionResultByKeywords"
- payload = {"keywords": keywords}
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(
- url,
- json=payload,
- headers={"Content-Type": "application/json"},
- )
- resp.raise_for_status()
- data = resp.json()
- except httpx.HTTPStatusError as e:
- raise RuntimeError(
- f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}"
- )
- except Exception as e:
- raise RuntimeError(f"请求异常: {str(e)}")
- # 解析响应格式: {'code': 0, 'msg': 'success', 'data': {...}, 'success': True}
- if isinstance(data, dict):
- # 检查 code 字段
- code = data.get("code", 0)
- if code != 0:
- msg = data.get("msg", "未知错误")
- raise RuntimeError(f"API 返回错误码: {code}, 消息: {msg}")
- # 获取 data 字段
- result_data = data.get("data", {})
- if isinstance(result_data, dict):
- return result_data
- return {}
- return {}
- @tool(
- description="根据关键词搜索视频标题和标题的解析结果。支持传入多个关键词,自动过滤掉 type 为'选题'的 description,统计每个 contentId 对应的关键词数量,并返回最高匹配比例(匹配最多关键词的 contentId 占所有关键词的比例)。",
- display={
- "zh": {
- "name": "内容解析结果搜索",
- "params": {
- "keywords": "关键词列表,例如:['食用', '禁忌']",
- },
- },
- },
- )
- async def query_content_deconstruction_by_keywords(
- keywords: List[str],
- ) -> ToolResult:
- """
- 根据关键词搜索视频标题和标题的解析结果。
-
- 处理流程:
- 1. 过滤掉 description 中 type 为 "选题" 的项
- 2. 统计每个 contentId 对应的关键词数量
- 3. 计算并返回最高匹配比例(匹配最多关键词的 contentId 占所有关键词的比例)
- Args:
- keywords: 关键词列表,例如:['食用', '禁忌']
- Returns:
- ToolResult: 包含过滤后的结果、contentId 统计信息、最高匹配比例等
- """
- # 验证关键词列表
- if not keywords:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error="关键词列表不能为空",
- )
- if not isinstance(keywords, list):
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error=f"关键词必须是列表类型,当前类型为: {type(keywords).__name__}",
- )
- # 过滤空字符串
- keywords = [kw.strip() for kw in keywords if kw and kw.strip()]
- if not keywords:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error="关键词列表中没有有效的关键词",
- )
- try:
- results = await _call_content_deconstruction_api(keywords=keywords)
- except RuntimeError as e:
- return ToolResult(
- title="内容解析结果搜索失败",
- output="",
- error=str(e),
- )
- if not results:
- return ToolResult(
- title="内容解析结果搜索",
- output=json.dumps(
- {
- "max_content_id_ratio": 0.0,
- },
- ensure_ascii=False,
- indent=2,
- ),
- )
- # 1. 过滤 description,去掉 type 为 "选题" 的项
- filtered_results = {}
- for keyword, videos in results.items():
- if not isinstance(videos, list):
- continue
-
- filtered_videos = []
- for video in videos:
- if not isinstance(video, dict):
- continue
-
- # 复制视频信息
- filtered_video = video.copy()
-
- # 过滤 description,去掉 type 为 "选题" 的项
- if "description" in filtered_video and isinstance(filtered_video["description"], list):
- filtered_video["description"] = [
- desc for desc in filtered_video["description"]
- if isinstance(desc, dict) and desc.get("type") != "选题"
- ]
-
- # 如果过滤后还有 description,说明该词匹配到了该 contentId
- if filtered_video.get("description") and len(filtered_video["description"]) > 0:
- filtered_videos.append(filtered_video)
-
- if filtered_videos:
- filtered_results[keyword] = filtered_videos
- # 2. 统计每个 contentId 对应的关键词数量
- content_id_to_keywords: Dict[str, set] = {}
- for keyword, videos in filtered_results.items():
- for video in videos:
- content_id = video.get("contentId")
- if content_id:
- if content_id not in content_id_to_keywords:
- content_id_to_keywords[content_id] = set()
- content_id_to_keywords[content_id].add(keyword)
- # 3. 计算每个 contentId 对应最多词的比例
- max_keyword_count = 0
- max_content_id = None
- if content_id_to_keywords:
- for content_id, matched_keywords in content_id_to_keywords.items():
- keyword_count = len(matched_keywords)
- if keyword_count > max_keyword_count:
- max_keyword_count = keyword_count
- max_content_id = content_id
-
- # 计算比例(匹配到的关键词数 / 总关键词数)
- max_ratio = max_keyword_count / len(keywords) if keywords else 0.0
- # 构建输出结果,只返回最高匹配度
- output_data = {
- "max_content_id_ratio": round(max_ratio * 100, 2), # 转换为百分比,保留2位小数
- }
- output = json.dumps(output_data, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"内容解析结果搜索 - {len(keywords)} 个关键词",
- output=output,
- long_term_memory=f"检索到内容解析结果,关键词: {', '.join(keywords)},最高匹配比例: {max_ratio * 100:.2f}%",
- )
- if __name__ == '__main__':
- res = asyncio.run(query_content_deconstruction_by_keywords(['食用', '禁忌']))
- print(res)
|