crawler_account_manager.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import time
  2. import math
  3. from datetime import datetime
  4. from typing import Optional, List, Dict
  5. from pandas import DataFrame
  6. from scipy import stats
  7. from tqdm.asyncio import tqdm
  8. class CrawlerAccountManagerConst:
  9. # 文章状态
  10. ARTICLE_LOW_SIMILARITY_STATUS = 0
  11. ARTICLE_INIT_STATUS = 1
  12. ARTICLE_PUBLISHED_STATUS = 2
  13. # 相似度阈值
  14. SIMILARITY_THRESHOLD = 0.4
  15. # 1天时间戳
  16. ONE_DAY_TIMESTAMP = 86400
  17. # 近30天时间戳
  18. THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
  19. class CrawlerAccountManager(CrawlerAccountManagerConst):
  20. def __init__(self, pool, aliyun_log, trace_id):
  21. self.pool = pool
  22. self.aliyun_log = aliyun_log
  23. self.trace_id = trace_id
  24. @staticmethod
  25. def safe_float(x: float | None, default: float = 0.0) -> float:
  26. """把 None / NaN / Inf 统一替换成指定默认值"""
  27. return default if x is None or not math.isfinite(x) else float(x)
  28. async def get_crawling_accounts(self, platform) -> List[str]:
  29. """获取抓取账号信息"""
  30. match platform:
  31. case "weixin":
  32. query = f"""
  33. select gh_id as 'account_id' from long_articles_accounts where is_using = 1;
  34. """
  35. case _:
  36. raise RuntimeError(f"Unknown platform: {platform}")
  37. account_list = await self.pool.async_fetch(query)
  38. return [i["account_id"] for i in account_list]
  39. class WeixinAccountManager(CrawlerAccountManager):
  40. def __init__(self, pool, aliyun_log, trace_id):
  41. super().__init__(pool, aliyun_log, trace_id)
  42. self.pool = pool
  43. self.aliyun_log = aliyun_log
  44. self.trace_id = trace_id
  45. async def get_account_crawler_articles_info(
  46. self, account_id: str
  47. ) -> Optional[DataFrame]:
  48. """get articles and set as dataframe"""
  49. query = f"""
  50. select title, link, score, status, article_index, read_cnt, publish_time
  51. from crawler_meta_article where out_account_id = %s;
  52. """
  53. response = await self.pool.async_fetch(query=query, params=(account_id,))
  54. return DataFrame(
  55. response,
  56. columns=[
  57. "title",
  58. "link",
  59. "score",
  60. "status",
  61. "article_index",
  62. "read_cnt",
  63. "publish_time",
  64. ],
  65. )
  66. async def update_account_stat_detail(
  67. self, account_id: str, history_info: Dict, recently_info: Dict
  68. ) -> int:
  69. """更新账号统计详情"""
  70. query = f"""
  71. update long_articles_accounts
  72. set history_publish_frequency = %s, history_score_ci_lower = %s,
  73. recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s
  74. where gh_id = %s;
  75. """
  76. return await self.pool.async_save(
  77. query=query,
  78. params=(
  79. history_info["publish_frequency"],
  80. history_info["ci_lower"],
  81. recently_info["publish_frequency"],
  82. recently_info["ci_lower"],
  83. datetime.today().strftime("%Y-%m-%d"),
  84. account_id,
  85. ),
  86. )
  87. def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
  88. score_list = dataframe["score"].dropna()
  89. n = len(score_list)
  90. mean = score_list.mean() if n else 0.0
  91. # 置信区间
  92. if n < 2:
  93. ci_lower, ci_upper = 0.0, 0.0
  94. else:
  95. sem = stats.sem(score_list) # 可能返回 NaN
  96. t_val = stats.t.ppf(0.975, df=n - 1)
  97. margin = t_val * sem if math.isfinite(sem) else 0.0
  98. ci_lower, ci_upper = mean - margin, mean + margin
  99. # 计算发文频率
  100. publish_times = dataframe["publish_time"].dropna()
  101. if len(publish_times) >= 2:
  102. delta = publish_times.max() - publish_times.min()
  103. publish_frequency = (
  104. (len(publish_times) / delta * self.ONE_DAY_TIMESTAMP) if delta else 0.0
  105. )
  106. else:
  107. publish_frequency = 0.0
  108. return {
  109. "publish_frequency": self.safe_float(publish_frequency),
  110. "ci_lower": self.safe_float(ci_lower),
  111. "ci_upper": self.safe_float(ci_upper),
  112. }
  113. async def analysis_single_account(self, account_id: str) -> int:
  114. dataframe = await self.get_account_crawler_articles_info(account_id)
  115. history_articles_analysis = self.analysis_dataframe(dataframe)
  116. thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
  117. recent_30_days_df = dataframe[dataframe["publish_time"] >= thirty_days_before]
  118. recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
  119. return await self.update_account_stat_detail(
  120. account_id, history_articles_analysis, recent_30_days_analysis
  121. )
  122. async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None:
  123. """deal"""
  124. if not account_id_list:
  125. account_id_list = await self.get_crawling_accounts(platform=platform)
  126. for account_id in tqdm(account_id_list):
  127. await self.analysis_single_account(account_id)