hot_topic_toolkit.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from typing import List, Dict, Any
  2. import requests
  3. from pqai_agent.logging import logger
  4. from pqai_agent.toolkit.base import BaseToolkit
  5. from pqai_agent.toolkit.function_tool import FunctionTool
  6. from pqai_agent.toolkit.tool_registry import register_toolkit
  7. @register_toolkit
  8. class HotTopicToolkit(BaseToolkit):
  9. @staticmethod
  10. def get_hot_topics(self) -> List[Dict[str, Any]]:
  11. """
  12. 获取热点内容列表
  13. Returns:
  14. List[Dict[str, Any]]: [{'title': ..., 'url': ...}, ...]
  15. """
  16. url = "http://ai-wechat-hook-internal.piaoquantv.com/hotTopic/getHotContent"
  17. try:
  18. resp = requests.get(url, timeout=5)
  19. resp.raise_for_status()
  20. data = resp.json()
  21. if data.get("code") == 0 and data.get("success"):
  22. result = []
  23. for item in data.get("data", []):
  24. result.append({
  25. "title": item.get("标题"),
  26. "url": item.get("链接")
  27. })
  28. return result
  29. else:
  30. logger.error(f"Failed to fetch hot topics: {data.get('msg', 'Unknown error')}")
  31. return []
  32. except Exception as e:
  33. logger.error(f"Error in fetching hot topics: {e}")
  34. return []
  35. def get_tools(self) -> List[FunctionTool]:
  36. return [FunctionTool(self.get_hot_topics)]