limited_account_analysis.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. import json
  2. from datetime import datetime, timedelta
  3. from applications.api import feishu_sheet, feishu_robot
  4. class LimitedAccountAnalysisConst:
  5. FIRST_POSITION = 1
  6. LIMIT_THRESHOLD = 0.2
  7. LIMIT_READ_AVG = 100
  8. TABLE_ID = 'MqkxwleLFiNOwHkU3crcXpWlnfe'
  9. DETAIL_SHEET_ID = '6fa679'
  10. SUMMARY_SHEET_ID = 'uvli3P'
  11. class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
  12. def __init__(self, pool, log_client):
  13. self.pool = pool
  14. self.log_client = log_client
  15. async def get_limited_account_detail(self, date_str):
  16. query = """
  17. SELECT date_str,
  18. COALESCE(account_mode, '公众号投流') AS account_mode,
  19. account_source, account_name,
  20. fans, title, view_count, avg_view_count
  21. FROM datastat_sort_strategy
  22. WHERE date_str = %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
  23. """
  24. account_detail = await self.pool.async_fetch(query=query, params=(date_str, self.FIRST_POSITION, self.LIMIT_THRESHOLD, self.LIMIT_READ_AVG))
  25. return account_detail
  26. async def get_limited_account_summary(self, date_str):
  27. query = """
  28. SELECT
  29. date_str,
  30. COALESCE(account_mode, '公众号投流') AS account_mode_label,
  31. CAST(
  32. SUM(
  33. CASE
  34. WHEN IFNULL(read_rate, 0) < %s THEN fans
  35. ELSE 0
  36. END
  37. ) AS SIGNED
  38. ) AS limit_fans,
  39. CAST(SUM(fans) AS SIGNED) AS total_fans
  40. FROM datastat_sort_strategy
  41. WHERE position = %s AND date_str = %s
  42. GROUP BY
  43. date_str,
  44. COALESCE(account_mode, '公众号投流');
  45. """
  46. account_summary = await self.pool.async_fetch(query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str))
  47. return account_summary
  48. async def insert_into_detail_table(self, detail_data):
  49. insert_array = []
  50. for row in detail_data:
  51. insert_array.append([
  52. row["date_str"],
  53. row["account_mode"],
  54. row["account_source"],
  55. row["account_name"],
  56. row["fans"],
  57. row["title"],
  58. row["view_count"],
  59. row["avg_view_count"],
  60. ]
  61. )
  62. await feishu_sheet.fetch_token()
  63. await feishu_sheet.prepend_value(
  64. sheet_token=self.TABLE_ID,
  65. sheet_id=self.DETAIL_SHEET_ID,
  66. ranges=f"A2:H{2 + len(detail_data)}",
  67. values=insert_array,
  68. )
  69. async def insert_into_summary_table(self, summary_data):
  70. insert_array = []
  71. for row in summary_data:
  72. insert_array.append([
  73. row["date_str"],
  74. row["account_mode_label"],
  75. row["limit_fans"],
  76. row["total_fans"],
  77. ]
  78. )
  79. await feishu_sheet.fetch_token()
  80. await feishu_sheet.prepend_value(
  81. sheet_token=self.TABLE_ID,
  82. sheet_id=self.SUMMARY_SHEET_ID,
  83. ranges=f"A2:E{2+len(summary_data)}",
  84. values=insert_array,
  85. )
  86. async def deal(self, date_string: str = None) -> None:
  87. """处理受限账号分析任务
  88. Args:
  89. date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
  90. """
  91. # 如果没有提供日期,默认使用昨天
  92. if not date_string:
  93. date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
  94. # 统一日期格式为YYYYMMDD
  95. date_string = date_string.replace("-", "").replace("/", "")
  96. # 验证日期格式
  97. if len(date_string) != 8 or not date_string.isdigit():
  98. raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
  99. try:
  100. detail_data = await self.get_limited_account_detail(date_str=date_string)
  101. summary_data = await self.get_limited_account_summary(date_str=date_string)
  102. # 如果有数据才打印,避免空数据输出
  103. if detail_data:
  104. await self.insert_into_detail_table(detail_data)
  105. else:
  106. print(f"在 {date_string} 没有找到受限账号数据")
  107. if summary_data:
  108. await self.insert_into_summary_table(summary_data)
  109. else:
  110. print(f"在 {date_string} 没有找到受限账号摘要数据")
  111. except Exception as e:
  112. print(f"处理受限账号分析时出错: {e}")
  113. raise