Bläddra i källkod

Merge branch 'feature/luojunhui-20251202-add-open-rate' of Server/LongArticleTaskServer into master

luojunhui 3 månader sedan
förälder
incheckning
01840b9d93

+ 7 - 1
applications/tasks/analysis_task/__init__.py

@@ -1,6 +1,12 @@
 from .crawler_detail import CrawlerDetailDeal
 from .account_position_info import AccountPositionReadRateAvg
 from .account_position_info import AccountPositionReadAvg
+from .account_position_info import AccountPositionOpenRateAvg
 
 
-__all__ = ["CrawlerDetailDeal", "AccountPositionReadRateAvg", "AccountPositionReadAvg"]
+__all__ = [
+    "CrawlerDetailDeal",
+    "AccountPositionReadRateAvg",
+    "AccountPositionReadAvg",
+    "AccountPositionOpenRateAvg",
+]

+ 62 - 4
applications/tasks/analysis_task/account_position_info.py

@@ -34,6 +34,9 @@ class AccountPositionInfoConst:
     ARTICLES_DAILY = 1
     TOULIU = 2
 
+    # 统计周期(天)
+    STAT_PERIOD = 30
+
     # 默认点赞
     DEFAULT_LIKE = 0
 
@@ -359,7 +362,9 @@ class AccountPositionReadAvg(AccountPositionReadRateAvg):
         fetch_response_list = await self.pool.async_fetch(
             query=fetch_query, db_name="piaoquan_crawler", params=(gh_id, index)
         )
-        read_avg_list = [i["read_avg"] for i in fetch_response_list if i['read_avg'] is not None]
+        read_avg_list = [
+            i["read_avg"] for i in fetch_response_list if i["read_avg"] is not None
+        ]
         n = len(read_avg_list)
         mean = np.mean(read_avg_list)
         std = np.std(read_avg_list, ddof=1)
@@ -375,7 +380,9 @@ class AccountPositionReadAvg(AccountPositionReadRateAvg):
             from long_articles_read_rate
             where dt_version = %s
         """
-        fetch_result = await self.pool.async_fetch(query=query, params=(dt.replace("-", ""),))
+        fetch_result = await self.pool.async_fetch(
+            query=query, params=(dt.replace("-", ""),)
+        )
         response = {}
         for item in fetch_result:
             key = f"{item['gh_id']}_{item['position']}"
@@ -512,5 +519,56 @@ class AccountPositionReadAvg(AccountPositionReadRateAvg):
                 print(traceback.format_exc())
 
 
-class AccountPositonOpenRateAvg(AccountPositionReadRateAvg):
-    pass
+class AccountPositionOpenRateAvg(AccountPositionReadRateAvg):
+    async def get_account_open_rate(self, gh_id: str, date_string: str) -> float:
+        fetch_query = f"""
+            select 
+                sum(view_count) as 'total_read', 
+                sum(first_level) as 'total_first_level',
+                sum(first_level) / sum(view_count) as 'avg_open_rate'
+            from datastat_sort_strategy
+            where gh_id = '{gh_id}' and date_str between date_sub(str_to_date('{date_string}', '%Y%m%d'), interval {self.STAT_PERIOD} day)
+            and str_to_date('{date_string}', '%Y%m%d');
+        """
+        res = await self.pool.async_fetch(query=fetch_query)
+
+        return float(res[0]["avg_open_rate"]) if res else 0.0
+
+    async def set_avg_open_rate_for_each_account(
+        self, gh_id: str, date_string: str, avg_read_rate: float
+    ) -> int:
+        update_query = """
+            update account_avg_info_v3
+            set open_rate_avg = %s
+            where gh_id = %s and update_time = %s;
+        """
+        return await self.pool.async_save(
+            query=update_query, db_name="piaoquan_crawler", params=(avg_read_rate, gh_id, date_string)
+        )
+
+    async def deal(self, date_string: str | None):
+        if not date_string:
+            date_string = datetime.now().strftime("%Y-%m-%d")
+
+        dt = (datetime.strptime(date_string, "%Y-%m-%d") - timedelta(days=1)).strftime(
+            "%Y-%m-%d"
+        )
+
+        account_list = await self.get_publishing_accounts()
+        for account in tqdm(account_list, desc="计算单个账号的打开率均值"):
+            if account["gh_id"] in self.FORBIDDEN_GH_IDS:
+                continue
+
+            try:
+                avg_open_rate = await self.get_account_open_rate(
+                    gh_id=account["gh_id"], date_string=dt.replace("-", "")
+                )
+                await self.set_avg_open_rate_for_each_account(
+                    gh_id=account["gh_id"],
+                    date_string=dt,
+                    avg_read_rate=avg_open_rate,
+                )
+            except Exception as e:
+                print(f"计算账号 {account['account_name']} 打开率均值失败 : {e}")
+                print(traceback.format_exc())
+                continue

+ 9 - 0
applications/tasks/task_handler.py

@@ -3,6 +3,7 @@ from datetime import datetime
 from applications.tasks.analysis_task import CrawlerDetailDeal
 from applications.tasks.analysis_task import AccountPositionReadRateAvg
 from applications.tasks.analysis_task import AccountPositionReadAvg
+from applications.tasks.analysis_task import AccountPositionOpenRateAvg
 
 from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
 
@@ -266,5 +267,13 @@ class TaskHandler(TaskMapper):
         await task.deal(end_date=self.data.get("end_date"))
         return self.TASK_SUCCESS_STATUS
 
+    # 更新账号打开率均值
+    async def _update_account_open_rate_avg_handler(self) -> int:
+        task = AccountPositionOpenRateAvg(
+            pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id
+        )
+        await task.deal(date_string=self.data.get("date_string"))
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -207,6 +207,8 @@ class TaskScheduler(TaskHandler):
             "update_account_read_rate_avg": self._update_account_read_rate_avg_handler,
             # 更新账号阅读均值
             "update_account_read_avg": self._update_account_read_avg_handler,
+            # 更新账号打开率均值
+            "update_account_open_rate_avg": self._update_account_open_rate_avg_handler,
         }
 
         if task_name not in handlers: