| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142 |
- import json
- from datetime import datetime, timedelta
- from applications.api import feishu_sheet, feishu_robot
- class LimitedAccountAnalysisConst:
- FIRST_POSITION = 1
- LIMIT_THRESHOLD = 0.2
- LIMIT_READ_AVG = 100
- TABLE_ID = "MqkxwleLFiNOwHkU3crcXpWlnfe"
- DETAIL_SHEET_ID = "6fa679"
- SUMMARY_SHEET_ID = "uvli3P"
- class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
- def __init__(self, pool, log_client):
- self.pool = pool
- self.log_client = log_client
- async def get_limited_account_detail(self, date_str):
- query = """
- SELECT date_str,
- COALESCE(account_mode, '公众号投流') AS account_mode,
- account_source, account_name,
- fans, title, view_count, avg_view_count
- FROM datastat_sort_strategy
- WHERE date_str = %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
- """
- account_detail = await self.pool.async_fetch(
- query=query,
- params=(
- date_str,
- self.FIRST_POSITION,
- self.LIMIT_THRESHOLD,
- self.LIMIT_READ_AVG,
- ),
- )
- return account_detail
- async def get_limited_account_summary(self, date_str):
- query = """
- SELECT
- date_str,
- COALESCE(account_mode, '公众号投流') AS account_mode_label,
- CAST(
- SUM(
- CASE
- WHEN IFNULL(read_rate, 0) < %s THEN fans
- ELSE 0
- END
- ) AS SIGNED
- ) AS limit_fans,
- CAST(SUM(fans) AS SIGNED) AS total_fans
- FROM datastat_sort_strategy
- WHERE position = %s AND date_str = %s
- GROUP BY
- date_str,
- COALESCE(account_mode, '公众号投流');
- """
- account_summary = await self.pool.async_fetch(
- query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str)
- )
- return account_summary
- async def insert_into_detail_table(self, detail_data):
- insert_array = []
- for row in detail_data:
- insert_array.append(
- [
- row["date_str"],
- row["account_mode"],
- row["account_source"],
- row["account_name"],
- row["fans"],
- row["title"],
- row["view_count"],
- row["avg_view_count"],
- ]
- )
- await feishu_sheet.fetch_token()
- await feishu_sheet.prepend_value(
- sheet_token=self.TABLE_ID,
- sheet_id=self.DETAIL_SHEET_ID,
- ranges=f"A2:H{2 + len(detail_data)}",
- values=insert_array,
- )
- async def insert_into_summary_table(self, summary_data):
- insert_array = []
- for row in summary_data:
- insert_array.append(
- [
- row["date_str"],
- row["account_mode_label"],
- row["limit_fans"],
- row["total_fans"],
- ]
- )
- await feishu_sheet.fetch_token()
- await feishu_sheet.prepend_value(
- sheet_token=self.TABLE_ID,
- sheet_id=self.SUMMARY_SHEET_ID,
- ranges=f"A2:E{2 + len(summary_data)}",
- values=insert_array,
- )
- async def deal(self, date_string: str = None) -> None:
- """处理受限账号分析任务
- Args:
- date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
- """
- # 如果没有提供日期,默认使用昨天
- if not date_string:
- date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 统一日期格式为YYYYMMDD
- date_string = date_string.replace("-", "").replace("/", "")
- # 验证日期格式
- if len(date_string) != 8 or not date_string.isdigit():
- raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
- try:
- detail_data = await self.get_limited_account_detail(date_str=date_string)
- summary_data = await self.get_limited_account_summary(date_str=date_string)
- # 如果有数据才打印,避免空数据输出
- if detail_data:
- await self.insert_into_detail_table(detail_data)
- else:
- print(f"在 {date_string} 没有找到受限账号数据")
- if summary_data:
- await self.insert_into_summary_table(summary_data)
- else:
- print(f"在 {date_string} 没有找到受限账号摘要数据")
- except Exception as e:
- print(f"处理受限账号分析时出错: {e}")
- raise
|