luojunhui 1 bulan lalu
induk
melakukan
5343433712

+ 4 - 0
Dockerfile

@@ -0,0 +1,4 @@
+FROM ubuntu:latest
+LABEL authors="huahaiblcu"
+
+ENTRYPOINT ["top", "-b"]

+ 0 - 0
README-CN.md


+ 0 - 3
README.md

@@ -1,3 +0,0 @@
-# AISupplyAgent
-
-AI 内容供给

+ 27 - 0
app.py

@@ -0,0 +1,27 @@
+from quart import Quart
+
+from applications.resource import init_resource_manager
+
+app = Quart(__name__)
+
+env = "PROD"
+resource_manager = init_resource_manager(env=env)
+
+
+@app.before_serving
+async def startup():
+    await resource_manager.startup()
+    print("Resource manager is ready.")
+    print("Jieba dictionary loaded successfully")
+
+
+@app.after_serving
+async def shutdown():
+    await resource_manager.shutdown()
+    print("Resource manager is Down.")
+
+
+# 注册路由
+from routes import server_bp
+
+app.register_blueprint(server_bp)

+ 0 - 0
applications/__init__.py


+ 0 - 0
applications/api/__init__.py


+ 55 - 0
applications/api/deepseek.py

@@ -0,0 +1,55 @@
+"""
+@author: luojunhui
+@description: deepseek 官方api (async版)
+"""
+
+from __future__ import annotations
+
+import json
+from typing import Dict, List, Optional
+from openai import AsyncOpenAI
+
+from applications.config import DEEPSEEK_MODEL
+from applications.config import DEEPSEEK_API_KEY
+
+
+async def fetch_deepseek_completion(
+    model: str,
+    prompt: str,
+    output_type: str = "text",
+    tool_calls: bool = False,
+    tools: List[Dict] = None,
+) -> Optional[Dict | List]:
+    messages = [{"role": "user", "content": prompt}]
+    kwargs = {
+        "model": DEEPSEEK_MODEL.get(model, "deepseek-chat"),
+        "messages": messages,
+    }
+
+    # add tool calls
+    if tool_calls and tools:
+        kwargs["tools"] = tools
+        kwargs["tool_choice"] = "auto"
+
+    client = AsyncOpenAI(api_key=DEEPSEEK_API_KEY, base_url="https://api.deepseek.com")
+
+    if output_type == "json":
+        kwargs["response_format"] = {"type": "json_object"}
+
+    try:
+        response = await client.chat.completions.create(**kwargs)
+        choice = response.choices[0]
+
+        if output_type == "text":
+            return choice.message.content  # 只返回文本
+        elif output_type == "json":
+            return json.loads(choice.message.content)
+        else:
+            raise ValueError(f"Invalid output_type: {output_type}")
+
+    except Exception as e:
+        print(f"[ERROR] fetch_deepseek_completion failed: {e}")
+        return None
+
+
+__all__ = ["fetch_deepseek_completion"]

+ 1 - 0
applications/config/__init__.py

@@ -0,0 +1 @@
+from .mysql_database_config import get_mysql_database_config

+ 25 - 0
applications/config/mysql_database_config.py

@@ -0,0 +1,25 @@
+
+prod_config = {
+        "host": "",
+        "user": "",
+        "password": "",
+        "port": 3306,
+        "db": "",
+        "charset": "utf8mb4",
+        "minsize": 5,
+        "maxsize": 20,
+}
+
+dev_config = {
+        "host": "",
+        "user": "",
+        "password": "",
+        "port": 3306,
+        "db": "",
+        "charset": "utf8mb4",
+        "minsize": 5,
+        "maxsize": 20,
+}
+
+def get_mysql_database_config(env="PROD"):
+    return prod_config if env == "PROD" else dev_config

+ 0 - 0
applications/prompts/__init__.py


+ 4 - 0
applications/resource/__init__.py

@@ -0,0 +1,4 @@
+from .resource_manager import get_resource_manager
+from .resource_manager import init_resource_manager
+
+__all__ = ["get_resource_manager", "init_resource_manager"]

+ 32 - 0
applications/resource/resource_manager.py

@@ -0,0 +1,32 @@
+
+from utils.mysql import DatabaseManager
+
+
+class ResourceManager:
+    def __init__(self, env):
+        self.env = env
+        self.mysql_client: DatabaseManager | None = None
+
+    async def startup(self):
+        # 初始化 MySQL
+        self.mysql_client = DatabaseManager(self.env)
+        await self.mysql_client.init_pools()
+        print("MySQL connected")
+
+    async def shutdown(self):
+        # 关闭 MySQL
+        if self.mysql_client:
+            await self.mysql_client.close_pools()
+            print("Mysql closed")
+
+_resource_manager: ResourceManager | None = None
+
+
+def init_resource_manager(env):
+    global _resource_manager
+    if _resource_manager is None:
+        _resource_manager = ResourceManager(env)
+    return _resource_manager
+
+def get_resource_manager() -> ResourceManager:
+    return _resource_manager

+ 0 - 0
applications/tasks/__init__.py


+ 6 - 0
config.toml

@@ -0,0 +1,6 @@
+reload = true
+bind = "0.0.0.0:9001"
+workers = 1
+keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
+graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
+loglevel = "debug"  # 日志级别

+ 0 - 0
docker-compose.yml


+ 4 - 0
requirements.txt

@@ -0,0 +1,4 @@
+aiohttp==3.13.0
+aiomysql==0.2.0
+quart
+hypercorn

+ 1 - 0
routes/__init__.py

