瀏覽代碼

add async aliyun log service

luojunhui 9 小時之前
父節點
當前提交
e9e2d1772b

+ 2 - 1
app_config.toml

@@ -3,4 +3,5 @@ bind = "0.0.0.0:6060"
 workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
-loglevel = "debug"  # 日志级别
+loglevel = "debug"  # 日志级别
+reload = true

+ 0 - 0
applications/__init__.py


+ 1 - 2
applications/api/__init__.py

@@ -1,7 +1,6 @@
-
 # feishu_sheet
 from .async_feishu_api import FeishuBotApi
 from .async_feishu_api import FeishuSheetApi
 
 feishu_robot = FeishuBotApi()
-feishu_sheet = FeishuSheetApi()
+feishu_sheet = FeishuSheetApi()

+ 4 - 0
applications/config/__init__.py

@@ -1,2 +1,6 @@
 from .mysql_config import aigc_db_config
 from .mysql_config import long_video_db_config
+
+# aliyun log sdk config
+
+from .aliyun_log_config import aliyun_log_config

+ 9 - 0
applications/config/aliyun_log_config.py

@@ -0,0 +1,9 @@
+
+# aliyun日志配置文件
+aliyun_log_config = {
+    "endpoint": "cn-hangzhou.log.aliyuncs.com",
+    "logstore":"long_articles_job",
+    "project":"changwen-alg",
+    "access_key_id":"LTAIP6x1l3DXfSxm",
+    "access_key_secret":"KbTaM9ars4OX3PMS6Xm7rtxGr1FLon",
+}

+ 3 - 4
applications/service/__init__.py

@@ -1,6 +1,5 @@
+# 日志服务
+from .log_service import LogService
+# 实验
 from .get_cover import GetCoverService
-from .response import Response
-from .response import TaskScheduleResponse
 
-
-task_schedule_response = TaskScheduleResponse()

+ 1 - 1
applications/service/get_cover.py

@@ -1,4 +1,4 @@
-from applications.service.response import Response
+from applications.utils.response import Response
 from applications.utils import fetch_channel_info
 from applications.utils import fetch_aigc_cover
 from applications.utils import fetch_long_video_cover

+ 47 - 0
applications/service/log_service.py

