luojunhui 1 месяц назад
Родитель
Сommit
d22d6e4a39

+ 2 - 0
app/api/v1/endpoints/__init__.py

@@ -2,10 +2,12 @@ from .abtest import create_abtest_bp
 from .health import create_health_bp
 from .tasks import create_tasks_bp
 from .tokens import create_tokens_bp
+from .monitor import create_monitor_bp
 
 __all__ = [
     "create_abtest_bp",
     "create_health_bp",
     "create_tasks_bp",
     "create_tokens_bp",
+    "create_monitor_bp",
 ]

+ 18 - 0
app/api/v1/endpoints/monitor.py

@@ -0,0 +1,18 @@
+from __future__ import annotations
+
+from quart import Blueprint, jsonify
+
+from app.api.v1.utils import ApiDependencies
+from app.domains.monitor_tasks import FwhGroupPublishMonitor
+
+
+def create_monitor_bp(deps: ApiDependencies) -> Blueprint:
+    bp = Blueprint("monitor", __name__)
+
+    @bp.route("/fwh_group_sent_monitor", methods=["GET"])
+    async def fwh_group_sent_monitor():
+        service = FwhGroupPublishMonitor(deps.db, deps.log)
+        await service.deal(task_name="bot")
+        return jsonify({"status": "success"})
+
+    return bp

+ 2 - 0
app/api/v1/routes/routes.py

@@ -8,6 +8,7 @@ from app.api.v1.endpoints import (
     create_health_bp,
     create_tasks_bp,
     create_tokens_bp,
+    create_monitor_bp,
 )
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
@@ -30,6 +31,7 @@ def register_v1_blueprints(deps: ApiDependencies) -> Blueprint:
     api.register_blueprint(create_tasks_bp(deps))
     api.register_blueprint(create_tokens_bp(deps))
     api.register_blueprint(create_abtest_bp(deps))
+    api.register_blueprint(create_monitor_bp(deps))
 
     return api
 

+ 2 - 0
app/domains/monitor_tasks/__init__.py

@@ -8,6 +8,7 @@ from .limited_account_analysis import LimitedAccountAnalysisTask
 from .task_processing_monitor import TaskProcessingMonitor
 from .auto_reply_cards_monitor import AutoReplyCardsMonitor
 from .cooperate_accounts_monitor import CooperateAccountsMonitorTask
+from .fwh_group_publish_monitor import FwhGroupPublishMonitor
 
 
 __all__ = [
@@ -21,4 +22,5 @@ __all__ = [
     "LimitedAccountAnalysisTask",
     "AutoReplyCardsMonitor",
     "CooperateAccountsMonitorTask",
+    "FwhGroupPublishMonitor",
 ]

+ 123 - 0
app/domains/monitor_tasks/fwh_group_publish_monitor.py

@@ -0,0 +1,123 @@
+"""
+@description: 服务号分组群发检测任务
+"""
+
+from datetime import datetime
+from enum import IntEnum
+from typing import List, Dict
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+from app.infra.external import feishu_robot
+
+
+class FwhGroupPublishConst:
+    """
+    @description: 服务号分组群发常量
+    """
+
+    NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
+
+    class PublishStatus(IntEnum):
+        INIT = 0
+        SUCCESS = 2
+
+
+class FwhGroupPublishMapper(FwhGroupPublishConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+
+    async def get_group_server_accounts(self):
+        query = "select gzh_id from article_gzh_developer;"
+        fetch_response = await self.pool.async_fetch(
+            query=query, db_name="piaoquan_crawler"
+        )
+        gh_id_list = [
+            i["gzh_id"]
+            for i in fetch_response
+            if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
+        ]
+        return gh_id_list
+
+    async def get_group_send_account_summary(self, publish_date: str) -> List[Dict]:
+        query = """
+            SELECT publish_date AS dt, gh_id, account_name, CAST(SUM(total_sent_fans) AS SIGNED) AS sent_fans
+            FROM (
+                     SELECT publish_date, account_name, gh_id, push_id, avg(sent_count) AS total_sent_fans
+                     FROM long_articles_group_send_result
+                     WHERE publish_date = %s  AND status = %s
+                     GROUP BY publish_date, account_name, push_id
+                 ) AS lags
+            GROUP BY lags.publish_date, lags.gh_id;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(publish_date, self.PublishStatus.SUCCESS)
+        )
+
+
+class FwhGroupPublishUtils(FwhGroupPublishMapper):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        super().__init__(pool, log_service)
+
+    @staticmethod
+    def generate_bot_columns():
+        return [
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="plain_text",
+                sheet_name="account_name",
+                display_name="公众号名称",
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="plain_text",
+                sheet_name="gh_id",
+                display_name="服务号 gh_id",
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
+            ),
+            feishu_robot.create_feishu_columns_sheet(
+                sheet_type="plain_text",
+                sheet_name="dt",
+                display_name="发文日期",
+            ),
+        ]
+
+
+class FwhGroupPublishMonitor(FwhGroupPublishUtils):
+    """
+    @description: 服务号分组群发检测
+    """
+
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        super().__init__(pool, log_service)
+
+    # bot
+    async def bot_fwh_publish_cnt(self, publish_date: str | None):
+        if not publish_date:
+            publish_date = datetime.now().strftime("%Y-%m-%d")
+
+        publish_detail = await self.get_group_send_account_summary(publish_date)
+
+        sorted_detail = sorted(
+            publish_detail, key=lambda x: x["sent_fans"], reverse=True
+        )
+        await feishu_robot.bot(
+            title=f"{publish_date}服务号分组群发检测统计(手动触发)",
+            detail={
+                "columns": self.generate_bot_columns(),
+                "rows": sorted_detail,
+            },
+            mention=False,
+            table=True,
+            env="server_account_publish_monitor",
+        )
+
+    # main
+    async def deal(self, task_name: str, publish_date: str | None = None):
+        match task_name:
+            case "bot":
+                await self.bot_fwh_publish_cnt(publish_date)
+
+            case _:
+                print(f"未处理任务 {task_name}")