Переглянути джерело

Merge branch '2025-04-21-luojunhui-read-avg-improve' of luojunhui/LongArticlesJob into master

luojunhui 6 місяців тому
батько
коміт
0d894664a6

+ 3 - 0
applications/const/__init__.py

@@ -153,6 +153,9 @@ class UpdateAccountReadAvgTaskConst:
     # default upper_quantile, confidence = 0.95
     DEFAULT_UPPER_QUANTILE = 0.975
 
+    ACCOUNT_READ_RATE_TABLE = "long_articles_read_rate"
+    ACCOUNT_READ_AVG_TABLE = "account_avg_info_v3"
+
 
 class WeixinVideoCrawlerConst:
     """

+ 2 - 1
requirements.txt

@@ -26,4 +26,5 @@ tenacity~=9.0.0
 scikit-learn~=1.6.1
 google~=3.0.0
 cffi~=1.17.1
-lxml~=5.3.2
+lxml~=5.3.2
+scipy~=1.15.2

+ 200 - 0
tasks/data_tasks/account_position_read_avg_task.py

@@ -0,0 +1,200 @@
+"""
+计算各个账号的阅读均值,以及阅读均值区间估计的上限
+"""
+
+import json
+
+import numpy as np
+from tqdm import tqdm
+from scipy import stats
+from pymysql.cursors import DictCursor
+
+from applications.const import UpdateAccountReadAvgTaskConst
+from applications.db import DatabaseConnector
+from applications.utils import fetch_account_fans
+from applications.utils import fetch_publishing_account_list
+from config import apolloConfig
+from config import long_articles_config, denet_config, piaoquan_crawler_config
+
+config = apolloConfig()
+const = UpdateAccountReadAvgTaskConst()
+unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
+touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
+backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
+
+
+class AccountPositionReadAvgTask(object):
+
+    def __init__(self):
+        # init piaoquan crawler db client
+        self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
+        self.piaoquan_crawler_db_client.connect()
+
+        # init long articles db client
+        self.long_articles_db_client = DatabaseConnector(long_articles_config)
+        self.long_articles_db_client.connect()
+
+        #  init aigc db client
+        self.denet_db_client = DatabaseConnector(denet_config)
+        self.denet_db_client.connect()
+
+    def fetch_read_rate_avg_for_each_account(self, dt):
+        dt = int(dt.replace("-", ""))
+        sql = f"""
+            select gh_id, position, read_rate_avg
+            from {const.ACCOUNT_READ_RATE_TABLE}
+            where dt_version = {dt};
+        """
+        fetch_response_list = self.long_articles_db_client.fetch(
+            query=sql, cursor_type=DictCursor
+        )
+        account_read_rate_dict = {}
+        for item in fetch_response_list:
+            key = "{}_{}".format(item["gh_id"], item["position"])
+            account_read_rate_dict[key] = item["read_rate_avg"]
+        return account_read_rate_dict
+
+    def cal_read_avg_ci(self, gh_id, position):
+        """
+        计算阅读均值的置信区间
+        """
+        fetch_query = f"""
+            select read_avg, update_time
+            from {const.ACCOUNT_READ_AVG_TABLE}
+            where gh_id = %s and position = %s 
+            order by update_time desc limit 30;
+        """
+        fetch_response_list = self.piaoquan_crawler_db_client.fetch(
+            query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
+        )
+        read_avg_list = [i["read_avg"] for i in fetch_response_list]
+        n = len(read_avg_list)
+        mean = np.mean(read_avg_list)
+        std = np.std(read_avg_list, ddof=1)
+        se = std / np.sqrt(n)
+        t = stats.t.ppf(0.975, df=n - 1)
+        upper_t = mean + t * se
+        return upper_t
+
+    def process_each_record(
+        self, account, index, fans, read_rate_avg, read_avg, read_avg_ci_upper, dt
+    ):
+        gh_id = account["gh_id"]
+        business_type = (
+            const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
+        )
+        # insert into database
+        insert_sql = f"""
+            insert into {const.ACCOUNT_READ_AVG_TABLE}
+            (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type, 
+            account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        try:
+            self.piaoquan_crawler_db_client.save(
+                query=insert_sql,
+                params=(
+                    gh_id,
+                    index,
+                    dt,
+                    account["account_name"],
+                    fans,
+                    read_avg,
+                    const.DEFAULT_LIKE,
+                    const.USING_STATUS,
+                    account["account_type"],
+                    account["mode_type"],
+                    account["account_source"],
+                    account["status"],
+                    business_type,
+                    read_rate_avg,
+                    read_avg_ci_upper,
+                ),
+            )
+        except Exception as e:
+            update_sql = f"""
+                update {const.ACCOUNT_READ_AVG_TABLE}
+                set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
+                where gh_id = %s and position = %s and update_time = %s
+            """
+            try:
+                self.piaoquan_crawler_db_client.save(
+                    query=update_sql,
+                    params=(
+                        fans,
+                        read_avg,
+                        read_rate_avg,
+                        read_avg_ci_upper,
+                        account["gh_id"],
+                        index,
+                        dt,
+                    ),
+                )
+            except Exception as e:
+                print(e)
+
+        # 修改前一天的状态为 0
+        update_status_sql = f"""
+                        update {const.ACCOUNT_READ_AVG_TABLE}
+                        set status = %s
+                        where update_time != %s and gh_id = %s and position = %s;
+                    """
+        self.piaoquan_crawler_db_client.save(
+            query=update_status_sql,
+            params=(const.NOT_USING_STATUS, dt, account["gh_id"], index),
+        )
+
+    def cal_read_avg_for_each_account(self, account, fans_dict, read_rate_avg_dict, dt):
+        gh_id = account["gh_id"]
+
+        fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
+
+        # use unauthorized account's fans if not found in aigc
+        if not fans:
+            fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
+
+        # use backup account's fans if not found in aigc
+        if not fans:
+            fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
+
+        if fans:
+            for index in const.ARTICLE_INDEX_LIST:
+                gh_id_position = "{}_{}".format(gh_id, index)
+                if read_rate_avg_dict.get(gh_id_position):
+                    # fetch read rate avg
+                    read_rate_avg = read_rate_avg_dict[gh_id_position]
+                    # cal read avg
+                    read_avg = fans * read_rate_avg
+
+                    # cal read avg ci upper
+                    read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
+
+                    # insert into database
+                    self.process_each_record(
+                        account,
+                        index,
+                        fans,
+                        read_rate_avg,
+                        read_avg,
+                        read_avg_ci_upper,
+                        dt,
+                    )
+
+    def do_task_list(self, dt):
+        """
+        do it
+        """
+        # get fans dict from aigc
+        fans_dict = fetch_account_fans(self.denet_db_client, dt)
+
+        # get publishing account list from aigc
+        account_list = fetch_publishing_account_list(self.denet_db_client)
+
+        # fetch each account's read avg for each position
+        read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
+
+        for account in tqdm(account_list, desc=dt):
+            self.cal_read_avg_for_each_account(
+                account, fans_dict, read_rate_avg_dict, dt
+            )

+ 9 - 188
updateAccountV3.py

@@ -1,190 +1,9 @@
-"""
-@author: luojunhui
-"""
-import json
 import time
 
-import numpy as np
-from scipy import stats
-from tqdm import tqdm
 from datetime import datetime, timedelta
 from argparse import ArgumentParser
-from pymysql.cursors import DictCursor
 
-from applications.const import UpdateAccountReadAvgTaskConst
-from applications.db import DatabaseConnector
-from applications.utils import fetch_account_fans
-from applications.utils import fetch_publishing_account_list
-from config import apolloConfig
-from config import long_articles_config, denet_config, piaoquan_crawler_config
-
-read_rate_table = "long_articles_read_rate"
-read_avg_table = "account_avg_info_v3"
-config = apolloConfig()
-const = UpdateAccountReadAvgTaskConst()
-unauthorized_account = json.loads(config.getConfigValue("unauthorized_gh_id_fans"))
-touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
-backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
-
-class UpdateAccountInfoVersion3(object):
-    """
-    更新账号的平均阅读率
-    """
-
-    def __init__(self):
-        # init piaoquan crawler db client
-        self.piaoquan_crawler_db_client = DatabaseConnector(piaoquan_crawler_config)
-        self.piaoquan_crawler_db_client.connect()
-
-        # init long articles db client
-        self.long_articles_db_client = DatabaseConnector(long_articles_config)
-        self.long_articles_db_client.connect()
-
-        #  init aigc db client
-        self.denet_db_client = DatabaseConnector(denet_config)
-        self.denet_db_client.connect()
-
-    def fetch_read_rate_avg_for_each_account(self, dt):
-        """
-        从长文数据库获取账号阅读均值
-        :return:
-        """
-        dt = int(dt.replace("-", ""))
-        sql = f"""
-            select gh_id, position, read_rate_avg
-            from {read_rate_table}
-            where dt_version = {dt};
-        """
-        fetch_response_list = self.long_articles_db_client.fetch(query=sql, cursor_type=DictCursor)
-        account_read_rate_dict = {}
-        for item in fetch_response_list:
-            key = "{}_{}".format(item['gh_id'], item['position'])
-            account_read_rate_dict[key] = item['read_rate_avg']
-        return account_read_rate_dict
-
-    def cal_read_avg_ci(self, gh_id, position):
-        """
-        计算阅读均值的置信区间
-        """
-        fetch_query = f"""
-            select read_avg
-            from {read_avg_table}
-            where gh_id = %s and position = %s 
-            order by update_time desc limit {const.STAT_PERIOD};
-        """
-        fetch_response_list = self.piaoquan_crawler_db_client.fetch(
-            query=fetch_query, params=(gh_id, position), cursor_type=DictCursor
-        )
-        read_avg_list = [i["read_avg"] for i in fetch_response_list]
-        n = len(read_avg_list)
-        mean = np.mean(read_avg_list)
-        std = np.std(read_avg_list, ddof=1)
-        se = std / np.sqrt(n)
-        t = stats.t.ppf(const.DEFAULT_UPPER_QUANTILE, df=n - 1)
-        upper_t = mean + t * se
-        return upper_t
-
-    def do_task_list(self, dt):
-        """
-        do it
-        """
-        # get fans dict from aigc
-        fans_dict = fetch_account_fans(self.denet_db_client, dt)
-
-        # get publishing account list from aigc
-        account_list = fetch_publishing_account_list(self.denet_db_client)
-
-        # fetch each account's read avg for each position
-        read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
-
-        for account in tqdm(account_list, desc=dt):
-            gh_id = account["gh_id"]
-            business_type = const.TOULIU if gh_id in touliu_accounts else const.ARTICLES_DAILY
-            fans = fans_dict.get(gh_id, {}).get(dt, const.DEFAULT_FANS)
-
-            # use unauthorized account's fans if not found in aigc
-            if not fans:
-                fans = int(unauthorized_account.get(gh_id, const.DEFAULT_FANS))
-
-            # use backup account's fans if not found in aigc
-            if not fans:
-                fans = int(backup_account_fans.get(gh_id, const.DEFAULT_FANS))
-
-            if fans:
-                for index in const.ARTICLE_INDEX_LIST:
-                    gh_id_position = "{}_{}".format(gh_id, index)
-                    if read_rate_avg_dict.get(gh_id_position):
-                        # fetch read rate avg
-                        read_rate_avg = read_rate_avg_dict[gh_id_position]
-                        # cal read avg
-                        read_avg = fans * read_rate_avg
-
-                        # cal read avg ci upper
-                        read_avg_ci_upper = self.cal_read_avg_ci(gh_id, index)
-
-                        # insert into database
-                        insert_sql = f"""
-                            insert into {read_avg_table}
-                            (gh_id, position, update_time, account_name, fans, read_avg, like_avg, status, account_type,
-                             account_mode, account_source, account_status, business_type, read_rate_avg, read_avg_ci_upper)
-                            values
-                            (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
-                        """
-                        try:
-                            self.piaoquan_crawler_db_client.save(
-                                query=insert_sql,
-                                params=(
-                                    gh_id,
-                                    index,
-                                    dt,
-                                    account['account_name'],
-                                    fans,
-                                    read_avg,
-                                    const.DEFAULT_LIKE,
-                                    const.USING_STATUS,
-                                    account['account_type'],
-                                    account['mode_type'],
-                                    account['account_source'],
-                                    account['status'],
-                                    business_type,
-                                    read_rate_avg,
-                                    read_avg_ci_upper
-                                )
-                            )
-                        except Exception as e:
-                            update_sql = f"""
-                                update {read_avg_table}
-                                set fans = %s, read_avg = %s, read_rate_avg = %s, read_avg_ci_upper = %s
-                                where gh_id = %s and position = %s and update_time = %s
-                            """
-                            try:
-                                self.piaoquan_crawler_db_client.save(
-                                    query=update_sql,
-                                    params=(
-                                        fans,
-                                        read_avg,
-                                        read_rate_avg,
-                                        read_avg_ci_upper,
-                                        account['gh_id'],
-                                        index,
-                                        dt
-                                    )
-                                )
-                            except Exception as e:
-                                print(e)
-
-                        # 修改前一天的状态为 0
-                        update_status_sql = f"""
-                            update {read_avg_table}
-                            set status = %s
-                            where update_time != %s and gh_id = %s and position = %s;
-                        """
-                        self.piaoquan_crawler_db_client.save(
-                            query=update_status_sql,
-                            params=(
-                                const.NOT_USING_STATUS, dt, account['gh_id'], index
-                            )
-                        )
+from tasks.data_tasks.account_position_read_avg_task import AccountPositionReadAvgTask
 
 
 def main():
@@ -193,20 +12,22 @@ def main():
     :return:
     """
     parser = ArgumentParser()
-    parser.add_argument("--run-date",
-                        help="Run only once for date in format of %Y-%m-%d. \
-                                If no specified, run as daily jobs.")
+    parser.add_argument(
+        "--run-date",
+        help="Run only once for date in format of %Y-%m-%d. \
+                                If no specified, run as daily jobs.",
+    )
     args = parser.parse_args()
-    update_account_read_avg_task = UpdateAccountInfoVersion3()
+    update_account_read_avg_task = AccountPositionReadAvgTask()
     if args.run_date:
         update_account_read_avg_task.do_task_list(dt=args.run_date)
     else:
         dt_object = datetime.fromtimestamp(int(time.time()))
         one_day = timedelta(days=1)
         yesterday = dt_object - one_day
-        yesterday_str = yesterday.strftime('%Y-%m-%d')
+        yesterday_str = yesterday.strftime("%Y-%m-%d")
         update_account_read_avg_task.do_task_list(dt=yesterday_str)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     main()