| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577 |
- import asyncio
- import traceback
- import numpy as np
- from collections import defaultdict
- from typing import Dict, List
- from pandas import DataFrame
- from scipy import stats
- from tqdm.asyncio import tqdm
- from datetime import datetime, timedelta
- class AccountPositionInfoConst:
- # 阅读率统计周期(秒)
- STATISTICS_PERIOD = 31 * 24 * 60 * 60
- # 一天的秒数
- ONE_DAY_IN_SECONDS = 60 * 60 * 24
- # 相对变化率阈值
- RELATIVE_VALUE_THRESHOLD = 0.1
- # 发文类型
- UNLIMITED_PUBLISH_TYPE = 10002
- BULK_PUBLISH_TYPE = 9
- # 文章位置
- ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
- # 默认粉丝
- DEFAULT_FANS = 0
- # 最低粉丝量
- MIN_FANS = 1000
- ARTICLES_DAILY = 1
- TOULIU = 2
- # 统计周期(天)
- STAT_PERIOD = 30
- # 默认点赞
- DEFAULT_LIKE = 0
- # 状态
- USING_STATUS = 1
- NOT_USING_STATUS = 0
- # 服务号
- GROUP_ACCOUNT_SET = {
- "gh_9cf3b7ff486b",
- "gh_ecb21c0453af",
- "gh_45beb952dc74",
- # "gh_84e744b16b3a",
- "gh_b3ffc1ca3a04",
- "gh_b8baac4296cb",
- "gh_efaf7da157f5",
- # "gh_5855bed97938",
- "gh_b32125c73861",
- "gh_761976bb98a6",
- "gh_5e543853d8f0",
- # "gh_61a72b720de3",
- }
- # 违禁账号
- FORBIDDEN_GH_IDS = {
- "gh_4c058673c07e",
- "gh_de9f9ebc976b",
- "gh_7b4a5f86d68c",
- "gh_f902cea89e48",
- "gh_789a40fe7935",
- "gh_cd041ed721e6",
- "gh_62d7f423f382",
- "gh_043223059726",
- "gh_6cfd1132df94",
- "gh_7f5075624a50",
- "gh_d4dffc34ac39",
- "gh_c69776baf2cd",
- "gh_9877c8541764",
- "gh_ac43e43b253b",
- "gh_93e00e187787",
- "gh_080bb43aa0dc",
- "gh_b1c71a0e7a85",
- "gh_d5f935d0d1f2",
- }
- # 投流账号
- TOULIU_ACCOUNTS = {
- "小阳看天下",
- "趣味生活方式",
- "趣味生活漫时光",
- "史趣探秘",
- "暖心一隅",
- "趣味生活漫谈",
- "历史长河流淌",
- "美好意义时光",
- "银发生活畅谈",
- "美好时光阅读汇",
- "时光趣味生活",
- "生活慢时光",
- }
- class AccountPositionReadRateAvg(AccountPositionInfoConst):
- """计算账号每个位置评价阅读率"""
- def __init__(self, pool, log_client, trace_id):
- self.pool = pool
- self.log_client = log_client
- self.trace_id = trace_id
- # 生成统计周期
- def generate_stat_duration(self, end_date: str) -> str:
- end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
- start_date_dt = end_date_dt - timedelta(seconds=self.STATISTICS_PERIOD)
- return start_date_dt.strftime("%Y-%m-%d")
- # 获取发文账号
- async def get_publishing_accounts(self):
- query = """
- select distinct
- t3.name as account_name,
- t3.gh_id as gh_id,
- group_concat(distinct t4.remark) as account_remark,
- t6.account_source_name as account_source,
- t6.mode_type as mode_type,
- t6.account_type as account_type,
- t6.`status` as status
- from
- publish_plan t1
- join publish_plan_account t2 on t1.id = t2.plan_id
- join publish_account t3 on t2.account_id = t3.id
- left join publish_account_remark t4 on t3.id = t4.publish_account_id
- left join wx_statistics_group_source_account t5 on t3.id = t5.account_id
- left join wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
- where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
- group by t3.id;
- """
- account_list = await self.pool.async_fetch(query, db_name="aigc")
- return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
- # 获取统计周期内,每个账号的粉丝量
- async def get_fans_for_each_date(self, start_date: str):
- # 获取订阅号粉丝量
- query = """
- SELECT t1.date_str as dt,
- CASE
- WHEN t1.fans_count IS NULL OR t1.fans_count = 0 THEN t2.follower_count
- ELSE t1.fans_count
- END AS fans,
- t2.gh_id as gh_id
- FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id
- WHERE t2.channel = 5 AND t2.status = 1 AND t1.date_str >= %s;
- """
- task1 = self.pool.async_fetch(query=query, db_name="aigc", params=(start_date,))
- if self.GROUP_ACCOUNT_SET:
- gh_ids = tuple(self.GROUP_ACCOUNT_SET)
- placeholders = ",".join(["%s"] * len(gh_ids))
- query_group = f"""
- SELECT gh_id, publish_date AS dt, CAST(SUM(sent_count) / 8 AS SIGNED) AS fans
- FROM long_articles_group_send_result
- WHERE publish_date >= %s AND gh_id IN ({placeholders})
- GROUP BY publish_date, gh_id;
- """
- params_group = (start_date, *gh_ids)
- task2 = self.pool.async_fetch(query=query_group, params=params_group)
- else:
- # 没有 group 账号,返回空列表
- task2 = asyncio.sleep(0, result=[])
- account_with_fans, group_account_with_fans = await asyncio.gather(task1, task2)
- # 合并粉丝数据
- account_dt_fans_mapper: Dict[str, Dict[str, int]] = defaultdict(dict)
- # 订阅号
- for item in account_with_fans or []:
- gh_id = item["gh_id"]
- dt = item["dt"]
- fans = int(item.get("fans") or 0)
- account_dt_fans_mapper[gh_id][dt] = fans
- # 服务号(覆盖相同 gh_id + dt)
- for item in group_account_with_fans or []:
- gh_id = item["gh_id"]
- dt = item["dt"]
- fans = int(item.get("fans") or 0)
- account_dt_fans_mapper[gh_id][dt] = fans
- return account_dt_fans_mapper
- # 从数据库获取账号群发文章 && 群发数据
- async def get_single_account_published_articles(
- self, gh_id: str, start_timestamp: int
- ):
- query = """
- SELECT
- ghId as gh_id, accountName as account_name,
- ItemIndex as position,
- CAST(AVG(show_view_count) AS SIGNED) as read_count,
- FROM_UNIXTIME(publish_timestamp, '%%Y-%%m-%%d') AS pub_dt
- FROM
- official_articles_v2
- WHERE
- ghId = %s and Type = %s and publish_timestamp >= %s
- GROUP BY ghId, accountName, ItemIndex, pub_dt;
- """
- return await self.pool.async_fetch(
- query=query,
- db_name="piaoquan_crawler",
- params=(gh_id, self.BULK_PUBLISH_TYPE, start_timestamp),
- )
- # 计算单个账号的每篇文章的阅读率
- async def cal_read_rate_for_single_account(
- self,
- publish_details: List[Dict],
- gh_id: str,
- fans_mapper: Dict[str, Dict[str, int]],
- ) -> DataFrame | None:
- if not publish_details:
- return None
- article_list_with_fans = []
- for article in publish_details:
- fans = fans_mapper.get(gh_id, {}).get(article["pub_dt"], self.DEFAULT_FANS)
- if not fans:
- print(
- f"账号 {article['account_name']} 在 {article['pub_dt']} 没有粉丝数据"
- )
- continue
- article["fans"] = fans
- if fans > self.MIN_FANS:
- article["read_rate"] = article["read_count"] / fans if fans else 0
- article_list_with_fans.append(article)
- # 转化为 DataFrame 方便后续处理
- return DataFrame(
- article_list_with_fans,
- columns=[
- "gh_id",
- "account_name",
- "position",
- "read_count",
- "pub_dt",
- "fans",
- "read_rate",
- ],
- )
- # 更新账号阅读率均值并且更新数据库
- async def update_read_rate_avg_for_each_account(
- self,
- account: dict,
- start_date: str,
- end_dt: str,
- df: DataFrame,
- fans_dict: Dict[str, int],
- ):
- avg_date = (datetime.strptime(end_dt, "%Y-%m-%d") - timedelta(days=1)).strftime(
- "%Y-%m-%d"
- )
- insert_error_list = []
- for index in self.ARTICLE_INDEX_LIST:
- # 过滤
- filter_df = df[
- (df["position"] == index)
- & (df["pub_dt"] < end_dt)
- & (df["pub_dt"] >= start_date)
- ]
- read_average = filter_df["read_count"].mean()
- read_std = filter_df["read_count"].std()
- output_df = filter_df[
- (filter_df["read_count"] > read_average - 2 * read_std)
- & (filter_df["read_count"] < read_average + 2 * read_std)
- ]
- records = len(output_df)
- if records:
- # todo: 需要检查波动
- # if index <= 2:
- # print("position need to be checked")
- # insert
- try:
- insert_query = """
- INSERT INTO long_articles_read_rate
- (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete, fans)
- VALUES
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- await self.pool.async_save(
- query=insert_query,
- params=(
- account["account_name"],
- account["gh_id"],
- index,
- output_df["read_rate"].mean(),
- "从 {} 开始往前计算 31 天".format(start_date),
- records,
- output_df["pub_dt"].min(),
- output_df["pub_dt"].max(),
- avg_date.replace("-", ""),
- 0,
- fans_dict.get(avg_date, 0),
- ),
- )
- except Exception as e:
- insert_error_list.append(str(e))
- # 入口函数
- async def deal(self, end_date: str | None):
- if not end_date:
- end_date = datetime.now().strftime("%Y-%m-%d")
- start_dt = self.generate_stat_duration(end_date)
- fans_mapper = await self.get_fans_for_each_date(start_date=start_dt)
- accounts = await self.get_publishing_accounts()
- for account in tqdm(accounts, desc="计算单个账号阅读率均值"):
- if account["gh_id"] in self.FORBIDDEN_GH_IDS:
- continue
- published_articles = await self.get_single_account_published_articles(
- gh_id=account["gh_id"],
- start_timestamp=int(
- datetime.strptime(start_dt, "%Y-%m-%d").timestamp()
- ),
- )
- article_dataframe = await self.cal_read_rate_for_single_account(
- publish_details=published_articles,
- gh_id=account["gh_id"],
- fans_mapper=fans_mapper,
- )
- if article_dataframe is None:
- continue
- if article_dataframe.empty:
- continue
- await self.update_read_rate_avg_for_each_account(
- account=account,
- start_date=start_dt,
- end_dt=end_date,
- df=article_dataframe,
- fans_dict=fans_mapper.get(account["gh_id"], {}),
- )
- class AccountPositionReadAvg(AccountPositionReadRateAvg):
- # 计算阅读均值置信区间上限
- async def cal_read_avg_ci_upper(self, gh_id: str, index: int):
- fetch_query = f"""
- select read_avg, update_time
- from account_avg_info_v3
- where gh_id = %s and position = %s
- order by update_time desc limit 30;
- """
- fetch_response_list = await self.pool.async_fetch(
- query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, index)
- )
- read_avg_list = [
- i["read_avg"] for i in fetch_response_list if i["read_avg"] is not None
- ]
- n = len(read_avg_list)
- mean = np.mean(read_avg_list)
- std = np.std(read_avg_list, ddof=1)
- se = std / np.sqrt(n)
- t = stats.t.ppf(0.975, df=n - 1)
- upper_t = mean + t * se
- return upper_t
- # 获取账号的阅读率均值信息
- async def get_accounts_read_avg(self, dt):
- query = """
- select gh_id, position, fans, read_rate_avg, fans * read_rate_avg as read_avg
- from long_articles_read_rate
- where dt_version = %s
- """
- fetch_result = await self.pool.async_fetch(
- query=query, params=(dt.replace("-", ""),)
- )
- response = {}
- for item in fetch_result:
- key = f"{item['gh_id']}_{item['position']}"
- response[key] = {
- "read_rate_avg": item["read_rate_avg"],
- "read_avg": item["read_avg"],
- "fans": item["fans"],
- }
- return response
- # 计算阅读均值置信区间上限
- async def cal_read_avg_detail(
- self, account: Dict, dt: str, account_with_read_rate_avg: Dict
- ):
- for index in self.ARTICLE_INDEX_LIST:
- key = f"{account['gh_id']}_{index}"
- print(key)
- if account_with_read_rate_avg.get(key) is None:
- continue
- read_avg = account_with_read_rate_avg[key]["read_avg"]
- # 计算阅读均值置信区间上限
- read_avg_ci_upper = await self.cal_read_avg_ci_upper(
- gh_id=account["gh_id"], index=index
- )
- await self.process_each_record(
- account=account,
- index=index,
- fans=account_with_read_rate_avg[key]["fans"],
- read_rate_avg=account_with_read_rate_avg[key]["read_rate_avg"],
- read_avg=read_avg,
- read_avg_ci_upper=read_avg_ci_upper,
- dt=dt,
- )
- async def process_each_record(
- self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
- ):
- gh_id = account["gh_id"]
- account_name = account["account_name"]
- business_type = (
- self.TOULIU if account_name in self.TOULIU_ACCOUNTS else self.ARTICLES_DAILY
- )
- # insert into database
- insert_sql = f"""
- insert into account_avg_info_v3
- (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
- account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
- values
- (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
- """
- try:
- await self.pool.async_save(
- query=insert_sql,
- db_name="piaoquan_crawler",
- params=(
- gh_id,
- index,
- dt,
- account["account_name"],
- fans,
- read_avg,
- self.DEFAULT_LIKE,
- self.USING_STATUS,
- account["account_type"],
- account["mode_type"],
- account["account_source"],
- account["status"],
- business_type,
- read_rate_avg,
- read_avg_ci_upper,
- ),
- )
- except Exception as e:
- print(e)
- update_sql = f"""
- update account_avg_info_v3
- set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
- where gh_id = %s and position = %s and update_time = %s
- """
- try:
- await self.pool.async_save(
- query=update_sql,
- db_name="piaoquan_crawler",
- params=(
- fans,
- read_avg,
- read_rate_avg,
- read_avg_ci_upper,
- account["gh_id"],
- index,
- dt,
- ),
- )
- except Exception as e:
- print(e)
- # 修改前一天的状态为 0
- update_status_sql = f"""
- UPDATE account_avg_info_v3
- SET status = %s
- WHERE update_time != %s AND gh_id = %s AND position = %s;
- """
- await self.pool.async_save(
- query=update_status_sql,
- db_name="piaoquan_crawler",
- params=(self.NOT_USING_STATUS, dt, gh_id, index),
- )
- async def deal(self, end_date: str | None):
- if not end_date:
- end_date = datetime.now().strftime("%Y-%m-%d")
- dt = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=1)).strftime(
- "%Y-%m-%d"
- )
- account_with_read_rate_avg = await self.get_accounts_read_avg(dt)
- accounts = await self.get_publishing_accounts()
- for account in tqdm(accounts, desc="计算单个账号的阅读均值"):
- if account["gh_id"] in self.FORBIDDEN_GH_IDS:
- continue
- try:
- await self.cal_read_avg_detail(
- account=account,
- dt=dt,
- account_with_read_rate_avg=account_with_read_rate_avg,
- )
- except Exception as e:
- print(f"计算账号 {account['account_name']} 阅读均值失败 : {e}")
- print(traceback.format_exc())
- class AccountPositionOpenRateAvg(AccountPositionReadRateAvg):
- async def get_account_open_rate(self, gh_id: str, date_string: str) -> float:
- fetch_query = f"""
- select
- sum(view_count) as 'total_read',
- sum(first_level) as 'total_first_level',
- sum(first_level) / sum(view_count) as 'avg_open_rate'
- from datastat_sort_strategy
- where gh_id = '{gh_id}' and date_str between date_sub(str_to_date('{date_string}', '%Y%m%d'), interval {self.STAT_PERIOD} day)
- and str_to_date('{date_string}', '%Y%m%d');
- """
- res = await self.pool.async_fetch(query=fetch_query)
- return float(res[0]["avg_open_rate"]) if res else 0.0
- async def set_avg_open_rate_for_each_account(
- self, gh_id: str, date_string: str, avg_read_rate: float
- ) -> int:
- update_query = """
- update account_avg_info_v3
- set open_rate_avg = %s
- where gh_id = %s and update_time = %s;
- """
- return await self.pool.async_save(
- query=update_query,
- db_name="piaoquan_crawler",
- params=(avg_read_rate, gh_id, date_string),
- )
- async def deal(self, date_string: str | None):
- if not date_string:
- date_string = datetime.now().strftime("%Y-%m-%d")
- dt = (datetime.strptime(date_string, "%Y-%m-%d") - timedelta(days=1)).strftime(
- "%Y-%m-%d"
- )
- account_list = await self.get_publishing_accounts()
- for account in tqdm(account_list, desc="计算单个账号的打开率均值"):
- if account["gh_id"] in self.FORBIDDEN_GH_IDS:
- continue
- try:
- avg_open_rate = await self.get_account_open_rate(
- gh_id=account["gh_id"], date_string=dt.replace("-", "")
- )
- await self.set_avg_open_rate_for_each_account(
- gh_id=account["gh_id"],
- date_string=dt,
- avg_read_rate=avg_open_rate,
- )
- except Exception as e:
- print(f"计算账号 {account['account_name']} 打开率均值失败 : {e}")
- print(traceback.format_exc())
- continue
|