from typing import List, Dict, Any import requests from pqai_agent.logging import logger from pqai_agent.toolkit.base import BaseToolkit from pqai_agent.toolkit.function_tool import FunctionTool from pqai_agent.toolkit.tool_registry import register_toolkit @register_toolkit class HotTopicToolkit(BaseToolkit): def get_hot_topics(self) -> List[Dict[str, Any]]: """ 获取热点内容列表 Returns: List[Dict[str, Any]]: [{'title': ..., 'url': ...}, ...] """ url = "http://ai-wechat-hook-internal.piaoquantv.com/hotTopic/getHotContent" try: resp = requests.get(url, timeout=5) resp.raise_for_status() data = resp.json() if data.get("code") == 0 and data.get("success"): result = [] for item in data.get("data", []): if item.get("title") and item.get("url"): result.append({ "title": item.get("title"), "url": item.get("url") }) return result else: logger.error(f"Failed to fetch hot topics: {data.get('msg', 'Unknown error')}") return [] except Exception as e: logger.error(f"Error in fetching hot topics: {e}") return [] def get_tools(self) -> List[FunctionTool]: return [FunctionTool(self.get_hot_topics)]