| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- """轻量反向代理服务 - 监听固定端口,动态转发到容器端口
- 监听端口:8001-8005
- 管理接口:9999(仅 localhost)
- """
- import asyncio
- import json
- import logging
- from pathlib import Path
- from typing import Dict
- import httpx
- from fastapi import FastAPI, Request, Response, HTTPException
- from fastapi.responses import JSONResponse
- import uvicorn
- logging.basicConfig(
- level=logging.INFO,
- format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
- )
- logger = logging.getLogger(__name__)
- # 路由表:{代理端口: 容器实际端口}
- routes: Dict[int, int] = {}
- routes_file = Path("/opt/tool_agent/proxy/routes.json")
- # 管理 API
- manage_app = FastAPI(title="Proxy Manager")
- # 代理 API(每个端口一个实例)
- proxy_apps: Dict[int, FastAPI] = {}
- def load_routes():
- """从文件加载路由表"""
- global routes
- if routes_file.exists():
- try:
- routes = json.loads(routes_file.read_text())
- # 转换 key 为 int
- routes = {int(k): int(v) for k, v in routes.items()}
- logger.info(f"Loaded routes: {routes}")
- except Exception as e:
- logger.error(f"Failed to load routes: {e}")
- routes = {}
- else:
- routes = {}
- def save_routes():
- """保存路由表到文件"""
- try:
- routes_file.parent.mkdir(parents=True, exist_ok=True)
- routes_file.write_text(json.dumps(routes, indent=2))
- logger.info(f"Saved routes: {routes}")
- except Exception as e:
- logger.error(f"Failed to save routes: {e}")
- # ========== 管理接口 ==========
- from pydantic import BaseModel
- class UpdateRouteRequest(BaseModel):
- proxy_port: int
- target_port: int
- class RemoveRouteRequest(BaseModel):
- proxy_port: int
- @manage_app.post("/manage/update_route")
- async def update_route(req: UpdateRouteRequest):
- """更新路由表"""
- if req.proxy_port not in [8001, 8002, 8003, 8004, 8005]:
- raise HTTPException(400, f"Invalid proxy_port {req.proxy_port}. Must be 8001-8005")
- routes[req.proxy_port] = req.target_port
- save_routes()
- logger.info(f"Updated route: {req.proxy_port} -> {req.target_port}")
- return {"status": "ok", "proxy_port": req.proxy_port, "target_port": req.target_port}
- @manage_app.delete("/manage/remove_route")
- async def remove_route(req: RemoveRouteRequest):
- """删除路由"""
- if req.proxy_port in routes:
- del routes[req.proxy_port]
- save_routes()
- logger.info(f"Removed route: {req.proxy_port}")
- return {"status": "ok", "proxy_port": req.proxy_port}
- else:
- raise HTTPException(404, f"Route {req.proxy_port} not found")
- @manage_app.get("/manage/routes")
- async def get_routes():
- """获取当前路由表"""
- return {"routes": routes}
- # ========== 代理转发 ==========
- def create_proxy_app(proxy_port: int) -> FastAPI:
- """为每个代理端口创建一个 FastAPI 实例"""
- app = FastAPI(title=f"Proxy {proxy_port}")
- @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
- async def proxy_handler(request: Request, path: str):
- """转发请求到容器实际端口"""
- target_port = routes.get(proxy_port)
- if not target_port:
- return JSONResponse(
- status_code=503,
- content={"error": f"No route configured for port {proxy_port}"}
- )
- target_url = f"http://localhost:{target_port}/{path}"
- if request.url.query:
- target_url += f"?{request.url.query}"
- try:
- async with httpx.AsyncClient(timeout=60.0) as client:
- # 转发请求
- response = await client.request(
- method=request.method,
- url=target_url,
- headers=dict(request.headers),
- content=await request.body(),
- )
- # 返回响应
- return Response(
- content=response.content,
- status_code=response.status_code,
- headers=dict(response.headers),
- )
- except httpx.ConnectError:
- logger.error(f"Failed to connect to {target_url}")
- return JSONResponse(
- status_code=502,
- content={"error": f"Container port {target_port} not reachable"}
- )
- except Exception as e:
- logger.error(f"Proxy error: {e}")
- return JSONResponse(
- status_code=500,
- content={"error": str(e)}
- )
- return app
- async def main():
- """启动所有服务"""
- load_routes()
- # 创建代理 app
- for port in [8001, 8002, 8003, 8004, 8005]:
- proxy_apps[port] = create_proxy_app(port)
- # 启动所有服务
- servers = []
- # 管理接口(仅 localhost:9999)
- config_manage = uvicorn.Config(
- manage_app,
- host="127.0.0.1",
- port=9999,
- log_level="info"
- )
- server_manage = uvicorn.Server(config_manage)
- servers.append(asyncio.create_task(server_manage.serve()))
- logger.info("Management API started on http://127.0.0.1:9999")
- # 代理接口(0.0.0.0:8001-8005)
- for port, app in proxy_apps.items():
- config = uvicorn.Config(
- app,
- host="0.0.0.0",
- port=port,
- log_level="warning" # 减少日志噪音
- )
- server = uvicorn.Server(config)
- servers.append(asyncio.create_task(server.serve()))
- logger.info(f"Proxy started on http://0.0.0.0:{port}")
- # 等待所有服务
- await asyncio.gather(*servers)
- if __name__ == "__main__":
- asyncio.run(main())
|