Jelajahi Sumber

更新优化小程序信息任务

luojunhui 5 hari lalu
induk
melakukan
4d63db6736

+ 6 - 2
applications/database/mysql_pools.py

@@ -1,4 +1,5 @@
 import logging
+
 from aiomysql import create_pool
 from aiomysql.cursors import DictCursor
 from applications.config import *
@@ -64,7 +65,7 @@ class DatabaseManager:
             logging.error(f"Failed to fetch {query}: {str(e)}")
             return None
 
-    async def async_save(self, query, params, db_name="long_articles"):
+    async def async_save(self, query, params, db_name="long_articles", batch: bool=False):
         pool = self.pools[db_name]
         if not pool:
             await self.init_pools()
@@ -72,7 +73,10 @@ class DatabaseManager:
         async with pool.acquire() as connection:
             async with connection.cursor() as cursor:
                 try:
-                    await cursor.execute(query, params)
+                    if batch:
+                        await cursor.executemany(query, params)
+                    else:
+                        await cursor.execute(query, params)
                     affected_rows = cursor.rowcount
                     await connection.commit()
                     return affected_rows

+ 62 - 40
applications/tasks/data_recycle_tasks/recycle_mini_program_detail.py

@@ -1,10 +1,12 @@
 import json
+import traceback
+from typing import Any
 from datetime import datetime, timedelta
 
-from tqdm.asyncio import tqdm
 
 from applications.crawler.wechat import get_article_detail
 from applications.utils import extract_root_source_id
+from applications.utils import run_tasks_with_asyncio_task_group
 
 
 class MiniProgramConst:
@@ -22,7 +24,7 @@ class MiniProgramConst:
 
     TASK_NAME = "recycle_mini_program_detail_daily"
     ARTICLE_TABLE = "official_articles_v2"
-    DETAIL_TABLE = "long_articles_detail_info"
+    DETAIL_TABLE = "long_articles_detail_info_dev"
 
     # 更新文章周期天数
     ARTICLE_RECYCLE_DAYS = 1
@@ -84,11 +86,12 @@ class RecycleMiniProgramDetailBase(MiniProgramConst):
     async def fetch_root_source_id_from_db(
         self, root_source_id_list: list[str]
     ) -> list[dict[str, str]]:
-        query = """
-            SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in %s;
+        placeholders = ", ".join(["%s"] * len(root_source_id_list))
+        query = f"""
+            SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in ({placeholders});
         """
         return await self.pool.async_fetch(
-            query=query, params=(tuple(root_source_id_list),)
+            query=query, params=tuple(root_source_id_list)
         )
 
     # 获取制定几天前的root_source_id
@@ -107,24 +110,18 @@ class RecycleMiniProgramDetailBase(MiniProgramConst):
             params=(run_date, date_delta, run_date),
         )
 
-    async def whether_article_record_exist(self, wx_sn, recall_dt, video_id):
-        query = f"""
-            SELECT * FROM {self.DETAIL_TABLE}
-            WHERE wx_sn = %s AND publish_dt = %s AND video_id = %s;
-        """
-        return await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(wx_sn, recall_dt, video_id))
-
     # 插入新的root_source_id
-    async def insert_each_root_source_id(self, params: tuple) -> None:
+    async def insert_each_root_source_id(self, params_list: list[tuple]) -> None:
         # 记录root_source_id信息
         query = f"""
