Procházet zdrojové kódy

更新优化小程序信息任务

luojunhui před 5 dny
rodič
revize
6ecce5ddc6

+ 2 - 0
applications/tasks/data_recycle_tasks/__init__.py

@@ -2,6 +2,7 @@ from .recycle_daily_publish_articles import RecycleDailyPublishArticlesTask
 from .recycle_daily_publish_articles import CheckDailyPublishArticlesTask
 from .recycle_daily_publish_articles import UpdateRootSourceIdAndUpdateTimeTask
 from .recycle_daily_publish_articles import RecycleFwhDailyPublishArticlesTask
+from .recycle_mini_program_detail import RecycleMiniProgramDetailTask
 
 
 __all__ = [
@@ -9,4 +10,5 @@ __all__ = [
     "CheckDailyPublishArticlesTask",
     "UpdateRootSourceIdAndUpdateTimeTask",
     "RecycleFwhDailyPublishArticlesTask",
+    "RecycleMiniProgramDetailTask",
 ]

+ 293 - 0
applications/tasks/data_recycle_tasks/recycle_mini_program_detail.py

@@ -0,0 +1,293 @@
+import json
+from datetime import datetime, timedelta
+
+from tqdm.asyncio import tqdm
+
+from applications.crawler.wechat import get_article_detail
+from applications.utils import extract_root_source_id
+
+
+class MiniProgramConst:
+    ARTICLE_SUCCESS_CODE = 0
+    # 记录默认状态
+    DEFAULT_STATUS = 0
+    # 请求接口失败状态
+    REQUEST_FAIL_STATUS = -1
+    # 文章被删除状态
+    DELETE_STATUS = -2
+    # 未知原因无信息返回状态
+    UNKNOWN_STATUS = -3
+    # 文章违规状态
+    ILLEGAL_STATUS = -4
+
+    TASK_NAME = "recycle_mini_program_detail_daily"
+    ARTICLE_TABLE = "official_articles_v2"
+    DETAIL_TABLE = "long_articles_detail_info"
+
+    # 更新文章周期天数
+    ARTICLE_RECYCLE_DAYS = 1
+    # 更新root_source_id天数
+    ROOT_SOURCE_ID_UPDATE_DAYS = 3
+
+
+class RecycleMiniProgramDetailBase(MiniProgramConst):
+
+    def __init__(self, pool, log_client, trace_id):
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    # 构造小程序信息对象
+    @staticmethod
+    async def create_mini_info(response: list[dict]) -> list[dict]:
+        return [
+            {
+                "app_id": "wx89e7eb06478361d7",
+                "avatar": "https://rescdn.yishihui.com/0temp/logo.png",
+                "image_url": "",
+                "nike_name": "票圈 l 3亿人喜欢的视频平台",
+                "root_source_id": item["root_source_id"],
+                "video_id": item["video_id"],
+                "service_type": "0",
+                "title": "",
+                "type": "card",
+            }
+            for item in response
+        ]
+
+    # 获取单个 root_source_id 的首层,裂变详情
+    async def fetch_root_source_id_result(self, root_source_id, dt) -> list[dict]:
+        query = f"""
+            SELECT first_uv, split0, split0_head, split0_recommend, split1, split1_head, split1_recommend, split2, split2_head, split2_recommend
+            FROM changwen_data_rootsourceid
+            WHERE root_source_id = %s AND dt = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(root_source_id, dt))
+
+    # 获取制定天发布的文章
+    async def fetch_published_articles(
+        self, run_date: str, date_delta: int
+    ) -> list[dict]:
+        query = f"""
+            SELECT ContentUrl, wx_sn, publish_timestamp, accountName, title, root_source_id_list
+            FROM official_articles_v2
+            WHERE FROM_UNIXTIME(publish_timestamp)
+            BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(run_date, date_delta, run_date),
+        )
+
+    # 从数据库获取root_source_id信息
+    async def fetch_root_source_id_from_db(
+        self, root_source_id_list: list[str]
+    ) -> list[dict[str, str]]:
+        query = """
+            SELECT video_id, root_source_id FROM long_articles_root_source_id WHERE root_source_id in %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(tuple(root_source_id_list),)
+        )
+
+    # 获取制定几天前的root_source_id
+    async def fetch_root_source_id_in_last_days(
+        self, run_date: str, date_delta: int
+    ) -> list[dict[str, str]]:
+        query = f"""
+            SELECT recall_dt, root_source_id
+            FROM {self.DETAIL_TABLE}
+            WHERE publish_dt
+            BETWEEN DATE_SUB(%s, INTERVAL %s DAY) AND DATE_SUB(%s, INTERVAL 1 SECOND);
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(run_date, date_delta, run_date),
+        )
+
+    async def whether_article_record_exist(self, wx_sn, recall_dt, video_id):
+        query = f"""
+            SELECT * FROM {self.DETAIL_TABLE}
+            WHERE wx_sn = %s AND publish_dt = %s AND video_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, db_name="piaoquan_crawler", params=(wx_sn, recall_dt, video_id))
+
+    # 插入新的root_source_id
+    async def insert_each_root_source_id(self, params: tuple) -> None:
+        # 记录root_source_id信息
+        query = f"""
+            INSERT IGNORE INTO {self.DETAIL_TABLE}
+            (wx_sn, mini_title, mini_name, cover_url, video_index, root_source_id, video_id, publish_dt, recall_dt)
+            values
+            (%s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query, db_name="piaoquan_crawler", params=params
+        )
+
+    # 更新root_source_id 首层 && 裂变信息
+    async def update_each_root_source_id(self, item: dict) -> None:
+        recall_dt = item["recall_dt"]
+        root_source_id = item["root_source_id"]
+        mini_program_detail = await self.fetch_root_source_id_result(
+            root_source_id, recall_dt
+        )
+        if not mini_program_detail:
+            return
+
+        mini_program_detail = mini_program_detail[0]
+        query = f"""
+            UPDATE {self.DETAIL_TABLE}
+            SET first_level = %s, 
+                fission_0 = %s, fission_0_head = %s, fission_0_recommend = %s, 
+                fission_1 = %s, fission_1_head = %s, fission_1_recommend = %s, 
+                fission_2 = %s, fission_2_head = %s, fission_2_recommend = %s
+            WHERE root_source_id = %s and recall_dt = %s;
+        """
+        await self.pool.async_save(
+            query=query,
+            db_name="piaoquan_crawler",
+            params=(
+                mini_program_detail["first_uv"],
+                mini_program_detail["split0"],
+                mini_program_detail["split0_head"],
+                mini_program_detail["split0_recommend"],
+                mini_program_detail["split1"],
+                mini_program_detail["split1_head"],
+                mini_program_detail["split1_recommend"],
+                mini_program_detail["split2"],
+                mini_program_detail["split2_head"],
+                mini_program_detail["split2_recommend"],
+                root_source_id,
+                recall_dt,
+            ),
+        )
+
+
+class RecycleMiniProgramDetailTask(RecycleMiniProgramDetailBase):
+    # 业务层
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__(pool, log_client, trace_id)
+
+    async def handle_single_task(self, article):
+        """
+        record each article into long_articles_detail_info table
+        """
+        url, publish_timestamp, wx_sn = (
+            article["ContentUrl"],
+            article["publish_timestamp"],
+            article["wx_sn"].decode(),
+        )
+        root_source_id_list = (
+            json.loads(article["root_source_id_list"])
+            if article["root_source_id_list"]
+            else []
+        )
+        mini_detail = []
+
+        # 获取小程序信息
+        if root_source_id_list:
+            root_source_id_detail = await self.fetch_root_source_id_from_db(
+                root_source_id_list
+            )
+            mini_detail = await self.create_mini_info(root_source_id_detail)
+
+        # no root_source_id_list in article
+        try:
+            article_detail = await get_article_detail(article_link=url)
+            response_code = article_detail["code"]
+            if response_code == self.ARTICLE_SUCCESS_CODE:
+                mini_detail = article_detail["data"]["data"].get("mini_program", [])
+
+        except Exception as e:
+            raise Exception("Weixin Detail Spider Error", e)
+
+        # 将root_source_id信息插入mysql表格
+        if not mini_detail:
+            return
+
+        publish_date = datetime.fromtimestamp(publish_timestamp)
+        publish_dt = publish_date.strftime("%Y-%m-%d")
+        # 记录T+0, T+1, T+2 三天的数据
+        recall_dt_list = [
+            (publish_date + timedelta(days=i)).strftime("%Y-%m-%d") for i in range(3)
+        ]
+        for dt in recall_dt_list:
+            for video_index, mini_item in enumerate(mini_detail, start=1):
+                image_url = mini_item["image_url"]
+                nickname = mini_item["nike_name"]
+                if mini_item.get("root_source_id") and mini_item.get("video_id"):
+                    root_source_id, video_id = (
+                        mini_item["root_source_id"],
+                        mini_item["video_id"],
+                    )
+                else:
+                    id_info = extract_root_source_id(mini_item["path"])
+                    root_source_id, video_id = (
+                        id_info["root_source_id"],
+                        id_info["video_id"],
+                    )
+                if self.whether_article_record_exist(wx_sn=wx_sn, video_id=video_id, recall_dt=dt):
+                    continue
+
+                kimi_title = mini_item["title"]
+                await self.insert_each_root_source_id(
+                    params=(
+                        wx_sn,
+                        kimi_title,
+                        nickname,
+                        image_url,
+                        video_index,
+                        root_source_id,
+                        video_id,
+                        publish_dt,
+                        dt,
+                    )
+                )
+
+    # 记录每日发文 && root_source_id 信息
+    async def record_published_articles_job(self, date_delta: int, run_date: str):
+        # get tasks
+        tasks = await self.fetch_published_articles(run_date, date_delta)
+        for task in tqdm(tasks):
+            await self.handle_single_task(task)
+
+    # 更新root_source_id的首层,裂变信息
+    async def update_mini_program_detail_job(self, date_delta: int, run_date: str):
+        mini_info_list = await self.fetch_root_source_id_in_last_days(
+            run_date, date_delta
+        )
+        fail_cnt = 0
+        for mini_item in tqdm(
+            mini_info_list, total=len(mini_info_list), desc="update_each_root_source_id"
+        ):
+            try:
+                await self.update_each_root_source_id(mini_item)
+            except Exception as e:
+                fail_cnt += 1
+                print(e)
+        if fail_cnt:
+            # add bot
+            raise Exception(f"Fail Count {fail_cnt}")
+
+    # 业务入口
+    async def deal(self, params: dict):
+        # 解析请求参数
+        run_date = params.get("run_date")
+        if not run_date:
+            run_date = datetime.today().strftime("%Y-%m-%d")
+
+        record_date_delta = params.get("record_date_delta", self.ARTICLE_RECYCLE_DAYS)
+        update_date_delta = params.get(
+            "update_date_delta", self.ROOT_SOURCE_ID_UPDATE_DAYS
+        )
+
+        # 业务执行
+        await self.record_published_articles_job(
+            date_delta=record_date_delta, run_date=run_date
+        )
+        await self.update_mini_program_detail_job(
+            date_delta=update_date_delta, run_date=run_date
+        )

