"""轻量反向代理服务 - 监听固定端口,动态转发到容器端口 监听端口:8001-8005 管理接口:9999(仅 localhost) """ import asyncio import json import logging from pathlib import Path from typing import Dict import httpx from fastapi import FastAPI, Request, Response, HTTPException from fastapi.responses import JSONResponse import uvicorn logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(name)s] %(levelname)s: %(message)s" ) logger = logging.getLogger(__name__) # 路由表:{代理端口: 容器实际端口} routes: Dict[int, int] = {} routes_file = Path("/opt/tool_agent/proxy/routes.json") # 管理 API manage_app = FastAPI(title="Proxy Manager") # 代理 API(每个端口一个实例) proxy_apps: Dict[int, FastAPI] = {} def load_routes(): """从文件加载路由表""" global routes if routes_file.exists(): try: routes = json.loads(routes_file.read_text()) # 转换 key 为 int routes = {int(k): int(v) for k, v in routes.items()} logger.info(f"Loaded routes: {routes}") except Exception as e: logger.error(f"Failed to load routes: {e}") routes = {} else: routes = {} def save_routes(): """保存路由表到文件""" try: routes_file.parent.mkdir(parents=True, exist_ok=True) routes_file.write_text(json.dumps(routes, indent=2)) logger.info(f"Saved routes: {routes}") except Exception as e: logger.error(f"Failed to save routes: {e}") # ========== 管理接口 ========== from pydantic import BaseModel class UpdateRouteRequest(BaseModel): proxy_port: int target_port: int class RemoveRouteRequest(BaseModel): proxy_port: int @manage_app.post("/manage/update_route") async def update_route(req: UpdateRouteRequest): """更新路由表""" if req.proxy_port not in [8001, 8002, 8003, 8004, 8005]: raise HTTPException(400, f"Invalid proxy_port {req.proxy_port}. Must be 8001-8005") routes[req.proxy_port] = req.target_port save_routes() logger.info(f"Updated route: {req.proxy_port} -> {req.target_port}") return {"status": "ok", "proxy_port": req.proxy_port, "target_port": req.target_port} @manage_app.delete("/manage/remove_route") async def remove_route(req: RemoveRouteRequest): """删除路由""" if req.proxy_port in routes: del routes[req.proxy_port] save_routes() logger.info(f"Removed route: {req.proxy_port}") return {"status": "ok", "proxy_port": req.proxy_port} else: raise HTTPException(404, f"Route {req.proxy_port} not found") @manage_app.get("/manage/routes") async def get_routes(): """获取当前路由表""" return {"routes": routes} # ========== 代理转发 ========== def create_proxy_app(proxy_port: int) -> FastAPI: """为每个代理端口创建一个 FastAPI 实例""" app = FastAPI(title=f"Proxy {proxy_port}") @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"]) async def proxy_handler(request: Request, path: str): """转发请求到容器实际端口""" target_port = routes.get(proxy_port) if not target_port: return JSONResponse( status_code=503, content={"error": f"No route configured for port {proxy_port}"} ) target_url = f"http://localhost:{target_port}/{path}" if request.url.query: target_url += f"?{request.url.query}" try: async with httpx.AsyncClient(timeout=60.0) as client: # 转发请求 response = await client.request( method=request.method, url=target_url, headers=dict(request.headers), content=await request.body(), ) # 返回响应 return Response( content=response.content, status_code=response.status_code, headers=dict(response.headers), ) except httpx.ConnectError: logger.error(f"Failed to connect to {target_url}") return JSONResponse( status_code=502, content={"error": f"Container port {target_port} not reachable"} ) except Exception as e: logger.error(f"Proxy error: {e}") return JSONResponse( status_code=500, content={"error": str(e)} ) return app async def main(): """启动所有服务""" load_routes() # 创建代理 app for port in [8001, 8002, 8003, 8004, 8005]: proxy_apps[port] = create_proxy_app(port) # 启动所有服务 servers = [] # 管理接口(仅 localhost:9999) config_manage = uvicorn.Config( manage_app, host="127.0.0.1", port=9999, log_level="info" ) server_manage = uvicorn.Server(config_manage) servers.append(asyncio.create_task(server_manage.serve())) logger.info("Management API started on http://127.0.0.1:9999") # 代理接口(0.0.0.0:8001-8005) for port, app in proxy_apps.items(): config = uvicorn.Config( app, host="0.0.0.0", port=port, log_level="warning" # 减少日志噪音 ) server = uvicorn.Server(config) servers.append(asyncio.create_task(server.serve())) logger.info(f"Proxy started on http://0.0.0.0:{port}") # 等待所有服务 await asyncio.gather(*servers) if __name__ == "__main__": asyncio.run(main())