#29 2025-09-04小程序更新性能优化

Fusionado
luojunhui fusionadas 4 achegas de Server/feature/luojunhui/2025-09-04-improve-mini-program-task en Server/master hai 4 días

+ 2 - 0
applications/config/task_chinese_name.py

@@ -15,4 +15,6 @@ name_map = {
     "check_publish_video_audit_status": "校验发布视频状态",
     "check_kimi_balance": "检验kimi余额",
     "account_category_analysis": "账号品类分析",
+    "mini_program_detail_process": "更新小程序信息",
+    "crawler_detail_analysis": "抓取详情分析"
 }

+ 6 - 2
applications/database/mysql_pools.py

@@ -1,4 +1,5 @@
 import logging
+
 from aiomysql import create_pool
 from aiomysql.cursors import DictCursor
 from applications.config import *
@@ -64,7 +65,7 @@ class DatabaseManager:
             logging.error(f"Failed to fetch {query}: {str(e)}")
             return None
 
-    async def async_save(self, query, params, db_name="long_articles"):
+    async def async_save(self, query, params, db_name="long_articles", batch: bool=False):
         pool = self.pools[db_name]
         if not pool:
             await self.init_pools()
@@ -72,7 +73,10 @@ class DatabaseManager:
         async with pool.acquire() as connection:
             async with connection.cursor() as cursor:
                 try:
-                    await cursor.execute(query, params)
+                    if batch:
+                        await cursor.executemany(query, params)
+                    else:
+                        await cursor.execute(query, params)
                     affected_rows = cursor.rowcount
                     await connection.commit()
                     return affected_rows

+ 2 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -2,6 +2,7 @@ from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
 from .recycle_daily_publish_articles import RecycleFwhDailyPublishArticlesTask