+ 14 - 0
applications/tasks/task_handler.py

@@ -1,19 +1,26 @@
 from datetime import datetime
 
 from applications.tasks.analysis_task import CrawlerDetailDeal
+
 from applications.tasks.algorithm_tasks import AccountCategoryAnalysis
+
 from applications.tasks.cold_start_tasks import ArticlePoolColdStart
+
 from applications.tasks.crawler_tasks import CrawlerToutiao
 from applications.tasks.crawler_tasks import WeixinAccountManager
 from applications.tasks.crawler_tasks import CrawlerGzhAccountArticles
 from applications.tasks.crawler_tasks import CrawlerGzhSearchArticles
+
 from applications.tasks.data_recycle_tasks import RecycleDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import CheckDailyPublishArticlesTask
 from applications.tasks.data_recycle_tasks import UpdateRootSourceIdAndUpdateTimeTask
 from applications.tasks.data_recycle_tasks import RecycleFwhDailyPublishArticlesTask
+from applications.tasks.data_recycle_tasks import RecycleMiniProgramDetailTask
+
 from applications.tasks.llm_tasks import TitleRewrite
 from applications.tasks.llm_tasks import ArticlePoolCategoryGeneration
 from applications.tasks.llm_tasks import CandidateAccountQualityScoreRecognizer
+
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
@@ -21,6 +28,7 @@ from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
 from applications.tasks.monitor_tasks import TaskProcessingMonitor
+
 from applications.tasks.task_mapper import TaskMapper
 
 
@@ -192,5 +200,11 @@ class TaskHandler(TaskMapper):
         await task.deal(params=self.data)
         return self.TASK_SUCCESS_STATUS
 
+    # 更新小程序裂变信息
+    async def _mini_program_detail_handler(self) -> int:
+        task = RecycleMiniProgramDetailTask(pool=self.db_client, log_client=self.log_client, trace_id=self.trace_id)
+        await task.deal(params=self.data)
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 2 - 0
applications/tasks/task_scheduler.py

@@ -193,6 +193,8 @@ class TaskScheduler(TaskHandler):
             "account_category_analysis": self._account_category_analysis_handler,
             # 抓取 文章/视频 数量分析
             "crawler_detail_analysis": self._crawler_article_analysis_handler,
+            # 小程序裂变信息处理
+            "mini_program_detail_process": self._mini_program_detail_handler,
         }
 
         if task_name not in handlers: