get_account_category.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import json
  2. import math
  3. from decimal import Decimal
  4. from tqdm.asyncio import tqdm
  5. class GetAccountCategory:
  6. def __init__(self, client):
  7. self.client = client
  8. @staticmethod
  9. def scale_score(x):
  10. """
  11. 将原始分值 x 从 [1, ∞) 映射到 [0.7, 1.5]
  12. - [1, 3] 区间线性放缩到 [0.7, 1.5]
  13. - 超过 3 后增长趋缓(可自行调节 slope_after_3)
  14. """
  15. # 参数可调
  16. min_x, pivot_x = 1, 2.8
  17. min_y, max_y = 0.8, 1.6
  18. slope_after_3 = 0.01 # 超过3后每+1仅增加0.05,可设为0表示完全饱和
  19. if x <= pivot_x:
  20. # 线性放缩
  21. y = min_y + (max_y - min_y) * (x - min_x) / (pivot_x - min_x)
  22. else:
  23. # 超过3后的平缓增长
  24. y = max_y - slope_after_3 * (x - pivot_x)
  25. # 可选:限制不超过上限/下限
  26. y = max(min_y, min(y, max_y))
  27. return y
  28. async def get_account_result(self, account_name: str):
  29. """获取账号分类"""
  30. query = """
  31. select
  32. category,
  33. count(1) as publish_cnt,
  34. sum(fans) as total_fans,
  35. sum(view_count) as total_view_count,
  36. sum(first_level) as total_first_level,
  37. sum(view_count) / sum(fans) as category_avg_read
  38. from publish_account_category_detail
  39. where position = 1 and category != 'empty' and first_level > 0
  40. and date_str > '20250901' and account_name = %s
  41. group by category, account_name
  42. having publish_cnt > 2;
  43. """
  44. result = await self.client.async_fetch(query=query, params=(account_name,))
  45. return result
  46. async def analysis_account_detail(self, account_name: str):
  47. detail = await self.get_account_result(account_name)
  48. total_fans = sum(i['total_fans'] for i in detail)
  49. total_view_count = sum(i['total_view_count'] for i in detail)
  50. # total_first_level = sum(i['total_first_level'] for i in detail)
  51. avg_read_rate = total_view_count / total_fans if total_fans > 0 else 0
  52. positive_category = {}
  53. for i in detail:
  54. if i['category_avg_read'] > avg_read_rate:
  55. raw_score = float(i['category_avg_read'] / Decimal(avg_read_rate))
  56. positive_category[i['category']] = self.scale_score(raw_score)
  57. return positive_category
  58. async def get_accounts(self):
  59. query = """
  60. select gh_id, account_name from category_dev;
  61. """
  62. result = await self.client.async_fetch(query=query)
  63. return result
  64. async def update_account_category(self, gh_id, category_map: dict):
  65. query = """
  66. update category_dev
  67. set positive_category_map = %s
  68. where gh_id = %s;
  69. """
  70. await self.client.async_save(query=query, params=(json.dumps(category_map, ensure_ascii=False), gh_id))
  71. async def deal(self):
  72. account_list = await self.get_accounts()
  73. for account in tqdm(account_list):
  74. account_name = account['account_name']
  75. try:
  76. positive_category = await self.analysis_account_detail(account_name)
  77. print(account['account_name'])
  78. print(json.dumps(positive_category, ensure_ascii=False, indent=4))
  79. # return positive_category
  80. await self.update_account_category(account['gh_id'], positive_category)
  81. except Exception as e:
  82. print(f"分析账号{account_name}分类失败: {e}")
  83. import traceback
  84. print(traceback.print_exc())