guantao 23 hours ago
parent
commit
fa433ab807
6 changed files with 0 additions and 351 deletions
  1. 0 20
      docs/abc.py
  2. 0 59
      kill_processes.py
  3. 0 81
      test_docker_deployment.py
  4. 0 63
      test_gpt_image_2_integration.py
  5. 0 58
      test_rembg_integration.py
  6. 0 70
      test_veo_integration.py

+ 0 - 20
docs/abc.py

@@ -1,20 +0,0 @@
-
-
-num[]
-target_value
-
-target_value -- index
-q=0
-def find_index(num, target_value):
-    q = 0
-    length = num.length
-    while(length-q>0):
-        if num[(length-q)/2] == target_value:
-            return (length-q)/2
-        elif num[(length-q)/2] > target_value:
-            length = (length-q)/2
-        else:
-            q = (length-q)/2
-
-
-            

+ 0 - 59
kill_processes.py

@@ -1,59 +0,0 @@
-"""查找并杀掉占用端口或一直运行的 Python 进程"""
-
-import subprocess
-import sys
-
-def find_processes_by_port(port):
-    """查找占用指定端口的进程"""
-    try:
-        result = subprocess.run(
-            ["netstat", "-ano"],
-            capture_output=True,
-            text=True,
-            shell=True
-        )
-
-        pids = []
-        for line in result.stdout.split('\n'):
-            if f':{port}' in line and 'LISTENING' in line:
-                parts = line.split()
-                if parts:
-                    pid = parts[-1]
-                    if pid.isdigit():
-                        pids.append(int(pid))
-
-        return pids
-    except Exception as e:
-        print(f"Error finding processes: {e}")
-        return []
-
-def kill_process(pid):
-    """杀掉指定 PID 的进程"""
-    try:
-        subprocess.run(["taskkill", "/F", "/PID", str(pid)], check=True)
-        print(f"✓ Killed process {pid}")
-        return True
-    except Exception as e:
-        print(f"✗ Failed to kill {pid}: {e}")
-        return False
-
-if __name__ == "__main__":
-    print("查找占用 8000 端口的进程...")
-    pids = find_processes_by_port(8000)
-
-    if pids:
-        print(f"找到 {len(pids)} 个进程: {pids}")
-        for pid in pids:
-            kill_process(pid)
-    else:
-        print("没有找到占用 8000 端口的进程")
-
-    print("\n查找占用 8001 端口的进程...")
-    pids = find_processes_by_port(8001)
-
-    if pids:
-        print(f"找到 {len(pids)} 个进程: {pids}")
-        for pid in pids:
-            kill_process(pid)
-    else:
-        print("没有找到占用 8001 端口的进程")

+ 0 - 81
test_docker_deployment.py

