| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- import json
- import math
- from decimal import Decimal
- from tqdm.asyncio import tqdm
- class GetAccountCategory:
- def __init__(self, client):
- self.client = client
- @staticmethod
- def scale_score(x):
- """
- 将原始分值 x 从 [1, ∞) 映射到 [0.7, 1.5]
- - [1, 3] 区间线性放缩到 [0.7, 1.5]
- - 超过 3 后增长趋缓(可自行调节 slope_after_3)
- """
- # 参数可调
- min_x, pivot_x = 1, 2.8
- min_y, max_y = 0.8, 1.6
- slope_after_3 = 0.01 # 超过3后每+1仅增加0.05,可设为0表示完全饱和
- if x <= pivot_x:
- # 线性放缩
- y = min_y + (max_y - min_y) * (x - min_x) / (pivot_x - min_x)
- else:
- # 超过3后的平缓增长
- y = max_y - slope_after_3 * (x - pivot_x)
- # 可选:限制不超过上限/下限
- y = max(min_y, min(y, max_y))
- return y
- async def get_account_result(self, account_name: str):
- """获取账号分类"""
- query = """
- select
- category,
- count(1) as publish_cnt,
- sum(fans) as total_fans,
- sum(view_count) as total_view_count,
- sum(first_level) as total_first_level,
- sum(view_count) / sum(fans) as category_avg_read
- from publish_account_category_detail
- where position = 1 and category != 'empty' and first_level > 0
- and date_str > '20250901' and account_name = %s
- group by category, account_name
- having publish_cnt > 2;
- """
- result = await self.client.async_fetch(query=query, params=(account_name,))
- return result
- async def analysis_account_detail(self, account_name: str):
- detail = await self.get_account_result(account_name)
- total_fans = sum(i['total_fans'] for i in detail)
- total_view_count = sum(i['total_view_count'] for i in detail)
- # total_first_level = sum(i['total_first_level'] for i in detail)
- avg_read_rate = total_view_count / total_fans if total_fans > 0 else 0
- positive_category = {}
- for i in detail:
- if i['category_avg_read'] > avg_read_rate:
- raw_score = float(i['category_avg_read'] / Decimal(avg_read_rate))
- positive_category[i['category']] = self.scale_score(raw_score)
- return positive_category
- async def get_accounts(self):
- query = """
- select gh_id, account_name from category_dev;
- """
- result = await self.client.async_fetch(query=query)
- return result
- async def update_account_category(self, gh_id, category_map: dict):
- query = """
- update category_dev
- set positive_category_map = %s
- where gh_id = %s;
- """
- await self.client.async_save(query=query, params=(json.dumps(category_map, ensure_ascii=False), gh_id))
- async def deal(self):
- account_list = await self.get_accounts()
- for account in tqdm(account_list):
- account_name = account['account_name']
- try:
- positive_category = await self.analysis_account_detail(account_name)
- print(account['account_name'])
- print(json.dumps(positive_category, ensure_ascii=False, indent=4))
- # return positive_category
- await self.update_account_category(account['gh_id'], positive_category)
- except Exception as e:
- print(f"分析账号{account_name}分类失败: {e}")
- import traceback
- print(traceback.print_exc())
|