| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- # activity_calculator.py
- import asyncio
- from typing import Optional, List, Dict, Any
- from datetime import datetime, date
- from config import settings
- from core.utils.log.logger_manager import LoggerManager
- from services.async_mysql_service import AsyncMysqlService
- from core.models.activity_config_models import ActivityCalculatorConfig, ActivityThresholds, ActivityLevels, ActivityLevel
- class ActivityCalculator:
- """优化版活跃度计算器"""
- def __init__(self, platform: str = "activity_calculator", mode: str = "activity", config: Optional[ActivityCalculatorConfig] = None, update_mode: str = "incremental"):
- self.platform = platform
- self.mode = mode
- self.update_mode = update_mode # "full" 或 "incremental"
- self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
- self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
- self.db_service = None
-
- # 使用传入的配置或创建默认配置
- if config:
- self.config = config
- else:
- self.config = ActivityCalculatorConfig()
- async def initialize(self):
- """异步初始化数据库连接"""
- self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
- await self.db_service.__aenter__() # 初始化连接池
- async def calculate_and_update(self):
- """计算并更新所有用户的活跃度"""
- try:
- # 1. 获取爬取统计数据
- crawl_stats = await self._get_crawl_statistics()
- self.logger.info(f"获取爬取统计数据成功:{len(crawl_stats)}条, 更新模式: {self.update_mode}")
- # 2. 并发批量更新用户活跃度
- updated_count = await self._concurrent_batch_update_user_activities(crawl_stats)
- self.logger.info(f"批量更新用户活跃度成功:{updated_count}条")
- success_msg = f"活跃度计算完成:更新{updated_count}用户"
- self.logger.info(success_msg)
- return updated_count
- except Exception as e:
- error_msg = f"活跃度计算失败: {e}"
- self.logger.error(error_msg)
- self.aliyun_logger.logging(code="9022", message=error_msg, data={"error": str(e)})
- raise
- async def _get_crawl_statistics(self):
- """获取爬取统计数据"""
- # 构建平台列表的SQL片段,包含合并的平台
- platforms = self.config.platforms.copy()
- # 如果配置中包含xiaoniangao,同时查询xiaoniangaotuijianliu
- if 'xiaoniangao' in platforms:
- platforms.extend(['xiaoniangaotuijianliu'])
- # 去重
- platforms = list(set(platforms))
-
- platforms_str = '"' + '","'.join(platforms) + '"'
-
- if self.update_mode == "full":
- # 全量更新:查询所有数据并计算完整统计,将两个平台合并为一个
- sql = f"""
- SELECT
- out_user_id,
- CASE
- WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
- ELSE platform
- END as platform,
- COUNT(*) as total_videos,
- SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
- SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
- FROM crawler_video
- WHERE out_user_id IS NOT NULL
- AND out_user_id != ''
- AND out_user_id != 0
- AND `platform` in ({platforms_str})
- GROUP BY out_user_id,
- CASE
- WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
- ELSE platform
- END;
- """
- results = await self.db_service.fetch_all(sql)
- else: # incremental
- # 增量更新:获取需要更新的所有用户(不仅是今天的,还包括在统计窗口内的)
- # 首先找出今天有活动的用户(包含两个平台)
- today_users_sql = f"""
- SELECT DISTINCT
- out_user_id,
- CASE
- WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
- ELSE platform
- END as platform
- FROM crawler_video
- WHERE out_user_id IS NOT NULL
- AND out_user_id != ''
- AND out_user_id != 0
- AND `platform` in ({platforms_str})
- AND DATE(create_time) = DATE_SUB(CURDATE(), INTERVAL 1 DAY)
- """
- today_users = await self.db_service.fetch_all(today_users_sql)
-
- if not today_users:
- return [] # 如果今天没有新数据,直接返回空列表
-
- # 构建用户条件用于查询最近30天的完整统计数据
- user_conditions = []
- params = []
- for user in today_users:
- user_conditions.append("(out_user_id=%s AND (platform=%s OR platform='xiaoniangaotuijianliu'))")
- params.extend([user['out_user_id'], user['platform']])
-
- user_where_clause = " OR ".join(user_conditions)
- sql = f"""
- SELECT
- out_user_id,
- CASE
- WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
- ELSE platform
- END as platform,
- COUNT(*) as total_videos,
- SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
- SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
- FROM crawler_video
- WHERE out_user_id IS NOT NULL
- AND out_user_id != ''
- AND `platform` in ({platforms_str})
- AND ({user_where_clause})
- GROUP BY out_user_id,
- CASE
- WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
- ELSE platform
- END;
- """
-
- results = await self.db_service.fetch_all(sql, params)
- stats_list = []
- for row in results:
- stats = {
- 'out_user_id': row['out_user_id'],
- 'platform': row['platform'],
- 'total_videos': row['total_videos'] or 0,
- 'recent_30d_videos': row['recent_30d_videos'] or 0,
- 'recent_7d_videos': row['recent_7d_videos'] or 0
- }
- stats_list.append(stats)
- return stats_list
- async def _process_single_batch(self, batch):
- """处理单个批次的数据"""
- # 准备当前批次的数据
- batch_data = []
- for stats in batch:
- # 确定活跃度级别和优先级
- activity_level, priority_level = self._determine_level(stats)
- # 判断是否活跃(僵尸用户不活跃)
- is_active = 0 if activity_level == self.config.activity_levels.zombie.label else 1
- # 新用户标记 - 从stats中获取初始值,如果是新用户则设为1,否则为0
- is_new_user = 1 if activity_level == self.config.activity_levels.new_user.label else 0
- # 添加到批次数据
- batch_data.append([
- stats['out_user_id'],
- stats['platform'],
- stats['total_videos'],
- stats['recent_30d_videos'],
- stats['recent_7d_videos'],
- activity_level,
- priority_level,
- is_active,
- is_new_user
- ])
-
- # 批量执行SQL
- update_sql = """
- INSERT INTO external_user_activity
- (out_user_id, platform,
- total_videos, recent_30d_videos, recent_7d_videos,
- activity_level, priority_level, is_active, is_new_user,
- update_time)
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON DUPLICATE KEY
- UPDATE
- total_videos = VALUES(total_videos),
- recent_30d_videos = VALUES(recent_30d_videos),
- recent_7d_videos = VALUES(recent_7d_videos),
- activity_level = VALUES(activity_level),
- priority_level = VALUES(priority_level),
- is_active = VALUES(is_active),
- is_new_user = VALUES(is_new_user),
- update_time = NOW()
- """
- try:
- affected_rows = await self.db_service.executemany(update_sql, batch_data)
- return affected_rows
- except Exception as e:
- error_msg = f"批次更新用户活跃度失败: {e}"
- self.logger.error(error_msg)
- self.aliyun_logger.logging(code="9023", message=error_msg, data={"error": str(e)})
- raise
- async def _concurrent_batch_update_user_activities(self, stats_list, batch_size=10000, concurrency=4):
- """并发批量更新用户活跃度,分批处理以提高性能"""
- if not stats_list:
- return 0
-
- # 将数据分成分批
- batches = [stats_list[i:i + batch_size] for i in range(0, len(stats_list), batch_size)]
- total_batches = len(batches)
-
- self.logger.info(f"准备处理 {len(stats_list)} 条记录,分为 {total_batches} 个批次,每个批次最多 {batch_size} 条记录")
-
- # 使用信号量控制并发数量
- semaphore = asyncio.Semaphore(concurrency)
-
- async def process_batch_with_semaphore(batch, index):
- async with semaphore:
- try:
- affected_rows = await self._process_single_batch(batch)
-
- # 记录批次进度
- processed_count = min((index + 1) * batch_size, len(stats_list))
- self.logger.info(f"并发批次处理进度: [{index + 1}/{total_batches}] 已处理: {processed_count}/{len(stats_list)}, 本次更新: {affected_rows}条")
-
- return affected_rows
- except Exception as e:
- error_msg = f"并发批次处理失败 (批次 {index}): {e}"
- self.logger.error(error_msg)
- raise
- # 并发处理所有批次
- tasks = [process_batch_with_semaphore(batch, i) for i, batch in enumerate(batches)]
- results = await asyncio.gather(*tasks, return_exceptions=True)
-
- # 检查是否有异常
- total_updated = 0
- for i, result in enumerate(results):
- if isinstance(result, Exception):
- error_msg = f"批次 {i} 处理失败: {result}"
- self.logger.error(error_msg)
- raise result
- total_updated += result
-
- return total_updated
- def _determine_level(self, stats):
- """基于三个指标确定活跃度级别和优先级(统一阈值)"""
- recent_7d = stats['recent_7d_videos']
- recent_30d = stats['recent_30d_videos']
- total = stats['total_videos']
- thresholds = self.config.activity_thresholds
- levels = self.config.activity_levels
- # 1. 新用户判断:没有历史数据
- if total <= thresholds.new_user_total:
- return levels.new_user.label, levels.new_user.priority
- # 2. 基于近7天数据判断
- if recent_7d >= 10: # extreme_active_recent_7d (极高活跃阈值)
- return levels.extreme_active.label, levels.extreme_active.priority
- elif recent_7d >= 7: # high_active_recent_7d (高活跃阈值)
- return levels.high_active.label, levels.high_active.priority
- elif recent_7d >= 3: # medium_active_recent_7d (中活跃阈值)
- return levels.medium_active.label, levels.medium_active.priority
- elif recent_7d >= 1: # low_active_recent_7d (低活跃阈值)
- return levels.low_active.label, levels.low_active.priority
- else:
- # 近7天没有数据,看近30天
- if recent_30d >= 3: # dormant_recent_30d (休眠阈值)
- return levels.dormant.label, levels.dormant.priority
- else:
- # 近30天也没有数据,但是历史有数据,则为僵尸
- return levels.zombie.label, levels.zombie.priority
- async def close(self):
- """关闭数据库连接"""
- if self.db_service:
- await self.db_service.__aexit__(None, None, None)
- self.logger.info("活跃度计算器已关闭")
- async def main_full():
- # 通过命令行参数控制更新模式
- import sys
- update_mode = "full"
- calculator = ActivityCalculator(update_mode=update_mode)
- await calculator.initialize() # 初始化数据库连接
- await calculator.calculate_and_update()
- await calculator.close() # 关闭连接
- async def main_incremental():
- update_mode = "incremental"
- calculator = ActivityCalculator(update_mode=update_mode)
- await calculator.initialize() # 初始化数据库连接
- await calculator.calculate_and_update()
- await calculator.close() # 关闭连接
|