-            INSERT IGNORE INTO {self.DETAIL_TABLE}
+            INSERT INTO {self.DETAIL_TABLE}
             (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
             values
-            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s)
+            ON DUPLICATE KEY UPDATE video_id = VALUES(video_id);
         """
         return await self.pool.async_save(
-            query=query, db_name="piaoquan_crawler", params=params
+            query=query, db_name="piaoquan_crawler", params=params_list, batch=True
         )
 
     # 更新root_source_id 首层 && 裂变信息
@@ -202,7 +199,17 @@ class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
                 mini_detail = article_detail["data"]["data"].get("mini_program", [])
 
         except Exception as e:
-            raise Exception("Weixin Detail Spider Error", e)
+            await self.log_client.log(
+                contents={
+                    "task": self.TASK_NAME,
+                    "trace_id": article["trace_id"],
+                    "message": "get article detail error",
+                    "data": {
+                        "error": str(e),
+                        "traceback": traceback.format_exc(),
+                    },
+                }
+            )
 
         # 将root_source_id信息插入mysql表格
         if not mini_detail:
@@ -214,6 +221,7 @@ class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
         recall_dt_list = [
             (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
         ]
+        params_list = []
         for dt in recall_dt_list:
             for video_index, mini_item in enumerate(mini_detail, start=1):
                 image_url = mini_item["image_url"]
@@ -229,12 +237,10 @@ class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
                         id_info["root_source_id"],
                         id_info["video_id"],
                     )
-                if self.whether_article_record_exist(wx_sn=wx_sn, video_id=video_id, recall_dt=dt):
-                    continue
 
                 kimi_title = mini_item["title"]
-                await self.insert_each_root_source_id(
-                    params=(
+                params_list.append(
+                    (
                         wx_sn,
                         kimi_title,
                         nickname,
@@ -247,30 +253,46 @@ class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
                     )
                 )
 
+        await self.insert_each_root_source_id(params_list)
+
     # 记录每日发文 && root_source_id 信息
-    async def record_published_articles_job(self, date_delta: int, run_date: str):
+    async def record_published_articles_job(
+        self,
+        date_delta: int,
+        run_date: str,
+        *,
+        max_concurrency: int = 20,  # 并发上限
+        fail_fast: bool = False,  # 是否出现错误就退出
+    ) -> dict[str, Any]:
         # get tasks
-        tasks = await self.fetch_published_articles(run_date, date_delta)
-        for task in tqdm(tasks):
-            await self.handle_single_task(task)
+        task_list = await self.fetch_published_articles(run_date, date_delta)
+        return await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.handle_single_task,
+            max_concurrency=max_concurrency,
+            fail_fast=fail_fast,
+            description="持久化文章&&rootSourceId",
+            unit="article",
+        )
 
     # 更新root_source_id的首层,裂变信息
-    async def update_mini_program_detail_job(self, date_delta: int, run_date: str):
-        mini_info_list = await self.fetch_root_source_id_in_last_days(
-            run_date, date_delta
+    async def update_mini_program_detail_job(
+        self,
+        date_delta: int,
+        run_date: str,
+        *,
+        max_concurrency: int = 20,
+        fail_fast: bool = False,
+    ) -> dict[str, Any]:
+        task_list = await self.fetch_root_source_id_in_last_days(run_date, date_delta)
+        return await run_tasks_with_asyncio_task_group(
+            task_list=task_list,
+            handler=self.update_each_root_source_id,
+            max_concurrency=max_concurrency,
+            fail_fast=fail_fast,
+            description="更新rootSourceId裂变表现",
+            unit="id",
         )
-        fail_cnt = 0
-        for mini_item in tqdm(
-            mini_info_list, total=len(mini_info_list), desc="update_each_root_source_id"
-        ):
-            try:
-                await self.update_each_root_source_id(mini_item)
-            except Exception as e:
-                fail_cnt += 1
-                print(e)
-        if fail_cnt:
-            # add bot
-            raise Exception(f"Fail Count {fail_cnt}")
 
     # 业务入口
     async def deal(self, params: dict):

+ 3 - 0
applications/utils/__init__.py

@@ -24,4 +24,7 @@ from .item import CrawlerMetaAccount
 # mysql utils
 from .async_mysql_utils import *
 
+# async tasks
+from .async_tasks import run_tasks_with_asyncio_task_group
+
 task_schedule_response = TaskScheduleResponse()

+ 51 - 0
applications/utils/async_tasks.py

@@ -0,0 +1,51 @@
+import asyncio
+from typing import Callable, Coroutine, List, Any, Dict
+
+from tqdm.asyncio import tqdm
+
+
+# 使用asyncio.TaskGroup 来高效处理I/O密集型任务
+async def run_tasks_with_asyncio_task_group(
+    task_list: List[Any],
+    handler: Callable[[Any], Coroutine[Any, Any, None]],
+    *,
+    description: str = None, # 任务介绍
+    unit: str,
+    max_concurrency: int = 20, # 最大并发数
+    fail_fast: bool = False, # 是否遇到错误就退出整个tasks
+) -> Dict[str, Any]:
+    """using asyncio.TaskGroup to process I/O-intensive tasks"""
+    if not task_list:
+        return {"total_task": 0, "processed_task": 0, "errors": []}
+
+    processed_task = 0
+    total_task = len(task_list)
+    errors: List[tuple[int, Any, Exception]] = []
+    semaphore = asyncio.Semaphore(max_concurrency)
+    processing_bar = tqdm(total=total_task, unit=unit, desc=description)
+
+    async def _run_single_task(task_obj: Any, idx: int):
+        nonlocal processed_task
+        async with semaphore:
+            try:
+                await handler(task_obj)
+                processed_task += 1
+            except Exception as e:
+                if fail_fast:
+                    raise e
+                errors.append((idx, task_obj, e))
+            finally:
+                processing_bar.update()
+
+    async with asyncio.TaskGroup() as task_group:
+        for index, task in enumerate(task_list, start=1):
+            task_group.create_task(
+                _run_single_task(task, index), name=f"processing {description}-{index}"
+            )
+
+    processing_bar.close()
+    return {
+        "total_task": total_task,
+        "processed_task": processed_task,
+        "errors": errors,
+    }