import time import math from datetime import datetime from typing import Optional, List, Dict from pandas import DataFrame from scipy import stats class CrawlerAccountManagerConst: # 文章状态 ARTICLE_LOW_SIMILARITY_STATUS = 0 ARTICLE_INIT_STATUS = 1 ARTICLE_PUBLISHED_STATUS = 2 # 相似度阈值 SIMILARITY_THRESHOLD = 0.4 # 1天时间戳 ONE_DAY_TIMESTAMP = 86400 # 近30天时间戳 THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP class CrawlerAccountManager(CrawlerAccountManagerConst): def __init__(self, pool, aliyun_log, trace_id): self.pool = pool self.aliyun_log = aliyun_log self.trace_id = trace_id @staticmethod def safe_float(x: float | None, default: float = 0.0) -> float: """把 None / NaN / Inf 统一替换成指定默认值""" return default if x is None or not math.isfinite(x) else float(x) async def get_crawling_accounts(self, platform) -> List[str]: """获取抓取账号信息""" match platform: case "weixin": query = f""" select gh_id as 'account_id' from long_articles_accounts where is_using = 1; """ case _: raise RuntimeError(f"Unknown platform: {platform}") account_list = await self.pool.async_fetch(query) return [i['account_id'] for i in account_list] class WeixinAccountManager(CrawlerAccountManager): def __init__(self, pool, aliyun_log, trace_id): super().__init__(pool, aliyun_log, trace_id) self.pool = pool self.aliyun_log = aliyun_log self.trace_id = trace_id async def get_account_crawler_articles_info( self, account_id: str ) -> Optional[DataFrame]: """get articles and set as dataframe""" query = f""" select title, link, score, status, article_index, read_cnt, publish_time from crawler_meta_article where out_account_id = %s; """ response = await self.pool.async_fetch(query=query, params=(account_id,)) return DataFrame( response, columns=[ "title", "link", "score", "status", "article_index", "read_cnt", "publish_time", ], ) async def update_account_stat_detail(self, account_id: str, history_info: Dict, recently_info: Dict) -> int: """更新账号统计详情""" query = f""" update long_articles_accounts set history_publish_frequency = %s, history_score_ci_lower = %s, recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s where gh_id = %s; """ return await self.pool.async_save( query=query, params=( history_info["publish_frequency"], history_info["ci_lower"], recently_info["publish_frequency"], recently_info["ci_lower"], account_id, datetime.today().strftime("%Y-%m-%d"), ) ) def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]: score_list = dataframe["score"].dropna() n = len(score_list) mean = score_list.mean() if n else 0.0 # 置信区间 if n < 2: ci_lower, ci_upper = 0.0, 0.0 else: sem = stats.sem(score_list) # 可能返回 NaN t_val = stats.t.ppf(0.975, df=n - 1) margin = t_val * sem if math.isfinite(sem) else 0.0 ci_lower, ci_upper = mean - margin, mean + margin # 计算发文频率 publish_times = dataframe["publish_time"].dropna() if len(publish_times) >= 2: delta = publish_times.max() - publish_times.min() publish_frequency = (len(publish_times) / delta * self.ONE_DAY_TIMESTAMP) if delta else 0.0 else: publish_frequency = 0.0 return { "publish_frequency": self.safe_float(publish_frequency), "ci_lower": self.safe_float(ci_lower), "ci_upper": self.safe_float(ci_upper), } async def analysis_single_account(self, account_id: str) -> None: dataframe = await self.get_account_crawler_articles_info(account_id) history_articles_analysis = self.analysis_dataframe(dataframe) thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP recent_30_days_df = dataframe[ dataframe["publish_time"] >= thirty_days_before ] recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df) await self.update_account_stat_detail(account_id, history_articles_analysis, recent_30_days_analysis) async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None: """deal""" if not account_id_list: account_id_list = await self.get_crawling_accounts(platform=platform) for account_id in account_id_list: await self.analysis_single_account(account_id)