limited_account_analysis.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. from datetime import datetime, timedelta
  2. from app.infra.external import feishu_sheet
  3. class LimitedAccountAnalysisConst:
  4. FIRST_POSITION = 1
  5. LIMIT_THRESHOLD = 0.2
  6. LIMIT_READ_AVG = 100
  7. TABLE_ID = "MqkxwleLFiNOwHkU3crcXpWlnfe"
  8. DETAIL_SHEET_ID = "6fa679"
  9. SUMMARY_SHEET_ID = "uvli3P"
  10. class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
  11. def __init__(self, pool, log_client):
  12. self.pool = pool
  13. self.log_client = log_client
  14. async def get_limited_account_detail(self, date_str):
  15. query = """
  16. SELECT date_str,
  17. COALESCE(account_mode, '公众号投流') AS account_mode,
  18. account_source, account_name,
  19. fans, title, view_count, avg_view_count
  20. FROM datastat_sort_strategy
  21. WHERE date_str = %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
  22. """
  23. account_detail = await self.pool.async_fetch(
  24. query=query,
  25. params=(
  26. date_str,
  27. self.FIRST_POSITION,
  28. self.LIMIT_THRESHOLD,
  29. self.LIMIT_READ_AVG,
  30. ),
  31. )
  32. return account_detail
  33. async def get_limited_account_summary(self, date_str):
  34. query = """
  35. SELECT
  36. date_str,
  37. COALESCE(account_mode, '公众号投流') AS account_mode_label,
  38. CAST(
  39. SUM(
  40. CASE
  41. WHEN IFNULL(read_rate, 0) < %s THEN fans
  42. ELSE 0
  43. END
  44. ) AS SIGNED
  45. ) AS limit_fans,
  46. CAST(SUM(fans) AS SIGNED) AS total_fans
  47. FROM datastat_sort_strategy
  48. WHERE position = %s AND date_str = %s
  49. GROUP BY
  50. date_str,
  51. COALESCE(account_mode, '公众号投流');
  52. """
  53. account_summary = await self.pool.async_fetch(
  54. query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str)
  55. )
  56. return account_summary
  57. async def insert_into_detail_table(self, detail_data):
  58. insert_array = []
  59. for row in detail_data:
  60. insert_array.append(
  61. [
  62. row["date_str"],
  63. row["account_mode"],
  64. row["account_source"],
  65. row["account_name"],
  66. row["fans"],
  67. row["title"],
  68. row["view_count"],
  69. row["avg_view_count"],
  70. ]
  71. )
  72. await feishu_sheet.fetch_token()
  73. await feishu_sheet.prepend_value(
  74. sheet_token=self.TABLE_ID,
  75. sheet_id=self.DETAIL_SHEET_ID,
  76. ranges=f"A2:H{2 + len(detail_data)}",
  77. values=insert_array,
  78. )
  79. async def insert_into_summary_table(self, summary_data):
  80. insert_array = []
  81. for row in summary_data:
  82. insert_array.append(
  83. [
  84. row["date_str"],
  85. row["account_mode_label"],
  86. row["limit_fans"],
  87. row["total_fans"],
  88. ]
  89. )
  90. await feishu_sheet.fetch_token()
  91. await feishu_sheet.prepend_value(
  92. sheet_token=self.TABLE_ID,
  93. sheet_id=self.SUMMARY_SHEET_ID,
  94. ranges=f"A2:E{2 + len(summary_data)}",
  95. values=insert_array,
  96. )
  97. async def deal(self, date_string: str = None) -> None:
  98. """处理受限账号分析任务
  99. Args:
  100. date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
  101. """
  102. # 如果没有提供日期,默认使用昨天
  103. if not date_string:
  104. date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  105. # 统一日期格式为YYYYMMDD
  106. date_string = date_string.replace("-", "").replace("/", "")
  107. # 验证日期格式
  108. if len(date_string) != 8 or not date_string.isdigit():
  109. raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
  110. try:
  111. detail_data = await self.get_limited_account_detail(date_str=date_string)
  112. summary_data = await self.get_limited_account_summary(date_str=date_string)
  113. # 如果有数据才打印,避免空数据输出
  114. if detail_data:
  115. await self.insert_into_detail_table(detail_data)
  116. else:
  117. print(f"在 {date_string} 没有找到受限账号数据")
  118. if summary_data:
  119. await self.insert_into_summary_table(summary_data)
  120. else:
  121. print(f"在 {date_string} 没有找到受限账号摘要数据")
  122. except Exception as e:
  123. print(f"处理受限账号分析时出错: {e}")
  124. raise