|
|
@@ -0,0 +1,310 @@
|
|
|
+# activity_calculator.py
|
|
|
+import asyncio
|
|
|
+from typing import Optional, List, Dict, Any
|
|
|
+from datetime import datetime, date
|
|
|
+
|
|
|
+from config import settings
|
|
|
+from core.utils.log.logger_manager import LoggerManager
|
|
|
+from services.async_mysql_service import AsyncMysqlService
|
|
|
+from core.models.activity_config_models import ActivityCalculatorConfig, ActivityThresholds, ActivityLevels, ActivityLevel
|
|
|
+
|
|
|
+
|
|
|
+class ActivityCalculator:
|
|
|
+ """优化版活跃度计算器"""
|
|
|
+
|
|
|
+ def __init__(self, platform: str = "activity_calculator", mode: str = "activity", config: Optional[ActivityCalculatorConfig] = None, update_mode: str = "incremental"):
|
|
|
+ self.platform = platform
|
|
|
+ self.mode = mode
|
|
|
+ self.update_mode = update_mode # "full" 或 "incremental"
|
|
|
+ self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
|
|
|
+ self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
|
|
|
+ self.db_service = None
|
|
|
+
|
|
|
+
|
|
|
+ # 使用传入的配置或创建默认配置
|
|
|
+ if config:
|
|
|
+ self.config = config
|
|
|
+ else:
|
|
|
+ self.config = ActivityCalculatorConfig()
|
|
|
+
|
|
|
+ async def initialize(self):
|
|
|
+ """异步初始化数据库连接"""
|
|
|
+ self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
|
|
|
+ await self.db_service.__aenter__() # 初始化连接池
|
|
|
+
|
|
|
+ async def calculate_and_update(self):
|
|
|
+ """计算并更新所有用户的活跃度"""
|
|
|
+ try:
|
|
|
+ # 1. 获取爬取统计数据
|
|
|
+ crawl_stats = await self._get_crawl_statistics()
|
|
|
+ self.logger.info(f"获取爬取统计数据成功:{len(crawl_stats)}条, 更新模式: {self.update_mode}")
|
|
|
+
|
|
|
+ # 2. 并发批量更新用户活跃度
|
|
|
+ updated_count = await self._concurrent_batch_update_user_activities(crawl_stats)
|
|
|
+ self.logger.info(f"批量更新用户活跃度成功:{updated_count}条")
|
|
|
+
|
|
|
+ success_msg = f"活跃度计算完成:更新{updated_count}用户"
|
|
|
+ self.logger.info(success_msg)
|
|
|
+ return updated_count
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"活跃度计算失败: {e}"
|
|
|
+ self.logger.error(error_msg)
|
|
|
+ self.aliyun_logger.logging(code="9022", message=error_msg, data={"error": str(e)})
|
|
|
+ raise
|
|
|
+
|
|
|
+ async def _get_crawl_statistics(self):
|
|
|
+ """获取爬取统计数据"""
|
|
|
+ # 构建平台列表的SQL片段,包含合并的平台
|
|
|
+ platforms = self.config.platforms.copy()
|
|
|
+ # 如果配置中包含xiaoniangao,同时查询xiaoniangaotuijianliu
|
|
|
+ if 'xiaoniangao' in platforms:
|
|
|
+ platforms.extend(['xiaoniangaotuijianliu'])
|
|
|
+ # 去重
|
|
|
+ platforms = list(set(platforms))
|
|
|
+
|
|
|
+ platforms_str = '"' + '","'.join(platforms) + '"'
|
|
|
+
|
|
|
+ if self.update_mode == "full":
|
|
|
+ # 全量更新:查询所有数据并计算完整统计,将两个平台合并为一个
|
|
|
+ sql = f"""
|
|
|
+ SELECT
|
|
|
+ out_user_id,
|
|
|
+ CASE
|
|
|
+ WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
|
|
|
+ ELSE platform
|
|
|
+ END as platform,
|
|
|
+ COUNT(*) as total_videos,
|
|
|
+ SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
|
|
|
+ SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
|
|
|
+ FROM crawler_video
|
|
|
+ WHERE out_user_id IS NOT NULL
|
|
|
+ AND out_user_id != ''
|
|
|
+ AND out_user_id != 0
|
|
|
+ AND `platform` in ({platforms_str})
|
|
|
+ GROUP BY out_user_id,
|
|
|
+ CASE
|
|
|
+ WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
|
|
|
+ ELSE platform
|
|
|
+ END;
|
|
|
+ """
|
|
|
+ results = await self.db_service.fetch_all(sql)
|
|
|
+ else: # incremental
|
|
|
+ # 增量更新:获取需要更新的所有用户(不仅是今天的,还包括在统计窗口内的)
|
|
|
+ # 首先找出今天有活动的用户(包含两个平台)
|
|
|
+ today_users_sql = f"""
|
|
|
+ SELECT DISTINCT
|
|
|
+ out_user_id,
|
|
|
+ CASE
|
|
|
+ WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
|
|
|
+ ELSE platform
|
|
|
+ END as platform
|
|
|
+ FROM crawler_video
|
|
|
+ WHERE out_user_id IS NOT NULL
|
|
|
+ AND out_user_id != ''
|
|
|
+ AND out_user_id != 0
|
|
|
+ AND `platform` in ({platforms_str})
|
|
|
+ AND DATE(create_time) = DATE_SUB(CURDATE(), INTERVAL 1 DAY)
|
|
|
+ """
|
|
|
+ today_users = await self.db_service.fetch_all(today_users_sql)
|
|
|
+
|
|
|
+ if not today_users:
|
|
|
+ return [] # 如果今天没有新数据,直接返回空列表
|
|
|
+
|
|
|
+ # 构建用户条件用于查询最近30天的完整统计数据
|
|
|
+ user_conditions = []
|
|
|
+ params = []
|
|
|
+ for user in today_users:
|
|
|
+ user_conditions.append("(out_user_id=%s AND (platform=%s OR platform='xiaoniangaotuijianliu'))")
|
|
|
+ params.extend([user['out_user_id'], user['platform']])
|
|
|
+
|
|
|
+ user_where_clause = " OR ".join(user_conditions)
|
|
|
+ sql = f"""
|
|
|
+ SELECT
|
|
|
+ out_user_id,
|
|
|
+ CASE
|
|
|
+ WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
|
|
|
+ ELSE platform
|
|
|
+ END as platform,
|
|
|
+ COUNT(*) as total_videos,
|
|
|
+ SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
|
|
|
+ SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
|
|
|
+ FROM crawler_video
|
|
|
+ WHERE out_user_id IS NOT NULL
|
|
|
+ AND out_user_id != ''
|
|
|
+ AND `platform` in ({platforms_str})
|
|
|
+ AND ({user_where_clause})
|
|
|
+ GROUP BY out_user_id,
|
|
|
+ CASE
|
|
|
+ WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
|
|
|
+ ELSE platform
|
|
|
+ END;
|
|
|
+ """
|
|
|
+
|
|
|
+ results = await self.db_service.fetch_all(sql, params)
|
|
|
+
|
|
|
+ stats_list = []
|
|
|
+ for row in results:
|
|
|
+ stats = {
|
|
|
+ 'out_user_id': row['out_user_id'],
|
|
|
+ 'platform': row['platform'],
|
|
|
+ 'total_videos': row['total_videos'] or 0,
|
|
|
+ 'recent_30d_videos': row['recent_30d_videos'] or 0,
|
|
|
+ 'recent_7d_videos': row['recent_7d_videos'] or 0
|
|
|
+ }
|
|
|
+ stats_list.append(stats)
|
|
|
+
|
|
|
+ return stats_list
|
|
|
+
|
|
|
+ async def _process_single_batch(self, batch):
|
|
|
+ """处理单个批次的数据"""
|
|
|
+ # 准备当前批次的数据
|
|
|
+ batch_data = []
|
|
|
+ for stats in batch:
|
|
|
+ # 确定活跃度级别和优先级
|
|
|
+ activity_level, priority_level = self._determine_level(stats)
|
|
|
+
|
|
|
+ # 判断是否活跃(僵尸用户不活跃)
|
|
|
+ is_active = 0 if activity_level == self.config.activity_levels.zombie.label else 1
|
|
|
+
|
|
|
+ # 新用户标记 - 从stats中获取初始值,如果是新用户则设为1,否则为0
|
|
|
+ is_new_user = 1 if activity_level == self.config.activity_levels.new_user.label else 0
|
|
|
+
|
|
|
+ # 添加到批次数据
|
|
|
+ batch_data.append([
|
|
|
+ stats['out_user_id'],
|
|
|
+ stats['platform'],
|
|
|
+ stats['total_videos'],
|
|
|
+ stats['recent_30d_videos'],
|
|
|
+ stats['recent_7d_videos'],
|
|
|
+ activity_level,
|
|
|
+ priority_level,
|
|
|
+ is_active,
|
|
|
+ is_new_user
|
|
|
+ ])
|
|
|
+
|
|
|
+ # 批量执行SQL
|
|
|
+ update_sql = """
|
|
|
+ INSERT INTO external_user_activity
|
|
|
+ (out_user_id, platform,
|
|
|
+ total_videos, recent_30d_videos, recent_7d_videos,
|
|
|
+ activity_level, priority_level, is_active, is_new_user,
|
|
|
+ update_time)
|
|
|
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON DUPLICATE KEY
|
|
|
+ UPDATE
|
|
|
+ total_videos = VALUES(total_videos),
|
|
|
+ recent_30d_videos = VALUES(recent_30d_videos),
|
|
|
+ recent_7d_videos = VALUES(recent_7d_videos),
|
|
|
+ activity_level = VALUES(activity_level),
|
|
|
+ priority_level = VALUES(priority_level),
|
|
|
+ is_active = VALUES(is_active),
|
|
|
+ is_new_user = VALUES(is_new_user),
|
|
|
+ update_time = NOW()
|
|
|
+ """
|
|
|
+
|
|
|
+ try:
|
|
|
+ affected_rows = await self.db_service.executemany(update_sql, batch_data)
|
|
|
+ return affected_rows
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"批次更新用户活跃度失败: {e}"
|
|
|
+ self.logger.error(error_msg)
|
|
|
+ self.aliyun_logger.logging(code="9023", message=error_msg, data={"error": str(e)})
|
|
|
+ raise
|
|
|
+
|
|
|
+ async def _concurrent_batch_update_user_activities(self, stats_list, batch_size=10000, concurrency=4):
|
|
|
+ """并发批量更新用户活跃度,分批处理以提高性能"""
|
|
|
+ if not stats_list:
|
|
|
+ return 0
|
|
|
+
|
|
|
+ # 将数据分成分批
|
|
|
+ batches = [stats_list[i:i + batch_size] for i in range(0, len(stats_list), batch_size)]
|
|
|
+ total_batches = len(batches)
|
|
|
+
|
|
|
+ self.logger.info(f"准备处理 {len(stats_list)} 条记录,分为 {total_batches} 个批次,每个批次最多 {batch_size} 条记录")
|
|
|
+
|
|
|
+ # 使用信号量控制并发数量
|
|
|
+ semaphore = asyncio.Semaphore(concurrency)
|
|
|
+
|
|
|
+ async def process_batch_with_semaphore(batch, index):
|
|
|
+ async with semaphore:
|
|
|
+ try:
|
|
|
+ affected_rows = await self._process_single_batch(batch)
|
|
|
+
|
|
|
+ # 记录批次进度
|
|
|
+ processed_count = min((index + 1) * batch_size, len(stats_list))
|
|
|
+ self.logger.info(f"并发批次处理进度: [{index + 1}/{total_batches}] 已处理: {processed_count}/{len(stats_list)}, 本次更新: {affected_rows}条")
|
|
|
+
|
|
|
+ return affected_rows
|
|
|
+ except Exception as e:
|
|
|
+ error_msg = f"并发批次处理失败 (批次 {index}): {e}"
|
|
|
+ self.logger.error(error_msg)
|
|
|
+ raise
|
|
|
+
|
|
|
+ # 并发处理所有批次
|
|
|
+ tasks = [process_batch_with_semaphore(batch, i) for i, batch in enumerate(batches)]
|
|
|
+ results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
+
|
|
|
+ # 检查是否有异常
|
|
|
+ total_updated = 0
|
|
|
+ for i, result in enumerate(results):
|
|
|
+ if isinstance(result, Exception):
|
|
|
+ error_msg = f"批次 {i} 处理失败: {result}"
|
|
|
+ self.logger.error(error_msg)
|
|
|
+ raise result
|
|
|
+ total_updated += result
|
|
|
+
|
|
|
+ return total_updated
|
|
|
+
|
|
|
+ def _determine_level(self, stats):
|
|
|
+ """基于三个指标确定活跃度级别和优先级(统一阈值)"""
|
|
|
+ recent_7d = stats['recent_7d_videos']
|
|
|
+ recent_30d = stats['recent_30d_videos']
|
|
|
+ total = stats['total_videos']
|
|
|
+
|
|
|
+ thresholds = self.config.activity_thresholds
|
|
|
+ levels = self.config.activity_levels
|
|
|
+
|
|
|
+ # 1. 新用户判断:没有历史数据
|
|
|
+ if total <= thresholds.new_user_total:
|
|
|
+ return levels.new_user.label, levels.new_user.priority
|
|
|
+
|
|
|
+ # 2. 基于近7天数据判断
|
|
|
+ if recent_7d >= 10: # extreme_active_recent_7d (极高活跃阈值)
|
|
|
+ return levels.extreme_active.label, levels.extreme_active.priority
|
|
|
+ elif recent_7d >= 7: # high_active_recent_7d (高活跃阈值)
|
|
|
+ return levels.high_active.label, levels.high_active.priority
|
|
|
+ elif recent_7d >= 3: # medium_active_recent_7d (中活跃阈值)
|
|
|
+ return levels.medium_active.label, levels.medium_active.priority
|
|
|
+ elif recent_7d >= 1: # low_active_recent_7d (低活跃阈值)
|
|
|
+ return levels.low_active.label, levels.low_active.priority
|
|
|
+ else:
|
|
|
+ # 近7天没有数据,看近30天
|
|
|
+ if recent_30d >= 3: # dormant_recent_30d (休眠阈值)
|
|
|
+ return levels.dormant.label, levels.dormant.priority
|
|
|
+ else:
|
|
|
+ # 近30天也没有数据,但是历史有数据,则为僵尸
|
|
|
+ return levels.zombie.label, levels.zombie.priority
|
|
|
+
|
|
|
+
|
|
|
+ async def close(self):
|
|
|
+ """关闭数据库连接"""
|
|
|
+ if self.db_service:
|
|
|
+ await self.db_service.__aexit__(None, None, None)
|
|
|
+ self.logger.info("活跃度计算器已关闭")
|
|
|
+
|
|
|
+async def main_full():
|
|
|
+ # 通过命令行参数控制更新模式
|
|
|
+ import sys
|
|
|
+ update_mode = "full"
|
|
|
+ calculator = ActivityCalculator(update_mode=update_mode)
|
|
|
+ await calculator.initialize() # 初始化数据库连接
|
|
|
+ await calculator.calculate_and_update()
|
|
|
+ await calculator.close() # 关闭连接
|
|
|
+
|
|
|
+
|
|
|
+async def main_incremental():
|
|
|
+ update_mode = "incremental"
|
|
|
+ calculator = ActivityCalculator(update_mode=update_mode)
|
|
|
+ await calculator.initialize() # 初始化数据库连接
|
|
|
+ await calculator.calculate_and_update()
|
|
|
+ await calculator.close() # 关闭连接
|