hot_topic_toolkit.py 1.3 KB

123456789101112131415161718192021222324252627282930313233343536
  1. from typing import List, Dict, Any
  2. import requests
  3. from pqai_agent.logging import logger
  4. from pqai_agent.toolkit.base import BaseToolkit
  5. from pqai_agent.toolkit.function_tool import FunctionTool
  6. class HotTopicToolkit(BaseToolkit):
  7. def get_hot_topics(self) -> List[Dict[str, Any]]:
  8. """
  9. 获取热点内容列表
  10. Returns:
  11. List[Dict[str, Any]]: [{'title': ..., 'url': ...}, ...]
  12. """
  13. url = "http://ai-wechat-hook-internal.piaoquantv.com/hotTopic/getHotContent"
  14. try:
  15. resp = requests.get(url, timeout=5)
  16. resp.raise_for_status()
  17. data = resp.json()
  18. if data.get("code") == 0 and data.get("success"):
  19. result = []
  20. for item in data.get("data", []):
  21. result.append({
  22. "title": item.get("标题"),
  23. "url": item.get("链接")
  24. })
  25. return result
  26. else:
  27. logger.error(f"Failed to fetch hot topics: {data.get('msg', 'Unknown error')}")
  28. return []
  29. except Exception as e:
  30. logger.error(f"Error in fetching hot topics: {e}")
  31. return []
  32. def get_tools(self) -> List[FunctionTool]:
  33. return [FunctionTool(self.get_hot_topics)]