proxy_server.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. """轻量反向代理服务 - 监听固定端口,动态转发到容器端口
  2. 监听端口:8001-8005
  3. 管理接口:9999(仅 localhost)
  4. """
  5. import asyncio
  6. import json
  7. import logging
  8. from pathlib import Path
  9. from typing import Dict
  10. import httpx
  11. from fastapi import FastAPI, Request, Response, HTTPException
  12. from fastapi.responses import JSONResponse
  13. import uvicorn
  14. logging.basicConfig(
  15. level=logging.INFO,
  16. format="%(asctime)s [%(name)s] %(levelname)s: %(message)s"
  17. )
  18. logger = logging.getLogger(__name__)
  19. # 路由表:{代理端口: 容器实际端口}
  20. routes: Dict[int, int] = {}
  21. routes_file = Path("/opt/tool_agent/proxy/routes.json")
  22. # 管理 API
  23. manage_app = FastAPI(title="Proxy Manager")
  24. # 代理 API(每个端口一个实例)
  25. proxy_apps: Dict[int, FastAPI] = {}
  26. def load_routes():
  27. """从文件加载路由表"""
  28. global routes
  29. if routes_file.exists():
  30. try:
  31. routes = json.loads(routes_file.read_text())
  32. # 转换 key 为 int
  33. routes = {int(k): int(v) for k, v in routes.items()}
  34. logger.info(f"Loaded routes: {routes}")
  35. except Exception as e:
  36. logger.error(f"Failed to load routes: {e}")
  37. routes = {}
  38. else:
  39. routes = {}
  40. def save_routes():
  41. """保存路由表到文件"""
  42. try:
  43. routes_file.parent.mkdir(parents=True, exist_ok=True)
  44. routes_file.write_text(json.dumps(routes, indent=2))
  45. logger.info(f"Saved routes: {routes}")
  46. except Exception as e:
  47. logger.error(f"Failed to save routes: {e}")
  48. # ========== 管理接口 ==========
  49. from pydantic import BaseModel
  50. class UpdateRouteRequest(BaseModel):
  51. proxy_port: int
  52. target_port: int
  53. class RemoveRouteRequest(BaseModel):
  54. proxy_port: int
  55. @manage_app.post("/manage/update_route")
  56. async def update_route(req: UpdateRouteRequest):
  57. """更新路由表"""
  58. if req.proxy_port not in [8001, 8002, 8003, 8004, 8005]:
  59. raise HTTPException(400, f"Invalid proxy_port {req.proxy_port}. Must be 8001-8005")
  60. routes[req.proxy_port] = req.target_port
  61. save_routes()
  62. logger.info(f"Updated route: {req.proxy_port} -> {req.target_port}")
  63. return {"status": "ok", "proxy_port": req.proxy_port, "target_port": req.target_port}
  64. @manage_app.delete("/manage/remove_route")
  65. async def remove_route(req: RemoveRouteRequest):
  66. """删除路由"""
  67. if req.proxy_port in routes:
  68. del routes[req.proxy_port]
  69. save_routes()
  70. logger.info(f"Removed route: {req.proxy_port}")
  71. return {"status": "ok", "proxy_port": req.proxy_port}
  72. else:
  73. raise HTTPException(404, f"Route {req.proxy_port} not found")
  74. @manage_app.get("/manage/routes")
  75. async def get_routes():
  76. """获取当前路由表"""
  77. return {"routes": routes}
  78. # ========== 代理转发 ==========
  79. def create_proxy_app(proxy_port: int) -> FastAPI:
  80. """为每个代理端口创建一个 FastAPI 实例"""
  81. app = FastAPI(title=f"Proxy {proxy_port}")
  82. @app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH", "OPTIONS", "HEAD"])
  83. async def proxy_handler(request: Request, path: str):
  84. """转发请求到容器实际端口"""
  85. target_port = routes.get(proxy_port)
  86. if not target_port:
  87. return JSONResponse(
  88. status_code=503,
  89. content={"error": f"No route configured for port {proxy_port}"}
  90. )
  91. target_url = f"http://localhost:{target_port}/{path}"
  92. if request.url.query:
  93. target_url += f"?{request.url.query}"
  94. try:
  95. async with httpx.AsyncClient(timeout=60.0) as client:
  96. # 转发请求
  97. response = await client.request(
  98. method=request.method,
  99. url=target_url,
  100. headers=dict(request.headers),
  101. content=await request.body(),
  102. )
  103. # 返回响应
  104. return Response(
  105. content=response.content,
  106. status_code=response.status_code,
  107. headers=dict(response.headers),
  108. )
  109. except httpx.ConnectError:
  110. logger.error(f"Failed to connect to {target_url}")
  111. return JSONResponse(
  112. status_code=502,
  113. content={"error": f"Container port {target_port} not reachable"}
  114. )
  115. except Exception as e:
  116. logger.error(f"Proxy error: {e}")
  117. return JSONResponse(
  118. status_code=500,
  119. content={"error": str(e)}
  120. )
  121. return app
  122. async def main():
  123. """启动所有服务"""
  124. load_routes()
  125. # 创建代理 app
  126. for port in [8001, 8002, 8003, 8004, 8005]:
  127. proxy_apps[port] = create_proxy_app(port)
  128. # 启动所有服务
  129. servers = []
  130. # 管理接口(仅 localhost:9999)
  131. config_manage = uvicorn.Config(
  132. manage_app,
  133. host="127.0.0.1",
  134. port=9999,
  135. log_level="info"
  136. )
  137. server_manage = uvicorn.Server(config_manage)
  138. servers.append(asyncio.create_task(server_manage.serve()))
  139. logger.info("Management API started on http://127.0.0.1:9999")
  140. # 代理接口(0.0.0.0:8001-8005)
  141. for port, app in proxy_apps.items():
  142. config = uvicorn.Config(
  143. app,
  144. host="0.0.0.0",
  145. port=port,
  146. log_level="warning" # 减少日志噪音
  147. )
  148. server = uvicorn.Server(config)
  149. servers.append(asyncio.create_task(server.serve()))
  150. logger.info(f"Proxy started on http://0.0.0.0:{port}")
  151. # 等待所有服务
  152. await asyncio.gather(*servers)
  153. if __name__ == "__main__":
  154. asyncio.run(main())