luojunhui 1 miesiąc temu
rodzic
commit
0087c66a16
36 zmienionych plików z 316 dodań i 251 usunięć
  1. 11 1
      app/api/service/__init__.py
  2. 1 1
      app/api/service/gzh_cookie_manager.py
  3. 41 37
      app/api/service/task_manager_service.py
  4. 0 0
      app/api/service/task_scheduler.py
  5. 11 4
      app/api/v1/routes.py
  6. 1 0
      app/core/bootstrap/__init__.py
  7. 35 0
      app/core/bootstrap/resource_manager.py
  8. 3 25
      app/core/config/global_settings.py
  9. 31 28
      app/core/config/settings/task_chinese_name.py
  10. 1 1
      app/core/database/mysql_pools.py
  11. 1 2
      app/core/dependency/__init__.py
  12. 10 26
      app/core/dependency/dependencies.py
  13. 68 26
      app/core/observability/logging/log_service.py
  14. 8 8
      app/core/pipeline/crawler_pipeline.py
  15. 0 0
      app/core/pipeline/schemas.py
  16. 11 6
      app/domains/cold_start_tasks/article_pool_cold_start.py
  17. 7 7
      app/domains/crawler_tasks/crawler_gzh.py
  18. 16 9
      app/domains/crawler_tasks/crawler_toutiao.py
  19. 3 3
      app/domains/data_recycle_tasks/article_detail_stat.py
  20. 1 1
      app/domains/data_recycle_tasks/recycle_daily_publish_articles.py
  21. 1 2
      app/domains/monitor_tasks/cooperate_accounts_monitor.py
  22. 1 1
      app/infra/crawler/wechat/__init__.py
  23. 2 7
      app/infra/crawler/wechat/gzh_article_stat.py
  24. 1 5
      app/infra/crawler/wechat/gzh_fans.py
  25. 3 1
      app/infra/crawler/wechat/gzh_spider.py
  26. 0 1
      app/infra/external/__init__.py
  27. 3 5
      app/infra/external/apollo.py
  28. 1 3
      app/infra/external/deepseek_official.py
  29. 3 1
      app/infra/external/elastic_search.py
  30. 5 0
      app/infra/internal/__init__.py
  31. 4 2
      app/infra/internal/long_articles.py
  32. 2 1
      app/infra/shared/tools.py
  33. 0 15
      app/infra/utils/__init__.py
  34. 0 3
      app/jobs/__init__.py
  35. 14 6
      app/jobs/task_handler.py
  36. 16 13
      task_app.py

+ 11 - 1
app/api/service/__init__.py

@@ -1,5 +1,15 @@
 # 日志服务
 
 # 前端交互
-from .task_manager_service import TaskManagerService
+from .task_manager_service import TaskManager
 from .gzh_cookie_manager import GzhCookieManager
+
+# 任务调度器
+from .task_scheduler import TaskScheduler
+
+
+__all__ = [
+    "TaskManager",
+    "GzhCookieManager",
+    "TaskScheduler",
+]

+ 1 - 1
app/api/service/gzh_cookie_manager.py

@@ -6,7 +6,7 @@ class GzhCookieManager(CrawlerGzhFansBase):
         super().__init__(pool, log_client)
 
     async def deal(self, data):
-        gh_id = data.get('gzh_id')
+        gh_id = data.get("gzh_id")
         if not gh_id:
             return {"error": "gh_id is required"}
 

+ 41 - 37
app/api/service/task_manager_service.py

@@ -2,38 +2,6 @@ import json
 from typing import Optional
 from app.core.config import GlobalConfigSettings
 
-def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
-    conds, params = [], []
-
-    if id_eq is not None:
-        conds.append("id = %s")
-        params.append(id_eq)
-
-    if date_string:  # 字符串非空
-        conds.append("date_string = %s")
-        params.append(date_string)
-
-    if trace_id:
-        conds.append("trace_id LIKE %s")
-        # 如果调用方已经传了 %,就原样用;否则自动做包含匹配
-        params.append(trace_id if "%" in trace_id else f"%{trace_id}%")
-
-    if task_status is not None:
-        conds.append("task_status = %s")
-        params.append(task_status)
-
-    where_clause = " AND ".join(conds) if conds else "1=1"
-    return where_clause, params
-
-
-def _safe_json(v):
-    try:
-        if isinstance(v, (str, bytes, bytearray)):
-            return json.loads(v)
-        return v or {}
-    except Exception:
-        return {}
-
 
 class TaskConst:
     INIT_STATUS = 0
@@ -64,7 +32,9 @@ class TaskManagerUtils(TaskConst):
                 "account_association", "账号联想"
             ).replace("search", "")
             crawl_mode = data.get("crawl_mode", "")
-            crawl_mode = crawl_mode.replace("search", "搜索").replace("account", "抓账号")
+            crawl_mode = crawl_mode.replace("search", "搜索").replace(
+                "account", "抓账号"
+            )
             strategy = data.get("strategy", "")
             return f"{task_name_chinese}\t{crawl_mode}\t{account_method}\t{strategy}"
         elif task_name == "article_pool_cold_start":
