| 123456789101112131415161718192021222324252627282930313233343536373839 |
- import asyncio
- import json
- import os
- import sys
- # 将当前目录加入系统路径以便导包
- sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
- from agent.tools.builtin.content.platforms.aigc_channel import search
- async def main():
- keyword = "人工智能"
- print(f"=====================================")
- print(f"Testing GZH (公众号) search API")
- print(f"Keyword: {keyword}")
- print(f"=====================================\n")
-
- result = await search(platform_id="gzh", keyword=keyword, max_count=5)
-
- if result.error:
- print(f"❌ [Error] 搜索失败: {result.error}")
- else:
- print(f"✅ [Success] 搜索成功!返回结果如下:")
- try:
- parsed = json.loads(result.output)
- print(json.dumps(parsed, ensure_ascii=False, indent=2))
- except Exception:
- print(result.output)
-
- if result.metadata and "posts" in result.metadata:
- posts = result.metadata["posts"]
- print(f"\n[Metadata] 成功获取到底层 posts 完整数据条数: {len(posts)}")
- if len(posts) > 0:
- print(f"[Metadata Sample] 第一条数据的标题: {posts[0].get('title', '无标题')}")
- if "_quality_score" in posts[0]:
- print(f"[Quality] 第一条数据的质量评分: {posts[0]['_quality_score']} ({posts[0]['_quality_grade']})")
- if __name__ == "__main__":
- asyncio.run(main())
|