+from .recycle_mini_program_detail import RecycleMiniProgramDetailTask
 
 
 __all__ = [
@@ -9,4 +10,5 @@ __all__ = [
     "CheckDailyPublishArticlesTask",
     "UpdateRootSourceIdAndUpdateTimeTask",
     "RecycleFwhDailyPublishArticlesTask",
+    "RecycleMiniProgramDetailTask",
 ]

+ 315 - 0
applications/tasks/data_recycle_tasks/recycle_mini_program_detail.py

@@ -0,0 +1,315 @@
+import json
+import traceback
+from typing import Any
+from datetime import datetime, timedelta
+
+
+from applications.crawler.wechat import get_article_detail
+from applications.utils import extract_root_source_id
+from applications.utils import run_tasks_with_asyncio_task_group
+
+
+class MiniProgramConst:
+    ARTICLE_SUCCESS_CODE = 0
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+    TASK_NAME = "recycle_mini_program_detail_daily"
+    ARTICLE_TABLE = "official_articles_v2"
+    DETAIL_TABLE = "long_articles_detail_info"
+
+    # 更新文章周期天数
+    ARTICLE_RECYCLE_DAYS = 1
+    # 更新root_source_id天数
+    ROOT_SOURCE_ID_UPDATE_DAYS = 3
+
+
+class RecycleMiniProgramDetailBase(MiniProgramConst):
+
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    # 构造小程序信息对象
+    @staticmethod
+    async def create_mini_info(response: list[dict]) -> list[dict]:
+        return [
+            {
+                "app_id": "wx89e7eb06478361d7",
+                "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
+                "image_url": "",
+                "nike_name": "票圈 l 3亿人喜欢的视频平台",
+                "root_source_id": item["root_source_id"],
+                "video_id": item["video_id"],
+                "service_type": "0",
+                "title": "",
+                "type": "card",
+            }
+            for item in response
+        ]
+
+    # 获取单个 root_source_id 的首层,裂变详情
+    async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
+        query = f"""
+            SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
+            FROM changwen_data_rootsourceid
+            WHERE root_source_id = %s AND dt = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
+
+    # 获取制定天发布的文章
+    async def fetch_published_articles(
+        self, run_date: str, date_delta: int
+    ) -> list[dict]:
+        query = f"""
+            SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
+            FROM official_articles_v2
+            WHERE FROM_UNIXTIME(publish_timestamp)
+            BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(run_date, date_delta, run_date),
+        )
+
+    # 从数据库获取root_source_id信息
+    async def fetch_root_source_id_from_db(
+        self, root_source_id_list: list[str]
+    ) -> list[dict[str, str]]:
+        placeholders = ", ".join(["%s"] * len(root_source_id_list))
+        query = f"""
+            SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
+        """
+        return await self.pool.async_fetch(
+            query=query, params=tuple(root_source_id_list)
+        )
+
+    # 获取制定几天前的root_source_id
+    async def fetch_root_source_id_in_last_days(
+        self, run_date: str, date_delta: int
+    ) -> list[dict[str, str]]:
+        query = f"""
+            SELECT recall_dt, root_source_id
+            FROM {self.DETAIL_TABLE}
+            WHERE publish_dt
+            BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(run_date, date_delta, run_date),
+        )
+
+    # 插入新的root_source_id
+    async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
+        # 记录root_source_id信息
+        query = f"""
+            INSERT INTO {self.DETAIL_TABLE}
+            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
+        """
+        return await self.pool.async_save(
+            query=query, db_name="piaoquan_crawler", params=params_list, batch=True
+        )
+
+    # 更新root_source_id 首层 && 裂变信息
+    async def update_each_root_source_id(self, item: dict) -> None:
+        recall_dt = item["recall_dt"]
+        root_source_id = item["root_source_id"]
+        mini_program_detail = await self.fetch_root_source_id_result(
+            root_source_id, recall_dt
+        )
+        if not mini_program_detail:
+            return
+
+        mini_program_detail = mini_program_detail[0]
+        query = f"""
+            UPDATE {self.DETAIL_TABLE}
+            SET first_level = %s, 
+                fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s, 
+                fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s, 
+                fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
+            WHERE root_source_id = %s and recall_dt = %s;
+        """
+        await self.pool.async_save(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(
+                mini_program_detail["first_uv"],
+                mini_program_detail["split0"],
+                mini_program_detail["split0_head"],
+                mini_program_detail["split0_recommend"],
+                mini_program_detail["split1"],
+                mini_program_detail["split1_head"],
+                mini_program_detail["split1_recommend"],
+                mini_program_detail["split2"],
+                mini_program_detail["split2_head"],
+                mini_program_detail["split2_recommend"],
+                root_source_id,
+                recall_dt,
+            ),
+        )
+
+
+class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
+    # 业务层
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def handle_single_task(self, article):
+        """
+        record each article into long_articles_detail_info table
+        """
+        url, publish_timestamp, wx_sn = (
+            article["ContentUrl"],
+            article["publish_timestamp"],
+            article["wx_sn"].decode(),
+        )
+        root_source_id_list = (
+            json.loads(article["root_source_id_list"])
+            if article["root_source_id_list"]
+            else []
+        )
+        mini_detail = []
+
+        # 获取小程序信息
+        if root_source_id_list:
+            root_source_id_detail = await self.fetch_root_source_id_from_db(
+                root_source_id_list
+            )
+            mini_detail = await self.create_mini_info(root_source_id_detail)
+
+        # no root_source_id_list in article
+        try:
+            article_detail = await get_article_detail(article_link=url)
+            response_code = article_detail["code"]
+            if response_code == self.ARTICLE_SUCCESS_CODE:
+                mini_detail = article_detail["data"]["data"].get("mini_program", [])
+
+        except Exception as e:
+            await self.log_client.log(
+                contents={
+                    "task": self.TASK_NAME,
+                    "trace_id": article["trace_id"],
+                    "message": "get article detail error",
+                    "data": {
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                }
+            )
+
+        # 将root_source_id信息插入mysql表格
+        if not mini_detail:
+            return
+
+        publish_date = datetime.fromtimestamp(publish_timestamp)
+        publish_dt = publish_date.strftime("%Y-%m-%d")
+        # 记录T+0, T+1, T+2 三天的数据
+        recall_dt_list = [
+            (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
+        ]
+        params_list = []
+        for dt in recall_dt_list:
+            for video_index, mini_item in enumerate(mini_detail, start=1):
+                image_url = mini_item["image_url"]
+                nickname = mini_item["nike_name"]
+                if mini_item.get("root_source_id") and mini_item.get("video_id"):
+                    root_source_id, video_id = (
+                        mini_item["root_source_id"],
+                        mini_item["video_id"],
+                    )
+                else:
+                    id_info = extract_root_source_id(mini_item["path"])
+                    root_source_id, video_id = (
+                        id_info["root_source_id"],
+                        id_info["video_id"],
+                    )
+
+                kimi_title = mini_item["title"]
+                params_list.append(
+                    (
+                        wx_sn,
+                        kimi_title,
+                        nickname,
+                        image_url,
+                        video_index,
+                        root_source_id,
+                        video_id,
+                        publish_dt,
+                        dt,
+                    )
+                )
+
+        await self.insert_each_root_source_id(params_list)
+
+    # 记录每日发文 && root_source_id 信息
+    async def record_published_articles_job(
+        self,
+        date_delta: int,
+        run_date: str,
+        *,
+        max_concurrency: int = 20,  # 并发上限
+        fail_fast: bool = False,  # 是否出现错误就退出
+    ) -> dict[str, Any]:
+        # get tasks
+        task_list = await self.fetch_published_articles(run_date, date_delta)
+        return await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.handle_single_task,
+            max_concurrency=max_concurrency,
+            fail_fast=fail_fast,
+            description="持久化文章&&rootSourceId",
+            unit="article",
+        )
+
+    # 更新root_source_id的首层,裂变信息
+    async def update_mini_program_detail_job(
+        self,
+        date_delta: int,
+        run_date: str,
+        *,
+        max_concurrency: int = 20,
+        fail_fast: bool = False,
+    ) -> dict[str, Any]:
+        task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
+        return await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.update_each_root_source_id,
+            max_concurrency=max_concurrency,
+            fail_fast=fail_fast,
+            description="更新rootSourceId裂变表现",
+            unit="id",
+        )
+
+    # 业务入口
+    async def deal(self, params: dict):
+        # 解析请求参数
+        run_date = params.get("run_date")
+        if not run_date:
+            run_date = datetime.today().strftime("%Y-%m-%d")
+
+        record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
+        update_date_delta = params.get(
+            "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
+        )
+
+        # 业务执行
+        await self.record_published_articles_job(
+            date_delta=record_date_delta, run_date=run_date
+        )
+        await self.update_mini_program_detail_job(
+            date_delta=update_date_delta, run_date=run_date
+        )

+ 14 - 0
applications/tasks/task_handler.py

@@ -1,19 +1,26 @@
 from datetime import datetime
 
 from applications.tasks.analysis_task import CrawlerDetailDeal
+
 from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
+
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
 from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
 from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
+
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
+
 from applications.tasks.llm_tasks import TitleRewrite
 from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
 from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
+
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
@@ -21,6 +28,7 @@ from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 from applications.tasks.monitor_tasks import TaskProcessingMonitor
+
 from applications.tasks.task_mapper import TaskMapper
 
 
@@ -192,5 +200,11 @@ class TaskHandler(TaskMapper):
         await task.deal(params=self.data)
         return self.TASK_SUCCESS_STATUS
 
+    # 更新小程序裂变信息
+    async def _mini_program_detail_handler(self) -> int:
+        task = RecycleMiniProgramDetailTask(pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id)
+        await task.deal(params=self.data)
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -193,6 +193,8 @@ class TaskScheduler(TaskHandler):
             "account_category_analysis": self._account_category_analysis_handler,
             # 抓取 文章/视频 数量分析
             "crawler_detail_analysis": self._crawler_article_analysis_handler,
+            # 小程序裂变信息处理
+            "mini_program_detail_process": self._mini_program_detail_handler,
         }
 
         if task_name not in handlers:

+ 3 - 0
applications/utils/__init__.py

@@ -24,4 +24,7 @@ from .item import CrawlerMetaAccount
 # mysql utils
 from .async_mysql_utils import *
 
+# async tasks
+from .async_tasks import run_tasks_with_asyncio_task_group
+
 task_schedule_response = TaskScheduleResponse()

+ 51 - 0
applications/utils/async_tasks.py

@@ -0,0 +1,51 @@
+import asyncio
+from typing import Callable, Coroutine, List, Any, Dict
+
+from tqdm.asyncio import tqdm
+
+
+# 使用asyncio.TaskGroup 来高效处理I/O密集型任务
+async def run_tasks_with_asyncio_task_group(
+    task_list: List[Any],
+    handler: Callable[[Any], Coroutine[Any, Any, None]],
+    *,
+    description: str = None, # 任务介绍
+    unit: str,
+    max_concurrency: int = 20, # 最大并发数
+    fail_fast: bool = False, # 是否遇到错误就退出整个tasks
+) -> Dict[str, Any]:
+    """using asyncio.TaskGroup to process I/O-intensive tasks"""
+    if not task_list:
+        return {"total_task": 0, "processed_task": 0, "errors": []}
+
+    processed_task = 0
+    total_task = len(task_list)
+    errors: List[tuple[int, Any, Exception]] = []
+    semaphore = asyncio.Semaphore(max_concurrency)
+    processing_bar = tqdm(total=total_task, unit=unit, desc=description)
+
+    async def _run_single_task(task_obj: Any, idx: int):
+        nonlocal processed_task
+        async with semaphore:
+            try:
+                await handler(task_obj)
+                processed_task += 1
+            except Exception as e:
+                if fail_fast:
+                    raise e
+                errors.append((idx, task_obj, e))
+            finally:
+                processing_bar.update()
+
+    async with asyncio.TaskGroup() as task_group:
+        for index, task in enumerate(task_list, start=1):
+            task_group.create_task(
+                _run_single_task(task, index), name=f"processing {description}-{index}"
+            )
+
+    processing_bar.close()
+    return {
+        "total_task": total_task,
+        "processed_task": processed_task,
+        "errors": errors,
+    }