@@ -80,9 +50,43 @@ class TaskManagerUtils(TaskConst):
         else:
             return task_name_chinese
 
+    @staticmethod
+    def _build_where(id_eq=None, date_string=None, trace_id=None, task_status=None):
+        conds, params = [], []
+
+        if id_eq is not None:
+            conds.append("id = %s")
+            params.append(id_eq)
+
+        if date_string:  # 字符串非空
+            conds.append("date_string = %s")
+            params.append(date_string)
+
+        if trace_id:
+            conds.append("trace_id LIKE %s")
+            # 如果调用方已经传了 %,就原样用;否则自动做包含匹配
+            params.append(trace_id if "%" in trace_id else f"%{trace_id}%")
+
+        if task_status is not None:
+            conds.append("task_status = %s")
+            params.append(task_status)
+
+        where_clause = " AND ".join(conds) if conds else "1=1"
+        return where_clause, params
+
+    @staticmethod
+    def _safe_json(v):
+        try:
+            if isinstance(v, (str, bytes, bytearray)):
+                return json.loads(v)
+            return v or {}
+        except Exception:
+            return {}
+
 
-class TaskManagerService(TaskConst):
-    def __init__(self, pool, data):
+class TaskManager(TaskManagerUtils):
+    def __init__(self, pool, data, config: GlobalConfigSettings):
+        super().__init__(config)
         self.pool = pool
         self.data = data
 
@@ -101,7 +105,7 @@ class TaskManagerService(TaskConst):
         )
 
         # 1) WHERE 子句
-        where_clause, params = _build_where(id_eq, date_string, trace_id, task_status)
+        where_clause, params = self._build_where(id_eq, date_string, trace_id, task_status)
         sort_whitelist = {
             "id",
             "date_string",
@@ -142,7 +146,7 @@ class TaskManagerService(TaskConst):
                 "status_text": self.STATUS_TEXT.get(
                     r["task_status"], str(r["task_status"])
                 ),
-                "task_name": get_task_chinese_name(_safe_json(r["data"])),
+                "task_name": self.get_task_chinese_name(self._safe_json(r["data"])),
             }
             for r in rows
         ]

+ 0 - 0
app/jobs/task_scheduler.py → app/api/service/task_scheduler.py


+ 11 - 4
app/api/v1/routes.py

@@ -2,14 +2,21 @@ from quart import Blueprint, jsonify, request
 from app.ab_test import GetCoverService
 from app.infra.shared.tools import generate_task_trace_id
 
-from app.jobs import TaskScheduler
-from app.api.service import TaskManagerService, GzhCookieManager
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.api.service import TaskScheduler
+from app.api.service import TaskManager
+from app.api.service import GzhCookieManager
 
 
 server_blueprint = Blueprint("api", __name__, url_prefix="/api")
 
 
-def server_routes(pools, log_service, config):
+def server_routes(
+    pools: DatabaseManager, log_service: LogService, config: GlobalConfigSettings
+):
     @server_blueprint.route("/get_cover", methods=["POST"])
     async def get_cover():
         params = await request.get_json()
@@ -32,7 +39,7 @@ def server_routes(pools, log_service, config):
     @server_blueprint.route("/tasks", methods=["POST"])
     async def task_list():
         data = await request.get_json()
-        TMS = TaskManagerService(pool=pools, data=data)
+        TMS = TaskManager(pool=pools, data=data)
         res = await TMS.list_tasks()
         return jsonify(res)
 

+ 1 - 0
app/core/bootstrap/__init__.py

@@ -0,0 +1 @@
+from .resource_manager import AppContext

+ 35 - 0
app/core/bootstrap/resource_manager.py

@@ -0,0 +1,35 @@
+import logging
+from app.core.dependency import ServerContainer
+
+logger = logging.getLogger(__name__)
+
+
+class AppContext:
+    def __init__(self, container: ServerContainer):
+        self.container = container
+
+    async def start_up(self):
+        logger.info("初始化数据库连接池")
+        mysql = self.container.mysql_manager()
+        await mysql.init_pools()
+        logger.info("Mysql pools init successfully")
+
+        logger.info("初始化日志服务")
+        log_service = self.container.log_service()
+        await log_service.start()
+        logger.info("aliyun log service init successfully")
+
+    async def shutdown(self):
+        logger.info("关闭数据库连接池")
+        mysql = self.container.mysql_manager()
+        await mysql.close_pools()
+        logger.info("应用资源已释放")
+        logger.info("关闭日志服务")
+        log_service = self.container.log_service()
+        await log_service.stop()
+        logger.info("aliyun log service stopped")
+
+
+__all__ = [
+    "AppContext",
+]

+ 3 - 25
app/core/config/global_settings.py

@@ -37,32 +37,10 @@ class GlobalConfigSettings(BaseSettings):
     # ============ 业务配置 ============
     cold_start: ColdStartConfig = Field(default_factory=ColdStartConfig)
     category: CategoryConfig = Field(default_factory=CategoryConfig)
-    task_chinese_name: TaskChineseNameConfig = Field(default_factory=TaskChineseNameConfig)
+    task_chinese_name: TaskChineseNameConfig = Field(
+        default_factory=TaskChineseNameConfig
+    )
 
     model_config = SettingsConfigDict(
         env_file=".env", env_file_encoding="utf-8", case_sensitive=False, extra="ignore"
     )
