| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123 |
- """
- @description: 服务号分组群发检测任务
- """
- from datetime import datetime
- from enum import IntEnum
- from typing import List, Dict
- from app.core.database import DatabaseManager
- from app.core.observability import LogService
- from app.infra.external import feishu_robot
- class FwhGroupPublishConst:
- """
- @description: 服务号分组群发常量
- """
- NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
- class PublishStatus(IntEnum):
- INIT = 0
- SUCCESS = 2
- class FwhGroupPublishMapper(FwhGroupPublishConst):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- self.pool = pool
- self.log_service = log_service
- async def get_group_server_accounts(self):
- query = "select gzh_id from article_gzh_developer;"
- fetch_response = await self.pool.async_fetch(
- query=query, db_name="piaoquan_crawler"
- )
- gh_id_list = [
- i["gzh_id"]
- for i in fetch_response
- if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
- ]
- return gh_id_list
- async def get_group_send_account_summary(self, publish_date: str) -> List[Dict]:
- query = """
- SELECT publish_date AS dt, gh_id, account_name, CAST(SUM(total_sent_fans) AS SIGNED) AS sent_fans
- FROM (
- SELECT publish_date, account_name, gh_id, push_id, avg(sent_count) AS total_sent_fans
- FROM long_articles_group_send_result
- WHERE publish_date = %s AND status = %s
- GROUP BY publish_date, account_name, push_id
- ) AS lags
- GROUP BY lags.publish_date, lags.gh_id;
- """
- return await self.pool.async_fetch(
- query=query, params=(publish_date, self.PublishStatus.SUCCESS)
- )
- class FwhGroupPublishUtils(FwhGroupPublishMapper):
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- super().__init__(pool, log_service)
- @staticmethod
- def generate_bot_columns():
- return [
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="account_name",
- display_name="公众号名称",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="gh_id",
- display_name="服务号 gh_id",
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
- ),
- feishu_robot.create_feishu_columns_sheet(
- sheet_type="plain_text",
- sheet_name="dt",
- display_name="发文日期",
- ),
- ]
- class FwhGroupPublishMonitor(FwhGroupPublishUtils):
- """
- @description: 服务号分组群发检测
- """
- def __init__(self, pool: DatabaseManager, log_service: LogService):
- super().__init__(pool, log_service)
- # bot
- async def bot_fwh_publish_cnt(self, publish_date: str | None):
- if not publish_date:
- publish_date = datetime.now().strftime("%Y-%m-%d")
- publish_detail = await self.get_group_send_account_summary(publish_date)
- sorted_detail = sorted(
- publish_detail, key=lambda x: x["sent_fans"], reverse=True
- )
- await feishu_robot.bot(
- title=f"{publish_date}服务号分组群发检测统计(手动触发)",
- detail={
- "columns": self.generate_bot_columns(),
- "rows": sorted_detail,
- },
- mention=False,
- table=True,
- env="server_account_publish_monitor",
- )
- # main
- async def deal(self, task_name: str, publish_date: str | None = None):
- match task_name:
- case "bot":
- await self.bot_fwh_publish_cnt(publish_date)
- case _:
- print(f"未处理任务 {task_name}")
|