test_gzh_api.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. import asyncio
  2. import json
  3. import os
  4. import sys
  5. # 将当前目录加入系统路径以便导包
  6. sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
  7. from agent.tools.builtin.content.platforms.aigc_channel import search
  8. async def main():
  9. keyword = "人工智能"
  10. print(f"=====================================")
  11. print(f"Testing GZH (公众号) search API")
  12. print(f"Keyword: {keyword}")
  13. print(f"=====================================\n")
  14. result = await search(platform_id="gzh", keyword=keyword, max_count=5)
  15. if result.error:
  16. print(f"❌ [Error] 搜索失败: {result.error}")
  17. else:
  18. print(f"✅ [Success] 搜索成功!返回结果如下:")
  19. try:
  20. parsed = json.loads(result.output)
  21. print(json.dumps(parsed, ensure_ascii=False, indent=2))
  22. except Exception:
  23. print(result.output)
  24. if result.metadata and "posts" in result.metadata:
  25. posts = result.metadata["posts"]
  26. print(f"\n[Metadata] 成功获取到底层 posts 完整数据条数: {len(posts)}")
  27. if len(posts) > 0:
  28. print(f"[Metadata Sample] 第一条数据的标题: {posts[0].get('title', '无标题')}")
  29. if "_quality_score" in posts[0]:
  30. print(f"[Quality] 第一条数据的质量评分: {posts[0]['_quality_score']} ({posts[0]['_quality_grade']})")
  31. if __name__ == "__main__":
  32. asyncio.run(main())