crawler_account_manager.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import time
  2. import math
  3. from typing import Optional, List, Dict
  4. from pandas import DataFrame
  5. from scipy import stats
  6. class CrawlerAccountManagerConst:
  7. # 文章状态
  8. ARTICLE_LOW_SIMILARITY_STATUS = 0
  9. ARTICLE_INIT_STATUS = 1
  10. ARTICLE_PUBLISHED_STATUS = 2
  11. # 相似度阈值
  12. SIMILARITY_THRESHOLD = 0.4
  13. # 1天时间戳
  14. ONE_DAY_TIMESTAMP = 86400
  15. # 近30天时间戳
  16. THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
  17. class CrawlerAccountManager(CrawlerAccountManagerConst):
  18. def __init__(self, pool, aliyun_log, trace_id):
  19. self.pool = pool
  20. self.aliyun_log = aliyun_log
  21. self.trace_id = trace_id
  22. @staticmethod
  23. def safe_float(x: float | None, default: float = 0.0) -> float:
  24. """把 None / NaN / Inf 统一替换成指定默认值"""
  25. return default if x is None or not math.isfinite(x) else float(x)
  26. async def get_crawling_accounts(self, platform):
  27. """获取抓取账号信息"""
  28. pass
  29. class WeixinAccountManager(CrawlerAccountManager):
  30. def __init__(self, pool, aliyun_log, trace_id):
  31. super().__init__(pool, aliyun_log, trace_id)
  32. self.pool = pool
  33. self.aliyun_log = aliyun_log
  34. self.trace_id = trace_id
  35. async def get_account_crawler_articles_info(
  36. self, account_id: str
  37. ) -> Optional[DataFrame]:
  38. """get articles and set as dataframe"""
  39. query = f"""
  40. select title, link, score, status, article_index, read_cnt, publish_time
  41. from crawler_meta_article where out_account_id = %s;
  42. """
  43. response = await self.pool.async_fetch(query=query, params=(account_id,))
  44. return DataFrame(
  45. response,
  46. columns=[
  47. "title",
  48. "link",
  49. "score",
  50. "status",
  51. "article_index",
  52. "read_cnt",
  53. "publish_time",
  54. ],
  55. )
  56. async def update_account_stat_detail(self, account_id: str, history_info: Dict, recently_info: Dict) -> int:
  57. """更新账号统计详情"""
  58. query = f"""
  59. update long_articles_accounts
  60. set history_publish_frequency = %s, history_score_ci_lower = %s,
  61. recent_publish_frequency= %s, recent_score_ci_lower = %s
  62. where gh_id = %s;
  63. """
  64. return await self.pool.async_save(
  65. query=query, params=(
  66. history_info["publish_frequency"],
  67. history_info["ci_lower"],
  68. recently_info["publish_frequency"],
  69. recently_info["ci_lower"],
  70. account_id,
  71. )
  72. )
  73. def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
  74. score_list = dataframe["score"].dropna()
  75. n = len(score_list)
  76. mean = score_list.mean() if n else 0.0
  77. # 置信区间
  78. if n < 2:
  79. ci_lower, ci_upper = 0.0, 0.0
  80. else:
  81. sem = stats.sem(score_list) # 可能返回 NaN
  82. t_val = stats.t.ppf(0.975, df=n - 1)
  83. margin = t_val * sem if math.isfinite(sem) else 0.0
  84. ci_lower, ci_upper = mean - margin, mean + margin
  85. # 计算发文频率
  86. publish_times = dataframe["publish_time"].dropna()
  87. if len(publish_times) >= 2:
  88. delta = publish_times.max() - publish_times.min()
  89. publish_frequency = (len(publish_times) / delta
  90. * self.ONE_DAY_TIMESTAMP) if delta else 0.0
  91. else:
  92. publish_frequency = 0.0
  93. return {
  94. "publish_frequency": self.safe_float(publish_frequency),
  95. "ci_lower": self.safe_float(ci_lower),
  96. "ci_upper": self.safe_float(ci_upper),
  97. }
  98. async def analysis_single_account(self, account_id: str) -> None:
  99. dataframe = await self.get_account_crawler_articles_info(account_id)
  100. history_articles_analysis = self.analysis_dataframe(dataframe)
  101. thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
  102. recent_30_days_df = dataframe[
  103. dataframe["publish_time"] >= thirty_days_before
  104. ]
  105. recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
  106. await self.update_account_stat_detail(account_id, history_articles_analysis, recent_30_days_analysis)
  107. async def deal(self, account_id_list: List[str]):
  108. for account_id in account_id_list:
  109. await self.analysis_single_account(account_id)