Просмотр исходного кода

微信广告合作平台 --- 文章发布

luojunhui 12 часов назад
Родитель
Сommit
6bf356cd8a

+ 79 - 0
app/core/config/settings/cold_start.py

@@ -32,6 +32,85 @@ class ColdStartConfig(BaseSettings):
         default_factory=lambda: {"weixin": 5, "toutiao": 6}
     )
 
+    # 冷启动分品类阈值
+    category_threshold_map: Dict[str, Dict[str, float]] = {
+        "知识科普": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "军事历史": {
+            "num": 50,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "家长里短": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "社会法治": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "奇闻趣事": {
+            "num": 150,
+            "read_threshold": 500,
+            "read_times_threshold": 1.4,
+        },
+        "名人八卦": {
+            "num": 200,
+            "read_threshold": 3000,
+            "read_times_threshold": 1.4,
+        },
+        "健康养生": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "情感故事": {
+            "num": 200,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "国家大事": {
+            "num": 200,
+            "read_threshold": 3000,
+            "read_times_threshold": 1.3,
+        },
+        "现代人物": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "怀旧时光": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "政治新闻": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "历史人物": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "社会现象": {
+            "num": 200,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+        "财经科技": {
+            "num": 100,
+            "read_threshold": 500,
+            "read_times_threshold": 1.3,
+        },
+    }
+
     model_config = SettingsConfigDict(
         env_prefix="COLD_START_", env_file=".env", case_sensitive=False, extra="ignore"
     )

+ 1 - 0
app/domains/cold_start_tasks/ad_platform_articles/__init__.py

@@ -0,0 +1 @@
+from .entrance import AdPlatformArticleColdStartTask

+ 36 - 0
app/domains/cold_start_tasks/ad_platform_articles/_const.py

@@ -0,0 +1,36 @@
+class AdPlatformArticleColdStartConst:
+    PLAN_LIST = ["互选平台-军事", "互选平台-时事", "互选平台-祝福"]
+
+    PLAN_ID_MAP = {
+        "互选平台-军事": "20260306142247841252958",
+        "互选平台-时事": "20260306142337446308009",
+        "互选平台-祝福": "20260309070148099417814",
+    }
+
+    CATEGORY_MAP = {
+        "互选平台-军事": "军事",
+        "互选平台-时事": "时事",
+        "互选平台-祝福": "壁纸头像",
+    }
+
+    READ_MEDIAN_MULTIPLIER: float = 2.0
+    CANDIDATE_ARTICLE_LIMIT: int = 100
+
+    class PublishType:
+        GROUP = "9"
+        UNLIMITED = "10002"
+
+    class BaseStatus:
+        INIT = 0
+        PROCESSING = 1
+        DONE = 2
+        FAILED = 99
+
+    class ColdStartStatus(BaseStatus):
+        pass
+
+    class FetchStatus(BaseStatus):
+        pass
+
+    class DecodeStatus(BaseStatus):
+        pass

+ 46 - 0
app/domains/cold_start_tasks/ad_platform_articles/_mapper.py

@@ -0,0 +1,46 @@
+from app.core.database import DatabaseManager
+from app.infra.internal import insert_crawler_plan
+
+from ._const import AdPlatformArticleColdStartConst
+
+
+class AdPlatformArticleColdStartMapper(AdPlatformArticleColdStartConst):
+    def __init__(self, pool: DatabaseManager):
+        self.pool = pool
+
+    async def fetch_candidate_articles(self, category: str):
+        query = """
+            SELECT t1.id,
+                   article_link,
+                   read_cnt, 
+                   read_median_multiplier
+            FROM ad_platform_accounts_daily_detail t1
+            JOIN ad_platform_accounts t2 ON t1.gh_id = t2.gh_id
+            WHERE cold_start_status = %s
+            AND t2.category = %s
+            ORDER BY read_median_multiplier DESC LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(self.ColdStartStatus.INIT, category, self.CANDIDATE_ARTICLE_LIMIT),
+        )
+
+    async def update_cold_start_status(
+        self, id_list: list[int], ori_status: int, new_status: int
+    ):
+        query = """
+            UPDATE ad_platform_accounts_daily_detail
+            SET cold_start_status = %s
+            WHERE id IN %s AND cold_start_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(new_status, tuple(id_list), ori_status),
+        )
+
+    # 存储抓取计划
+    async def record_crawler_plan(self, data_tuple):
+        return await insert_crawler_plan(pool=self.pool, data_tuple=data_tuple)
+
+
+__all__ = ["AdPlatformArticleColdStartMapper"]

+ 54 - 0
app/domains/cold_start_tasks/ad_platform_articles/_utils.py

@@ -0,0 +1,54 @@
+import time
+import datetime
+
+from typing import List, Dict
+
+from app.infra.internal import auto_create_crawler_task
+from app.infra.internal import auto_bind_crawler_task_to_generate_task
+
+
+from ._const import AdPlatformArticleColdStartConst
+
+
+class AdPlatformArticleColdStartUtils(AdPlatformArticleColdStartConst):
+    @staticmethod
+    async def create_crawler_plan(category: str, article_list: List[Dict]):
+
+        article_cnt = len(article_list)
+        date_string = datetime.date.today().__str__()
+        plan_name = f"冷启动-{category}-{date_string}-{article_cnt}"
+        url_list = [i["article_link"] for i in article_list]
+
+        return await auto_create_crawler_task(
+            plan_id=None,
+            plan_name=plan_name,
+            plan_tag="互选平台优质账号",
+            platform="weixin",
+            url_list=url_list,
+        )
+
+    async def bind_to_generate_task(self, plan, plan_response: Dict):
+        crawler_task_list = [
+            {
+                "contentType": 1,
+                "inputSourceType": 2,
+                "inputSourceSubType": None,
+                "fieldName": None,
+                "inputSourceValue": plan_response["data"]["id"],
+                "inputSourceLabel": plan_response["data"]["name"],
+                "inputSourceModal": 3,
+                "inputSourceChannel": 5,
+            }
+        ]
+        generate_task_id = self.PLAN_ID_MAP[plan]
+        return await auto_bind_crawler_task_to_generate_task(
+            crawler_task_list=crawler_task_list, generate_task_id=generate_task_id
+        )
+
+    @staticmethod
+    def process_plan_info(plan_response: Dict) -> tuple:
+        create_timestamp = int(time.time()) * 1000
+        crawler_plan_id = plan_response["data"]["id"]
+        crawler_plan_name = plan_response["data"]["name"]
+
+        return tuple([crawler_plan_id, crawler_plan_name, create_timestamp])

+ 0 - 70
app/domains/cold_start_tasks/ad_platform_articles/dev.py

@@ -1,70 +0,0 @@
-import time
-import datetime
-from app.core.database import DatabaseManager
-from app.core.config import GlobalConfigSettings
-
-from app.infra.internal import auto_create_crawler_task
-
-import asyncio
-
-
-
-async def main():
-    config = GlobalConfigSettings()
-
-    mysql_manager = DatabaseManager(config)
-    await mysql_manager.init_pools()
-
-    category_list = ['军事']
-    for c in category_list:
-        query = """
-            select t1.id, article_link, read_cnt, read_median_multiplier
-            from ad_platform_accounts_daily_detail t1
-            join ad_platform_accounts t2 on t1.gh_id = t2.gh_id
-            where cold_start_status = 0
-            and t2.category = %s
-            order by read_median_multiplier desc limit 100;
-        """
-        articles = await mysql_manager.async_fetch(query, params=(c,))
-
-        articles = articles[:100]
-        url_list = [i["article_link"] for i in articles]
-        crawler_plan_response = await auto_create_crawler_task(
-            plan_id=None,
-            plan_name=f"冷启动--{c}-{datetime.date.today().__str__()}-{len(articles)}",
-            plan_tag="互选平台优质账号",
-            platform="weixin",
-            url_list=url_list,
-        )
-        print(crawler_plan_response)
-        create_timestamp = int(time.time()) * 1000
-        crawler_plan_id = crawler_plan_response["data"]["id"]
-        crawler_plan_name = crawler_plan_response["data"]["name"]
-        query = """
-            insert into article_crawler_plan (crawler_plan_id, name, create_timestamp)
-            values (%s, %s, %s)
-        """
-        await mysql_manager.async_save(
-            query=query,
-            params=(crawler_plan_id, crawler_plan_name, create_timestamp),
-        )
-
-        id_list = [i["id"] for i in articles]
-        query = """
-            update ad_platform_accounts_daily_detail
-            set cold_start_status = %s
-            where id in %s and cold_start_status = %s;
-        """
-        affect_rows = await mysql_manager.async_save(
-            query=query,
-            params=(2, tuple(id_list), 0),
-        )
-        print(affect_rows)
-        return
-
-
-    await mysql_manager.close_pools()
-
-
-if __name__ == "__main__":
-    asyncio.run(main())

+ 131 - 0
app/domains/cold_start_tasks/ad_platform_articles/entrance.py

@@ -0,0 +1,131 @@
+import traceback
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+from app.infra.external import feishu_robot
+
+from ._const import AdPlatformArticleColdStartConst
+from ._utils import AdPlatformArticleColdStartUtils
+from ._mapper import AdPlatformArticleColdStartMapper
+
+
+class AdPlatformArticleColdStartTask(AdPlatformArticleColdStartConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.mapper = AdPlatformArticleColdStartMapper(pool)
+        self.tool = AdPlatformArticleColdStartUtils()
+        self.log_service = log_service
+
+    async def _process_single_plan(self, plan: str):
+        category = self.CATEGORY_MAP[plan]
+        id_list: list = []
+
+        try:
+            # fetch candidate articles
+            candidate_articles = await self.mapper.fetch_candidate_articles(
+                category=category
+            )
+            if not candidate_articles:
+                return
+
+            id_list = [article["id"] for article in candidate_articles]
+
+            # update cold start status
+            await self.mapper.update_cold_start_status(
+                id_list=id_list,
+                ori_status=self.ColdStartStatus.INIT,
+                new_status=self.ColdStartStatus.PROCESSING,
+            )
+
+            # start process
+            plan_response = await self.tool.create_crawler_plan(
+                category=category, article_list=candidate_articles
+            )
+            print(plan_response)
+
+            # save to db
+            plan_info = self.tool.process_plan_info(plan_response)
+            await self.mapper.record_crawler_plan(plan_info)
+
+            # bind to generate plan
+            response = await self.tool.bind_to_generate_task(plan, plan_response)
+            await self.log_service.log(
+                contents={
+                    "task": "article_pool_cold_start",
+                    "status": "success",
+                    "message": "绑定至生成计划成功",
+                    "data": response,
+                }
+            )
+
+            # 修改状态
+            await self.mapper.update_cold_start_status(
+                id_list=id_list,
+                ori_status=self.ColdStartStatus.PROCESSING,
+                new_status=self.ColdStartStatus.DONE,
+            )
+        except Exception as e:
+            # 回滚状态:若已更新为 PROCESSING 则改为 FAILED,便于重试或排查
+            if id_list:
+                try:
+                    await self.mapper.update_cold_start_status(
+                        id_list=id_list,
+                        ori_status=self.ColdStartStatus.PROCESSING,
+                        new_status=self.ColdStartStatus.FAILED,
+                    )
+                except Exception as rollback_err:
+                    await self.log_service.log(
+                        contents={
+                            "task": "ad_platform_article_cold_start",
+                            "status": "error",
+                            "message": "回滚冷启状态失败",
+                            "rollback_error": str(rollback_err),
+                        }
+                    )
+            await self.log_service.log(
+                contents={
+                    "task": "ad_platform_article_cold_start",
+                    "status": "error",
+                    "plan": plan,
+                    "category": category,
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                }
+            )
+            await feishu_robot.bot(
+                title="互选平台文章冷启动任务异常",
+                detail={
+                    "plan": plan,
+                    "category": category,
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                },
+                mention=False,
+            )
+
+    async def deal(self):
+        """函数入口"""
+        try:
+            for plan in self.PLAN_LIST:
+                await self._process_single_plan(plan)
+        except Exception as e:
+            await self.log_service.log(
+                contents={
+                    "task": "ad_platform_article_cold_start",
+                    "status": "error",
+                    "message": "deal 入口异常",
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                }
+            )
+            await feishu_robot.bot(
+                title="互选平台文章冷启动任务-入口异常",
+                detail={
+                    "error": str(e),
+                    "traceback": traceback.format_exc(),
+                },
+                mention=False,
+            )
+            raise
+
+
+__all__ = ["AdPlatformArticleColdStartTask"]

+ 2 - 0
app/infra/internal/__init__.py

@@ -14,6 +14,7 @@ from .aigc_system import get_titles_from_produce_plan
 # long_articles
 from .long_articles import get_top_article_title_list
 from .long_articles import get_hot_titles
+from .long_articles import insert_crawler_plan
 
 __all__ = [
     "change_video_audit_status",
@@ -27,4 +28,5 @@ __all__ = [
     "get_top_article_title_list",
     "get_hot_titles",
     "DecodeServer",
+    "insert_crawler_plan",
 ]

+ 12 - 0
app/infra/internal/long_articles.py

@@ -25,3 +25,15 @@ async def get_hot_titles(
         query=query, params=(position, read_times_threshold, date_string)
     )
     return [i["title"] for i in response]
+
+
+async def insert_crawler_plan(pool: DatabaseManager, data_tuple: tuple):
+    query = """
+        INSERT INTO article_crawler_plan (crawler_plan_id, name, create_timestamp)
+        VALUES (%s, %s, %s);
+    """
+    return await pool.async_save(
+        query=query, params=data_tuple
+    )
+
+