소스 검색

新增服务号发文回收

luojunhui 3 주 전
부모
커밋
20227a687d

+ 5 - 0
applications/config/__init__.py

@@ -14,6 +14,9 @@ from .deepseek_config import deep_seek_official_api_key
 # es config
 from .elastic_search_mappings import es_index, es_mappings, es_settings
 
+# cold start config
+from .cold_start_config import category_config, input_source_map
+
 __all__ = [
     "aigc_db_config",
     "long_video_db_config",
@@ -25,4 +28,6 @@ __all__ = [
     "es_index",
     "es_mappings",
     "es_settings",
+    "category_config",
+    "input_source_map"
 ]

+ 22 - 0
applications/config/cold_start_config.py

@@ -0,0 +1,22 @@
+category_config = {
+    "知识科普": "20250813032110801233225",
+    "国家大事": "20250813032845706844854",
+    "历史人物": "20250813033415138644740",
+    "奇闻趣事": "20250813033056703155233",
+    "名人八卦": "20250813033257335290696",
+    "怀旧时光": "20250813033536622149424",
+    "情感故事": "20250813033605574986309",
+    "社会法治": "20250813033829272898432",
+    "现代人物": "20250813034015084388155",
+    "社会现象": "20250813034056506771309",
+    "健康养生": "20250813034120792523588",
+    "家长里短": "20250813034159621236902",
+    "军事历史": "20250813034227997109122",
+    "财经科技": "20250813034253336624837",
+    "政治新闻": "20250813034320561348119"
+}
+
+input_source_map = {
+    "weixin": 5,
+    "toutiao": 6
+}

+ 9 - 0
applications/tasks/cold_start_tasks/article_pool/__init__.py

@@ -0,0 +1,9 @@
+from .article_pool_cold_start_const import ArticlePoolColdStartConst
+from .article_pool_cold_start_strategy import ArticlePoolColdStartStrategy
+from .article_pool_filter_strategy import ArticlePoolFilterStrategy
+
+__all__ = [
+    "ArticlePoolColdStartConst",
+    "ArticlePoolColdStartStrategy",
+    "ArticlePoolFilterStrategy",
+]

+ 19 - 0
applications/tasks/cold_start_tasks/article_pool/article_pool_cold_start_const.py

@@ -0,0 +1,19 @@
+class ArticlePoolColdStartConst:
+    # article
+    DAILY_ARTICLE_NUM = 1000
+    SIMILARITY_SCORE_THRESHOLD = 0.5
+
+    TITLE_NOT_SENSITIVE = 0
+    TITLE_SENSITIVE = 1
+
+    PUBLISHED_STATUS = 2
+    INIT_STATUS = 1
+    BAD_STATUS = 0
+    READ_TIMES_THRESHOLD = 1.3
+    # READ_THRESHOLD = 5000
+    READ_THRESHOLD = 1000
+
+    TITLE_LENGTH_LIMIT = 15
+    TITLE_LENGTH_MAX = 50
+
+    DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]

+ 98 - 0
applications/tasks/cold_start_tasks/article_pool/article_pool_cold_start_strategy.py

