瀏覽代碼

add task manager

luojunhui 12 小時之前
父節點
當前提交
03bea18794

+ 1 - 2
app_config.toml

@@ -3,5 +3,4 @@ bind = "0.0.0.0:6060"
 workers = 4
 keep_alive_timeout = 120  # 保持连接的最大秒数,根据需要调整
 graceful_timeout = 30    # 重启或停止之前等待当前工作完成的时间
-loglevel = "debug"  # 日志级别
-reload = true
+loglevel = "debug"  # 日志级别

+ 8 - 8
applications/config/mysql_config.py

@@ -6,8 +6,8 @@ aigc_db_config = {
     "password": "cyber#crawler_2023",
     "db": "aigc-admin-prod",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
 }
 
 # long_video_db_config
@@ -18,8 +18,8 @@ long_video_db_config = {
     "password": "wx2016_longvideoP@assword1234",
     "db": "longvideo",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
 }
 
 # 长文数据库连接配置
@@ -30,8 +30,8 @@ long_articles_db_config = {
     "password": "changwen@123456",
     "db": "long_articles",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
 }
 
 # 票圈爬虫库数据库配置
@@ -42,6 +42,6 @@ piaoquan_crawler_db_config = {
     "password": "crawler123456@",
     "db": "piaoquan-crawler",
     "charset": "utf8mb4",
-    "min_size": 5,
-    "max_size": 20,
+    "minsize": 5,
+    "maxsize": 20,
 }

+ 2 - 2
applications/database/mysql_pools.py

@@ -1,5 +1,5 @@
-from aiomysql import create_pool, DictCursor
-
+from aiomysql import create_pool
+from aiomysql.cursors import DictCursor
 from applications.config import *
 
 

+ 0 - 2
applications/service/log_service.py

@@ -42,8 +42,6 @@ class LogService:
             for k, v in contents.items()
         ]
         log_item = LogItem(timestamp=timestamp, contents=safe_items)
-        print(type(log_item))
-        print(log_item)
         req = PutLogsRequest(
             self.project, self.logstore, topic="", source="", logitems=[log_item]
         )

+ 0 - 1
applications/tasks/monitor_tasks/kimi_balance.py

@@ -32,4 +32,3 @@ async def check_kimi_balance() -> Dict:
             detail={"error": str(e), "error_msg": error_stack},
         )
         return {"code": 99, "data": error_stack}
-

+ 11 - 3
applications/tasks/task_scheduler.py

@@ -18,7 +18,9 @@ class TaskScheduler:
         query = f"""
             select start_timestamp from {self.table} where task_name = %s and task_status = %s;
         """
-        response = await self.db_client.async_fetch(query=query, params=(task_name, 1))
+        response, error = await self.db_client.async_fetch(
+            query=query, params=(task_name, 1)
+        )
         if not response:
             # no task is processing
             return False
@@ -54,7 +56,8 @@ class TaskScheduler:
             where task_name = %s and date_string = %s and task_status = %s;
         """
         return await self.db_client.async_save(
-            query=query, params=(final_status, int(time.time()), task_name, date_string, 1)
+            query=query,
+            params=(final_status, int(time.time()), task_name, date_string, 1),
         )
 
     async def deal(self):
@@ -83,6 +86,7 @@ class TaskScheduler:
                     return await task_schedule_response.fail_response(
                         error_code="5001", error_message="task is processing"
                     )
+
                 await self.record_task(task_name=task_name, date_string=date_string)
 
                 await self.lock_task(task_name, date_string)
@@ -96,7 +100,11 @@ class TaskScheduler:
                         "data": response,
                     }
                 )
-                await self.release_task(task_name=task_name, date_string=date_string, final_status=response['code'])
+                await self.release_task(
+                    task_name=task_name,
+                    date_string=date_string,
+                    final_status=response["code"],
+                )
                 return await task_schedule_response.success_response(
                     task_name=task_name, data=response
                 )