12345678910111213141516171819202122232425262728293031323334353637383940 |
- from typing import List, Dict, Any
- import requests
- from pqai_agent.logging import logger
- from pqai_agent.toolkit.base import BaseToolkit
- from pqai_agent.toolkit.function_tool import FunctionTool
- from pqai_agent.toolkit.tool_registry import register_toolkit
- @register_toolkit
- class HotTopicToolkit(BaseToolkit):
- @staticmethod
- def get_hot_topics(self) -> List[Dict[str, Any]]:
- """
- 获取热点内容列表
- Returns:
- List[Dict[str, Any]]: [{'title': ..., 'url': ...}, ...]
- """
- url = "http://ai-wechat-hook-internal.piaoquantv.com/hotTopic/getHotContent"
- try:
- resp = requests.get(url, timeout=5)
- resp.raise_for_status()
- data = resp.json()
- if data.get("code") == 0 and data.get("success"):
- result = []
- for item in data.get("data", []):
- result.append({
- "title": item.get("标题"),
- "url": item.get("链接")
- })
- return result
- else:
- logger.error(f"Failed to fetch hot topics: {data.get('msg', 'Unknown error')}")
- return []
- except Exception as e:
- logger.error(f"Error in fetching hot topics: {e}")
- return []
- def get_tools(self) -> List[FunctionTool]:
- return [FunctionTool(self.get_hot_topics)]
|