@@ -0,0 +1,98 @@
+from __future__ import annotations
+
+from typing import List, Dict
+from applications.tasks.cold_start_tasks.article_pool.article_pool_cold_start_const import (
+    ArticlePoolColdStartConst,
+)
+
+
+class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
+    def __init__(self, pool, log_client, trace_id):
+        super().__init__()
+        self.pool = pool
+        self.log_client = log_client
+        self.trace_id = trace_id
+
+    async def get_weixin_cold_start_articles(
+        self, crawl_method: str, strategy: str, category: str | None
+    ) -> List[Dict]:
+        match strategy:
+            case "strategy_v1":
+                query = """
+                    select 
+                        article_id, title, link,  llm_sensitivity, score, category_by_ai
+                    from crawler_meta_article t1 
+                    join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
+                    where category = %s 
+                        and platform = %s
+                        and title_sensitivity = %s 
+                        and t1.status = %s
+                        and t1.read_cnt / t2.read_avg >= %s
+                        and t1.read_cnt >= %s
+                        and t2.status = %s
+                    order by score desc;
+                """
+                article_list = await self.pool.async_fetch(
+                    query=query,
+                    params=(
+                        crawl_method,
+                        "weixin",
+                        self.TITLE_NOT_SENSITIVE,
+                        self.INIT_STATUS,
+                        self.READ_TIMES_THRESHOLD,
+                        self.READ_THRESHOLD,
+                        self.INIT_STATUS,
+                    ),
+                )
+                return article_list
+
+            case "strategy_v2":
+                query = """
+                    select 
+                        article_id, title, link, llm_sensitivity, score, category_by_ai
+                    from crawler_meta_article t1 
+                    join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
+                    where category_by_ai = %s 
+                        and platform = %s
+                        and title_sensitivity = %s 
+                        and t1.status = %s
+                        and t1.read_cnt / t2.read_avg >= %s
+                        and t1.read_cnt >= %s
+                        and t2.status = %s
+                    order by score desc;
+                """
+                article_list = await self.pool.async_fetch(
+                    query=query,
+                    params=(
+                        category,
+                        "weixin",
+                        self.TITLE_NOT_SENSITIVE,
+                        self.INIT_STATUS,
+                        self.READ_TIMES_THRESHOLD,
+                        self.READ_THRESHOLD,
+                        self.INIT_STATUS,
+                    ),
+                )
+                return article_list
+
+            case _:
+                raise ValueError("Invalid strategy")
+
+    async def get_toutiao_cold_start_articles(
+        self, crawl_method: str, strategy: str, category: str | None
+    ) -> List[Dict]:
+        match strategy:
+            case "strategy_v1":
+                query = f"""
+                    select article_id, title, link,  llm_sensitivity, score, category_by_ai
+                    from crawler_meta_article
+                    where category = %s 
+                        and platform = %s
+                        and status = %s;
+                """
+                article_list = await self.pool.async_fetch(
+                    query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
+                )
+                return article_list
+            case _:
+                raise ValueError("Invalid strategy")

+ 141 - 0
applications/tasks/cold_start_tasks/article_pool/article_pool_filter_strategy.py

@@ -0,0 +1,141 @@
+from __future__ import annotations
+
+from typing import Optional
+from pandas import DataFrame
+
+from applications.api import feishu_robot
+from applications.tasks.cold_start_tasks.article_pool.article_pool_cold_start_const import (
+    ArticlePoolColdStartConst,
+)
+
+
+class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
+    def __init__(self):
+        super().__init__()
+        self.cold_start_records = []
+
+    async def filter_weixin_articles(self, strategy, dataframe, crawl_method, category):
+        """微信过滤漏斗"""
+        total_length: int = dataframe.shape[0]
+
+        # 通过标题长度过滤
+        filter_df = dataframe[
+            (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
+            & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
+        ]
+        length_level1 = filter_df.shape[0]
+
+        # 通过敏感词过滤
+        sensitive_keywords = [
+            "农历",
+            "太极",
+            "节",
+            "早上好",
+            "赖清德",
+            "普京",
+            "俄",
+            "南海",
+            "台海",
+            "解放军",
+            "蔡英文",
+            "中国",
+        ]
+        # 构建正则表达式,使用 | 连接表示“或”的关系
+        pattern = "|".join(sensitive_keywords)
+        filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
+        # 获取过滤后的行数
+        length_level2 = filter_df.shape[0]
+        filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
+        length_level3 = filter_df.shape[0]
+        # 第4层通过相关性分数过滤
+        filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
+        length_level4 = filter_df.shape[0]
+        match strategy:
+            case "strategy_v1":
+                await feishu_robot.bot(
+                    title="冷启任务发布通知",
+                    detail={
+                        "总文章数量": total_length,
+                        "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(
+                            total_length - length_level1, length_level1
+                        ),
+                        "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(
+                            length_level1 - length_level2, length_level2
+                        ),
+                        "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(
+                            length_level2 - length_level3, length_level3
+                        ),
+                        "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(
+                            length_level3 - length_level4, length_level4
+                        ),
+                        "渠道": crawl_method,
+                        "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
+                        "阅读量阈值": self.READ_THRESHOLD,
+                        "标题长度阈值": self.TITLE_LENGTH_LIMIT,
+                    },
+                    mention=False,
+                )
+                daily_article_num = self.DAILY_ARTICLE_NUM
+            case "strategy_v2":
+                daily_article_num = 120
+                self.cold_start_records.append(
+                    {
+                        "category": category,
+                        "cold_start_num": len(filter_df) if len(filter_df) > daily_article_num else daily_article_num,
+                        "total_length": total_length,
+                        "filter_by_title_length": length_level1,
+                        "filter_by_sensitivity": length_level2,
+                        "filter_by_llm_sensitity": length_level3,
+                        "filter_by_score": length_level4,
+                        "read_avg_threshold": self.READ_TIMES_THRESHOLD,
+                        "read_threshold": self.READ_THRESHOLD,
+                        "title_length_threshold": self.TITLE_LENGTH_LIMIT,
+                    }
+                )
+
+
+            case _:
+                raise ValueError("Invalid strategy")
+
+        return filter_df.head(daily_article_num)
+
+    async def filter_toutiao_articles(
+        self, strategy, dataframe, crawl_method, category
+    ):
+        total_length = dataframe.shape[0]
+        filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
+
+        await feishu_robot.bot(
+            title="冷启动创建抓取计划",
+            detail={
+                "渠道": crawl_method,
+                "总文章数量": total_length,
+                "相关性分数过滤剩余": filter_df.shape[0],
+            },
+            mention=False,
+        )
+        return filter_df[: self.DAILY_ARTICLE_NUM]
+
+    async def article_pool_filter(
+        self, strategy, platform, dataframe, crawl_method, category
+    ) -> Optional[DataFrame]:
+        """
+        @param: platform 文章平台,来自微信、头条、...
+        @param: dataframe 过滤前的dataframe
+        @param: crawl_method 文章抓取方式
+        @param: category 文章品类(AI生成)
+        """
+        # filter articles
+        match platform:
+            case "weixin":
+                filter_article_df = await self.filter_weixin_articles(
+                    strategy, dataframe, crawl_method, category
+                )
+            case "toutiao":
+                filter_article_df = await self.filter_toutiao_articles(
+                    strategy, dataframe, crawl_method, category
+                )
+            case _:
+                raise ValueError("Invalid platform")
+
+        return filter_article_df

+ 253 - 273
applications/tasks/cold_start_tasks/article_pool_cold_start.py

@@ -4,174 +4,19 @@ import time
 import datetime
 import traceback
 
-from typing import List, Dict
+from typing import List
 from pandas import DataFrame
+from tqdm.asyncio import tqdm
 
 from applications.api import task_apollo, feishu_robot
 from applications.api import auto_create_crawler_task
 from applications.api import auto_bind_crawler_task_to_generate_task
+from applications.config import category_config, input_source_map
 from applications.utils import get_titles_from_produce_plan
