limited_account_analysis.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. import json
  2. from datetime import datetime, timedelta
  3. from applications.api import feishu_sheet, feishu_robot
  4. class LimitedAccountAnalysisConst:
  5. FIRST_POSITION = 1
  6. LIMIT_THRESHOLD = 0.2
  7. LIMIT_READ_AVG = 100
  8. TABLE_ID = "MqkxwleLFiNOwHkU3crcXpWlnfe"
  9. DETAIL_SHEET_ID = "6fa679"
  10. SUMMARY_SHEET_ID = "uvli3P"
  11. class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
  12. def __init__(self, pool, log_client):
  13. self.pool = pool
  14. self.log_client = log_client
  15. async def get_limited_account_detail(self, date_str):
  16. query = """
  17. SELECT date_str,
  18. COALESCE(account_mode, '公众号投流') AS account_mode,
  19. account_source, account_name,
  20. fans, title, view_count, avg_view_count
  21. FROM datastat_sort_strategy
  22. WHERE date_str = %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
  23. """
  24. account_detail = await self.pool.async_fetch(
  25. query=query,
  26. params=(
  27. date_str,
  28. self.FIRST_POSITION,
  29. self.LIMIT_THRESHOLD,
  30. self.LIMIT_READ_AVG,
  31. ),
  32. )
  33. return account_detail
  34. async def get_limited_account_summary(self, date_str):
  35. query = """
  36. SELECT
  37. date_str,
  38. COALESCE(account_mode, '公众号投流') AS account_mode_label,
  39. CAST(
  40. SUM(
  41. CASE
  42. WHEN IFNULL(read_rate, 0) < %s THEN fans
  43. ELSE 0
  44. END
  45. ) AS SIGNED
  46. ) AS limit_fans,
  47. CAST(SUM(fans) AS SIGNED) AS total_fans
  48. FROM datastat_sort_strategy
  49. WHERE position = %s AND date_str = %s
  50. GROUP BY
  51. date_str,
  52. COALESCE(account_mode, '公众号投流');
  53. """
  54. account_summary = await self.pool.async_fetch(
  55. query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str)
  56. )
  57. return account_summary
  58. async def insert_into_detail_table(self, detail_data):
  59. insert_array = []
  60. for row in detail_data:
  61. insert_array.append(
  62. [
  63. row["date_str"],
  64. row["account_mode"],
  65. row["account_source"],
  66. row["account_name"],
  67. row["fans"],
  68. row["title"],
  69. row["view_count"],
  70. row["avg_view_count"],
  71. ]
  72. )
  73. await feishu_sheet.fetch_token()
  74. await feishu_sheet.prepend_value(
  75. sheet_token=self.TABLE_ID,
  76. sheet_id=self.DETAIL_SHEET_ID,
  77. ranges=f"A2:H{2 + len(detail_data)}",
  78. values=insert_array,
  79. )
  80. async def insert_into_summary_table(self, summary_data):
  81. insert_array = []
  82. for row in summary_data:
  83. insert_array.append(
  84. [
  85. row["date_str"],
  86. row["account_mode_label"],
  87. row["limit_fans"],
  88. row["total_fans"],
  89. ]
  90. )
  91. await feishu_sheet.fetch_token()
  92. await feishu_sheet.prepend_value(
  93. sheet_token=self.TABLE_ID,
  94. sheet_id=self.SUMMARY_SHEET_ID,
  95. ranges=f"A2:E{2 + len(summary_data)}",
  96. values=insert_array,
  97. )
  98. async def deal(self, date_string: str = None) -> None:
  99. """处理受限账号分析任务
  100. Args:
  101. date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
  102. """
  103. # 如果没有提供日期,默认使用昨天
  104. if not date_string:
  105. date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  106. # 统一日期格式为YYYYMMDD
  107. date_string = date_string.replace("-", "").replace("/", "")
  108. # 验证日期格式
  109. if len(date_string) != 8 or not date_string.isdigit():
  110. raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
  111. try:
  112. detail_data = await self.get_limited_account_detail(date_str=date_string)
  113. summary_data = await self.get_limited_account_summary(date_str=date_string)
  114. # 如果有数据才打印,避免空数据输出
  115. if detail_data:
  116. await self.insert_into_detail_table(detail_data)
  117. else:
  118. print(f"在 {date_string} 没有找到受限账号数据")
  119. if summary_data:
  120. await self.insert_into_summary_table(summary_data)
  121. else:
  122. print(f"在 {date_string} 没有找到受限账号摘要数据")
  123. except Exception as e:
  124. print(f"处理受限账号分析时出错: {e}")
  125. raise