import json import math from decimal import Decimal from tqdm.asyncio import tqdm class GetAccountCategory: def __init__(self, client): self.client = client @staticmethod def scale_score(x): """ 将原始分值 x 从 [1, ∞) 映射到 [0.7, 1.5] - [1, 3] 区间线性放缩到 [0.7, 1.5] - 超过 3 后增长趋缓(可自行调节 slope_after_3) """ # 参数可调 min_x, pivot_x = 1, 2.8 min_y, max_y = 0.8, 1.6 slope_after_3 = 0.01 # 超过3后每+1仅增加0.05,可设为0表示完全饱和 if x <= pivot_x: # 线性放缩 y = min_y + (max_y - min_y) * (x - min_x) / (pivot_x - min_x) else: # 超过3后的平缓增长 y = max_y - slope_after_3 * (x - pivot_x) # 可选:限制不超过上限/下限 y = max(min_y, min(y, max_y)) return y async def get_account_result(self, account_name: str): """获取账号分类""" query = """ select category, count(1) as publish_cnt, sum(fans) as total_fans, sum(view_count) as total_view_count, sum(first_level) as total_first_level, sum(view_count) / sum(fans) as category_avg_read from publish_account_category_detail where position = 1 and category != 'empty' and first_level > 0 and date_str > '20250901' and account_name = %s group by category, account_name having publish_cnt > 2; """ result = await self.client.async_fetch(query=query, params=(account_name,)) return result async def analysis_account_detail(self, account_name: str): detail = await self.get_account_result(account_name) total_fans = sum(i['total_fans'] for i in detail) total_view_count = sum(i['total_view_count'] for i in detail) # total_first_level = sum(i['total_first_level'] for i in detail) avg_read_rate = total_view_count / total_fans if total_fans > 0 else 0 positive_category = {} for i in detail: if i['category_avg_read'] > avg_read_rate: raw_score = float(i['category_avg_read'] / Decimal(avg_read_rate)) positive_category[i['category']] = self.scale_score(raw_score) return positive_category async def get_accounts(self): query = """ select gh_id, account_name from category_dev; """ result = await self.client.async_fetch(query=query) return result async def update_account_category(self, gh_id, category_map: dict): query = """ update category_dev set positive_category_map = %s where gh_id = %s; """ await self.client.async_save(query=query, params=(json.dumps(category_map, ensure_ascii=False), gh_id)) async def deal(self): account_list = await self.get_accounts() for account in tqdm(account_list): account_name = account['account_name'] try: positive_category = await self.analysis_account_detail(account_name) print(account['account_name']) print(json.dumps(positive_category, ensure_ascii=False, indent=4)) # return positive_category await self.update_account_category(account['gh_id'], positive_category) except Exception as e: print(f"分析账号{account_name}分类失败: {e}") import traceback print(traceback.print_exc())