activity_calculator.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # activity_calculator.py
  2. import asyncio
  3. from typing import Optional, List, Dict, Any
  4. from datetime import datetime, date
  5. from config import settings
  6. from core.utils.log.logger_manager import LoggerManager
  7. from services.async_mysql_service import AsyncMysqlService
  8. from core.models.activity_config_models import ActivityCalculatorConfig, ActivityThresholds, ActivityLevels, ActivityLevel
  9. class ActivityCalculator:
  10. """优化版活跃度计算器"""
  11. def __init__(self, platform: str = "activity_calculator", mode: str = "activity", config: Optional[ActivityCalculatorConfig] = None, update_mode: str = "incremental"):
  12. self.platform = platform
  13. self.mode = mode
  14. self.update_mode = update_mode # "full" 或 "incremental"
  15. self.logger = LoggerManager.get_logger(platform=platform, mode=mode)
  16. self.aliyun_logger = LoggerManager.get_aliyun_logger(platform=platform, mode=mode)
  17. self.db_service = None
  18. # 使用传入的配置或创建默认配置
  19. if config:
  20. self.config = config
  21. else:
  22. self.config = ActivityCalculatorConfig()
  23. async def initialize(self):
  24. """异步初始化数据库连接"""
  25. self.db_service = AsyncMysqlService(platform=self.platform, mode=self.mode)
  26. await self.db_service.__aenter__() # 初始化连接池
  27. async def calculate_and_update(self):
  28. """计算并更新所有用户的活跃度"""
  29. try:
  30. # 1. 获取爬取统计数据
  31. crawl_stats = await self._get_crawl_statistics()
  32. self.logger.info(f"获取爬取统计数据成功:{len(crawl_stats)}条, 更新模式: {self.update_mode}")
  33. # 2. 并发批量更新用户活跃度
  34. updated_count = await self._concurrent_batch_update_user_activities(crawl_stats)
  35. self.logger.info(f"批量更新用户活跃度成功:{updated_count}条")
  36. success_msg = f"活跃度计算完成:更新{updated_count}用户"
  37. self.logger.info(success_msg)
  38. return updated_count
  39. except Exception as e:
  40. error_msg = f"活跃度计算失败: {e}"
  41. self.logger.error(error_msg)
  42. self.aliyun_logger.logging(code="9022", message=error_msg, data={"error": str(e)})
  43. raise
  44. async def _get_crawl_statistics(self):
  45. """获取爬取统计数据"""
  46. # 构建平台列表的SQL片段,包含合并的平台
  47. platforms = self.config.platforms.copy()
  48. # 如果配置中包含xiaoniangao,同时查询xiaoniangaotuijianliu
  49. if 'xiaoniangao' in platforms:
  50. platforms.extend(['xiaoniangaotuijianliu'])
  51. # 去重
  52. platforms = list(set(platforms))
  53. platforms_str = '"' + '","'.join(platforms) + '"'
  54. if self.update_mode == "full":
  55. # 全量更新:查询所有数据并计算完整统计,将两个平台合并为一个
  56. sql = f"""
  57. SELECT
  58. out_user_id,
  59. CASE
  60. WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
  61. ELSE platform
  62. END as platform,
  63. COUNT(*) as total_videos,
  64. SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
  65. SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
  66. FROM crawler_video
  67. WHERE out_user_id IS NOT NULL
  68. AND out_user_id != ''
  69. AND out_user_id != 0
  70. AND `platform` in ({platforms_str})
  71. GROUP BY out_user_id,
  72. CASE
  73. WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
  74. ELSE platform
  75. END;
  76. """
  77. results = await self.db_service.fetch_all(sql)
  78. else: # incremental
  79. # 增量更新:获取需要更新的所有用户(不仅是今天的,还包括在统计窗口内的)
  80. # 首先找出今天有活动的用户(包含两个平台)
  81. today_users_sql = f"""
  82. SELECT DISTINCT
  83. out_user_id,
  84. CASE
  85. WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
  86. ELSE platform
  87. END as platform
  88. FROM crawler_video
  89. WHERE out_user_id IS NOT NULL
  90. AND out_user_id != ''
  91. AND out_user_id != 0
  92. AND `platform` in ({platforms_str})
  93. AND DATE(create_time) = DATE_SUB(CURDATE(), INTERVAL 1 DAY)
  94. """
  95. today_users = await self.db_service.fetch_all(today_users_sql)
  96. if not today_users:
  97. return [] # 如果今天没有新数据,直接返回空列表
  98. # 构建用户条件用于查询最近30天的完整统计数据
  99. user_conditions = []
  100. params = []
  101. for user in today_users:
  102. user_conditions.append("(out_user_id=%s AND (platform=%s OR platform='xiaoniangaotuijianliu'))")
  103. params.extend([user['out_user_id'], user['platform']])
  104. user_where_clause = " OR ".join(user_conditions)
  105. sql = f"""
  106. SELECT
  107. out_user_id,
  108. CASE
  109. WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
  110. ELSE platform
  111. END as platform,
  112. COUNT(*) as total_videos,
  113. SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 30 DAY) THEN 1 ELSE 0 END) as recent_30d_videos,
  114. SUM(CASE WHEN create_time >= DATE_SUB(NOW(), INTERVAL 7 DAY) THEN 1 ELSE 0 END) as recent_7d_videos
  115. FROM crawler_video
  116. WHERE out_user_id IS NOT NULL
  117. AND out_user_id != ''
  118. AND `platform` in ({platforms_str})
  119. AND ({user_where_clause})
  120. GROUP BY out_user_id,
  121. CASE
  122. WHEN platform = 'xiaoniangaotuijianliu' THEN 'xiaoniangao'
  123. ELSE platform
  124. END;
  125. """
  126. results = await self.db_service.fetch_all(sql, params)
  127. stats_list = []
  128. for row in results:
  129. stats = {
  130. 'out_user_id': row['out_user_id'],
  131. 'platform': row['platform'],
  132. 'total_videos': row['total_videos'] or 0,
  133. 'recent_30d_videos': row['recent_30d_videos'] or 0,
  134. 'recent_7d_videos': row['recent_7d_videos'] or 0
  135. }
  136. stats_list.append(stats)
  137. return stats_list
  138. async def _process_single_batch(self, batch):
  139. """处理单个批次的数据"""
  140. # 准备当前批次的数据
  141. batch_data = []
  142. for stats in batch:
  143. # 确定活跃度级别和优先级
  144. activity_level, priority_level = self._determine_level(stats)
  145. # 判断是否活跃(僵尸用户不活跃)
  146. is_active = 0 if activity_level == self.config.activity_levels.zombie.label else 1
  147. # 新用户标记 - 从stats中获取初始值,如果是新用户则设为1,否则为0
  148. is_new_user = 1 if activity_level == self.config.activity_levels.new_user.label else 0
  149. # 添加到批次数据
  150. batch_data.append([
  151. stats['out_user_id'],
  152. stats['platform'],
  153. stats['total_videos'],
  154. stats['recent_30d_videos'],
  155. stats['recent_7d_videos'],
  156. activity_level,
  157. priority_level,
  158. is_active,
  159. is_new_user
  160. ])
  161. # 批量执行SQL
  162. update_sql = """
  163. INSERT INTO external_user_activity
  164. (out_user_id, platform,
  165. total_videos, recent_30d_videos, recent_7d_videos,
  166. activity_level, priority_level, is_active, is_new_user,
  167. update_time)
  168. VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW()) ON DUPLICATE KEY
  169. UPDATE
  170. total_videos = VALUES(total_videos),
  171. recent_30d_videos = VALUES(recent_30d_videos),
  172. recent_7d_videos = VALUES(recent_7d_videos),
  173. activity_level = VALUES(activity_level),
  174. priority_level = VALUES(priority_level),
  175. is_active = VALUES(is_active),
  176. is_new_user = VALUES(is_new_user),
  177. update_time = NOW()
  178. """
  179. try:
  180. affected_rows = await self.db_service.executemany(update_sql, batch_data)
  181. return affected_rows
  182. except Exception as e:
  183. error_msg = f"批次更新用户活跃度失败: {e}"
  184. self.logger.error(error_msg)
  185. self.aliyun_logger.logging(code="9023", message=error_msg, data={"error": str(e)})
  186. raise
  187. async def _concurrent_batch_update_user_activities(self, stats_list, batch_size=10000, concurrency=4):
  188. """并发批量更新用户活跃度,分批处理以提高性能"""
  189. if not stats_list:
  190. return 0
  191. # 将数据分成分批
  192. batches = [stats_list[i:i + batch_size] for i in range(0, len(stats_list), batch_size)]
  193. total_batches = len(batches)
  194. self.logger.info(f"准备处理 {len(stats_list)} 条记录,分为 {total_batches} 个批次,每个批次最多 {batch_size} 条记录")
  195. # 使用信号量控制并发数量
  196. semaphore = asyncio.Semaphore(concurrency)
  197. async def process_batch_with_semaphore(batch, index):
  198. async with semaphore:
  199. try:
  200. affected_rows = await self._process_single_batch(batch)
  201. # 记录批次进度
  202. processed_count = min((index + 1) * batch_size, len(stats_list))
  203. self.logger.info(f"并发批次处理进度: [{index + 1}/{total_batches}] 已处理: {processed_count}/{len(stats_list)}, 本次更新: {affected_rows}条")
  204. return affected_rows
  205. except Exception as e:
  206. error_msg = f"并发批次处理失败 (批次 {index}): {e}"
  207. self.logger.error(error_msg)
  208. raise
  209. # 并发处理所有批次
  210. tasks = [process_batch_with_semaphore(batch, i) for i, batch in enumerate(batches)]
  211. results = await asyncio.gather(*tasks, return_exceptions=True)
  212. # 检查是否有异常
  213. total_updated = 0
  214. for i, result in enumerate(results):
  215. if isinstance(result, Exception):
  216. error_msg = f"批次 {i} 处理失败: {result}"
  217. self.logger.error(error_msg)
  218. raise result
  219. total_updated += result
  220. return total_updated
  221. def _determine_level(self, stats):
  222. """基于三个指标确定活跃度级别和优先级(统一阈值)"""
  223. recent_7d = stats['recent_7d_videos']
  224. recent_30d = stats['recent_30d_videos']
  225. total = stats['total_videos']
  226. thresholds = self.config.activity_thresholds
  227. levels = self.config.activity_levels
  228. # 1. 新用户判断:没有历史数据
  229. if total <= thresholds.new_user_total:
  230. return levels.new_user.label, levels.new_user.priority
  231. # 2. 基于近7天数据判断
  232. if recent_7d >= 10: # extreme_active_recent_7d (极高活跃阈值)
  233. return levels.extreme_active.label, levels.extreme_active.priority
  234. elif recent_7d >= 7: # high_active_recent_7d (高活跃阈值)
  235. return levels.high_active.label, levels.high_active.priority
  236. elif recent_7d >= 3: # medium_active_recent_7d (中活跃阈值)
  237. return levels.medium_active.label, levels.medium_active.priority
  238. elif recent_7d >= 1: # low_active_recent_7d (低活跃阈值)
  239. return levels.low_active.label, levels.low_active.priority
  240. else:
  241. # 近7天没有数据,看近30天
  242. if recent_30d >= 3: # dormant_recent_30d (休眠阈值)
  243. return levels.dormant.label, levels.dormant.priority
  244. else:
  245. # 近30天也没有数据,但是历史有数据,则为僵尸
  246. return levels.zombie.label, levels.zombie.priority
  247. async def close(self):
  248. """关闭数据库连接"""
  249. if self.db_service:
  250. await self.db_service.__aexit__(None, None, None)
  251. self.logger.info("活跃度计算器已关闭")
  252. async def main_full():
  253. # 通过命令行参数控制更新模式
  254. import sys
  255. update_mode = "full"
  256. calculator = ActivityCalculator(update_mode=update_mode)
  257. await calculator.initialize() # 初始化数据库连接
  258. await calculator.calculate_and_update()
  259. await calculator.close() # 关闭连接
  260. async def main_incremental():
  261. update_mode = "incremental"
  262. calculator = ActivityCalculator(update_mode=update_mode)
  263. await calculator.initialize() # 初始化数据库连接
  264. await calculator.calculate_and_update()
  265. await calculator.close() # 关闭连接