fwh_group_publish_monitor.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. """
  2. @description: 服务号分组群发检测任务
  3. """
  4. from datetime import datetime
  5. from enum import IntEnum
  6. from typing import List, Dict
  7. from app.core.database import DatabaseManager
  8. from app.core.observability import LogService
  9. from app.infra.external import feishu_robot
  10. class FwhGroupPublishConst:
  11. """
  12. @description: 服务号分组群发常量
  13. """
  14. NOT_USED_SERVER_ACCOUNT = {"gh_84e744b16b3a", "gh_5855bed97938", "gh_61a72b720de3"}
  15. class PublishStatus(IntEnum):
  16. INIT = 0
  17. SUCCESS = 2
  18. class FwhGroupPublishMapper(FwhGroupPublishConst):
  19. def __init__(self, pool: DatabaseManager, log_service: LogService):
  20. self.pool = pool
  21. self.log_service = log_service
  22. async def get_group_server_accounts(self):
  23. query = "select gzh_id from article_gzh_developer;"
  24. fetch_response = await self.pool.async_fetch(
  25. query=query, db_name="piaoquan_crawler"
  26. )
  27. gh_id_list = [
  28. i["gzh_id"]
  29. for i in fetch_response
  30. if i["gzh_id"] not in self.NOT_USED_SERVER_ACCOUNT
  31. ]
  32. return gh_id_list
  33. async def get_group_send_account_summary(self, publish_date: str) -> List[Dict]:
  34. query = """
  35. SELECT publish_date AS dt, gh_id, account_name, CAST(SUM(total_sent_fans) AS SIGNED) AS sent_fans
  36. FROM (
  37. SELECT publish_date, account_name, gh_id, push_id, avg(sent_count) AS total_sent_fans
  38. FROM long_articles_group_send_result
  39. WHERE publish_date = %s AND status = %s
  40. GROUP BY publish_date, account_name, push_id
  41. ) AS lags
  42. GROUP BY lags.publish_date, lags.gh_id;
  43. """
  44. return await self.pool.async_fetch(
  45. query=query, params=(publish_date, self.PublishStatus.SUCCESS)
  46. )
  47. class FwhGroupPublishUtils(FwhGroupPublishMapper):
  48. def __init__(self, pool: DatabaseManager, log_service: LogService):
  49. super().__init__(pool, log_service)
  50. @staticmethod
  51. def generate_bot_columns():
  52. return [
  53. feishu_robot.create_feishu_columns_sheet(
  54. sheet_type="plain_text",
  55. sheet_name="account_name",
  56. display_name="公众号名称",
  57. ),
  58. feishu_robot.create_feishu_columns_sheet(
  59. sheet_type="plain_text",
  60. sheet_name="gh_id",
  61. display_name="服务号 gh_id",
  62. ),
  63. feishu_robot.create_feishu_columns_sheet(
  64. sheet_type="number", sheet_name="sent_fans", display_name="今日发布次数"
  65. ),
  66. feishu_robot.create_feishu_columns_sheet(
  67. sheet_type="plain_text",
  68. sheet_name="dt",
  69. display_name="发文日期",
  70. ),
  71. ]
  72. class FwhGroupPublishMonitor(FwhGroupPublishUtils):
  73. """
  74. @description: 服务号分组群发检测
  75. """
  76. def __init__(self, pool: DatabaseManager, log_service: LogService):
  77. super().__init__(pool, log_service)
  78. # bot
  79. async def bot_fwh_publish_cnt(self, publish_date: str | None):
  80. if not publish_date:
  81. publish_date = datetime.now().strftime("%Y-%m-%d")
  82. publish_detail = await self.get_group_send_account_summary(publish_date)
  83. sorted_detail = sorted(
  84. publish_detail, key=lambda x: x["sent_fans"], reverse=True
  85. )
  86. await feishu_robot.bot(
  87. title=f"{publish_date}服务号分组群发检测统计(手动触发)",
  88. detail={
  89. "columns": self.generate_bot_columns(),
  90. "rows": sorted_detail,
  91. },
  92. mention=False,
  93. table=True,
  94. env="server_account_publish_monitor",
  95. )
  96. # main
  97. async def deal(self, task_name: str, publish_date: str | None = None):
  98. match task_name:
  99. case "bot":
  100. await self.bot_fwh_publish_cnt(publish_date)
  101. case _:
  102. print(f"未处理任务 {task_name}")