-
-
-class ArticlePoolColdStartConst:
-    # article
-    DAILY_ARTICLE_NUM = 1000
-    SIMILARITY_SCORE_THRESHOLD = 0.5
-
-    TITLE_NOT_SENSITIVE = 0
-    TITLE_SENSITIVE = 1
-
-    PUBLISHED_STATUS = 2
-    INIT_STATUS = 1
-    BAD_STATUS = 0
-    READ_TIMES_THRESHOLD = 1.3
-    # READ_THRESHOLD = 5000
-    READ_THRESHOLD = 1000
-
-    TITLE_LENGTH_LIMIT = 15
-    TITLE_LENGTH_MAX = 50
-
-    DEFAULT_CRAWLER_METHODS = ["1030-手动挑号", "account_association"]
-
-
-class ArticlePoolColdStartStrategy(ArticlePoolColdStartConst):
-    def __init__(self, pool, log_client, trace_id):
-        self.pool = pool
-        self.log_client = log_client
-        self.trace_id = trace_id
-
-    async def get_weixin_cold_start_articles(
-        self, crawl_method: str, strategy: str
-    ) -> List[Dict]:
-        match strategy:
-            case "strategy_v1":
-                query = f"""
-                    select 
-                        article_id, title, link,  llm_sensitivity, score, category_by_ai
-                    from crawler_meta_article t1 
-                    join crawler_meta_article_accounts_read_avg t2 on t1.out_account_id = t2.gh_id and t1.article_index = t2.position
-                    where category = %s 
-                        and platform = %s
-                        and title_sensitivity = %s 
-                        and t1.status = %s
-                        and t1.read_cnt / t2.read_avg >= %s
-                        and t1.read_cnt >= %s
-                        and t2.status = %s
-                    order by score desc;
-                """
-                article_list = await self.pool.async_fetch(
-                    query=query,
-                    params=(
-                        crawl_method,
-                        "weixin",
-                        self.TITLE_NOT_SENSITIVE,
-                        self.INIT_STATUS,
-                        self.READ_TIMES_THRESHOLD,
-                        self.READ_THRESHOLD,
-                        self.INIT_STATUS,
-                    ),
-                )
-                return article_list
-            case _:
-                raise ValueError("Invalid strategy")
-
-    async def get_toutiao_cold_start_articles(
-        self, crawl_method: str, strategy: str
-    ) -> List[Dict]:
-        match strategy:
-            case "strategy_v1":
-                query = f"""
-                    select article_id, title, link,  llm_sensitivity, score, category_by_ai
-                    from crawler_meta_article
-                    where category = %s 
-                        and platform = %s
-                        and status = %s;
-                """
-                article_list = await self.pool.async_fetch(
-                    query=query, params=(crawl_method, "toutiao", self.INIT_STATUS)
-                )
-                return article_list
-            case _:
-                raise ValueError("Invalid strategy")
-
-
-class ArticlePoolFilterStrategy(ArticlePoolColdStartConst):
-    async def filter_weixin_articles(self, dataframe, crawl_method):
-        """微信过滤漏斗"""
-        total_length: int = dataframe.shape[0]
-
-        # 通过标题长度过滤
-        filter_df = dataframe[
-            (dataframe["title"].str.len() <= self.TITLE_LENGTH_MAX)
-            & (dataframe["title"].str.len() >= self.TITLE_LENGTH_LIMIT)
-        ]
-        length_level1 = filter_df.shape[0]
-
-        # 通过敏感词过滤
-        sensitive_keywords = [
-            "农历",
-            "太极",
-            "节",
-            "早上好",
-            "赖清德",
-            "普京",
-            "俄",
-            "南海",
-            "台海",
-            "解放军",
-            "蔡英文",
-            "中国",
-        ]
-        # 构建正则表达式,使用 | 连接表示“或”的关系
-        pattern = "|".join(sensitive_keywords)
-        filter_df = filter_df[~filter_df["title"].str.contains(pattern, na=False)]
-        # 获取过滤后的行数
-        length_level2 = filter_df.shape[0]
-        filter_df = filter_df[~(filter_df["llm_sensitivity"] > 0)]
-        length_level3 = filter_df.shape[0]
-        # 第4层通过相关性分数过滤
-        filter_df = filter_df[filter_df["score"] > self.SIMILARITY_SCORE_THRESHOLD]
-        length_level4 = filter_df.shape[0]
-
-        await feishu_robot.bot(
-            title="冷启任务发布通知",
-            detail={
-                "总文章数量": total_length,
-                "通过标题长度过滤": "过滤数量: {}    剩余数量: {}".format(
-                    total_length - length_level1, length_level1
-                ),
-                "通过敏感词过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level1 - length_level2, length_level2
-                ),
-                "通过LLM敏感度过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level2 - length_level3, length_level3
-                ),
-                "通过相关性分数过滤": "过滤数量: {}    剩余数量: {}".format(
-                    length_level3 - length_level4, length_level4
-                ),
-                "渠道": crawl_method,
-                "阅读均值倍数阈值": self.READ_TIMES_THRESHOLD,
-                "阅读量阈值": self.READ_THRESHOLD,
-                "标题长度阈值": self.TITLE_LENGTH_LIMIT,
-            },
-            mention=False,
-        )
-        return filter_df[: self.DAILY_ARTICLE_NUM]
-
-    async def filter_toutiao_articles(self, dataframe, crawl_method):
-        total_length = dataframe.shape[0]
-        filter_df = dataframe[dataframe["score"] > self.SIMILARITY_SCORE_THRESHOLD]
-
-        await feishu_robot.bot(
-            title="冷启动创建抓取计划",
-            detail={
-                "渠道": crawl_method,
-                "总文章数量": total_length,
-                "相关性分数过滤剩余": filter_df.shape[0],
-            },
-            mention=False,
-        )
-        return filter_df[: self.DAILY_ARTICLE_NUM]
+from applications.tasks.cold_start_tasks.article_pool import (
+    ArticlePoolColdStartStrategy,
+    ArticlePoolFilterStrategy
+)
 
 
 class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrategy):
@@ -179,7 +24,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         super().__init__(pool, log_client, trace_id)
 
     async def get_article_from_meta_table(
-        self, platform: str, crawl_method: str, strategy: str
+        self, platform: str, crawl_method: str, strategy: str, category: str | None
     ) -> DataFrame:
         """
         @param platform: 文章抓取平台
@@ -189,11 +34,11 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         match platform:
             case "weixin":
                 article_list = await self.get_weixin_cold_start_articles(
-                    crawl_method, strategy
+                    crawl_method, strategy, category
                 )
             case "toutiao":
                 article_list = await self.get_toutiao_cold_start_articles(
-                    crawl_method, strategy
+                    crawl_method, strategy, category
                 )
             case _:
                 raise ValueError("Invalid platform")
@@ -214,7 +59,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         过滤已添加至aigc中的标题
         """
         published_title_tuple = await get_titles_from_produce_plan(self.pool, plan_id)
-        update_query = f"""
+        update_query = """
             update crawler_meta_article set status = %s where title in %s and status = %s;
         """
         changed_rows = await self.pool.async_save(
@@ -226,7 +71,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
     async def insert_crawler_plan_into_database(
         self, crawler_plan_id, crawler_plan_name, create_timestamp
     ):
-        query = f"""
+        query = """
             insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
             values (%s, %s, %s)
         """
