Sfoglia il codice sorgente

Merge branch 'feature/luojunhui/20251125-add-limited-account' of Server/LongArticleTaskServer into master

luojunhui 3 mesi fa
parent
commit
ee5087c57a

+ 2 - 1
applications/api/async_feishu_api.py

@@ -37,7 +37,7 @@ class Feishu:
             "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
         }
         async with AsyncHttpClient(default_headers=self.headers) as client:
-            response = await client.post(url=url, data=post_data)
+            response = await client.post(url=url, json=post_data)
 
         tenant_access_token = response["tenant_access_token"]
         self.token = tenant_access_token
@@ -68,6 +68,7 @@ class FeishuSheetApi(Feishu):
                 sheet_token
             )
         )
+        # self.token = 't-g104bpfHNZN45BVJWFSQEM6WD45AAI4FNRWXCZVK'
         headers = {
             "Authorization": "Bearer " + self.token,
             "contentType": "application/json; charset=utf-8",

+ 1 - 0
applications/config/task_chinese_name.py

@@ -17,4 +17,5 @@ name_map = {
     "account_category_analysis": "账号品类分析",
     "mini_program_detail_process": "更新小程序信息",
     "crawler_detail_analysis": "抓取详情分析",
+    "limited_account_analysis": "限流账号分析处理",
 }

+ 6 - 2
applications/tasks/cold_start_tasks/article_pool/article_pool_cold_start_strategy.py

@@ -69,9 +69,13 @@ class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
                         self.TITLE_NOT_SENSITIVE,
                         self.INIT_STATUS,
                         # self.READ_TIMES_THRESHOLD,
-                        self.CATEGORY_CONFIG_MAP.get(category, self.READ_TIMES_THRESHOLD).get("read_times_threshold", self.READ_TIMES_THRESHOLD),
+                        self.CATEGORY_CONFIG_MAP.get(
+                            category, self.READ_TIMES_THRESHOLD
+                        ).get("read_times_threshold", self.READ_TIMES_THRESHOLD),
                         # self.READ_THRESHOLD,
-                        self.CATEGORY_CONFIG_MAP.get(category, self.READ_THRESHOLD).get("read_threshold", self.READ_THRESHOLD),
+                        self.CATEGORY_CONFIG_MAP.get(category, self.READ_THRESHOLD).get(
+                            "read_threshold", self.READ_THRESHOLD
+                        ),
                         self.INIT_STATUS,
                     ),
                 )

+ 1 - 1
applications/tasks/cold_start_tasks/article_pool/article_pool_filter_strategy.py

