|
|
@@ -1,39 +0,0 @@
|
|
|
-import asyncio
|
|
|
-import json
|
|
|
-import os
|
|
|
-import sys
|
|
|
-
|
|
|
-# 将当前目录加入系统路径以便导包
|
|
|
-sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
-
|
|
|
-from agent.tools.builtin.content.platforms.aigc_channel import search
|
|
|
-
|
|
|
-async def main():
|
|
|
- keyword = "人工智能"
|
|
|
- print(f"=====================================")
|
|
|
- print(f"Testing GZH (公众号) search API")
|
|
|
- print(f"Keyword: {keyword}")
|
|
|
- print(f"=====================================\n")
|
|
|
-
|
|
|
- result = await search(platform_id="gzh", keyword=keyword, max_count=5)
|
|
|
-
|
|
|
- if result.error:
|
|
|
- print(f"❌ [Error] 搜索失败: {result.error}")
|
|
|
- else:
|
|
|
- print(f"✅ [Success] 搜索成功!返回结果如下:")
|
|
|
- try:
|
|
|
- parsed = json.loads(result.output)
|
|
|
- print(json.dumps(parsed, ensure_ascii=False, indent=2))
|
|
|
- except Exception:
|
|
|
- print(result.output)
|
|
|
-
|
|
|
- if result.metadata and "posts" in result.metadata:
|
|
|
- posts = result.metadata["posts"]
|
|
|
- print(f"\n[Metadata] 成功获取到底层 posts 完整数据条数: {len(posts)}")
|
|
|
- if len(posts) > 0:
|
|
|
- print(f"[Metadata Sample] 第一条数据的标题: {posts[0].get('title', '无标题')}")
|
|
|
- if "_quality_score" in posts[0]:
|
|
|
- print(f"[Quality] 第一条数据的质量评分: {posts[0]['_quality_score']} ({posts[0]['_quality_grade']})")
|
|
|
-
|
|
|
-if __name__ == "__main__":
|
|
|
- asyncio.run(main())
|