@@ -251,7 +96,7 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         :param: article_id_list: 文章的唯一 id
         :return:
         """
-        query = f"""
+        query = """
             update crawler_meta_article
             set status = %s
             where article_id in %s and status = %s;
@@ -262,11 +107,46 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
         )
         return affect_rows
 
-    async def create_cold_start_plan(
-        self, platform, crawl_method, plan_id, strategy="strategy_v1"
+    async def create_crawler_plan_and_bind_to_produce_plan(
+            self,
+            strategy: str,
+            crawl_method: str,
+            category: str,
+            platform: str,
+            url_list: List[str],
+            plan_id: str,
     ):
-        article_dataframe = await self.get_article_from_meta_table(
-            platform, crawl_method, strategy
+        # create_crawler_plan
+        crawler_plan_response = await auto_create_crawler_task(
+            plan_id=None,
+            plan_name=f"冷启动-{strategy}-{category}-{datetime.date.today().__str__()}-{len(url_list)}",
+            plan_tag="品类冷启动",
+            platform=platform,
+            url_list=url_list,
+        )
+        # save to db
+        create_timestamp = int(time.time()) * 1000
+        crawler_plan_id = crawler_plan_response["data"]["id"]
+        crawler_plan_name = crawler_plan_response["data"]["name"]
+        await self.insert_crawler_plan_into_database(
+            crawler_plan_id, crawler_plan_name, create_timestamp
+        )
+
+        # auto bind to generate plan
+        new_crawler_task_list = [
+            {
+                "contentType": 1,
+                "inputSourceType": 2,
+                "inputSourceSubType": None,
+                "fieldName": None,
+                "inputSourceValue": crawler_plan_id,
+                "inputSourceLabel": crawler_plan_name,
+                "inputSourceModal": 3,
+                "inputSourceChannel": input_source_map[platform],
+            }
+        ]
+        generate_plan_response = await auto_bind_crawler_task_to_generate_task(
+            crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
         )
         await self.log_client.log(
             contents={
@@ -275,129 +155,229 @@ class ArticlePoolColdStart(ArticlePoolColdStartStrategy, ArticlePoolFilterStrate
                 "crawl_method": crawl_method,
                 "status": "success",
                 "trace_id": self.trace_id,
-                "message": "获取文章成功",
-                "data": {"article_length": article_dataframe.shape[0]},
+                "message": "绑定至生成计划成功",
+                "data": generate_plan_response,
             }
         )
 
-        match platform:
-            case "weixin":
-                input_source_channel = 5
-                filter_article_df = await self.filter_weixin_articles(
-                    article_dataframe, crawl_method
-                )
-            case "toutiao":
-                input_source_channel = 6
-                filter_article_df = await self.filter_toutiao_articles(
-                    article_dataframe, crawl_method
-                )
-            case _:
-                raise ValueError("Invalid platform")
-
-        # split article into each category
-        category_list = await task_apollo.get_config_value(key="category_list")
-        for ai_category in category_list:
-            filter_category_df = filter_article_df[
-                filter_article_df["category_by_ai"] == ai_category
-            ]
-            url_list = filter_category_df["link"].values.tolist()
-            if url_list:
-                # create_crawler_plan
-                crawler_plan_response = await auto_create_crawler_task(
-                    plan_id=None,
-                    plan_name="自动绑定strategyV2-{}-{}-{}--{}".format(
-                        crawl_method,
-                        ai_category,
-                        datetime.date.today().__str__(),
-                        len(url_list),
-                    ),
-                    plan_tag="品类冷启动",
-                    platform=platform,
-                    url_list=url_list,
-                )
-                # save to db
-                create_timestamp = int(time.time()) * 1000
-                crawler_plan_id = crawler_plan_response["data"]["id"]
-                crawler_plan_name = crawler_plan_response["data"]["name"]
-                await self.insert_crawler_plan_into_database(
-                    crawler_plan_id, crawler_plan_name, create_timestamp
-                )
-
-                # auto bind to generate plan
-                new_crawler_task_list = [
-                    {
-                        "contentType": 1,
-                        "inputSourceType": 2,
-                        "inputSourceSubType": None,
-                        "fieldName": None,
-                        "inputSourceValue": crawler_plan_id,
-                        "inputSourceLabel": crawler_plan_name,
-                        "inputSourceModal": 3,
-                        "inputSourceChannel": input_source_channel,
-                    }
-                ]
-                generate_plan_response = await auto_bind_crawler_task_to_generate_task(
-                    crawler_task_list=new_crawler_task_list, generate_task_id=plan_id
-                )
-                await self.log_client.log(
-                    contents={
-                        "task": "article_pool_cold_start",
-                        "platform": platform,
-                        "crawl_method": crawl_method,
-                        "status": "success",
-                        "trace_id": self.trace_id,
-                        "message": "绑定至生成计划成功",
-                        "data": generate_plan_response,
-                    }
-                )
-                # change article status
-                article_id_list = filter_category_df["article_id"].values.tolist()
-                await self.change_article_status_while_publishing(
-                    article_id_list=article_id_list
-                )
-
-    async def deal(self, platform: str, crawl_methods: List[str]) -> None:
-        if not crawl_methods:
-            crawl_methods = self.DEFAULT_CRAWLER_METHODS
+    async def create_cold_start_plan(
+        self,
+        platform,
+        plan_id,
+        strategy="strategy_v1",
+        category=None,
+        crawl_method=None,
+    ):
+        # get article data_frame from meta article
+        article_dataframe = await self.get_article_from_meta_table(
+            platform, crawl_method, strategy, category
+        )
 
         await self.log_client.log(
             contents={
                 "task": "article_pool_cold_start",
                 "platform": platform,
-                "crawl_methods": crawl_methods,
+                "crawl_method": crawl_method,
                 "status": "success",
                 "trace_id": self.trace_id,
+                "message": "获取文章成功",
+                "data": {"article_length": article_dataframe.shape[0]},
             }
         )
 
-        crawl_methods_map = await task_apollo.get_config_value(
-            key="category_cold_start_map"
+        filter_article_df = await self.article_pool_filter(
+            strategy, platform, article_dataframe, crawl_method, category
         )
+        match strategy:
+            case "strategy_v1":
+                # split article into each category
+                category_list = await task_apollo.get_config_value(key="category_list")
+                for ai_category in category_list:
+                    filter_category_df = filter_article_df[
+                        filter_article_df["category_by_ai"] == ai_category
+                    ]
+                    url_list = filter_category_df["link"].values.tolist()
+                    if url_list:
+                        await self.create_crawler_plan_and_bind_to_produce_plan(
+                            strategy, crawl_method, ai_category, platform, url_list, plan_id
+                        )
+                        # change article status
+                        article_id_list = filter_category_df["article_id"].values.tolist()
+                        await self.change_article_status_while_publishing(
+                            article_id_list=article_id_list
+                        )
+
+            case "strategy_v2":
+                url_list =  filter_article_df["link"].values.tolist()
+                await self.create_crawler_plan_and_bind_to_produce_plan(
+                    strategy, crawl_method, category, platform, url_list, plan_id
+                )
+                # change article status
+                article_id_list = filter_article_df["article_id"].values.tolist()
+                await self.change_article_status_while_publishing(
+                    article_id_list=article_id_list
+                )
+
+
+    async def deal(
+        self,
+        platform: str,
+        strategy="strategy_v1",
+        crawl_methods=None,
+        category_list=None,
+    ) -> None:
+        """execute cold start task in different strategy"""
+        match strategy:
+            case "strategy_v1":
+                if not crawl_methods:
+                    crawl_methods = self.DEFAULT_CRAWLER_METHODS
 
-        for crawl_method in crawl_methods:
-            try:
-                plan_id = crawl_methods_map[crawl_method]
-                affected_rows = await self.filter_published_titles(plan_id)
                 await self.log_client.log(
                     contents={
                         "task": "article_pool_cold_start",
                         "platform": platform,
-                        "crawl_method": crawl_method,
+                        "crawl_methods": crawl_methods,
                         "status": "success",
                         "trace_id": self.trace_id,
-                        "message": "通过已抓取标题修改文章状态",
-                        "data": {"affected_rows": affected_rows},
                     }
                 )
-                await self.create_cold_start_plan(platform, crawl_method, plan_id)
 
-            except Exception as e:
-                await feishu_robot.bot(
-                    title="文章冷启动异常",
-                    detail={
-                        "crawl_method": crawl_method,
-                        "error": str(e),
-                        "function": "deal",
-                        "traceback": traceback.format_exc(),
-                    },
+                crawl_methods_map = await task_apollo.get_config_value(
+                    key="category_cold_start_map"
                 )
+
+                for crawl_method in crawl_methods:
+                    try:
+                        plan_id = crawl_methods_map[crawl_method]
+                        affected_rows = await self.filter_published_titles(plan_id)
+                        await self.log_client.log(
+                            contents={
+                                "task": "article_pool_cold_start",
+                                "platform": platform,
+                                "crawl_method": crawl_method,
+                                "status": "success",
+                                "trace_id": self.trace_id,
+                                "message": "通过已抓取标题修改文章状态",
+                                "data": {"affected_rows": affected_rows},
+                            }
+                        )
+                        await self.create_cold_start_plan(
+                            platform=platform,
+                            plan_id=plan_id,
+                            crawl_method=crawl_method,
+                        )
+
+                    except Exception as e:
+                        await feishu_robot.bot(
+                            title="文章冷启动异常",
+                            detail={
+                                "crawl_method": crawl_method,
+                                "error": str(e),
+                                "function": "deal",
+                                "traceback": traceback.format_exc(),
+                            },
+                        )
+
+            case "strategy_v2":
+                if not category_list:
+                    category_list = list(category_config.keys())
+
+                for category in tqdm(category_list):
+                    try:
+                        plan_id = category_config[category]
+                        affected_rows = await self.filter_published_titles(plan_id)
+                        await self.log_client.log(
+                            contents={
+                                "task": "article_pool_cold_start",
+                                "platform": platform,
+                                "category": category,
+                                "status": "success",
+                                "trace_id": self.trace_id,
+                                "message": "通过已抓取标题修改文章状态",
+                                "data": {"affected_rows": affected_rows},
+                            }
+                        )
+                        await self.create_cold_start_plan(
+                            platform=platform,
+                            strategy=strategy,
+                            plan_id=plan_id,
+                            category=category,
+                        )
+                        # todo add bot notify
+                    except Exception as e:
+                        await feishu_robot.bot(
+                            title="文章冷启动异常",
+                            detail={
+                                "category": category,
+                                "strategy": strategy,
+                                "error": str(e),
+                                "function": "deal",
+                                "traceback": traceback.format_exc(),
+                            },
+                        )
+
+                if self.cold_start_records:
+                    columns = [
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="plain_text",
+                            sheet_name="category",
+                            display_name="文章品类",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="cold_start_num",
+                            display_name="本次冷启数量",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="total_length",
+                            display_name="总文章剩余数量",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="filter_by_title_length",
+                            display_name="标题长度过滤",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="filter_by_sensitivity",
+                            display_name="敏感词过滤",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="filter_by_llm_sensitity",
+                            display_name="经过大模型判断敏感过滤",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="filter_by_score",
+                            display_name="经过相关性分过滤",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="read_avg_threshold",
+                            display_name="阅读均值倍数阈值",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="read_threshold",
+                            display_name="阅读量阈值",
+                        ),
+                        feishu_robot.create_feishu_columns_sheet(
+                            sheet_type="number",
+                            sheet_name="title_length_threshold",
+                            display_name="标题长度阈值",
+                        ),
+                    ]
+                    await feishu_robot.bot(
+                        title="长文文章路冷启动发布",
+                        detail={
+                            "columns": columns,
+                            "rows": self.cold_start_records,
+                        },
+                        table=True,
+                        mention=False,
+                    )
+
+            case _:
+                raise Exception(f"error strategy {strategy}")

+ 1 - 1
applications/tasks/monitor_tasks/gzh_article_monitor.py

@@ -200,7 +200,7 @@ class OutsideGzhArticlesMonitor(OutsideGzhArticlesManager):
         if response_code == self.ARTICLE_ILLEGAL_CODE:
             illegal_reason = article_detail.get("msg")
             # illegal_reason = '测试报警功能'
-            feishu_robot.bot(
+            await feishu_robot.bot(
                 title="文章违规告警",
                 detail={
                     "账号名称": article["account_name"],

+ 3 - 1
applications/tasks/task_handler.py

@@ -106,7 +106,9 @@ class TaskHandler(TaskMapper):
         )
         platform = self.data.get("platform", "weixin")
         crawler_methods = self.data.get("crawler_methods", [])
-        await cold_start.deal(platform=platform, crawl_methods=crawler_methods)
+        category_list = self.data.get("category_list", [])
+        strategy = self.data.get("strategy", "strategy_v1")
+        await cold_start.deal(platform=platform, crawl_methods=crawler_methods, category_list=category_list, strategy=strategy)
         return self.TASK_SUCCESS_STATUS
 
     async def _candidate_account_quality_score_handler(self) -> int: