# activity_calculator.py import asyncio from typing import Optional, List, Dict, Any from datetime import datetime, date from config import settings from core.utils.log.logger_manager import LoggerManager from services.async_mysql_service import AsyncMysqlService from core.models.activity_config_models import ActivityCalculatorConfig, ActivityThresholds, ActivityLevels, ActivityLevel class ActivityCalculator: """优化版活跃度计算器""" def __init__(self, platform: str = "activity_calculator", mode: str = "activity", config: Optional[ActivityCalculatorConfig] = None, update_mode: str = "incremental"): self.platform = platform self.mode = mode self.update_mode = update_mode # "full" 或 "incremental" self.logger = LoggerManager.get_logger(platform=platform, mode=mode) self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=platform, mode=mode) self.db_service = None # 使用传入的配置或创建默认配置 if config: self.config = config else: self.config = ActivityCalculatorConfig() async def initialize(self): """异步初始化数据库连接""" self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode) await self.db_service.__aenter__() # 初始化连接池 async def calculate_and_update(self): """计算并更新所有用户的活跃度""" try: # 1. 获取爬取统计数据 crawl_stats = await self._get_crawl_statistics() self.logger.info(f"获取爬取统计数据成功:{len(crawl_stats)}条, 更新模式: {self.update_mode}") # 2. 并发批量更新用户活跃度 updated_count = await self._concurrent_batch_update_user_activities(crawl_stats) self.logger.info(f"批量更新用户活跃度成功:{updated_count}条") success_msg = f"活跃度计算完成:更新{updated_count}用户" self.logger.info(success_msg) return updated_count except Exception as e: error_msg = f"活跃度计算失败: {e}" self.logger.error(error_msg) self.aliyun_logger.logging(code="9022", message=error_msg, data={"error": str(e)}) raise async def _get_crawl_statistics(self): """获取爬取统计数据""" # 构建平台列表的SQL片段,包含合并的平台 platforms = self.config.platforms.copy() # 如果配置中包含xiaoniangao,同时查询xiaoniangaotuijianliu if 'xiaoniangao' in platforms: platforms.extend(['xiaoniangaotuijianliu']) # 去重 platforms = list(set(platforms)) platforms_str = '"' + '","'.join(platforms) + '"' if self.update_mode == "full": # 全量更新:查询所有数据并计算完整统计,将两个平台合并为一个 sql = f""" SELECT out_user_id, CASE WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao' ELSE platform END as platform, COUNT(*) as total_videos, SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos, SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos FROM crawler_video WHERE out_user_id IS NOT NULL AND out_user_id != '' AND out_user_id != 0 AND `platform` in ({platforms_str}) GROUP BY out_user_id, CASE WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao' ELSE platform END; """ results = await self.db_service.fetch_all(sql) else: # incremental # 增量更新:获取需要更新的所有用户(不仅是今天的,还包括在统计窗口内的) # 首先找出今天有活动的用户(包含两个平台) today_users_sql = f""" SELECT DISTINCT out_user_id, CASE WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao' ELSE platform END as platform FROM crawler_video WHERE out_user_id IS NOT NULL AND out_user_id != '' AND out_user_id != 0 AND `platform` in ({platforms_str}) AND DATE(create_time) = DATE_SUB(CURDATE(), INTERVAL 1 DAY) """ today_users = await self.db_service.fetch_all(today_users_sql) if not today_users: return [] # 如果今天没有新数据,直接返回空列表 # 构建用户条件用于查询最近30天的完整统计数据 user_conditions = [] params = [] for user in today_users: user_conditions.append("(out_user_id=%s AND (platform=%s OR platform='xiaoniangaotuijianliu'))") params.extend([user['out_user_id'], user['platform']]) user_where_clause = " OR ".join(user_conditions) sql = f""" SELECT out_user_id, CASE WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao' ELSE platform END as platform, COUNT(*) as total_videos, SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos, SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos FROM crawler_video WHERE out_user_id IS NOT NULL AND out_user_id != '' AND `platform` in ({platforms_str}) AND ({user_where_clause}) GROUP BY out_user_id, CASE WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao' ELSE platform END; """ results = await self.db_service.fetch_all(sql, params) stats_list = [] for row in results: stats = { 'out_user_id': row['out_user_id'], 'platform': row['platform'], 'total_videos': row['total_videos'] or 0, 'recent_30d_videos': row['recent_30d_videos'] or 0, 'recent_7d_videos': row['recent_7d_videos'] or 0 } stats_list.append(stats) return stats_list async def _process_single_batch(self, batch): """处理单个批次的数据""" # 准备当前批次的数据 batch_data = [] for stats in batch: # 确定活跃度级别和优先级 activity_level, priority_level = self._determine_level(stats) # 判断是否活跃(僵尸用户不活跃) is_active = 0 if activity_level == self.config.activity_levels.zombie.label else 1 # 新用户标记 - 从stats中获取初始值,如果是新用户则设为1,否则为0 is_new_user = 1 if activity_level == self.config.activity_levels.new_user.label else 0 # 添加到批次数据 batch_data.append([ stats['out_user_id'], stats['platform'], stats['total_videos'], stats['recent_30d_videos'], stats['recent_7d_videos'], activity_level, priority_level, is_active, is_new_user ]) # 批量执行SQL update_sql = """ INSERT INTO external_user_activity (out_user_id, platform, total_videos, recent_30d_videos, recent_7d_videos, activity_level, priority_level, is_active, is_new_user, update_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON DUPLICATE KEY UPDATE total_videos = VALUES(total_videos), recent_30d_videos = VALUES(recent_30d_videos), recent_7d_videos = VALUES(recent_7d_videos), activity_level = VALUES(activity_level), priority_level = VALUES(priority_level), is_active = VALUES(is_active), is_new_user = VALUES(is_new_user), update_time = NOW() """ try: affected_rows = await self.db_service.executemany(update_sql, batch_data) return affected_rows except Exception as e: error_msg = f"批次更新用户活跃度失败: {e}" self.logger.error(error_msg) self.aliyun_logger.logging(code="9023", message=error_msg, data={"error": str(e)}) raise async def _concurrent_batch_update_user_activities(self, stats_list, batch_size=10000, concurrency=4): """并发批量更新用户活跃度,分批处理以提高性能""" if not stats_list: return 0 # 将数据分成分批 batches = [stats_list[i:i + batch_size] for i in range(0, len(stats_list), batch_size)] total_batches = len(batches) self.logger.info(f"准备处理 {len(stats_list)} 条记录,分为 {total_batches} 个批次,每个批次最多 {batch_size} 条记录") # 使用信号量控制并发数量 semaphore = asyncio.Semaphore(concurrency) async def process_batch_with_semaphore(batch, index): async with semaphore: try: affected_rows = await self._process_single_batch(batch) # 记录批次进度 processed_count = min((index + 1) * batch_size, len(stats_list)) self.logger.info(f"并发批次处理进度: [{index + 1}/{total_batches}] 已处理: {processed_count}/{len(stats_list)}, 本次更新: {affected_rows}条") return affected_rows except Exception as e: error_msg = f"并发批次处理失败 (批次 {index}): {e}" self.logger.error(error_msg) raise # 并发处理所有批次 tasks = [process_batch_with_semaphore(batch, i) for i, batch in enumerate(batches)] results = await asyncio.gather(*tasks, return_exceptions=True) # 检查是否有异常 total_updated = 0 for i, result in enumerate(results): if isinstance(result, Exception): error_msg = f"批次 {i} 处理失败: {result}" self.logger.error(error_msg) raise result total_updated += result return total_updated def _determine_level(self, stats): """基于三个指标确定活跃度级别和优先级(统一阈值)""" recent_7d = stats['recent_7d_videos'] recent_30d = stats['recent_30d_videos'] total = stats['total_videos'] thresholds = self.config.activity_thresholds levels = self.config.activity_levels # 1. 新用户判断:没有历史数据 if total <= thresholds.new_user_total: return levels.new_user.label, levels.new_user.priority # 2. 基于近7天数据判断 if recent_7d >= 10: # extreme_active_recent_7d (极高活跃阈值) return levels.extreme_active.label, levels.extreme_active.priority elif recent_7d >= 7: # high_active_recent_7d (高活跃阈值) return levels.high_active.label, levels.high_active.priority elif recent_7d >= 3: # medium_active_recent_7d (中活跃阈值) return levels.medium_active.label, levels.medium_active.priority elif recent_7d >= 1: # low_active_recent_7d (低活跃阈值) return levels.low_active.label, levels.low_active.priority else: # 近7天没有数据,看近30天 if recent_30d >= 3: # dormant_recent_30d (休眠阈值) return levels.dormant.label, levels.dormant.priority else: # 近30天也没有数据,但是历史有数据,则为僵尸 return levels.zombie.label, levels.zombie.priority async def close(self): """关闭数据库连接""" if self.db_service: await self.db_service.__aexit__(None, None, None) self.logger.info("活跃度计算器已关闭") async def main_full(): # 通过命令行参数控制更新模式 import sys update_mode = "full" calculator = ActivityCalculator(update_mode=update_mode) await calculator.initialize() # 初始化数据库连接 await calculator.calculate_and_update() await calculator.close() # 关闭连接 async def main_incremental(): update_mode = "incremental" calculator = ActivityCalculator(update_mode=update_mode) await calculator.initialize() # 初始化数据库连接 await calculator.calculate_and_update() await calculator.close() # 关闭连接