ingestion.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. """
  2. 内容导入工具
  3. 将文章链接批量导入 AIGC CMS 系统。
  4. """
  5. import json
  6. from typing import Any, Dict, List
  7. import httpx
  8. from agent.tools import tool, ToolResult
  9. AIGC_BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
  10. DEFAULT_TIMEOUT = 60.0
  11. @tool(groups=["content"])
  12. async def import_content(plan_name: str, content_data: List[Dict[str, Any]]) -> ToolResult:
  13. """
  14. 导入长文内容到 CMS(微信公众号、小红书、抖音等通用链接)。
  15. Args:
  16. plan_name: 计划名称
  17. content_data: 内容数据列表,每项包含 channel、content_link、title 等字段
  18. """
  19. try:
  20. async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
  21. response = await client.post(
  22. f"{AIGC_BASE_URL}/weixin/auto_insert",
  23. json={"plan_name": plan_name, "data": content_data},
  24. )
  25. response.raise_for_status()
  26. data = response.json()
  27. if data.get("code") == 0:
  28. result_data = data.get("data", {})
  29. return ToolResult(
  30. title=f"内容导入: {plan_name}",
  31. output=json.dumps(result_data, ensure_ascii=False, indent=2),
  32. long_term_memory=f"Imported {len(content_data)} items to plan '{plan_name}'",
  33. )
  34. return ToolResult(title="导入失败", output="", error=f"导入失败: {data.get('msg', '未知错误')}")
  35. except Exception as e:
  36. return ToolResult(title="内容导入异常", output="", error=str(e))