Kaynağa Gözat

限流账号监测

luojunhui 3 ay önce
ebeveyn
işleme
1d4d092c49

+ 2 - 1
applications/api/async_feishu_api.py

@@ -37,7 +37,7 @@ class Feishu:
             "app_secret": "cNoTAqMpsAm7mPBcpCAXFfvOzCNL27fe",
         }
         async with AsyncHttpClient(default_headers=self.headers) as client:
-            response = await client.post(url=url, data=post_data)
+            response = await client.post(url=url, json=post_data)
 
         tenant_access_token = response["tenant_access_token"]
         self.token = tenant_access_token
@@ -68,6 +68,7 @@ class FeishuSheetApi(Feishu):
                 sheet_token
             )
         )
+        # self.token = 't-g104bpfHNZN45BVJWFSQEM6WD45AAI4FNRWXCZVK'
         headers = {
             "Authorization": "Bearer " + self.token,
             "contentType": "application/json; charset=utf-8",

+ 1 - 0
applications/config/task_chinese_name.py

@@ -17,4 +17,5 @@ name_map = {
     "account_category_analysis": "账号品类分析",
     "mini_program_detail_process": "更新小程序信息",
     "crawler_detail_analysis": "抓取详情分析",
+    "limited_account_analysis": "限流账号分析处理",
 }

+ 2 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -4,6 +4,7 @@ from .get_off_videos import CheckVideoAuditStatus
 from .gzh_article_monitor import OutsideGzhArticlesMonitor
 from .gzh_article_monitor import OutsideGzhArticlesCollector
 from .gzh_article_monitor import InnerGzhArticlesMonitor
+from .limited_account_analysis import LimitedAccountAnalysisTask
 from .task_processing_monitor import TaskProcessingMonitor
 
 __all__ = [
@@ -14,4 +15,5 @@ __all__ = [
     "OutsideGzhArticlesCollector",
     "InnerGzhArticlesMonitor",
     "TaskProcessingMonitor",
+    "LimitedAccountAnalysisTask",
 ]

+ 129 - 0
applications/tasks/monitor_tasks/limited_account_analysis.py

@@ -0,0 +1,129 @@
+import json
+
+from datetime import datetime, timedelta
+from applications.api import feishu_sheet, feishu_robot
+
+class LimitedAccountAnalysisConst:
+    FIRST_POSITION = 1
+    LIMIT_THRESHOLD = 0.2
+    LIMIT_READ_AVG = 100
+
+    TABLE_ID = 'MqkxwleLFiNOwHkU3crcXpWlnfe'
+    DETAIL_SHEET_ID = '6fa679'
+    SUMMARY_SHEET_ID = 'uvli3P'
+
+
+class LimitedAccountAnalysisTask(LimitedAccountAnalysisConst):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    async def get_limited_account_detail(self, date_str):
+        query = """
+            SELECT  date_str, 
+                    COALESCE(account_mode, '公众号投流') AS account_mode,
+                    account_source, account_name,
+                    fans, title, view_count, avg_view_count
+            FROM datastat_sort_strategy
+            WHERE date_str =  %s AND position = %s AND read_rate < %s AND avg_view_count >= %s;
+        """
+        account_detail = await self.pool.async_fetch(query=query, params=(date_str, self.FIRST_POSITION, self.LIMIT_THRESHOLD, self.LIMIT_READ_AVG))
+        return account_detail
+
+    async def get_limited_account_summary(self, date_str):
+        query = """
+            SELECT
+                date_str,
+                COALESCE(account_mode, '公众号投流') AS account_mode_label,
+                CAST(
+                    SUM(
+                        CASE
+                            WHEN IFNULL(read_rate, 0) < %s THEN fans
+                            ELSE 0
+                        END
+                    ) AS SIGNED
+                ) AS limit_fans,
+                CAST(SUM(fans) AS SIGNED) AS total_fans
+            FROM datastat_sort_strategy
+            WHERE position = %s AND date_str = %s
+            GROUP BY
+                date_str,
+                COALESCE(account_mode, '公众号投流');
+        """
+        account_summary = await self.pool.async_fetch(query=query, params=(self.LIMIT_THRESHOLD, self.FIRST_POSITION, date_str))
+        return account_summary
+
+    async def insert_into_detail_table(self, detail_data):
+        insert_array = []
+        for row in detail_data:
+            insert_array.append([
+                    row["date_str"],
+                    row["account_mode"],
+                    row["account_source"],
+                    row["account_name"],
+                    row["fans"],
+                    row["title"],
+                    row["view_count"],
+                    row["avg_view_count"],
+                ]
+            )
+        await feishu_sheet.fetch_token()
+        await feishu_sheet.prepend_value(
+            sheet_token=self.TABLE_ID,
+            sheet_id=self.DETAIL_SHEET_ID,
+            ranges=f"A2:H{2 + len(detail_data)}",
+            values=insert_array,
+        )
+
+    async def insert_into_summary_table(self, summary_data):
+        insert_array = []
+        for row in summary_data:
+            insert_array.append([
+                    row["date_str"],
+                    row["account_mode_label"],
+                    row["limit_fans"],
+                    row["total_fans"],
+                ]
+            )
+        await feishu_sheet.fetch_token()
+        await feishu_sheet.prepend_value(
+            sheet_token=self.TABLE_ID,
+            sheet_id=self.SUMMARY_SHEET_ID,
+            ranges=f"A2:E{2+len(summary_data)}",
+            values=insert_array,
+        )
+
+    async def deal(self, date_string: str = None) -> None:
+        """处理受限账号分析任务
+        
+        Args:
+            date_string: 日期字符串,格式为YYYYMMDD、YYYY-MM-DD或YYYY/MM/DD
+        """
+        # 如果没有提供日期,默认使用昨天
+        if not date_string:
+            date_string = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
+        
+        # 统一日期格式为YYYYMMDD
+        date_string = date_string.replace("-", "").replace("/", "")
+        
+        # 验证日期格式
+        if len(date_string) != 8 or not date_string.isdigit():
+            raise ValueError(f"无效的日期格式: {date_string},请使用YYYYMMDD格式")
+        
+        try:
+            detail_data = await self.get_limited_account_detail(date_str=date_string)
+            summary_data = await self.get_limited_account_summary(date_str=date_string)
+            
+            # 如果有数据才打印,避免空数据输出
+            if detail_data:
+                await self.insert_into_detail_table(detail_data)
+            else:
+                print(f"在 {date_string} 没有找到受限账号数据")
+
+            if summary_data:
+                await self.insert_into_summary_table(summary_data)
+            else:
+                print(f"在 {date_string} 没有找到受限账号摘要数据")
+        except Exception as e:
+            print(f"处理受限账号分析时出错: {e}")
+            raise

+ 9 - 0
applications/tasks/task_handler.py

@@ -34,6 +34,7 @@ from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 from applications.tasks.monitor_tasks import TaskProcessingMonitor
+from applications.tasks.monitor_tasks import LimitedAccountAnalysisTask
 
 from applications.tasks.task_mapper import TaskMapper
 
@@ -239,5 +240,13 @@ class TaskHandler(TaskMapper):
         await task.deal()
         return self.TASK_SUCCESS_STATUS
 
+    # 更新限流账号信息
+    async def _update_limited_account_info_handler(self) -> int:
+        task = LimitedAccountAnalysisTask(
+            pool=self.db_client, log_client=self.log_client
+        )
+        await task.deal(date_string=self.data.get("date_string"))
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -201,6 +201,8 @@ class TaskScheduler(TaskHandler):
             "recycle_outside_account_articles": self._recycle_outside_account_article_handler,
             # 更新外部账号文章的root_source_id和update_time
             "update_outside_account_article_root_source_id": self._update_outside_account_article_root_source_id_and_update_time_handler,
+            # 更新限流账号信息
+            "update_limited_account_info": self._update_limited_account_info_handler,
         }
 
         if task_name not in handlers: