crawler_account_manager.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. import time
  2. import math
  3. from datetime import datetime
  4. from typing import Optional, List, Dict
  5. from pandas import DataFrame
  6. from scipy import stats
  7. class CrawlerAccountManagerConst:
  8. # 文章状态
  9. ARTICLE_LOW_SIMILARITY_STATUS = 0
  10. ARTICLE_INIT_STATUS = 1
  11. ARTICLE_PUBLISHED_STATUS = 2
  12. # 相似度阈值
  13. SIMILARITY_THRESHOLD = 0.4
  14. # 1天时间戳
  15. ONE_DAY_TIMESTAMP = 86400
  16. # 近30天时间戳
  17. THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
  18. class CrawlerAccountManager(CrawlerAccountManagerConst):
  19. def __init__(self, pool, aliyun_log, trace_id):
  20. self.pool = pool
  21. self.aliyun_log = aliyun_log
  22. self.trace_id = trace_id
  23. @staticmethod
  24. def safe_float(x: float | None, default: float = 0.0) -> float:
  25. """把 None / NaN / Inf 统一替换成指定默认值"""
  26. return default if x is None or not math.isfinite(x) else float(x)
  27. async def get_crawling_accounts(self, platform) -> List[str]:
  28. """获取抓取账号信息"""
  29. match platform:
  30. case "weixin":
  31. query = f"""
  32. select gh_id as 'account_id' from long_articles_accounts where is_using = 1;
  33. """
  34. case _:
  35. raise RuntimeError(f"Unknown platform: {platform}")
  36. account_list = await self.pool.async_fetch(query)
  37. return [i['account_id'] for i in account_list]
  38. class WeixinAccountManager(CrawlerAccountManager):
  39. def __init__(self, pool, aliyun_log, trace_id):
  40. super().__init__(pool, aliyun_log, trace_id)
  41. self.pool = pool
  42. self.aliyun_log = aliyun_log
  43. self.trace_id = trace_id
  44. async def get_account_crawler_articles_info(
  45. self, account_id: str
  46. ) -> Optional[DataFrame]:
  47. """get articles and set as dataframe"""
  48. query = f"""
  49. select title, link, score, status, article_index, read_cnt, publish_time
  50. from crawler_meta_article where out_account_id = %s;
  51. """
  52. response = await self.pool.async_fetch(query=query, params=(account_id,))
  53. return DataFrame(
  54. response,
  55. columns=[
  56. "title",
  57. "link",
  58. "score",
  59. "status",
  60. "article_index",
  61. "read_cnt",
  62. "publish_time",
  63. ],
  64. )
  65. async def update_account_stat_detail(self, account_id: str, history_info: Dict, recently_info: Dict) -> int:
  66. """更新账号统计详情"""
  67. query = f"""
  68. update long_articles_accounts
  69. set history_publish_frequency = %s, history_score_ci_lower = %s,
  70. recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s
  71. where gh_id = %s;
  72. """
  73. return await self.pool.async_save(
  74. query=query, params=(
  75. history_info["publish_frequency"],
  76. history_info["ci_lower"],
  77. recently_info["publish_frequency"],
  78. recently_info["ci_lower"],
  79. account_id,
  80. datetime.today().strftime("%Y-%m-%d"),
  81. )
  82. )
  83. def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
  84. score_list = dataframe["score"].dropna()
  85. n = len(score_list)
  86. mean = score_list.mean() if n else 0.0
  87. # 置信区间
  88. if n < 2:
  89. ci_lower, ci_upper = 0.0, 0.0
  90. else:
  91. sem = stats.sem(score_list) # 可能返回 NaN
  92. t_val = stats.t.ppf(0.975, df=n - 1)
  93. margin = t_val * sem if math.isfinite(sem) else 0.0
  94. ci_lower, ci_upper = mean - margin, mean + margin
  95. # 计算发文频率
  96. publish_times = dataframe["publish_time"].dropna()
  97. if len(publish_times) >= 2:
  98. delta = publish_times.max() - publish_times.min()
  99. publish_frequency = (len(publish_times) / delta
  100. * self.ONE_DAY_TIMESTAMP) if delta else 0.0
  101. else:
  102. publish_frequency = 0.0
  103. return {
  104. "publish_frequency": self.safe_float(publish_frequency),
  105. "ci_lower": self.safe_float(ci_lower),
  106. "ci_upper": self.safe_float(ci_upper),
  107. }
  108. async def analysis_single_account(self, account_id: str) -> None:
  109. dataframe = await self.get_account_crawler_articles_info(account_id)
  110. history_articles_analysis = self.analysis_dataframe(dataframe)
  111. thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
  112. recent_30_days_df = dataframe[
  113. dataframe["publish_time"] >= thirty_days_before
  114. ]
  115. recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
  116. await self.update_account_stat_detail(account_id, history_articles_analysis, recent_30_days_analysis)
  117. async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None:
  118. """deal"""
  119. if not account_id_list:
  120. account_id_list = await self.get_crawling_accounts(platform=platform)
  121. for account_id in account_id_list:
  122. await self.analysis_single_account(account_id)