@@ -25,7 +25,7 @@ class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
         filter_df = dedup_df[
             (dedup_df["title"].str.len() <= self.TITLE_LENGTH_MAX)
             & (dedup_df["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
-            ]
+        ]
         length_level1 = filter_df.shape[0]
 
         # 通过敏感词过滤

+ 2 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -4,6 +4,7 @@ from .get_off_videos import CheckVideoAuditStatus
 from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
+from .limited_account_analysis import LimitedAccountAnalysisTask
 from .task_processing_monitor import TaskProcessingMonitor
 
 __all__ = [
@@ -14,4 +15,5 @@ __all__ = [
     "OutsideGzhArticlesCollector",
     "InnerGzhArticlesMonitor",
     "TaskProcessingMonitor",
+    "LimitedAccountAnalysisTask",
 ]

+ 142 - 0
applications/tasks/monitor_tasks/limited_account_analysis.py

@@ -0,0 +1,142 @@
+import json
+
+from datetime import datetime, timedelta
+from applications.api import feishu_sheet, feishu_robot
+
+
+class LimitedAccountAnalysisConst:
+    FIRST_POSITION = 1
+    LIMIT_THRESHOLD = 0.2
+    LIMIT_READ_AVG = 100
+
+    TABLE_ID = "MqkxwleLFiNOwHkU3crcXpWlnfe"
+    DETAIL_SHEET_ID = "6fa679"
+    SUMMARY_SHEET_ID = "uvli3P"
+
+
+class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_limited_account_detail(self, date_str):
+        query = """
+            SELECT  date_str, 
+                    COALESCE(account_mode, '公众号投流') AS account_mode,
+                    account_source, account_name,
+                    fans, title, view_count, avg_view_count
+            FROM datastat_sort_strategy
+            WHERE date_str =  %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
+        """
+        account_detail = await self.pool.async_fetch(
+            query=query,
+            params=(
+                date_str,
+                self.FIRST_POSITION,
+                self.LIMIT_THRESHOLD,
+                self.LIMIT_READ_AVG,
+            ),
+        )
+        return account_detail
+
+    async def get_limited_account_summary(self, date_str):
+        query = """
+            SELECT
+                date_str,
+                COALESCE(account_mode, '公众号投流') AS account_mode_label,
+                CAST(
+                    SUM(
+                        CASE
+                            WHEN IFNULL(read_rate, 0) < %s THEN fans
+                            ELSE 0
+                        END
+                    ) AS SIGNED
+                ) AS limit_fans,
+                CAST(SUM(fans) AS SIGNED) AS total_fans
+            FROM datastat_sort_strategy
+            WHERE position = %s AND date_str = %s
+            GROUP BY
+                date_str,
+                COALESCE(account_mode, '公众号投流');
+        """
+        account_summary = await self.pool.async_fetch(
+            query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str)
+        )
+        return account_summary
+
+    async def insert_into_detail_table(self, detail_data):
+        insert_array = []
+        for row in detail_data:
+            insert_array.append(
+                [
+                    row["date_str"],
+                    row["account_mode"],
+                    row["account_source"],
+                    row["account_name"],
+                    row["fans"],
+                    row["title"],
+                    row["view_count"],
+                    row["avg_view_count"],
+                ]
+            )
+        await feishu_sheet.fetch_token()
+        await feishu_sheet.prepend_value(
+            sheet_token=self.TABLE_ID,
+            sheet_id=self.DETAIL_SHEET_ID,
+            ranges=f"A2:H{2 + len(detail_data)}",
+            values=insert_array,
+        )
+
+    async def insert_into_summary_table(self, summary_data):
+        insert_array = []
+        for row in summary_data:
+            insert_array.append(
+                [
+                    row["date_str"],
+                    row["account_mode_label"],
+                    row["limit_fans"],
+                    row["total_fans"],
+                ]
+            )
+        await feishu_sheet.fetch_token()
+        await feishu_sheet.prepend_value(
+            sheet_token=self.TABLE_ID,
+            sheet_id=self.SUMMARY_SHEET_ID,
+            ranges=f"A2:E{2 + len(summary_data)}",
+            values=insert_array,
+        )
+
+    async def deal(self, date_string: str = None) -> None:
+        """处理受限账号分析任务
+
+        Args:
+            date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
+        """
+        # 如果没有提供日期,默认使用昨天
+        if not date_string:
+            date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
+
+        # 统一日期格式为YYYYMMDD
+        date_string = date_string.replace("-", "").replace("/", "")
+
+        # 验证日期格式
+        if len(date_string) != 8 or not date_string.isdigit():
+            raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
+
+        try:
+            detail_data = await self.get_limited_account_detail(date_str=date_string)
+            summary_data = await self.get_limited_account_summary(date_str=date_string)
+
+            # 如果有数据才打印,避免空数据输出
+            if detail_data:
+                await self.insert_into_detail_table(detail_data)
+            else:
+                print(f"在 {date_string} 没有找到受限账号数据")
+
+            if summary_data:
+                await self.insert_into_summary_table(summary_data)
+            else:
+                print(f"在 {date_string} 没有找到受限账号摘要数据")
+        except Exception as e:
+            print(f"处理受限账号分析时出错: {e}")
+            raise

+ 9 - 0
applications/tasks/task_handler.py

@@ -34,6 +34,7 @@ from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
 
 from applications.tasks.task_mapper import TaskMapper
 
@@ -239,5 +240,13 @@ class TaskHandler(TaskMapper):
         await task.deal()
         return self.TASK_SUCCESS_STATUS
 
+    # 更新限流账号信息
+    async def _update_limited_account_info_handler(self) -> int:
+        task = LimitedAccountAnalysisTask(
+            pool=self.db_client, log_client=self.log_client
+        )
+        await task.deal(date_string=self.data.get("date_string"))
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -201,6 +201,8 @@ class TaskScheduler(TaskHandler):
             "recycle_outside_account_articles": self._recycle_outside_account_article_handler,
             # 更新外部账号文章的root_source_id和update_time
             "update_outside_account_article_root_source_id": self._update_outside_account_article_root_source_id_and_update_time_handler,
+            # 更新限流账号信息
+            "update_limited_account_info": self._update_limited_account_info_handler,
         }
 
         if task_name not in handlers: