| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103 |
- """
- 使用真实数据执行完整审批流程测试
- """
- import asyncio
- import sys
- from pathlib import Path
- # 添加项目根目录到路径
- PROJECT_ROOT = Path(__file__).parent.parent.parent
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- from examples.auto_put_ad_mini.config import FEISHU_OPERATOR_OPEN_ID, FEISHU_AD_PROJECT_CHAT_ID
- from examples.auto_put_ad_mini.tools.im_approval import send_approval_request
- # 模拟ToolContext
- class SimpleContext:
- """简单的上下文模拟"""
- def __init__(self):
- self.config = {}
- async def main():
- print("=" * 70)
- print(" 完整端到端测试:使用真实数据")
- print("=" * 70)
- print()
- # 使用最新的验证决策数据
- validated_csv = "outputs/reports/validated_decisions_20260415.csv"
- print(f"使用数据文件: {validated_csv}")
- print()
- print("配置验证:")
- print(f" 个人 Open ID: {FEISHU_OPERATOR_OPEN_ID}")
- print(f" 项目群聊 ID: {FEISHU_AD_PROJECT_CHAT_ID}")
- print()
- # 执行审批流程
- print("执行审批流程...")
- print("-" * 70)
- ctx = SimpleContext()
- try:
- result = await send_approval_request(
- ctx=ctx,
- validated_csv=validated_csv,
- wait_for_reply=False, # 非阻塞模式
- timeout_minutes=30,
- )
- print()
- print("✅ 审批请求发送成功!")
- print("=" * 70)
- print()
- print(f"📋 {result.title}")
- print()
- print(result.output)
- print()
- if result.metadata:
- print("详细信息:")
- print(f" 请求ID: {result.metadata.get('request_id')}")
- print(f" 飞书发送状态: {result.metadata.get('feishu_sent', False)}")
- if 'ad_ids' in result.metadata:
- ad_ids = result.metadata['ad_ids']
- print(f" 待审批广告数: {len(ad_ids)}")
- print()
- print("=" * 70)
- print("请验证以下内容:")
- print("=" * 70)
- print()
- print("1️⃣ 个人私聊")
- print(" - 检查飞书个人消息")
- print(" - 确认收到审批消息 + 在线表格链接")
- print()
- print("2️⃣ 项目群聊")
- print(" - 检查项目群消息")
- print(" - 确认收到相同的审批消息 + 在线表格链接")
- print()
- print("3️⃣ 表格权限")
- print(" - 打开表格链接")
- print(' - 确认有"编辑"按钮(anyone_editable)')
- print(" - 尝试修改单元格,确认可编辑")
- print()
- print("4️⃣ 回复测试")
- print(" - 在个人私聊或项目群中回复'通过'")
- print(" - 系统应该能识别回复")
- print()
- except Exception as e:
- print(f"❌ 发送失败: {e}")
- import traceback
- traceback.print_exc()
- if __name__ == "__main__":
- asyncio.run(main())
|