Pārlūkot izejas kodu

阅读均值-阅读率均值优化

luojunhui 3 mēneši atpakaļ
vecāks
revīzija
bcd648c174

+ 4 - 1
applications/tasks/analysis_task/__init__.py

@@ -1,3 +1,6 @@
 from .crawler_detail import CrawlerDetailDeal
+from .account_position_info import AccountPositionReadRateAvg
+from .account_position_info import AccountPositionReadAvg
 
-__all__ = ["CrawlerDetailDeal"]
+
+__all__ = ["CrawlerDetailDeal", "AccountPositionReadRateAvg", "AccountPositionReadAvg"]

+ 516 - 0
applications/tasks/analysis_task/account_position_info.py

@@ -0,0 +1,516 @@
+import asyncio
+import traceback
+
+import numpy as np
+from collections import defaultdict
+from typing import Dict, List
+from pandas import DataFrame
+from scipy import stats
+from tqdm.asyncio import tqdm
+
+
+from datetime import datetime, timedelta
+
+
+class AccountPositionInfoConst:
+    # 阅读率统计周期(秒)
+    STATISTICS_PERIOD = 31 * 24 * 60 * 60
+    # 一天的秒数
+    ONE_DAY_IN_SECONDS = 60 * 60 * 24
+    # 相对变化率阈值
+    RELATIVE_VALUE_THRESHOLD = 0.1
+    # 发文类型
+    UNLIMITED_PUBLISH_TYPE = 10002
+    BULK_PUBLISH_TYPE = 9
+    # 文章位置
+    ARTICLE_INDEX_LIST = [1, 2, 3, 4, 5, 6, 7, 8]
+
+    # 默认粉丝
+    DEFAULT_FANS = 0
+
+    # 最低粉丝量
+    MIN_FANS = 1000
+
+    ARTICLES_DAILY = 1
+    TOULIU = 2
+
+    # 默认点赞
+    DEFAULT_LIKE = 0
+
+    # 状态
+    USING_STATUS = 1
+    NOT_USING_STATUS = 0
+
+    # 服务号
+    GROUP_ACCOUNT_SET = {
+        "gh_9cf3b7ff486b",
+        "gh_ecb21c0453af",
+        "gh_45beb952dc74",
+        # "gh_84e744b16b3a",
+        "gh_b3ffc1ca3a04",
+        "gh_b8baac4296cb",
+        "gh_efaf7da157f5",
+        # "gh_5855bed97938",
+        "gh_b32125c73861",
+        "gh_761976bb98a6",
+        "gh_5e543853d8f0",
+        # "gh_61a72b720de3",
+    }
+
+    # 违禁账号
+    FORBIDDEN_GH_IDS = {
+        "gh_4c058673c07e",
+        "gh_de9f9ebc976b",
+        "gh_7b4a5f86d68c",
+        "gh_f902cea89e48",
+        "gh_789a40fe7935",
+        "gh_cd041ed721e6",
+        "gh_62d7f423f382",
+        "gh_043223059726",
+        "gh_6cfd1132df94",
+        "gh_7f5075624a50",
+        "gh_d4dffc34ac39",
+        "gh_c69776baf2cd",
+        "gh_9877c8541764",
+        "gh_ac43e43b253b",
+        "gh_93e00e187787",
+        "gh_080bb43aa0dc",
+        "gh_b1c71a0e7a85",
+        "gh_d5f935d0d1f2",
+    }
+
+    # 投流账号
+    TOULIU_ACCOUNTS = {
+        "小阳看天下",
+        "趣味生活方式",
+        "趣味生活漫时光",
+        "史趣探秘",
+        "暖心一隅",
+        "趣味生活漫谈",
+        "历史长河流淌",
+        "美好意义时光",
+        "银发生活畅谈",
+        "美好时光阅读汇",
+        "时光趣味生活",
+    }
+
+
+class AccountPositionReadRateAvg(AccountPositionInfoConst):
+    """计算账号每个位置评价阅读率"""
+
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    # 生成统计周期
+    def generate_stat_duration(self, end_date: str) -> str:
+        end_date_dt = datetime.strptime(end_date, "%Y-%m-%d")
+        start_date_dt = end_date_dt - timedelta(seconds=self.STATISTICS_PERIOD)
+        return start_date_dt.strftime("%Y-%m-%d")
+
+    # 获取发文账号
+    async def get_publishing_accounts(self):
+        query = """
+            select distinct
+                t3.name as account_name,
+                t3.gh_id as gh_id,
+                group_concat(distinct t4.remark) as account_remark,
+                t6.account_source_name as account_source,
+                t6.mode_type as mode_type,
+                t6.account_type as account_type,
+                t6.`status` as status
+            from
+                publish_plan t1
+                join publish_plan_account t2 on t1.id = t2.plan_id
+                join publish_account t3 on t2.account_id = t3.id
+                left join publish_account_remark t4 on t3.id = t4.publish_account_id
+                left join wx_statistics_group_source_account t5 on t3.id = t5.account_id
+                left join wx_statistics_group_source t6 on t5.group_source_name = t6.account_source_name
+            where t1.plan_status = 1 and t1.content_modal = 3 and t3.channel = 5
+            group by t3.id;
+        """
+        account_list = await self.pool.async_fetch(query, db_name="aigc")
+        return [i for i in account_list if "自动回复" not in str(i["account_remark"])]
+
+    # 获取统计周期内,每个账号的粉丝量
+    async def get_fans_for_each_date(self, start_date: str):
+        # 获取订阅号粉丝量
+        query = """
+            SELECT t1.date_str as dt, 
+                   CASE 
+                        WHEN t1.fans_count IS NULL OR t1.fans_count = 0 THEN t2.follower_count
+                        ELSE t1.fans_count
+                    END	AS fans, 
+                   t2.gh_id as gh_id
+            FROM datastat_wx t1 JOIN publish_account t2 ON t1.account_id = t2.id
+            WHERE t2.channel = 5 AND t2.status = 1 AND t1.date_str >= %s;
+        """
+        task1 = self.pool.async_fetch(query=query, db_name="aigc", params=(start_date,))
+
+        if self.GROUP_ACCOUNT_SET:
+            gh_ids = tuple(self.GROUP_ACCOUNT_SET)
+            placeholders = ",".join(["%s"] * len(gh_ids))
+            query_group = f"""
+                    SELECT gh_id, publish_date AS dt, CAST(SUM(sent_count) / 8 AS SIGNED) AS fans
+                    FROM long_articles_group_send_result
+                    WHERE publish_date >= %s AND gh_id IN ({placeholders})
+                    GROUP BY publish_date, gh_id;
+            """
+            params_group = (start_date, *gh_ids)
+            task2 = self.pool.async_fetch(query=query_group, params=params_group)
+        else:
+            # 没有 group 账号,返回空列表
+            task2 = asyncio.sleep(0, result=[])
+
+        account_with_fans, group_account_with_fans = await asyncio.gather(task1, task2)
+
+        # 合并粉丝数据
+        account_dt_fans_mapper: Dict[str, Dict[str, int]] = defaultdict(dict)
+
+        # 订阅号
+        for item in account_with_fans or []:
+            gh_id = item["gh_id"]
+            dt = item["dt"]
+            fans = int(item.get("fans") or 0)
+            account_dt_fans_mapper[gh_id][dt] = fans
+
+        # 服务号(覆盖相同 gh_id + dt)
+        for item in group_account_with_fans or []:
+            gh_id = item["gh_id"]
+            dt = item["dt"]
+            fans = int(item.get("fans") or 0)
+            account_dt_fans_mapper[gh_id][dt] = fans
+
+        return account_dt_fans_mapper
+
+    # 从数据库获取账号群发文章 && 群发数据
+    async def get_single_account_published_articles(
+        self, gh_id: str, start_timestamp: int
+    ):
+        query = """
+            SELECT 
+                ghId as gh_id, accountName as account_name,
+                ItemIndex as position, 
+                CAST(AVG(show_view_count) AS SIGNED) as read_count,
+                FROM_UNIXTIME(publish_timestamp, '%%Y-%%m-%%d') AS pub_dt
+            FROM 
+                official_articles_v2
+            WHERE 
+                ghId = %s and Type = %s and publish_timestamp >= %s
+            GROUP BY ghId, accountName, ItemIndex, pub_dt;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(gh_id, self.BULK_PUBLISH_TYPE, start_timestamp),
+        )
+
+    # 计算单个账号的每篇文章的阅读率
+    async def cal_read_rate_for_single_account(
+        self,
+        publish_details: List[Dict],
+        gh_id: str,
+        fans_mapper: Dict[str, Dict[str, int]],
+    ) -> DataFrame | None:
+        if not publish_details:
+            return None
+
+        article_list_with_fans = []
+        for article in publish_details:
+            fans = fans_mapper.get(gh_id, {}).get(article["pub_dt"], self.DEFAULT_FANS)
+
+            if not fans:
+                print(
+                    f"账号 {article['account_name']} 在 {article['pub_dt']} 没有粉丝数据"
+                )
+                continue
+
+            article["fans"] = fans
+            if fans > self.MIN_FANS:
+                article["read_rate"] = article["read_count"] / fans if fans else 0
+                article_list_with_fans.append(article)
+
+        # 转化为 DataFrame 方便后续处理
+        return DataFrame(
+            article_list_with_fans,
+            columns=[
+                "gh_id",
+                "account_name",
+                "position",
+                "read_count",
+                "pub_dt",
+                "fans",
+                "read_rate",
+            ],
+        )
+
+    # 更新账号阅读率均值并且更新数据库
+    async def update_read_rate_avg_for_each_account(
+        self,
+        account: dict,
+        start_date: str,
+        end_dt: str,
+        df: DataFrame,
+        fans_dict: Dict[str, int],
+    ):
+        avg_date = (datetime.strptime(end_dt, "%Y-%m-%d") - timedelta(days=1)).strftime(
+            "%Y-%m-%d"
+        )
+
+        insert_error_list = []
+        for index in self.ARTICLE_INDEX_LIST:
+            # 过滤
+            filter_df = df[
+                (df["position"] == index)
+                & (df["pub_dt"] < end_dt)
+                & (df["pub_dt"] >= start_date)
+            ]
+            read_average = filter_df["read_count"].mean()
+            read_std = filter_df["read_count"].std()
+
+            output_df = filter_df[
+                (filter_df["read_count"] > read_average - 2 * read_std)
+                & (filter_df["read_count"] < read_average + 2 * read_std)
+            ]
+
+            records = len(output_df)
+            if records:
+                # todo: 需要检查波动
+                # if index <= 2:
+                #     print("position need to be checked")
+                # insert
+                try:
+                    insert_query = """
+                        INSERT INTO long_articles_read_rate
+                            (account_name, gh_id, position, read_rate_avg, remark, articles_count, earliest_publish_time, latest_publish_time, dt_version, is_delete, fans)
+                        VALUES 
+                        (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+                    """
+                    await self.pool.async_save(
+                        query=insert_query,
+                        params=(
+                            account["account_name"],
+                            account["gh_id"],
+                            index,
+                            output_df["read_rate"].mean(),
+                            "从 {} 开始往前计算 31  天".format(start_date),
+                            records,
+                            output_df["pub_dt"].min(),
+                            output_df["pub_dt"].max(),
+                            avg_date.replace("-", ""),
+                            0,
+                            fans_dict.get(avg_date, 0),
+                        ),
+                    )
+
+                except Exception as e:
+                    insert_error_list.append(str(e))
+
+    # 入口函数
+    async def deal(self, end_date: str | None):
+        if not end_date:
+            end_date = datetime.now().strftime("%Y-%m-%d")
+
+        start_dt = self.generate_stat_duration(end_date)
+
+        fans_mapper = await self.get_fans_for_each_date(start_date=start_dt)
+
+        accounts = await self.get_publishing_accounts()
+        for account in tqdm(accounts, desc="计算单个账号阅读率均值"):
+            if account["gh_id"] in self.FORBIDDEN_GH_IDS:
+                continue
+
+            published_articles = await self.get_single_account_published_articles(
+                gh_id=account["gh_id"],
+                start_timestamp=int(
+                    datetime.strptime(start_dt, "%Y-%m-%d").timestamp()
+                ),
+            )
+            article_dataframe = await self.cal_read_rate_for_single_account(
+                publish_details=published_articles,
+                gh_id=account["gh_id"],
+                fans_mapper=fans_mapper,
+            )
+            if article_dataframe is None:
+                continue
+
+            if article_dataframe.empty:
+                continue
+
+            await self.update_read_rate_avg_for_each_account(
+                account=account,
+                start_date=start_dt,
+                end_dt=end_date,
+                df=article_dataframe,
+                fans_dict=fans_mapper.get(account["gh_id"], {}),
+            )
+
+
+class AccountPositionReadAvg(AccountPositionReadRateAvg):
+    # 计算阅读均值置信区间上限
+    async def cal_read_avg_ci_upper(self, gh_id: str, index: int):
+        fetch_query = f"""
+            select read_avg, update_time
+            from account_avg_info_v3
+            where gh_id = %s and position = %s 
+            order by update_time desc limit 30;
+        """
+        fetch_response_list = await self.pool.async_fetch(
+            query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, index)
+        )
+        read_avg_list = [i["read_avg"] for i in fetch_response_list]
+        n = len(read_avg_list)
+        mean = np.mean(read_avg_list)
+        std = np.std(read_avg_list, ddof=1)
+        se = std / np.sqrt(n)
+        t = stats.t.ppf(0.975, df=n - 1)
+        upper_t = mean + t * se
+        return upper_t
+
+    # 获取账号的阅读率均值信息
+    async def get_accounts_read_avg(self, dt):
+        query = """
+            select gh_id, position, fans, read_rate_avg, fans * read_rate_avg as read_avg
+            from long_articles_read_rate
+            where dt_version = %s
+        """
+        fetch_result = await self.pool.async_fetch(query=query, params=(dt.replace("-", ""),))
+        response = {}
+        for item in fetch_result:
+            key = f"{item['gh_id']}_{item['position']}"
+            response[key] = {
+                "read_rate_avg": item["read_rate_avg"],
+                "read_avg": item["read_avg"],
+                "fans": item["fans"],
+            }
+        return response
+
+    # 计算阅读均值置信区间上限
+    async def cal_read_avg_detail(
+        self, account: Dict, dt: str, account_with_read_rate_avg: Dict
+    ):
+        for index in self.ARTICLE_INDEX_LIST:
+            key = f"{account['gh_id']}_{index}"
+            print(key)
+            if account_with_read_rate_avg.get(key) is None:
+                continue
+
+            read_avg = account_with_read_rate_avg[key]["read_avg"]
+
+            # 计算阅读均值置信区间上限
+            read_avg_ci_upper = await self.cal_read_avg_ci_upper(
+                gh_id=account["gh_id"], index=index
+            )
+
+            await self.process_each_record(
+                account=account,
+                index=index,
+                fans=account_with_read_rate_avg[key]["fans"],
+                read_rate_avg=account_with_read_rate_avg[key]["read_rate_avg"],
+                read_avg=read_avg,
+                read_avg_ci_upper=read_avg_ci_upper,
+                dt=dt,
+            )
+
+    async def process_each_record(
+        self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
+    ):
+        gh_id = account["gh_id"]
+        account_name = account["account_name"]
+        business_type = (
+            self.TOULIU if account_name in self.TOULIU_ACCOUNTS else self.ARTICLES_DAILY
+        )
+        # insert into database
+        insert_sql = f"""
+            insert into account_avg_info_v3
+            (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, 
+            account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        try:
+            await self.pool.async_save(
+                query=insert_sql,
+                db_name="piaoquan_crawler",
+                params=(
+                    gh_id,
+                    index,
+                    dt,
+                    account["account_name"],
+                    fans,
+                    read_avg,
+                    self.DEFAULT_LIKE,
+                    self.USING_STATUS,
+                    account["account_type"],
+                    account["mode_type"],
+                    account["account_source"],
+                    account["status"],
+                    business_type,
+                    read_rate_avg,
+                    read_avg_ci_upper,
+                ),
+            )
+        except Exception as e:
+            print(e)
+            update_sql = f"""
+                update account_avg_info_v3
+                set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
+                where gh_id = %s and position = %s and update_time = %s
+            """
+            try:
+                await self.pool.async_save(
+                    query=update_sql,
+                    db_name="piaoquan_crawler",
+                    params=(
+                        fans,
+                        read_avg,
+                        read_rate_avg,
+                        read_avg_ci_upper,
+                        account["gh_id"],
+                        index,
+                        dt,
+                    ),
+                )
+            except Exception as e:
+                print(e)
+
+        # 修改前一天的状态为 0
+        update_status_sql = f"""
+            UPDATE account_avg_info_v3
+            SET status = %s
+            WHERE update_time != %s AND gh_id = %s AND position = %s;
+        """
+        await self.pool.async_save(
+            query=update_status_sql,
+            db_name="piaoquan_crawler",
+            params=(self.NOT_USING_STATUS, dt, gh_id, index),
+        )
+
+    async def deal(self, end_date: str | None):
+        if not end_date:
+            end_date = datetime.now().strftime("%Y-%m-%d")
+
+        dt = (datetime.strptime(end_date, "%Y-%m-%d") - timedelta(days=1)).strftime(
+            "%Y-%m-%d"
+        )
+        account_with_read_rate_avg = await self.get_accounts_read_avg(dt)
+        accounts = await self.get_publishing_accounts()
+
+        for account in tqdm(accounts, desc="计算单个账号的阅读均值"):
+            if account["gh_id"] in self.FORBIDDEN_GH_IDS:
+                continue
+
+            try:
+                await self.cal_read_avg_detail(
+                    account=account,
+                    dt=dt,
+                    account_with_read_rate_avg=account_with_read_rate_avg,
+                )
+            except Exception as e:
+                print(f"计算账号 {account['account_name']} 阅读均值失败 : {e}")
+                print(traceback.format_exc())
+
+
+class AccountPositonOpenRateAvg(AccountPositionReadRateAvg):
+    pass

+ 18 - 0
applications/tasks/task_handler.py

@@ -1,6 +1,8 @@
 from datetime import datetime
 
 from applications.tasks.analysis_task import CrawlerDetailDeal
+from applications.tasks.analysis_task import AccountPositionReadRateAvg
+from applications.tasks.analysis_task import AccountPositionReadAvg
 
 from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
 
@@ -248,5 +250,21 @@ class TaskHandler(TaskMapper):
         await task.deal(date_string=self.data.get("date_string"))
         return self.TASK_SUCCESS_STATUS
 
+    # 更新账号阅读率均值
+    async def _update_account_read_rate_avg_handler(self) -> int:
+        task = AccountPositionReadRateAvg(
+            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
+        )
+        await task.deal(end_date=self.data.get("end_date"))
+        return self.TASK_SUCCESS_STATUS
+
+     # 更新账号阅读均值
+    async def _update_account_read_avg_handler(self) -> int:
+        task = AccountPositionReadAvg(
+            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
+        )
+        await task.deal(end_date=self.data.get("end_date"))
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 4 - 0
applications/tasks/task_scheduler.py

@@ -203,6 +203,10 @@ class TaskScheduler(TaskHandler):
             "update_outside_account_article_root_source_id": self._update_outside_account_article_root_source_id_and_update_time_handler,
             # 更新限流账号信息
             "update_limited_account_info": self._update_limited_account_info_handler,
+            # 更新账号阅读率均值
+            "update_account_read_rate_avg": self._update_account_read_rate_avg_handler,
+            # 更新账号阅读均值
+            "update_account_read_avg": self._update_account_read_avg_handler,
         }
 
         if task_name not in handlers: