hot_topic_toolkit.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940
  1. from typing import List, Dict, Any
  2. import requests
  3. from pqai_agent.logging import logger
  4. from pqai_agent.toolkit.base import BaseToolkit
  5. from pqai_agent.toolkit.function_tool import FunctionTool
  6. from pqai_agent.toolkit.tool_registry import register_toolkit
  7. @register_toolkit
  8. class HotTopicToolkit(BaseToolkit):
  9. def get_hot_topics(self) -> List[Dict[str, Any]]:
  10. """
  11. 获取热点内容列表
  12. Returns:
  13. List[Dict[str, Any]]: [{'title': ..., 'url': ...}, ...]
  14. """
  15. url = "http://ai-wechat-hook-internal.piaoquantv.com/hotTopic/getHotContent"
  16. try:
  17. resp = requests.get(url, timeout=5)
  18. resp.raise_for_status()
  19. data = resp.json()
  20. if data.get("code") == 0 and data.get("success"):
  21. result = []
  22. for item in data.get("data", []):
  23. if item.get("title") and item.get("url"):
  24. result.append({
  25. "title": item.get("title"),
  26. "url": item.get("url")
  27. })
  28. return result
  29. else:
  30. logger.error(f"Failed to fetch hot topics: {data.get('msg', 'Unknown error')}")
  31. return []
  32. except Exception as e:
  33. logger.error(f"Error in fetching hot topics: {e}")
  34. return []
  35. def get_tools(self) -> List[FunctionTool]:
  36. return [FunctionTool(self.get_hot_topics)]