| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153 |
- """
- 热榜搜索工具 - 根据排序类型获取今日热榜内容(news分类)
- 用于 Agent 执行时自主调取热榜数据。
- """
- import asyncio
- import json
- import os
- from typing import Any, Dict, List, Optional
- import httpx
- from agent.tools import tool, ToolResult
- # 热榜搜索 API 配置
- HOT_RANK_BASE_URL = os.getenv("HOT_RANK_BASE_URL", "http://crawapi.piaoquantv.com")
- DEFAULT_TIMEOUT = 30.0
- async def _call_hot_rank_api(
- sort_type: str = "最热",
- cursor: int = 0,
- ) -> Optional[List[Dict[str, Any]]]:
- """调用热榜搜索 API,返回结果列表。"""
- url = f"{HOT_RANK_BASE_URL.rstrip('/')}/crawler/jin_ri_re_bang/content_rank"
- payload = {
- "sort_type": sort_type,
- "category": "news", # 固定分类
- "cursor": cursor,
- }
- try:
- async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
- resp = await client.post(url, json=payload)
- resp.raise_for_status()
- data = resp.json()
- except httpx.HTTPStatusError as e:
- raise RuntimeError(f"API 请求失败: {e.response.status_code} - {e.response.text[:200]}")
- except Exception as e:
- raise RuntimeError(f"请求异常: {str(e)}")
- # 解析响应格式: {'code': 0, 'data': {'data': [{'jump_url': ..., 'rankList': [...]}]}}
- if isinstance(data, dict):
- # 检查 code 字段
- code = data.get("code", 0)
- if code != 0:
- raise RuntimeError(f"API 返回错误码: {code}")
-
- # 获取 data.data 数组
- outer_data = data.get("data", {})
- if isinstance(outer_data, dict):
- inner_data = outer_data.get("data", [])
- else:
- inner_data = outer_data if isinstance(outer_data, list) else []
-
- # 提取所有 rankList 中的条目
- all_items = []
- if isinstance(inner_data, list):
- for item in inner_data:
- if item['source'] != '百度':
- continue
- if isinstance(item, dict):
- rank_list = item.get("rankList", [])
- if isinstance(rank_list, list):
- all_items.extend(rank_list)
-
- return all_items if all_items else []
-
- # 兼容直接返回列表的情况
- if isinstance(data, list):
- return data
-
- return []
- if __name__ == '__main__':
- try:
- res = asyncio.run(_call_hot_rank_api())
- print(res)
- except Exception as e:
- print(f"执行出错: {e}")
- @tool(
- description="根据排序类型获取今日热榜内容(news分类),用于创作参考。支持按最热或最新排序,可指定分页游标。",
- display={
- "zh": {
- "name": "热榜搜索",
- "params": {
- "sort_type": "排序类型:最热 或 最新",
- "cursor": "分页游标,从0开始",
- },
- },
- },
- )
- async def hot_rank_search(
- sort_type: str = "最热",
- cursor: int = 0,
- ) -> ToolResult:
- """
- 根据排序类型获取今日热榜内容(固定news分类)。
- Args:
- sort_type: 排序类型,"最热" 或 "最新",默认为 "最热"
- cursor: 分页游标,从0开始,默认为 0
- Returns:
- ToolResult: 热榜内容列表
- """
- # 验证排序类型
- if sort_type not in ["最热", "最新"]:
- return ToolResult(
- title="热榜搜索失败",
- output="",
- error=f"排序类型必须是 '最热' 或 '最新',当前为: {sort_type}",
- )
- # 验证游标
- if cursor < 0:
- return ToolResult(
- title="热榜搜索失败",
- output="",
- error=f"分页游标必须大于等于0,当前为: {cursor}",
- )
- try:
- results = await _call_hot_rank_api(sort_type=sort_type, cursor=cursor)
- except RuntimeError as e:
- return ToolResult(
- title="热榜搜索失败",
- output="",
- error=str(e),
- )
- if not results:
- return ToolResult(
- title="热榜搜索",
- output=json.dumps(
- {
- "message": "未找到热榜内容",
- "sort_type": sort_type,
- "category": "news",
- "cursor": cursor,
- },
- ensure_ascii=False,
- indent=2,
- ),
- )
- output = json.dumps(results, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"热榜搜索 - {sort_type} (news)",
- output=output,
- long_term_memory=f"检索到热榜内容,排序: {sort_type},分类: news,共 {len(results)} 条",
- )
|