""" @description: 服务号分组群发检测任务 """ from datetime import datetime from enum import IntEnum from typing import List, Dict from app.core.database import DatabaseManager from app.core.observability import LogService from app.infra.external import feishu_robot class FwhGroupPublishConst: """ @description: 服务号分组群发常量 """ NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"} class PublishStatus(IntEnum): INIT = 0 SUCCESS = 2 class FwhGroupPublishMapper(FwhGroupPublishConst): def __init__(self, pool: DatabaseManager, log_service: LogService): self.pool = pool self.log_service = log_service async def get_group_server_accounts(self): query = "select gzh_id from article_gzh_developer;" fetch_response = await self.pool.async_fetch( query=query, db_name="piaoquan_crawler" ) gh_id_list = [ i["gzh_id"] for i in fetch_response if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT ] return gh_id_list async def get_group_send_account_summary(self, publish_date: str) -> List[Dict]: query = """ SELECT publish_date AS dt, gh_id, account_name, CAST(SUM(total_sent_fans) AS SIGNED) AS sent_fans FROM ( SELECT publish_date, account_name, gh_id, push_id, avg(sent_count) AS total_sent_fans FROM long_articles_group_send_result WHERE publish_date = %s AND status = %s GROUP BY publish_date, account_name, push_id ) AS lags GROUP BY lags.publish_date, lags.gh_id; """ return await self.pool.async_fetch( query=query, params=(publish_date, self.PublishStatus.SUCCESS) ) class FwhGroupPublishUtils(FwhGroupPublishMapper): def __init__(self, pool: DatabaseManager, log_service: LogService): super().__init__(pool, log_service) @staticmethod def generate_bot_columns(): return [ feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="account_name", display_name="公众号名称", ), feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="gh_id", display_name="服务号 gh_id", ), feishu_robot.create_feishu_columns_sheet( sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数" ), feishu_robot.create_feishu_columns_sheet( sheet_type="plain_text", sheet_name="dt", display_name="发文日期", ), ] class FwhGroupPublishMonitor(FwhGroupPublishUtils): """ @description: 服务号分组群发检测 """ def __init__(self, pool: DatabaseManager, log_service: LogService): super().__init__(pool, log_service) # bot async def bot_fwh_publish_cnt(self, publish_date: str | None): if not publish_date: publish_date = datetime.now().strftime("%Y-%m-%d") publish_detail = await self.get_group_send_account_summary(publish_date) sorted_detail = sorted( publish_detail, key=lambda x: x["sent_fans"], reverse=True ) await feishu_robot.bot( title=f"{publish_date}服务号分组群发检测统计(手动触发)", detail={ "columns": self.generate_bot_columns(), "rows": sorted_detail, }, mention=False, table=True, env="server_account_publish_monitor", ) # main async def deal(self, task_name: str, publish_date: str | None = None): match task_name: case "bot": await self.bot_fwh_publish_cnt(publish_date) case _: print(f"未处理任务 {task_name}")