浏览代码

新增-品类自动更新脚本

luojunhui 3 周之前
父节点
当前提交
16205302b0
共有 1 个文件被更改,包括 47 次插入49 次删除
  1. 47 49
      applications/tasks/algorithm_tasks/account_category_analysis.py

+ 47 - 49
applications/tasks/algorithm_tasks/account_category_analysis.py

@@ -1,6 +1,7 @@
 import time, json
 from datetime import datetime, timedelta
 from pandas import DataFrame
+from tqdm.asyncio import tqdm
 
 from .models import CategoryRegression
 
@@ -47,16 +48,16 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
 
     async def prepare_raw_data(self, end_dt, begin_dt: str = "20250401"):
         query = """
-            SELECT dt, gh_id, account_name, title, similarity, view_count_rate, category,
+            select dt, gh_id, account_name, title, similarity, view_count_rate, category,
                     read_avg, read_avg_rate, first_pub_interval, `index`
-            FROM datastat_score
-            WHERE dt BETWEEN %s AND %s 
-                AND similarity > %s
-                AND category IS NOT NULL
-                AND read_avg > %s
-                AND read_avg_rate BETWEEN %s AND %s
-                AND view_count_rate > %s
-                AND `index` < %s;
+            from datastat_score
+            where dt between %s and %s 
+                and similarity > %s
+                and category IS NOT NULL
+                and read_avg > %s
+                and read_avg_rate between %s and %s
+                and view_count_rate > %s
+                and `index` < %s;
             ;
         """
         fetch_response = await self.pool.async_fetch(
@@ -94,7 +95,7 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
     async def update_each_account(self, record):
         now_timestamp = int(time.time())
         query = """
-            insert into account_category (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
+            insert ignore into account_category (dt, gh_id, category_map, status, version, create_timestamp, update_timestamp)
                 values (%s, %s, %s, %s, %s, %s, %s)
         """
         insert_rows = await self.pool.async_save(
@@ -134,44 +135,41 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         sample_count = sub_df.shape[0]
         if sample_count < self.SAMPLE_MIN_SIZE:
             return
-        print(sub_df.columns.values.tolist())
-        for line in sub_df.values.tolist():
-            print(line)
-
-        # params, t_stats, p_values = self.run_ols_linear_regression(
-        #     sub_df, self.view_only, self.P_VALUE_THRESHOLD
-        # )
-        # current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
-        # params_names = self.get_param_names()
-        # for name, param, p_value in zip(params_names, params, p_values):
-        #     category_name = param_to_category_map.get(name, None)
-        #     if (
-        #         abs(param) > 0.1
-        #         and p_value < self.P_VALUE_THRESHOLD
-        #         and category_name is not None
-        #     ):
-        #         scale_factor = min(0.1 / p_value, 1)
-        #         print(
-        #             f"{account_id} {account_name} {category_name} {param:.3f} {p_value:.3f}"
-        #         )
-        #         truncate_param = round(max(min(param, 0.25), -0.25) * scale_factor, 6)
-        #         current_record["category_map"][category_name] = truncate_param
-        #
-        #     if (
-        #         param < -0.1
-        #         and p_value < self.P_VALUE_THRESHOLD
-        #         and category_name is not None
-        #     ):
-        #         self.account_negative_categories[account_id].append(category_name)
-        #
-        # if not current_record["category_map"]:
-        #     return
-        #
-        # current_record["category_map"] = json.dumps(
-        #     current_record["category_map"], ensure_ascii=False
-        # )
-
-        # await self.update_each_account(current_record)
+
+        params, t_stats, p_values = self.run_ols_linear_regression(
+            sub_df, self.view_only, self.P_VALUE_THRESHOLD
+        )
+        current_record = {"dt": end_dt, "gh_id": account_id, "category_map": {}, "name": account_name}
+        params_names = self.get_param_names()
+        for name, param, p_value in zip(params_names, params, p_values):
+            category_name = param_to_category_map.get(name, None)
+            if (
+                abs(param) > 0.1
+                and p_value < self.P_VALUE_THRESHOLD
+                and category_name is not None
+            ):
+                scale_factor = min(0.1 / p_value, 1)
+                print(
+                    f"{account_id} {account_name} {category_name} {param:.3f} {p_value:.3f}"
+                )
+                truncate_param = round(max(min(param, 0.25), -0.25) * scale_factor, 6)
+                current_record["category_map"][category_name] = truncate_param
+
+            if (
+                param < -0.1
+                and p_value < self.P_VALUE_THRESHOLD
+                and category_name is not None
+            ):
+                self.account_negative_categories[account_id].append(category_name)
+
+        if not current_record["category_map"]:
+            return
+
+        current_record["category_map"] = json.dumps(
+            current_record["category_map"], ensure_ascii=False
+        )
+
+        await self.update_each_account(current_record)
 
     async def deal(self):
         end_dt = self.init_execute_date()
@@ -197,7 +195,7 @@ class AccountCategoryAnalysis(CategoryRegression, AccountCategoryConst):
         account_negative_categories = {key: [] for key in account_ids}
         self.account_negative_categories = account_negative_categories
 
-        for account_id in account_ids[:1]:
+        for account_id in tqdm(account_ids, desc="analysis each account"):
             await self.predict_each_account(
                 pre_processed_dataframe, account_id, account_id_map, end_dt, param_to_category_map
             )