@@ -1,81 +0,0 @@
-"""测试 Docker 工具部署 — 在远程 Docker 上部署 rembg 并注册"""
-import httpx
-import time
-
-BASE_URL = "http://localhost:8001"
-
-def main():
-    task_spec = """在远程 Docker 上部署 rembg 图像背景移除工具(使用 git clone 方式)。
-
-背景信息:
-- GitHub: https://github.com/danielgatis/rembg
-- rembg 是一个 AI 图像背景移除工具,22k+ stars
-- 项目自带 HTTP 服务器,运行 'rembg s' 即可启动
-- 需要部署到远程 Docker 容器(不是本地 uv)
-- rembg 首次启动会下载 AI 模型到 /root/.u2net/ 目录
-
-任务要求:
-1. 使用 create_docker_env 创建 Docker 容器:
-   - 镜像: python:3.12-slim
-   - 端口: [7000]
-   - volumes: {
-       "/opt/tool_agent/docker/rembg/app": "/app",  # 代码目录
-       "/opt/tool_agent/docker/rembg/models": "/root/.u2net"  # 模型目录
-     }
-2. 在容器内 git clone 项目:
-   - cd /app
-   - git clone https://github.com/danielgatis/rembg.git
-   - cd rembg
-3. 安装依赖:
-   - 先查看 README.md 或 setup.py 了解安装方式
-   - 安装项目:pip install -e .[cpu,cli]
-4. 后台启动服务: rembg s --host 0.0.0.0 --port 7000
-5. 等待服务启动完成(首次会下载 AI 模型,需要几分钟)
-6. 使用 test_deployment 工具验证部署(容器状态、路由、连通性)
-7. 验证通过后注册到工具库
-8. 容器需要持久化运行,不要销毁
-
-注册信息:
-- tool_id: rembg_bg_remover
-- name: Rembg Background Remover
-- category: cv
-- runtime_type: docker
-- endpoint_path: /api/remove
-- http_method: POST
-- input_schema: {"type": "object", "properties": {"file": {"type": "string", "format": "binary", "description": "image file"}}, "required": ["file"]}
-"""
-
-    print("Submitting rembg Docker deployment task...")
-    resp = httpx.post(
-        f"{BASE_URL}/create_tool",
-        json={"task_spec": task_spec, "tool_id": "rembg_bg_remover"},
-        timeout=30,
-    )
-    resp.raise_for_status()
-    data = resp.json()
-    task_id = data["task_id"]
-    print(f"Task submitted: {task_id}")
-
-    print("Waiting for task completion (polling every 30s)...")
-    while True:
-        time.sleep(30)
-        try:
-            resp = httpx.get(f"{BASE_URL}/task/{task_id}", timeout=30)
-            resp.raise_for_status()
-            task_data = resp.json()
-            status = task_data["status"]
-            print(f"[{time.strftime('%H:%M:%S')}] status: {status}")
-
-            if status == "completed":
-                print("\nDone!")
-                print(task_data.get("result", ""))
-                break
-            elif status == "failed":
-                print("\nFailed!")
-                print(task_data.get("error", ""))
-                break
-        except Exception as e:
-            print(f"[{time.strftime('%H:%M:%S')}] poll error: {e}")
-
-if __name__ == "__main__":
-    main()

+ 0 - 63
test_gpt_image_2_integration.py

@@ -1,63 +0,0 @@
-"""测试 GPT Image 2 工具集成 — 接入工具库已有的 gpt-image-2"""
-import httpx
-import time
-
-BASE_URL = "http://localhost:8001"
-
-def main():
-    task_spec = """接入工具库中已有的 gpt-image-2 图像生成工具。
-
-背景信息:
-- 工具库中已存在 tool_id 为 "gpt-image-2" 的工具记录,但尚未实现代码
-- 使用 apiyi 平台的 API(OpenAI 兼容格式)
-- API Base URL: https://api.apiyi.com/v1
-- API Key: 从环境变量 APIYI_KEY 读取
-- GPT Image 2 是 OpenAI 的图像生成模型(gpt-image-1),通过 apiyi 平台调用
-
-任务要求:
-1. 去 apiyi 平台官网搜索 GPT Image 的 API 文档,找到正确的模型名称和调用方式
-2. 在 tools/local/gpt_image_2/ 下创建项目,编写 FastAPI 包装服务
-3. 接收 prompt 参数,调用 GPT Image 2 生成图像,返回图像 URL
-4. 自己测试是否正常工作(用一个简单的 prompt 测试,确认图像生成成功)
-5. 测试通过后注册到工具库(更新已有的 gpt-image-2 记录)
-
-注册信息:
-- tool_id: gpt_image_2
-- name: GPT Image 2 图像生成
-- category: image
-"""
-
-    print("提交 GPT Image 2 工具创建任务...")
-    resp = httpx.post(
-        f"{BASE_URL}/create_tool",
-        json={"task_spec": task_spec, "tool_id": "gpt-image-2"},
-        timeout=30,
-    )
-    resp.raise_for_status()
-    data = resp.json()
-    task_id = data["task_id"]
-    print(f"任务已提交: {task_id}")
-
-    print("等待任务完成(每 30 秒查询一次)...")
-    while True:
-        time.sleep(30)
-        try:
-            resp = httpx.get(f"{BASE_URL}/task/{task_id}", timeout=30)
-            resp.raise_for_status()
-            task_data = resp.json()
-            status = task_data["status"]
-            print(f"[{time.strftime('%H:%M:%S')}] status: {status}")
-
-            if status == "completed":
-                print("\nDone!")
-                print(task_data.get("result", ""))
-                break
-            elif status == "failed":
-                print("\nFailed!")
-                print(task_data.get("error", ""))
-                break
-        except Exception as e:
-            print(f"[{time.strftime('%H:%M:%S')}] poll error: {e}")
-
-if __name__ == "__main__":
-    main()

+ 0 - 58
test_rembg_integration.py

@@ -1,58 +0,0 @@
-"""创建 rembg 背景移除工具(Docker 工具)"""
-import httpx
-import time
-
-BASE_URL = "http://localhost:8001"
-
-def main():
-    # 明确指定这是一个 Docker 工具
-    task_spec = """创建 rembg 背景移除工具。
-
-工具类型:Docker 工具(需要在远程服务器上部署)
-
-任务要求:
-1. 调研 rembg 的 GitHub 项目,了解如何使用 Docker 部署
-2. 按照"流程 B:Docker 工具"的步骤部署
-3. 测试验证背景移除功能
-4. 注册工具到工具库
-
-注册信息:
-- tool_id: rembg
-- name: Rembg 背景移除
-- category: image_process
-"""
-
-    print("提交 rembg 工具创建任务...")
-    resp = httpx.post(
-        f"{BASE_URL}/create_tool",
-        json={"task_spec": task_spec},
-        timeout=30,
-    )
-    resp.raise_for_status()
-    data = resp.json()
-    task_id = data["task_id"]
-    print(f"任务已提交: {task_id}")
-
-    print("等待任务完成(每 30 秒查询一次)...")
-    while True:
-        time.sleep(30)
-        try:
-            resp = httpx.get(f"{BASE_URL}/task/{task_id}", timeout=30)
-            resp.raise_for_status()
-            task_data = resp.json()
-            status = task_data["status"]
-            print(f"[{time.strftime('%H:%M:%S')}] status: {status}")
-
-            if status == "completed":
-                print("\n任务完成!")
-                print(task_data.get("result", ""))
-                break
-            elif status == "failed":
-                print("\n任务失败!")
-                print(task_data.get("error", ""))
-                break
-        except Exception as e:
-            print(f"[{time.strftime('%H:%M:%S')}] poll error: {e}")
-
-if __name__ == "__main__":
-    main()

+ 0 - 70
test_veo_integration.py

@@ -1,70 +0,0 @@
-"""
-测试 Veo 工具集成
-通过 /create_tool 接口让 CodingAgent 自动接入 Veo 视频生成工具
-使用 apiyi 平台的 API(环境变量已配置)
-"""
-import httpx
-import time
-import json
-
-BASE_URL = "http://localhost:8001"
-
-TASK_SPEC = """
-接入 Veo 视频生成工具。
-
-背景信息:
-- 使用 apiyi 平台的 API(OpenAI 兼容格式)
-- API Base URL: https://api.apiyi.com/v1
-- API Key: 从环境变量 APIYI_KEY 读取
-- Veo 是 Google 的视频生成模型,通过 apiyi 平台可以调用
-
-任务要求:
-1. 自己搜索 apiyi 平台关于 Veo 的 API 文档,找到正确的模型名称和调用方式
-2. 编写一个 FastAPI 包装服务,接收文本 prompt,调用 Veo 生成视频
-3. 自己测试接口是否正常工作(用一个简单的 prompt 测试)
-4. 测试通过后注册到工具库
-
-注册信息:
-- tool_id: veo_video_gen
-- name: Veo 视频生成
-- category: video
-"""
-
-
-def main():
-    # 1. 提交创建任务
-    print("提交 Veo 工具创建任务...")
-    resp = httpx.post(f"{BASE_URL}/create_tool", json={"task_spec": TASK_SPEC}, timeout=30)
-    resp.raise_for_status()
-    data = resp.json()
-    task_id = data["task_id"]
-    print(f"任务已提交: {task_id}")
-
-    # 2. 轮询任务状态
-    print("等待任务完成(每 30 秒查询一次)...")
-    while True:
-        time.sleep(30)
-        resp = httpx.get(f"{BASE_URL}/task/{task_id}", timeout=10)
-        resp.raise_for_status()
-        task = resp.json()
-        status = task["status"]
-        print(f"[{time.strftime('%H:%M:%S')}] 状态: {status}")
-
-        if status == "completed":
-            print("\n任务完成!")
-            print("结果摘要:")
-            print(task.get("result", "(无结果)"))
-            break
-        elif status == "failed":
-            print("\n任务失败!")
-            print("错误:", task.get("error"))
-            break
-        elif status in ("pending", "running", "researching"):
-            continue
-        else:
-            print(f"未知状态: {status}")
-            break
-
-
-if __name__ == "__main__":
-    main()