Prechádzať zdrojové kódy

Merge branch '2025-05-27-add-read-rate-avg-task' of luojunhui/LongArticlesJob into master

luojunhui 5 mesiacov pred
rodič
commit
6077cf1121

+ 85 - 4
tasks/data_tasks/account_position_read_avg_task.py

@@ -3,12 +3,14 @@
 """
 """
 
 
 import json
 import json
+import traceback
 
 
 import numpy as np
 import numpy as np
 from tqdm import tqdm
 from tqdm import tqdm
 from scipy import stats
 from scipy import stats
 from pymysql.cursors import DictCursor
 from pymysql.cursors import DictCursor
 
 
+from applications import log
 from applications.const import UpdateAccountReadAvgTaskConst
 from applications.const import UpdateAccountReadAvgTaskConst
 from applications.db import DatabaseConnector
 from applications.db import DatabaseConnector
 from applications.utils import fetch_account_fans
 from applications.utils import fetch_account_fans
@@ -23,7 +25,7 @@ touliu_accounts = set(json.loads(config.getConfigValue("touliu_gh_id_list")))
 backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 backup_account_fans = json.loads(config.getConfigValue("backup_account_fans"))
 
 
 
 
-class AccountPositionReadAvgTask(object):
+class AccountDataTask:
 
 
     def __init__(self):
     def __init__(self):
         # init piaoquan crawler db client
         # init piaoquan crawler db client
@@ -38,6 +40,9 @@ class AccountPositionReadAvgTask(object):
         self.denet_db_client = DatabaseConnector(denet_config)
         self.denet_db_client = DatabaseConnector(denet_config)
         self.denet_db_client.connect()
         self.denet_db_client.connect()
 
 
+
+class AccountPositionReadAvgTask(AccountDataTask):
+
     def fetch_read_rate_avg_for_each_account(self, dt):
     def fetch_read_rate_avg_for_each_account(self, dt):
         dt = int(dt.replace("-", ""))
         dt = int(dt.replace("-", ""))
         sql = f"""
         sql = f"""
@@ -195,6 +200,82 @@ class AccountPositionReadAvgTask(object):
         read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
         read_rate_avg_dict = self.fetch_read_rate_avg_for_each_account(dt)
 
 
         for account in tqdm(account_list, desc=dt):
         for account in tqdm(account_list, desc=dt):
-            self.cal_read_avg_for_each_account(
-                account, fans_dict, read_rate_avg_dict, dt
-            )
+            try:
+                self.cal_read_avg_for_each_account(
+                    account, fans_dict, read_rate_avg_dict, dt
+                )
+            except Exception as e:
+                log(
+                    task="account_read_avg_producer",
+                    function="do_task_list",
+                    status="fail",
+                    message=str(e),
+                    data={
+                        "gh_id": account["gh_id"],
+                        "date": dt.replace("-", ""),
+                        "traceback": traceback.format_exc(),
+                    },
+                )
+
+
+class AccountOpenRateAvgTask(AccountDataTask):
+    """
+    cal open rate avg for each account
+    """
+
+    def set_avg_open_rate_for_each_account(
+        self, gh_id: str, date_string: str, avg_read_rate: float
+    ) -> int:
+        update_sql = f"""
+            update account_avg_info_v3
+            set open_rate_avg = %s
+            where gh_id = %s and update_time = %s;
+        """
+        return self.piaoquan_crawler_db_client.save(
+            update_sql, params=(avg_read_rate, gh_id, date_string)
+        )
+
+    def get_account_open_rate(self, gh_id: str, date_string: str) -> float:
+        """
+        get open rate for each account
+        """
+        fetch_query = f"""
+            select 
+                sum(view_count) as 'total_read', 
+                sum(first_level) as 'total_first_level',
+                sum(first_level) / sum(view_count) as 'avg_open_rate'
+            from datastat_sort_strategy
+            where gh_id = '{gh_id}' and date_str between date_sub(str_to_date('{date_string}', '%Y%m%d'), interval {const.STAT_PERIOD} day)
+            and str_to_date('{date_string}', '%Y%m%d');
+        """
+        res = self.long_articles_db_client.fetch(
+            query=fetch_query, cursor_type=DictCursor
+        )[0]
+        return float(res["avg_open_rate"])
+
+    def do_task_list(self, date_string: str):
+        """
+        INPUT date_string: '%Y-%m-%d'
+        """
+        account_list = fetch_publishing_account_list(self.denet_db_client)
+        for account in tqdm(account_list):
+            gh_id = account["gh_id"]
+            try:
+                avg_read_rate = self.get_account_open_rate(
+                    gh_id=gh_id, date_string=date_string.replace("-", "")
+                )
+                self.set_avg_open_rate_for_each_account(
+                    gh_id, date_string, avg_read_rate
+                )
+            except Exception as e:
+                log(
+                    task="account_open_rate_producer",
+                    function="deal",
+                    status="fail",
+                    message=str(e),
+                    data={
+                        "gh_id": gh_id,
+                        "date": date_string.replace("-", ""),
+                        "traceback": traceback.format_exc(),
+                    },
+                )

+ 6 - 1
updateAccountV3.py

@@ -3,7 +3,10 @@ import time
 from datetime import datetime, timedelta
 from datetime import datetime, timedelta
 from argparse import ArgumentParser
 from argparse import ArgumentParser
 
 
-from tasks.data_tasks.account_position_read_avg_task import AccountPositionReadAvgTask
+from tasks.data_tasks.account_position_read_avg_task import (
+    AccountPositionReadAvgTask,
+    AccountOpenRateAvgTask,
+)
 
 
 
 
 def main():
 def main():
@@ -19,6 +22,7 @@ def main():
     )
     )
     args = parser.parse_args()
     args = parser.parse_args()
     update_account_read_avg_task = AccountPositionReadAvgTask()
     update_account_read_avg_task = AccountPositionReadAvgTask()
+    update_account_open_rate_avg_task = AccountOpenRateAvgTask()
     if args.run_date:
     if args.run_date:
         update_account_read_avg_task.do_task_list(dt=args.run_date)
         update_account_read_avg_task.do_task_list(dt=args.run_date)
     else:
     else:
@@ -27,6 +31,7 @@ def main():
         yesterday = dt_object - one_day
         yesterday = dt_object - one_day
         yesterday_str = yesterday.strftime("%Y-%m-%d")
         yesterday_str = yesterday.strftime("%Y-%m-%d")
         update_account_read_avg_task.do_task_list(dt=yesterday_str)
         update_account_read_avg_task.do_task_list(dt=yesterday_str)
+        update_account_open_rate_avg_task.do_task_list(date_string=yesterday_str)
 
 
 
 
 if __name__ == "__main__":
 if __name__ == "__main__":