-
-
-# # ============ 全局配置实例 ============
-# global_settings = GlobalConfigSettings()
-#
-#
-# # ============ 兼容旧代码的导出 ============
-# # 这些变量保持向后兼容,让旧代码可以继续使用
-# aigc_db_config = global_settings.aigc_db.to_dict()
-# long_video_db_config = global_settings.long_video_db.to_dict()
-# long_articles_db_config = global_settings.long_articles_db.to_dict()
-# piaoquan_crawler_db_config = global_settings.piaoquan_crawler_db.to_dict()
-# growth_db_config = global_settings.growth_db.to_dict()
-#
-# deep_seek_official_api_key = global_settings.deepseek.api_key
-# deep_seek_official_model = global_settings.deepseek.get_model_map()
-#
-# aliyun_log_config = global_settings.aliyun_log.to_dict()
-#
-# cold_start_category_map = global_settings.cold_start.category_map
-# input_source_map = global_settings.cold_start.input_source_map
-#
-# CATEGORY_FEATURES = global_settings.category.features
-# CATEGORY_MAP = global_settings.category.category_map

+ 31 - 28
app/core/config/settings/task_chinese_name.py

@@ -5,38 +5,41 @@ from typing import Dict
 
 class TaskChineseNameConfig(BaseSettings):
     """冷启动配置"""
+
     # 分类映射
     category_map: Dict[str, str] = Field(
         default_factory=lambda: {
-    "title_rewrite": "标题重写",
-    "crawler_gzh_articles": "抓取公众号文章",
-    "crawler_account_manager": "抓取账号管理",
-    "article_pool_category_generation": "文章池品类生成",
-    "candidate_account_quality_analysis": "候选账号质量分析",
-    "article_pool_cold_start": "文章路冷启动",
-    "crawler_toutiao": "头条抓取",
-    "task_processing_monitor": "协程监测",
-    "update_root_source_id": "更新今日root_source_id",
-    "daily_publish_articles_recycle": "回收今日发文",
-    "inner_article_monitor": "账号发文违规监测",
-    "outside_article_monitor": "外部服务号发文监测",
-    "get_off_videos": "自动下架视频",
-    "check_publish_video_audit_status": "校验发布视频状态",
-    "check_kimi_balance": "检验kimi余额",
-    "account_category_analysis": "账号品类分析",
-    "mini_program_detail_process": "更新小程序信息",
-    "crawler_detail_analysis": "抓取详情分析",
-    "limited_account_analysis": "限流账号分析处理",
-    "auto_follow_account": "自动关注账号",
-    "update_account_open_rate_avg": "更新账号平均打开率",
-    "update_limited_account_info": "更新限流账号信息",
-    "update_account_read_avg": "更新账号平均阅读率",
-    "get_follow_result": "获取自动关注回复",
-    "extract_reply_result": "解析自动回复结果",
-    }
-
+            "title_rewrite": "标题重写",
+            "crawler_gzh_articles": "抓取公众号文章",
+            "crawler_account_manager": "抓取账号管理",
+            "article_pool_category_generation": "文章池品类生成",
+            "candidate_account_quality_analysis": "候选账号质量分析",
+            "article_pool_cold_start": "文章路冷启动",
+            "crawler_toutiao": "头条抓取",
+            "task_processing_monitor": "协程监测",
+            "update_root_source_id": "更新今日root_source_id",
+            "daily_publish_articles_recycle": "回收今日发文",
+            "inner_article_monitor": "账号发文违规监测",
+            "outside_article_monitor": "外部服务号发文监测",
+            "get_off_videos": "自动下架视频",
+            "check_publish_video_audit_status": "校验发布视频状态",
+            "check_kimi_balance": "检验kimi余额",
+            "account_category_analysis": "账号品类分析",
+            "mini_program_detail_process": "更新小程序信息",
+            "crawler_detail_analysis": "抓取详情分析",
+            "limited_account_analysis": "限流账号分析处理",
+            "auto_follow_account": "自动关注账号",
+            "update_account_open_rate_avg": "更新账号平均打开率",
+            "update_limited_account_info": "更新限流账号信息",
+            "update_account_read_avg": "更新账号平均阅读率",
+            "get_follow_result": "获取自动关注回复",
+            "extract_reply_result": "解析自动回复结果",
+        }
     )
 
     model_config = SettingsConfigDict(
-        env_prefix="TASK_CHINESE_NAME_", env_file=".env", case_sensitive=False, extra="ignore"
+        env_prefix="TASK_CHINESE_NAME_",
+        env_file=".env",
+        case_sensitive=False,
+        extra="ignore",
     )

+ 1 - 1
app/core/database/mysql_pools.py

@@ -33,7 +33,7 @@ class DatabaseManager(LogService):
                     autocommit=True,
                 )
                 self.pools[db_name] = pool
-                print(f"Pool for {db_name} created successfully")
+                print(f"DETAIL\t{db_name} MYSQL连接池 created successfully")
 
             except Exception as e:
                 await self.log(

+ 1 - 2
app/core/dependency/__init__.py

@@ -1,2 +1 @@
-from .dependencies import log_service, mysql_manager
-from .dependencies import apollo_client, es_client
+from .dependencies import ServerContainer

+ 10 - 26
app/core/dependency/dependencies.py

@@ -1,37 +1,21 @@
-import logging
+from dependency_injector import containers, providers
 
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
-from app.infra.external import AsyncApolloApi
-from app.infra.external import AsyncElasticSearchClient
 
 
-logging.basicConfig(level=logging.INFO)
+class ServerContainer(containers.DeclarativeContainer):
+    # config
+    config = providers.Singleton(GlobalConfigSettings)
 
-logging.info("开始加载全局配置")
-config = GlobalConfigSettings()
-logging.info("全局配置加载完成")
+    # 阿里云日志
+    log_service = providers.Singleton(LogService, log_config=config.provided.aliyun_log)
 
-logging.info("开始加载日志服务")
-log_service = LogService(config.aliyun_log)
-logging.info("日志服务加载完成")
+    # MySQL
+    mysql_manager = providers.Singleton(DatabaseManager, config=config)
 
-logging.info("开始创建数据库连接池")
-mysql_manager = DatabaseManager(config)
-logging.info("数据库连接池创建完成")
 
-logging.info("开始创建Apollo客户端")
-apollo_client = AsyncApolloApi(config.apollo, None, None)
-logging.info("Apollo客户端创建完成")
-
-logging.info("开始创建ElasticSearch客户端")
-es_client = AsyncElasticSearchClient(index_=config.elasticsearch.index)
-logging.info("ElasticSearch客户端创建完成")
-
-__ALL__ = [
-    "log_service",
-    "mysql_manager",
-    "apollo_client",
-    "es_client",
+__all__ = [
+    "ServerContainer",
 ]

+ 68 - 26
app/core/observability/logging/log_service.py

@@ -1,53 +1,95 @@
 import asyncio
 import traceback
-import time
-import json
+import time, json
 import datetime
-from aliyun.log import LogClient, PutLogsRequest, LogItem
+import contextlib
+from typing import Optional
 
+from aliyun.log import LogClient, PutLogsRequest, LogItem
 from app.core.config.settings import AliyunLogConfig
 
 
 class LogService:
     def __init__(self, log_config: AliyunLogConfig):
-        endpoint = log_config.endpoint
-        access_key_id = log_config.access_key_id
-        access_key_secret = log_config.access_key_secret
-        self.client = LogClient(endpoint, access_key_id, access_key_secret)
-        self.project = log_config.project
-        self.logstore = log_config.logstore
-        self.queue = asyncio.Queue()
-        self.running = False
+        self.config = log_config
+
+        self.client: Optional[LogClient] = None
+        self.queue: Optional[asyncio.Queue] = None
+
+        self._worker_task: Optional[asyncio.Task] = None
+        self._running = False
 
     async def start(self):
-        self.running = True
-        asyncio.create_task(self._worker())
+        if self._running:
+            return
+
+        self.client = LogClient(
+            self.config.endpoint,
+            self.config.access_key_id,
+            self.config.access_key_secret,
+        )
+        self.queue = asyncio.Queue(maxsize=10000)
+
+        self._running = True
+        self._worker_task = asyncio.create_task(self._worker())
 
     async def stop(self):
-        self.running = False
+        if not self._running:
+            return
+
+        self._running = False
+
+        if self._worker_task:
+            self._worker_task.cancel()
+            with contextlib.suppress(asyncio.CancelledError):
+                await self._worker_task
+
+        self._worker_task = None
+        self.queue = None
+        self.client = None
 
     async def log(self, contents: dict):
-        """外部调用日志接口"""
-        await self.queue.put(contents)
+        if not self._running or self.queue is None:
+            return
+
+        try:
+            self.queue.put_nowait(contents)
+        except asyncio.QueueFull:
+            # 可以打 stderr / 统计丢日志数量
+            pass
 
     async def _worker(self):
-        while self.running:
-            contents = await self.queue.get()
-            try:
-                await asyncio.to_thread(self._put_log, contents)
-            except Exception as e:
-                print(f"[Log Error] {e}")
-                print(traceback.format_exc())
+        try:
+            while self._running:
+                contents = await self.queue.get()
+                try:
+                    await asyncio.to_thread(self._put_log, contents)
+                except Exception as e:
+                    print(f"[Log Error] {e}")
+                    print(traceback.format_exc())
+        except asyncio.CancelledError:
+            pass
 
     def _put_log(self, contents: dict):
         timestamp = int(time.time())
-        contents["datetime"] = datetime.datetime.now().__str__()
+        contents["datetime"] = datetime.datetime.now().isoformat()
+
         safe_items = [
-            (str(k), json.dumps(v) if isinstance(v, (dict, list)) else str(v))
+            (
+                str(k),
+                json.dumps(v, ensure_ascii=False)
+                if isinstance(v, (dict, list))
+                else str(v),
+            )
             for k, v in contents.items()
         ]
+
         log_item = LogItem(timestamp=timestamp, contents=safe_items)
         req = PutLogsRequest(
-            self.project, self.logstore, topic="", source="", logitems=[log_item]
+            self.config.project,
+            self.config.logstore,
+            topic="",
+            source="",
+            logitems=[log_item],
         )
         self.client.put_logs(req)

+ 8 - 8
app/core/pipeline/crawler_pipeline.py

@@ -1,10 +1,8 @@
 from typing import Dict, Tuple
-
 from pydantic import BaseModel
 
-from app.infra.utils import CrawlerMetaArticle
-from app.infra.utils import CrawlerMetaAccount
-from app.core.dependency import apollo_client
+from app.infra.external import AsyncApolloApi
+from .schemas import CrawlerMetaArticle, CrawlerMetaAccount
 
 
 class CrawlerPipeline:
@@ -14,13 +12,15 @@ class CrawlerPipeline:
         # 如后续有新类型,直接在这里加即可
     }
 
-    def __init__(self, pool, log_client):
+    def __init__(self, pool, log_client, config):
         self.pool = pool
         self.log_client = log_client
+        self.apollo_client = AsyncApolloApi(apollo_config=config, app_id=None, env=None)
 
-    @staticmethod
-    async def whether_title_sensitive(title: str) -> bool:
-        sensitive_word_list = await apollo_client.get_config_value("sensitive_word_list")
+    async def whether_title_sensitive(self, title: str) -> bool:
+        sensitive_word_list = await self.apollo_client.get_config_value(
+            "sensitive_word_list"
+        )
         for word in sensitive_word_list:
             if word in title:
                 return True

+ 0 - 0
app/infra/utils/item.py → app/core/pipeline/schemas.py


+ 11 - 6
app/domains/cold_start_tasks/article_pool_cold_start.py

@@ -10,11 +10,11 @@ from pandas import DataFrame
 from tqdm.asyncio import tqdm
 
 from app.infra.external import feishu_robot
+from app.infra.external import AsyncApolloApi
+
 from app.infra.internal import auto_create_crawler_task
 from app.infra.internal import auto_bind_crawler_task_to_generate_task
 from app.infra.internal import get_titles_from_produce_plan
-from app.core.dependency import apollo_client
-from app.core.config.settings import ColdStartConfig
 
 from app.domains.cold_start_tasks.article_pool import (
     ArticlePoolColdStartStrategy,
@@ -23,9 +23,12 @@ from app.domains.cold_start_tasks.article_pool import (
 
 
 class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
-    def __init__(self, pool, log_client,  trace_id, config: ColdStartConfig):
+    def __init__(self, pool, log_client, trace_id, global_config):
         super().__init__(pool, log_client, trace_id)
-        self.config = config
+        self.config = global_config.cold_start
+        self.apollo_client = AsyncApolloApi(
+            apollo_config=global_config.apollo, app_id=None, env=None
+        )
 
     async def get_article_from_meta_table(
         self, platform: str, crawl_method: str, strategy: str, category: str | None
@@ -197,7 +200,9 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         match strategy:
             case "strategy_v1":
                 # split article into each category
-                category_list = await apollo_client.get_config_value(key="category_list")
+                category_list = await self.apollo_client.get_config_value(
+                    key="category_list"
+                )
                 for ai_category in category_list:
                     filter_category_df = filter_article_df[
                         filter_article_df["category_by_ai"] == ai_category
@@ -366,7 +371,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                     }
                 )
 
-                crawl_methods_map = await apollo_client.get_config_value(
+                crawl_methods_map = await self.apollo_client.get_config_value(
                     key="category_cold_start_map"
                 )
 

+ 7 - 7
app/domains/crawler_tasks/crawler_gzh.py

@@ -7,13 +7,13 @@ from datetime import datetime, date, timedelta
 from typing import List, Dict
 from tqdm.asyncio import tqdm
 
+from app.infra.internal import get_hot_titles
 from app.infra.external import feishu_robot
 from app.infra.crawler.wechat import weixin_search
 from app.infra.crawler.wechat import get_article_detail
 from app.infra.crawler.wechat import get_article_list_from_account
 from app.core.pipeline import CrawlerPipeline
 from app.infra.shared.tools import timestamp_to_str, show_desc_to_sta, generate_gzh_id
-from app.infra.utils import get_hot_titles
 
 
 class CrawlerGzhConst:
@@ -38,8 +38,8 @@ class CrawlerGzhConst:
 
 
 class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
-    def __init__(self, pool, log_client, trace_id):
-        super().__init__(pool, log_client)
+    def __init__(self, pool, log_client, trace_id, config):
+        super().__init__(pool, log_client, config.apollo)
         self.trace_id = trace_id
 
     async def get_crawler_accounts(self, method: str, strategy: str) -> List[Dict]:
@@ -209,8 +209,8 @@ class CrawlerGzhBaseStrategy(CrawlerPipeline, CrawlerGzhConst):
 
 
 class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
-    def __init__(self, pool, log_client, trace_id):
-        super().__init__(pool, log_client, trace_id)
+    def __init__(self, pool, log_client, trace_id, config):
+        super().__init__(pool, log_client, trace_id, config)
 
     async def insert_article_into_meta(self, gh_id, account_method, msg_list):
         """
@@ -300,8 +300,8 @@ class CrawlerGzhAccountArticles(CrawlerGzhBaseStrategy):
 
 
 class CrawlerGzhSearchArticles(CrawlerGzhBaseStrategy):
-    def __init__(self, pool, log_client, trace_id):
-        super().__init__(pool, log_client, trace_id)
+    def __init__(self, pool, log_client, trace_id, config):
+        super().__init__(pool, log_client, trace_id, config)
 
     async def crawl_search_articles_detail(
         self, article_list: List[Dict], source_title: str

+ 16 - 9
app/domains/crawler_tasks/crawler_toutiao.py

@@ -1,8 +1,7 @@
 from __future__ import annotations
 
 import asyncio
-import json
-import time
+import time, json
 import aiohttp
 import traceback
 from datetime import datetime
@@ -10,15 +9,17 @@ from typing import List
 
 from tqdm import tqdm
 
-from app.infra.external import feishu_robot
 from app.infra.crawler.toutiao import get_toutiao_account_info_list
 from app.infra.crawler.toutiao import search_in_toutiao
 from app.infra.crawler.toutiao import get_toutiao_detail
+from app.infra.external import feishu_robot
+from app.infra.internal import get_top_article_title_list
 from app.infra.shared.tools import async_proxy
-from app.infra.utils import get_top_article_title_list
 
 from app.core.pipeline import CrawlerPipeline
-
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
 
 
 class CrawlerToutiaoConst:
@@ -50,8 +51,14 @@ class CrawlerToutiaoConst:
 
 
 class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
-    def __init__(self, pool, log_client, trace_id):
-        super().__init__(pool, log_client)
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        log_client: LogService,
+        trace_id: str,
+        config: GlobalConfigSettings,
+    ):
+        super().__init__(pool, log_client, config.apollo)
         self.trace_id = trace_id
 
     async def get_request_params(self, category):
@@ -127,7 +134,7 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
         has_more = True
         current_cursor = max_behot_time
         max_cursor = max_cursor or self.DEFAULT_CURSOR
-        cookie = await self.get_config_value(
+        cookie = await self.apollo_client.get_config_value(
             key="toutiao_blogger_cookie", output_type="string"
         )
         while has_more:
@@ -413,7 +420,7 @@ class CrawlerToutiao(CrawlerPipeline, CrawlerToutiaoConst):
     # 搜索抓账号
     async def search_candidate_accounts(self):
         top_title_list = await get_top_article_title_list(pool=self.pool)
-        cookie = await self.get_config_value(
+        cookie = await self.apollo_client.get_config_value(
             key="toutiao_blogger_cookie", output_type="string"
         )
         for article in top_title_list:

+ 3 - 3
app/domains/data_recycle_tasks/article_detail_stat.py

@@ -312,7 +312,7 @@ class ArticleDetailStat(ArticleDetailStatMapper):
                     contents={
                         "task": "article_detail_stat",
                         "account_name": account["account_name"],
-                        "status": "success"
+                        "status": "success",
                     }
                 )
             except Exception as e:
@@ -322,6 +322,6 @@ class ArticleDetailStat(ArticleDetailStatMapper):
                         "account_name": account["account_name"],
                         "error": str(e),
                         "traceback": traceback.format_exc(),
-                        "status": "fail"
+                        "status": "fail",
                     }
-                )
+                )

+ 1 - 1
app/domains/data_recycle_tasks/recycle_daily_publish_articles.py

@@ -47,7 +47,7 @@ class Const:
         "gh_72bace6b3059",
         "gh_dd4c857bbb36",
         "gh_ff487cb5dab3",
-        "gh_ac43eb24376d"
+        "gh_ac43eb24376d",
     ]
 
     # NOT USED SERVER ACCOUNT

+ 1 - 2
app/domains/monitor_tasks/cooperate_accounts_monitor.py

@@ -286,7 +286,7 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper):
                     single_article["send_time"],
                     self.extract_wx_sn(single_article["ContentUrl"]),
                     show_stat.get("show_view_count", 0),
-                    show_stat.get("show_like_count", 0)
+                    show_stat.get("show_like_count", 0),
                 )
                 params.append(single_param)
 
@@ -387,4 +387,3 @@ class CooperateAccountsMonitorTask(CooperateAccountsMonitorMapper):
                 else:
                     print("没有需要处理详情的账号")
                     return
-

+ 1 - 1
app/infra/crawler/wechat/__init__.py

@@ -1,3 +1,3 @@
 from .gzh_spider import *
 from .gzh_fans import *
-from .gzh_article_stat import *
+from .gzh_article_stat import *

+ 2 - 7
app/infra/crawler/wechat/gzh_article_stat.py

@@ -4,18 +4,13 @@ from app.infra.shared import AsyncHttpClient
 # 抓取公众号粉丝
 async def get_gzh_stat_daily(access_token: str, date_string: str):
     url = f"https://api.weixin.qq.com/datacube/getarticletotaldetail?access_token={access_token}"
-    data = {
-        "begin_date": date_string,
-        "end_date": date_string
-    }
+    data = {"begin_date": date_string, "end_date": date_string}
     headers = {
         "user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
-        "Content-Type": "application/json"
+        "Content-Type": "application/json",
     }
     # 发送请求
     async with AsyncHttpClient(timeout=10) as http_client:
         response = await http_client.post(url, headers=headers, json=data)
 
     return response
-
-

+ 1 - 5
app/infra/crawler/wechat/gzh_fans.py

@@ -48,11 +48,7 @@ async def get_gzh_fans(token, cookie, cursor_id, cursor_timestamp):
 # 获取 access_token
 async def get_access_token(app_id, app_secret):
     url = f"https://api.weixin.qq.com/cgi-bin/stable_token"
-    data = {
-        "grant_type": "client_credential",
-        "appid": app_id,
-        "secret": app_secret
-    }
+    data = {"grant_type": "client_credential", "appid": app_id, "secret": app_secret}
     async with AsyncHttpClient(timeout=100) as http_client:
         response = await http_client.post(url, json=data)
 

+ 3 - 1
app/infra/crawler/wechat/gzh_spider.py

@@ -49,7 +49,9 @@ async def get_article_detail(
 
 
 @retry(**retry_desc)
-async def get_article_list_from_account(account_id: str, index=None, is_cache=True) -> dict | None:
+async def get_article_list_from_account(
+    account_id: str, index=None, is_cache=True
+) -> dict | None:
     target_url = f"{base_url}/blogger"
     payload = json.dumps(
         {

+ 0 - 1
app/infra/external/__init__.py

@@ -16,4 +16,3 @@ __all__ = [
     "log",
     "AsyncElasticSearchClient",
 ]
-

+ 3 - 5
app/infra/external/apollo.py

@@ -14,7 +14,7 @@ class AsyncApolloClient:
     def __init__(
         self,
         app_id,
-        config_server_url = "http://localhost:8080",
+        config_server_url="http://localhost:8080",
         cluster="default",
         timeout=35,
         ip=None,
@@ -138,10 +138,8 @@ class AsyncApolloClient:
 
 class AsyncApolloApi(AsyncApolloClient):
     def __init__(
-            self,
-            apollo_config: ApolloConfig,
-            app_id: str | None ,
-            env: str | None):
+        self, apollo_config: ApolloConfig, app_id: str | None, env: str | None
+    ):
         if not app_id:
             app_id = apollo_config.app_id
 

+ 1 - 3
app/infra/external/deepseek_official.py

@@ -36,9 +36,7 @@ def fetch_deepseek_completion(
         kwargs["tools"] = tools
         kwargs["tool_choice"] = "auto"
 
-    client = OpenAI(
-        api_key=api_key, base_url="https://api.deepseek.com"
-    )
+    client = OpenAI(api_key=api_key, base_url="https://api.deepseek.com")
 
     if output_type == "json":
         kwargs["response_format"] = {"type": "json_object"}

+ 3 - 1
app/infra/external/elastic_search.py

@@ -8,7 +8,9 @@ class AsyncElasticSearchClient:
     def __init__(self, index_):
         self.password = "nkvvASQuQ0XUGRq5OLvm"
         self.hosts = ["https://192.168.205.85:9200", "https://192.168.205.85:9300"]
-        self.ctx = ssl.create_default_context(cafile="app/core/config/cert/es_certs.crt")
+        self.ctx = ssl.create_default_context(
+            cafile="app/core/config/cert/es_certs.crt"
+        )
         self.es = AsyncElasticsearch(
             self.hosts, basic_auth=("elastic", self.password), ssl_context=self.ctx
         )

+ 5 - 0
app/infra/internal/__init__.py

@@ -10,6 +10,9 @@ from .aigc_system import auto_bind_crawler_task_to_generate_task
 from .aigc_system import insert_crawler_relation_to_aigc_system
 from .aigc_system import get_titles_from_produce_plan
 
+# long_articles
+from .long_articles import get_top_article_title_list
+from .long_articles import get_hot_titles
 
 __all__ = [
     "change_video_audit_status",
@@ -20,4 +23,6 @@ __all__ = [
     "auto_bind_crawler_task_to_generate_task",
     "insert_crawler_relation_to_aigc_system",
     "get_titles_from_produce_plan",
+    "get_top_article_title_list",
+    "get_hot_titles",
 ]

+ 4 - 2
app/infra/utils/async_mysql_utils.py → app/infra/internal/long_articles.py

@@ -1,7 +1,9 @@
 from typing import List, Dict
 
+from app.core.database import DatabaseManager
 
-async def get_top_article_title_list(pool) -> List[Dict]:
+
+async def get_top_article_title_list(pool: DatabaseManager) -> List[Dict]:
     query = """
         select distinct title, source_id
         from datastat_sort_strategy
@@ -11,7 +13,7 @@ async def get_top_article_title_list(pool) -> List[Dict]:
 
 
 async def get_hot_titles(
-    pool, date_string, position, read_times_threshold
+    pool: DatabaseManager, date_string: str, position: int, read_times_threshold: float
 ) -> List[str]:
     """get titles of hot articles"""
     query = """

+ 2 - 1
app/infra/shared/tools.py

@@ -1,6 +1,7 @@
 """
 @author: luojunhui
 """
+
 import oss2
 import random
 import string
@@ -247,4 +248,4 @@ def upload_to_oss(local_video_path, oss_key):
         oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
     )
     bucket.put_object_from_file(key=oss_key, filename=local_video_path)
-    return oss_key
+    return oss_key

+ 0 - 15
app/infra/utils/__init__.py

@@ -1,8 +1,3 @@
-# import async apollo client
-# from .async_apollo_client import AsyncApolloClient
-
-# import async http client
-
 from .get_cover import fetch_channel_info
 from .get_cover import fetch_aigc_cover
 from .get_cover import fetch_long_video_cover
@@ -10,15 +5,5 @@ from .get_cover import fetch_long_video_cover
 # server response
 from .response import TaskScheduleResponse
 
-# common
-
-# import item
-from .item import CrawlerMetaArticle
-from .item import CrawlerMetaAccount
-
-# mysql utils
-from .async_mysql_utils import *
-
 # async tasks
-
 task_schedule_response = TaskScheduleResponse()

+ 0 - 3
app/jobs/__init__.py

@@ -1,3 +0,0 @@
-from .task_scheduler import TaskScheduler
-
-__all__ = ["TaskScheduler"]

+ 14 - 6
app/jobs/task_handler.py

@@ -65,7 +65,14 @@ class TaskHandler:
     # 任务注册表
     _handlers = _TASK_HANDLER_REGISTRY
 
-    def __init__(self, data: dict, log_service, db_client, trace_id: str, config: GlobalConfigSettings):
+    def __init__(
+        self,
+        data: dict,
+        log_service,
+        db_client,
+        trace_id: str,
+        config: GlobalConfigSettings,
+    ):
         self.data = data
         self.log_client = log_service
         self.db_client = db_client
@@ -158,7 +165,9 @@ class TaskHandler:
     @register("crawler_toutiao")
     async def _crawler_toutiao_handler(self) -> int:
         """头条文章/视频抓取"""
-        sub_task = CrawlerToutiao(self.db_client, self.log_client, self.trace_id)
+        sub_task = CrawlerToutiao(
+            self.db_client, self.log_client, self.trace_id, self.config
+        )
         method = self.data.get("method", "account")
         media_type = self.data.get("media_type", "article")
         category_list = self.data.get("category_list", [])
@@ -185,12 +194,12 @@ class TaskHandler:
         match crawl_mode:
             case "account":
                 task = CrawlerGzhAccountArticles(
-                    self.db_client, self.log_client, self.trace_id
+                    self.db_client, self.log_client, self.trace_id, self.config
                 )
                 await task.deal(account_method, strategy)
             case "search":
                 task = CrawlerGzhSearchArticles(
-                    self.db_client, self.log_client, self.trace_id
+                    self.db_client, self.log_client, self.trace_id, self.config
                 )
                 await task.deal(strategy)
             case _:
@@ -230,7 +239,6 @@ class TaskHandler:
         await task.deal()
         return TaskStatus.SUCCESS
 
-
     @register("daily_publish_articles_recycle")
     async def _recycle_article_data_handler(self) -> int:
         """每日发文数据回收"""
@@ -282,7 +290,7 @@ class TaskHandler:
     async def _article_pool_cold_start_handler(self) -> int:
         """文章池冷启动"""
         cold_start = ArticlePoolColdStart(
-            self.db_client, self.log_client, self.trace_id, self.config.cold_start
+            self.db_client, self.log_client, self.trace_id, self.config
         )
         platform = self.data.get("platform", "weixin")
         crawler_methods = self.data.get("crawler_methods", [])

+ 16 - 13
task_app.py

@@ -1,35 +1,38 @@
 import logging
 
+logging.basicConfig(level=logging.INFO)
+
 from quart_cors import cors
 from quart import Quart
 
-from app.core.config import GlobalConfigSettings
-from app.core.dependency import log_service, mysql_manager
+from app.core.bootstrap import AppContext
+from app.core.dependency import ServerContainer
 from app.api.v1 import server_routes
 
-
 app = Quart(__name__)
 app = cors(app, allow_origin="*")
-config = GlobalConfigSettings()
+
+server_container = ServerContainer()
+ctx = AppContext(server_container)
+
+config = server_container.config()
+log_service = server_container.log_service()
+mysql_manager = server_container.mysql_manager()
+
 
 routes = server_routes(mysql_manager, log_service, config)
 app.register_blueprint(routes)
 
-logging.basicConfig(level=logging.INFO)
 
 @app.before_serving
 async def startup():
     logging.info("Starting application...")
-    await mysql_manager.init_pools()
-    logging.info("Mysql pools init successfully")
-    await log_service.start()
-    logging.info("aliyun log service init successfully")
+    await ctx.start_up()
+    logging.info("Application started successfully")
 
 
 @app.after_serving
 async def shutdown():
     logging.info("Shutting down application...")
-    await mysql_manager.close_pools()
-    logging.info("Mysql pools close successfully")
-    await log_service.stop()
-    logging.info("aliyun log service stop successfully")
+    await ctx.shutdown()
+    logging.info("Application shutdown successfully")