123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import time
- import math
- from typing import Optional, List, Dict
- from pandas import DataFrame
- from scipy import stats
- class CrawlerAccountManagerConst:
- # 文章状态
- ARTICLE_LOW_SIMILARITY_STATUS = 0
- ARTICLE_INIT_STATUS = 1
- ARTICLE_PUBLISHED_STATUS = 2
- # 相似度阈值
- SIMILARITY_THRESHOLD = 0.4
- # 1天时间戳
- ONE_DAY_TIMESTAMP = 86400
- # 近30天时间戳
- THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP
- class CrawlerAccountManager(CrawlerAccountManagerConst):
- def __init__(self, pool, aliyun_log, trace_id):
- self.pool = pool
- self.aliyun_log = aliyun_log
- self.trace_id = trace_id
- @staticmethod
- def safe_float(x: float | None, default: float = 0.0) -> float:
- """把 None / NaN / Inf 统一替换成指定默认值"""
- return default if x is None or not math.isfinite(x) else float(x)
- async def get_crawling_accounts(self, platform):
- """获取抓取账号信息"""
- pass
- class WeixinAccountManager(CrawlerAccountManager):
- def __init__(self, pool, aliyun_log, trace_id):
- super().__init__(pool, aliyun_log, trace_id)
- self.pool = pool
- self.aliyun_log = aliyun_log
- self.trace_id = trace_id
- async def get_account_crawler_articles_info(
- self, account_id: str
- ) -> Optional[DataFrame]:
- """get articles and set as dataframe"""
- query = f"""
- select title, link, score, status, article_index, read_cnt, publish_time
- from crawler_meta_article where out_account_id = %s;
- """
- response = await self.pool.async_fetch(query=query, params=(account_id,))
- return DataFrame(
- response,
- columns=[
- "title",
- "link",
- "score",
- "status",
- "article_index",
- "read_cnt",
- "publish_time",
- ],
- )
- async def update_account_stat_detail(self, account_id: str, history_info: Dict, recently_info: Dict) -> int:
- """更新账号统计详情"""
- query = f"""
- update long_articles_accounts
- set history_publish_frequency = %s, history_score_ci_lower = %s,
- recent_publish_frequency= %s, recent_score_ci_lower = %s
- where gh_id = %s;
- """
- return await self.pool.async_save(
- query=query, params=(
- history_info["publish_frequency"],
- history_info["ci_lower"],
- recently_info["publish_frequency"],
- recently_info["ci_lower"],
- account_id,
- )
- )
- def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]:
- score_list = dataframe["score"].dropna()
- n = len(score_list)
- mean = score_list.mean() if n else 0.0
- # 置信区间
- if n < 2:
- ci_lower, ci_upper = 0.0, 0.0
- else:
- sem = stats.sem(score_list) # 可能返回 NaN
- t_val = stats.t.ppf(0.975, df=n - 1)
- margin = t_val * sem if math.isfinite(sem) else 0.0
- ci_lower, ci_upper = mean - margin, mean + margin
- # 计算发文频率
- publish_times = dataframe["publish_time"].dropna()
- if len(publish_times) >= 2:
- delta = publish_times.max() - publish_times.min()
- publish_frequency = (len(publish_times) / delta
- * self.ONE_DAY_TIMESTAMP) if delta else 0.0
- else:
- publish_frequency = 0.0
- return {
- "publish_frequency": self.safe_float(publish_frequency),
- "ci_lower": self.safe_float(ci_lower),
- "ci_upper": self.safe_float(ci_upper),
- }
- async def analysis_single_account(self, account_id: str) -> None:
- dataframe = await self.get_account_crawler_articles_info(account_id)
- history_articles_analysis = self.analysis_dataframe(dataframe)
- thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP
- recent_30_days_df = dataframe[
- dataframe["publish_time"] >= thirty_days_before
- ]
- recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df)
- await self.update_account_stat_detail(account_id, history_articles_analysis, recent_30_days_analysis)
- async def deal(self, account_id_list: List[str]):
- for account_id in account_id_list:
- await self.analysis_single_account(account_id)
|