@@ -0,0 +1 @@
+from .blueprint import server_bp

+ 15 - 0
routes/blueprint.py

@@ -0,0 +1,15 @@
+import asyncio
+import json
+import traceback
+import uuid
+from typing import Dict, Any
+
+from quart import Blueprint, jsonify, request
+
+
+server_bp = Blueprint("api", __name__, url_prefix="/api")
+
+
+@server_bp.route("/hello", methods=["GET"])
+async def _hello():
+    return jsonify({"msg": "Hello, World! Hello Future!"})

+ 0 - 0
utils/common/__init__.py


+ 0 - 0
utils/http/__init__.py


+ 99 - 0
utils/http/async_http_client.py

@@ -0,0 +1,99 @@
+import aiohttp
+from typing import Optional, Union, Dict, Any
+
+
+class AsyncHttpClient:
+    def __init__(
+        self,
+        timeout: int = 10,
+        max_connections: int = 100,
+        default_headers: Optional[Dict[str, str]] = None,
+    ):
+        """
+        简化版异步 HTTP 客户端
+
+        :param timeout: 请求超时时间(秒)
+        :param max_connections: 连接池最大连接数
+        :param default_headers: 默认请求头
+        """
+        self.timeout = aiohttp.ClientTimeout(total=timeout)
+        self.connector = aiohttp.TCPConnector(limit=max_connections)
+        self.default_headers = default_headers or {}
+        self.session = None
+
+    async def __aenter__(self):
+        self.session = aiohttp.ClientSession(
+            connector=self.connector, timeout=self.timeout, headers=self.default_headers
+        )
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        await self.session.close()
+
+    async def request(
+        self,
+        method: str,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """核心请求方法"""
+        request_headers = {**self.default_headers, **(headers or {})}
+
+        try:
+            async with self.session.request(
+                method,
+                url,
+                params=params,
+                data=data,
+                json=json,
+                headers=request_headers,
+            ) as response:
+                response.raise_for_status()
+                content_type = response.headers.get("Content-Type", "")
+
+                if "application/json" in content_type:
+                    return await response.json()
+                return await response.text()
+
+        except aiohttp.ClientResponseError as e:
+            print(f"HTTP error: {e.status} {e.message}")
+            raise
+        except aiohttp.ClientError as e:
+            print(f"Network error: {str(e)}")
+            raise
+
+    async def get(
+        self,
+        url: str,
+        params: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """GET 请求"""
+        return await self.request("GET", url, params=params, headers=headers)
+
+    async def post(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """POST 请求"""
+        return await self.request("POST", url, data=data, json=json, headers=headers)
+
+    async def put(
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
+    ) -> Union[Dict[str, Any], str]:
+        """
+        PUT 请求
+
+        通常用于更新资源,可以发送表单数据或 JSON 数据
+        """
+        return await self.request("PUT", url, data=data, json=json, headers=headers)

+ 0 - 0
utils/milvus/__init__.py


+ 1 - 0
utils/mysql/__init__.py

@@ -0,0 +1 @@
+from .pool import DatabaseManager

+ 81 - 0
utils/mysql/pool.py

@@ -0,0 +1,81 @@
+from aiomysql import create_pool
+from aiomysql.cursors import DictCursor
+from applications.config import get_mysql_database_config
+
+
+class DatabaseManager:
+    def __init__(self, env="PROD"):
+        self.databases = None
+        self.pools = {}
+        self.env = env
+
+    async def init_pools(self):
+        # 从配置获取数据库配置,也可以直接在这里配置
+        self.databases = {"supply": get_mysql_database_config(self.env)}
+
+        for db_name, config in self.databases.items():
+            try:
+                pool = await create_pool(
+                    host=config["host"],
+                    port=config["port"],
+                    user=config["user"],
+                    password=config["password"],
+                    db=config["db"],
+                    minsize=config["minsize"],
+                    maxsize=config["maxsize"],
+                    cursorclass=DictCursor,
+                    autocommit=True,
+                )
+                self.pools[db_name] = pool
+                print(f"Created connection pool for {db_name}")
+            except Exception as e:
+                print(f"Failed to create pool for {db_name}: {str(e)}")
+                self.pools[db_name] = None
+
+    async def close_pools(self):
+        for name, pool in self.pools.items():
+            if pool:
+                pool.close()
+                await pool.wait_closed()
+
+    async def async_fetch(
+        self, query, db_name="supply", params=None, cursor_type=DictCursor
+    ):
+        pool = self.pools[db_name]
+        if not pool:
+            await self.init_pools()
+        # fetch from db
+        try:
+            async with pool.acquire() as conn:
+                async with conn.cursor(cursor_type) as cursor:
+                    await cursor.execute(query, params)
+                    fetch_response = await cursor.fetchall()
+
+            return fetch_response
+        except Exception as e:
+            return None
+
+    async def async_save(self, query, params, db_name="supply", batch: bool = False):
+        pool = self.pools[db_name]
+        if not pool:
+            await self.init_pools()
+
+        async with pool.acquire() as connection:
+            async with connection.cursor() as cursor:
+                try:
+                    if batch:
+                        await cursor.executemany(query, params)
+                    else:
+                        await cursor.execute(query, params)
+                    affected_rows = cursor.rowcount
+                    await connection.commit()
+                    return affected_rows
+                except Exception as e:
+                    await connection.rollback()
+                    raise e
+
+    def get_pool(self, db_name):
+        return self.pools.get(db_name)
+
+    def list_databases(self):
+        return list(self.databases.keys())