@@ -0,0 +1,47 @@
+import asyncio
+import traceback
+import time
+import json
+import datetime
+from aliyun.log import LogClient, PutLogsRequest, LogItem
+
+class LogService:
+    def __init__(self, endpoint, access_key_id, access_key_secret, project, logstore):
+        self.client = LogClient(endpoint, access_key_id, access_key_secret)
+        self.project = project
+        self.logstore = logstore
+        self.queue = asyncio.Queue()
+        self.running = False
+
+    async def start(self):
+        self.running = True
+        asyncio.create_task(self._worker())
+
+    async def stop(self):
+        self.running = False
+
+    async def log(self, contents: dict):
+        """外部调用日志接口"""
+        await self.queue.put(contents)
+
+    async def _worker(self):
+        while self.running:
+            contents = await self.queue.get()
+            try:
+                await asyncio.to_thread(self._put_log, contents)
+            except Exception as e:
+                print(f"[Log Error] {e}")
+                print(traceback.format_exc())
+
+    def _put_log(self, contents: dict):
+        timestamp = int(time.time())
+        contents['datetime'] = datetime.datetime.now().__str__()
+        safe_items = [
+            (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
+            for k, v in contents.items()
+        ]
+        log_item = LogItem(timestamp=timestamp, contents=safe_items)
+        print(type(log_item))
+        print(log_item)
+        req = PutLogsRequest(self.project, self.logstore, topic="", source="", logitems=[log_item])
+        self.client.put_logs(req)

+ 0 - 0
applications/tasks/__init__.py


+ 0 - 0
tasks/monitor_tasks/__init__.py → applications/tasks/monitor_tasks/__init__.py


+ 0 - 0
tasks/monitor_tasks/kimi_balance.py → applications/tasks/monitor_tasks/kimi_balance.py


+ 52 - 0
applications/tasks/task_scheduler.py

@@ -0,0 +1,52 @@
+from applications.utils import task_schedule_response
+from applications.tasks.monitor_tasks import check_kimi_balance
+
+class TaskScheduler:
+    def __init__(self, data, log_service):
+        self.data = data
+        self.log_client = log_service
+
+    async def deal(self):
+        task_name = self.data.get("task_name")
+        if not task_name:
+            await self.log_client.log(
+                contents={
+                    "task": task_name,
+                    "function": "task_scheduler_deal",
+                    "message": "not task name in params",
+                    "status": "fail",
+                    "data": self.data,
+                }
+            )
+            return await task_schedule_response.fail_response(
+                error_code="4002", error_message="task_name must be input"
+            )
+
+        match task_name:
+            case "check_kimi_balance":
+                response = await check_kimi_balance()
+                await self.log_client.log(
+                    contents={
+                        "task": task_name,
+                        "function": "task_scheduler_deal",
+                        "message": "check_kimi_balance task execute successfully",
+                        "status": "success",
+                        "data": response,
+                    }
+                )
+                return await task_schedule_response.success_response(
+                    task_name=task_name, data=response
+                )
+            case _:
+                await self.log_client.log(
+                    contents={
+                        "task": task_name,
+                        "function": "task_scheduler_deal",
+                        "message": "wrong task input",
+                        "status": "success",
+                        "data": self.data,
+                    }
+                )
+                return await task_schedule_response.fail_response(
+                    error_code="4001", error_message="wrong task name input"
+                )

+ 3 - 0
applications/utils/__init__.py

@@ -2,3 +2,6 @@ from .get_cover import fetch_channel_info
 from .get_cover import fetch_aigc_cover
 from .get_cover import fetch_long_video_cover
 from .async_http_client import AsyncHttPClient
+from applications.utils.response import TaskScheduleResponse
+
+task_schedule_response = TaskScheduleResponse()

+ 6 - 6
applications/utils/async_http_client.py

@@ -85,15 +85,15 @@ class AsyncHttPClient:
         return await self.request("POST", url, data=data, json=json, headers=headers)
 
     async def put(
-            self,
-            url: str,
-            data: Optional[Union[Dict[str, Any], str, bytes]] = None,
-            json: Optional[Dict[str, Any]] = None,
-            headers: Optional[Dict[str, str]] = None
+        self,
+        url: str,
+        data: Optional[Union[Dict[str, Any], str, bytes]] = None,
+        json: Optional[Dict[str, Any]] = None,
+        headers: Optional[Dict[str, str]] = None,
     ) -> Union[Dict[str, Any], str]:
         """
         PUT 请求
 
         通常用于更新资源,可以发送表单数据或 JSON 数据
         """
-        return await self.request("PUT", url, data=data, json=json, headers=headers)
+        return await self.request("PUT", url, data=data, json=json, headers=headers)

+ 7 - 2
applications/service/response.py → applications/utils/response.py

@@ -15,5 +15,10 @@ class TaskScheduleResponse:
         return {"code": error_code, "status": "error", "message": error_message}
 
     @classmethod
-    async def success_response(cls, task_name,  data):
-        return {"code": 0, "status": "task execute successfully", "data": data, "task_name": task_name}
+    async def success_response(cls, task_name, data):
+        return {
+            "code": 0,
+            "status": "task execute successfully",
+            "data": data,
+            "task_name": task_name,
+        }

+ 3 - 3
routes/blueprint.py

@@ -1,12 +1,12 @@
 from quart import Blueprint, jsonify, request
 from applications.service import GetCoverService
 
-from tasks.task_scheduler import TaskScheduler
+from applications.tasks.task_scheduler import TaskScheduler
 
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")
 
 
-def server_routes(pools):
+def server_routes(pools, log_service):
 
     @server_blueprint.route("/get_cover", methods=["POST"])
     async def get_cover():
@@ -17,7 +17,7 @@ def server_routes(pools):
     @server_blueprint.route("/run_task", methods=["POST"])
     async def run_task():
         data = await request.get_json()
-        task_scheduler = TaskScheduler(data)
+        task_scheduler = TaskScheduler(data, log_service)
         response = await task_scheduler.deal()
         return jsonify(response)
 

+ 13 - 3
task_app.py

@@ -1,19 +1,29 @@
 from quart import Quart
+from applications.config import aliyun_log_config
 from applications.database import mysql_manager
+from applications.service import LogService
 from routes import server_routes
 
+log_service = LogService(**aliyun_log_config)
+
 app = Quart(__name__)
-routes = server_routes(mysql_manager)
+routes = server_routes(mysql_manager, log_service)
 app.register_blueprint(routes)
 
 
 @app.before_serving
 async def startup():
-    print("🚀 Starting application...")
+    print("Starting application...")
     await mysql_manager.init_pools()
+    print("Mysql pools init successfully")
+    await log_service.start()
+    print("aliyun log service init successfully")
 
 
 @app.after_serving
 async def shutdown():
-    print("🛑 Shutting down application...")
+    print("Shutting down application...")
     await mysql_manager.close_pools()
+    print("Mysql pools close successfully")
+    await log_service.stop()
+    print("aliyun log service stop successfully")

+ 0 - 27
tasks/task_scheduler.py

@@ -1,27 +0,0 @@
-from applications.service import task_schedule_response
-from tasks.monitor_tasks import check_kimi_balance
-
-
-class TaskScheduler:
-    def __init__(self, data):
-        self.data = data
-
-    async def deal(self):
-        task_name = self.data.get("task_name")
-        if not task_name:
-            return await task_schedule_response.fail_response(
-                error_code="4002",
-                error_message="task_name must be input"
-            )
-
-        match task_name:
-            case "check_kimi_balance":
-                response = await check_kimi_balance()
-                return await task_schedule_response.success_response(task_name=task_name, data=response)
-            case _:
-                return await task_schedule_response.fail_response(
-                    error_code="4001",
-                    error_message="wrong task name input"
-                )
-
-