import json from datetime import datetime, timedelta from applications.api import feishu_sheet, feishu_robot class LimitedAccountAnalysisConst: FIRST_POSITION = 1 LIMIT_THRESHOLD = 0.2 LIMIT_READ_AVG = 100 TABLE_ID = "MqkxwleLFiNOwHkU3crcXpWlnfe" DETAIL_SHEET_ID = "6fa679" SUMMARY_SHEET_ID = "uvli3P" class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst): def __init__(self, pool, log_client): self.pool = pool self.log_client = log_client async def get_limited_account_detail(self, date_str): query = """ SELECT date_str, COALESCE(account_mode, '公众号投流') AS account_mode, account_source, account_name, fans, title, view_count, avg_view_count FROM datastat_sort_strategy WHERE date_str = %s AND position = %s AND read_rate < %s AND avg_view_count >= %s; """ account_detail = await self.pool.async_fetch( query=query, params=( date_str, self.FIRST_POSITION, self.LIMIT_THRESHOLD, self.LIMIT_READ_AVG, ), ) return account_detail async def get_limited_account_summary(self, date_str): query = """ SELECT date_str, COALESCE(account_mode, '公众号投流') AS account_mode_label, CAST( SUM( CASE WHEN IFNULL(read_rate, 0) < %s THEN fans ELSE 0 END ) AS SIGNED ) AS limit_fans, CAST(SUM(fans) AS SIGNED) AS total_fans FROM datastat_sort_strategy WHERE position = %s AND date_str = %s GROUP BY date_str, COALESCE(account_mode, '公众号投流'); """ account_summary = await self.pool.async_fetch( query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str) ) return account_summary async def insert_into_detail_table(self, detail_data): insert_array = [] for row in detail_data: insert_array.append( [ row["date_str"], row["account_mode"], row["account_source"], row["account_name"], row["fans"], row["title"], row["view_count"], row["avg_view_count"], ] ) await feishu_sheet.fetch_token() await feishu_sheet.prepend_value( sheet_token=self.TABLE_ID, sheet_id=self.DETAIL_SHEET_ID, ranges=f"A2:H{2 + len(detail_data)}", values=insert_array, ) async def insert_into_summary_table(self, summary_data): insert_array = [] for row in summary_data: insert_array.append( [ row["date_str"], row["account_mode_label"], row["limit_fans"], row["total_fans"], ] ) await feishu_sheet.fetch_token() await feishu_sheet.prepend_value( sheet_token=self.TABLE_ID, sheet_id=self.SUMMARY_SHEET_ID, ranges=f"A2:E{2 + len(summary_data)}", values=insert_array, ) async def deal(self, date_string: str = None) -> None: """处理受限账号分析任务 Args: date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD """ # 如果没有提供日期,默认使用昨天 if not date_string: date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") # 统一日期格式为YYYYMMDD date_string = date_string.replace("-", "").replace("/", "") # 验证日期格式 if len(date_string) != 8 or not date_string.isdigit(): raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式") try: detail_data = await self.get_limited_account_detail(date_str=date_string) summary_data = await self.get_limited_account_summary(date_str=date_string) # 如果有数据才打印,避免空数据输出 if detail_data: await self.insert_into_detail_table(detail_data) else: print(f"在 {date_string} 没有找到受限账号数据") if summary_data: await self.insert_into_summary_table(summary_data) else: print(f"在 {date_string} 没有找到受限账号摘要数据") except Exception as e: print(f"处理受限账号分析时出错: {e}") raise