crawler_account_manager.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. import time
  2. import math
  3. from datetime import datetime
  4. from typing import Optional, List, Dict
  5. import pandas as pd
  6. from pandas import DataFrame
  7. from scipy import stats
  8. from tqdm.asyncio import tqdm
  9. class CrawlerAccountManagerConst:
  10. # 文章状态
  11. ARTICLE_LOW_SIMILARITY_STATUS = 0
  12. ARTICLE_INIT_STATUS = 1
  13. ARTICLE_PUBLISHED_STATUS = 2
  14. # 相似度阈值
  15. SIMILARITY_THRESHOLD = 0.4
  16. # 1天时间戳
  17. ONE_DAY_TIMESTAMP = 86400
  18. # 近30天时间戳
  19. THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
  20. class CrawlerAccountManager(CrawlerAccountManagerConst):
  21. def __init__(self, pool, aliyun_log, trace_id):
  22. self.pool = pool
  23. self.aliyun_log = aliyun_log
  24. self.trace_id = trace_id
  25. @staticmethod
  26. def safe_float(x: float | None, default: float = 0.0) -> float:
  27. """把 None / NaN / Inf 统一替换成指定默认值"""
  28. return default if x is None or not math.isfinite(x) else float(x)
  29. async def get_crawling_accounts(self, platform) -> List[str]:
  30. """获取抓取账号信息"""
  31. match platform:
  32. case "weixin":
  33. query = f"""
  34. select gh_id as 'account_id' from long_articles_accounts where is_using = 1;
  35. """
  36. case _:
  37. raise RuntimeError(f"Unknown platform: {platform}")
  38. account_list = await self.pool.async_fetch(query)
  39. return [i["account_id"] for i in account_list]
  40. class WeixinAccountManager(CrawlerAccountManager):
  41. def __init__(self, pool, aliyun_log, trace_id):
  42. super().__init__(pool, aliyun_log, trace_id)
  43. self.pool = pool
  44. self.aliyun_log = aliyun_log
  45. self.trace_id = trace_id
  46. async def get_account_crawler_articles_info(
  47. self, account_id: str
  48. ) -> Optional[DataFrame]:
  49. """get articles and set as dataframe"""
  50. query = f"""
  51. select title, link, score, status, article_index, read_cnt, publish_time
  52. from crawler_meta_article where out_account_id = %s;
  53. """
  54. response = await self.pool.async_fetch(query=query, params=(account_id,))
  55. return DataFrame(
  56. response,
  57. columns=[
  58. "title",
  59. "link",
  60. "score",
  61. "status",
  62. "article_index",
  63. "read_cnt",
  64. "publish_time",
  65. ],
  66. )
  67. async def update_account_stat_detail(
  68. self, account_id: str, history_info: Dict, recently_info: Dict
  69. ) -> int:
  70. """更新账号统计详情"""
  71. query = f"""
  72. update long_articles_accounts
  73. set history_publish_frequency = %s, history_score_ci_lower = %s,
  74. recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s
  75. where gh_id = %s;
  76. """
  77. return await self.pool.async_save(
  78. query=query,
  79. params=(
  80. history_info["publish_frequency"],
  81. history_info["ci_lower"],
  82. recently_info["publish_frequency"],
  83. recently_info["ci_lower"],
  84. datetime.today().strftime("%Y-%m-%d"),
  85. account_id,
  86. ),
  87. )
  88. def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
  89. score_list = dataframe["score"].dropna()
  90. n = len(score_list)
  91. mean = score_list.mean() if n else 0.0
  92. # 置信区间
  93. if n < 2:
  94. ci_lower, ci_upper = 0.0, 0.0
  95. else:
  96. sem = stats.sem(score_list) # 可能返回 NaN
  97. t_val = stats.t.ppf(0.975, df=n - 1)
  98. margin = t_val * sem if math.isfinite(sem) else 0.0
  99. ci_lower, ci_upper = mean - margin, mean + margin
  100. # 计算发文频率
  101. publish_times = pd.to_numeric(dataframe["publish_time"], errors="coerce")
  102. publish_times = publish_times.replace(
  103. [float("inf"), float("-inf")], pd.NA
  104. ).dropna()
  105. if len(publish_times) >= 2:
  106. dates = pd.to_datetime(publish_times, unit="s").dt.normalize()
  107. days_delta = max(int((dates.max() - dates.min()).days) + 1, 1)
  108. publish_frequency = len(publish_times) / days_delta
  109. else:
  110. publish_frequency = 0.0
  111. return {
  112. "publish_frequency": self.safe_float(publish_frequency),
  113. "ci_lower": self.safe_float(ci_lower),
  114. "ci_upper": self.safe_float(ci_upper),
  115. }
  116. async def analysis_single_account(self, account_id: str) -> int:
  117. dataframe = await self.get_account_crawler_articles_info(account_id)
  118. history_articles_analysis = self.analysis_dataframe(dataframe)
  119. thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
  120. recent_30_days_df = dataframe[dataframe["publish_time"] >= thirty_days_before]
  121. recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
  122. return await self.update_account_stat_detail(
  123. account_id, history_articles_analysis, recent_30_days_analysis
  124. )
  125. async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None:
  126. """deal"""
  127. if not account_id_list:
  128. account_id_list = await self.get_crawling_accounts(platform=platform)
  129. for account_id in tqdm(account_id_list):
  130. await self.analysis_single_account(account_id)