import time import math from datetime import datetime from typing import Optional, List, Dict import pandas as pd from pandas import DataFrame from scipy import stats from tqdm.asyncio import tqdm class CrawlerAccountManagerConst: # 文章状态 ARTICLE_LOW_SIMILARITY_STATUS = 0 ARTICLE_INIT_STATUS = 1 ARTICLE_PUBLISHED_STATUS = 2 # 相似度阈值 SIMILARITY_THRESHOLD = 0.4 # 1天时间戳 ONE_DAY_TIMESTAMP = 86400 # 近30天时间戳 THIRTY_DAYS_TIMESTAMP = 30 * ONE_DAY_TIMESTAMP class CrawlerAccountManager(CrawlerAccountManagerConst): def __init__(self, pool, aliyun_log, trace_id): self.pool = pool self.aliyun_log = aliyun_log self.trace_id = trace_id @staticmethod def safe_float(x: float | None, default: float = 0.0) -> float: """把 None / NaN / Inf 统一替换成指定默认值""" return default if x is None or not math.isfinite(x) else float(x) async def get_crawling_accounts(self, platform) -> List[str]: """获取抓取账号信息""" match platform: case "weixin": query = f""" select gh_id as 'account_id' from long_articles_accounts where is_using = 1; """ case _: raise RuntimeError(f"Unknown platform: {platform}") account_list = await self.pool.async_fetch(query) return [i["account_id"] for i in account_list] class WeixinAccountManager(CrawlerAccountManager): def __init__(self, pool, aliyun_log, trace_id): super().__init__(pool, aliyun_log, trace_id) self.pool = pool self.aliyun_log = aliyun_log self.trace_id = trace_id async def get_account_crawler_articles_info( self, account_id: str ) -> Optional[DataFrame]: """get articles and set as dataframe""" query = f""" select title, link, score, status, article_index, read_cnt, publish_time from crawler_meta_article where out_account_id = %s; """ response = await self.pool.async_fetch(query=query, params=(account_id,)) return DataFrame( response, columns=[ "title", "link", "score", "status", "article_index", "read_cnt", "publish_time", ], ) async def update_account_stat_detail( self, account_id: str, history_info: Dict, recently_info: Dict ) -> int: """更新账号统计详情""" query = f""" update long_articles_accounts set history_publish_frequency = %s, history_score_ci_lower = %s, recent_publish_frequency= %s, recent_score_ci_lower = %s, update_date = %s where gh_id = %s; """ return await self.pool.async_save( query=query, params=( history_info["publish_frequency"], history_info["ci_lower"], recently_info["publish_frequency"], recently_info["ci_lower"], datetime.today().strftime("%Y-%m-%d"), account_id, ), ) def analysis_dataframe(self, dataframe: DataFrame) -> Optional[Dict]: score_list = dataframe["score"].dropna() n = len(score_list) mean = score_list.mean() if n else 0.0 # 置信区间 if n < 2: ci_lower, ci_upper = 0.0, 0.0 else: sem = stats.sem(score_list) # 可能返回 NaN t_val = stats.t.ppf(0.975, df=n - 1) margin = t_val * sem if math.isfinite(sem) else 0.0 ci_lower, ci_upper = mean - margin, mean + margin # 计算发文频率 publish_times = pd.to_numeric(dataframe["publish_time"], errors="coerce") publish_times = publish_times.replace( [float("inf"), float("-inf")], pd.NA ).dropna() if len(publish_times) >= 2: dates = pd.to_datetime(publish_times, unit="s").dt.normalize() days_delta = max(int((dates.max() - dates.min()).days) + 1, 1) publish_frequency = len(publish_times) / days_delta else: publish_frequency = 0.0 return { "publish_frequency": self.safe_float(publish_frequency), "ci_lower": self.safe_float(ci_lower), "ci_upper": self.safe_float(ci_upper), } async def analysis_single_account(self, account_id: str) -> int: dataframe = await self.get_account_crawler_articles_info(account_id) history_articles_analysis = self.analysis_dataframe(dataframe) thirty_days_before = int(time.time()) - self.THIRTY_DAYS_TIMESTAMP recent_30_days_df = dataframe[dataframe["publish_time"] >= thirty_days_before] recent_30_days_analysis = self.analysis_dataframe(recent_30_days_df) return await self.update_account_stat_detail( account_id, history_articles_analysis, recent_30_days_analysis ) async def deal(self, platform, account_id_list: Optional[List[str]] = None) -> None: """deal""" if not account_id_list: account_id_list = await self.get_crawling_accounts(platform=platform) for account_id in tqdm(account_id_list): await self.analysis_single_account(account_id)