Kaynağa Gözat

1. 新增growth配置项
2. 新增外部账号监测任务

luojunhui 1 ay önce
ebeveyn
işleme
950ffeb439

+ 2 - 0
applications/config/__init__.py

@@ -3,6 +3,7 @@ from .mysql_config import aigc_db_config
 from .mysql_config import long_video_db_config
 from .mysql_config import long_articles_db_config
 from .mysql_config import piaoquan_crawler_db_config
+from .mysql_config import growth_db_config
 
 # aliyun log sdk config
 from .aliyun_log_config import aliyun_log_config
@@ -28,6 +29,7 @@ __all__ = [
     "long_video_db_config",
     "long_articles_db_config",
     "piaoquan_crawler_db_config",
+    "growth_db_config",
     "aliyun_log_config",
     "deep_seek_official_model",
     "deep_seek_official_api_key",

+ 1 - 0
applications/database/mysql_pools.py

@@ -19,6 +19,7 @@ class DatabaseManager:
             "long_video": long_video_db_config,
             "long_articles": long_articles_db_config,
             "piaoquan_crawler": piaoquan_crawler_db_config,
+            "growth": growth_db_config,
         }
 
         for db_name, config in self.databases.items():

+ 3 - 0
applications/tasks/monitor_tasks/__init__.py

@@ -7,6 +7,8 @@ from .gzh_article_monitor import InnerGzhArticlesMonitor
 from .limited_account_analysis import LimitedAccountAnalysisTask
 from .task_processing_monitor import TaskProcessingMonitor
 from .auto_reply_cards_monitor import AutoReplyCardsMonitor
+from .cooperate_accounts_monitor import CooperateAccountsMonitorTask
+
 
 __all__ = [
     "check_kimi_balance",
@@ -18,4 +20,5 @@ __all__ = [
     "TaskProcessingMonitor",
     "LimitedAccountAnalysisTask",
     "AutoReplyCardsMonitor",
+    "CooperateAccountsMonitorTask",
 ]

+ 299 - 0
applications/tasks/monitor_tasks/cooperate_accounts_monitor.py

@@ -0,0 +1,299 @@
+import json
+
+from tqdm import tqdm
+from datetime import datetime, timedelta
+from urllib.parse import unquote, parse_qs, urlparse
+
+from applications.utils import fetch_from_odps
+from applications.crawler.wechat import get_article_list_from_account
+from applications.crawler.wechat import get_article_detail
+
+
+class CooperateAccountsMonitorTaskConst:
+    INVALID_STATUS = 0
+    VALID_STATUS = 1
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+    HAS_MINI_PROGRAM = 1
+    DONT_HAS_MINI_PROGRAM = 0
+
+    ARTICLE_NUM = 100
+
+
+class CooperateAccountsMonitorTaskUtils(CooperateAccountsMonitorTaskConst):
+    @staticmethod
+    def get_monitor_account_list():
+        dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
+        week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
+        query = f"""
+            SELECT  公众号名, ghid, count(DISTINCT mid) AS uv
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = {dt}
+            AND     hotsencetype = 1058
+            AND     usersharedepth = 0
+            AND     channel = '公众号合作-即转-稳定'
+            AND     点击时间 >= '{week_ago}'
+            GROUP BY 公众号名, ghid
+            ORDER BY uv DESC
+            ;
+        """
+        result = fetch_from_odps(query)
+        return result
+
+    @staticmethod
+    def extract_page_path(page_path):
+        # 解析外层 URL
+        parsed_url = urlparse(page_path)
+        outer_params = parse_qs(parsed_url.query)
+
+        # 取出并解码 jumpPage
+        jump_page = outer_params.get("jumpPage", [""])[0]
+        if not jump_page:
+            return None, None
+
+        decoded_jump_page = unquote(jump_page)
+
+        # 解析 jumpPage 内层参数
+        inner_query = urlparse(decoded_jump_page).query
+        inner_params = parse_qs(inner_query)
+
+        video_id = inner_params.get("id", [None])[0]
+        root_source_id = inner_params.get("rootSourceId", [None])[0]
+
+        return video_id, root_source_id
+
+    @staticmethod
+    def extract_wx_sn(content_url):
+        if not content_url:
+            return None
+        query = urlparse(content_url).query
+        return parse_qs(query).get("sn", [None])[0]
+
+
+class CooperateAccountsMonitorTask(CooperateAccountsMonitorTaskUtils):
+    def __init__(self, pool, log_client):
+        self.pool = pool
+        self.log_client = log_client
+
+    # 获取 gh_id 的兜底逻辑
+    async def fetch_gh_id(self, account_name):
+        query = """
+         SELECT gh_id FROM content_platform_gzh_account WHERE name = %s AND status = %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=query, db_name="growth", params=(account_name, self.VALID_STATUS)
+        )
+        return fetch_response[0].get("gh_id", None) if fetch_response else None
+
+    # 修改 fetch 状态
+    async def update_fetch_status(self, wx_sn, ori_status, new_status):
+        query = """
+            UPDATE cooperate_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, wx_sn, ori_status)
+        )
+
+    # 更新文章详情
+    async def set_article_detail(self, article):
+        wx_sn = article["wx_sn"]
+        article_link = article["article_link"]
+        # acquire lock
+        acquire_lock = await self.update_fetch_status(
+            wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            print("锁抢占失败")
+            return acquire_lock
+
+        article_detail = await get_article_detail(
+            article_link, is_count=True, is_cache=False
+        )
+        if not article_detail:
+            return await self.update_fetch_status(
+                wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
+            )
+
+        # 更新文章信息
+        code = article_detail.get("code", None)
+
+        match code:
+            case 0:
+                try:
+                    article = article_detail.get("data", {}).get("data", {})
+                    body_text = article.get("body_text", None)
+                    images = article.get("image_url_list", [])
+                    mini_program = article.get("mini_program", [])
+                    has_mini_program = (
+                        self.HAS_MINI_PROGRAM
+                        if mini_program
+                        else self.DONT_HAS_MINI_PROGRAM
+                    )
+                    read_cnt = article.get("view_count", None)
+                    like_cnt = article.get("like_count", None)
+                    share_cnt = article.get("share_count", None)
+                    looking_cnt = article.get("looking_count", None)
+                    publish_timestamp = article.get("publish_timestamp", None)
+                    await self.store_mini_program(mini_program, wx_sn)
+
+                    query = """
+                        UPDATE cooperate_accounts_daily_detail SET
+                            article_text = %s,
+                            article_images = %s,
+                            read_cnt = %s,
+                            like_cnt = %s,
+                            share_cnt = %s,
+                            looking_cnt = %s,
+                            publish_timestamp = %s,
+                            fetch_status = %s,
+                            has_mini_program = %s
+                        WHERE wx_sn = %s AND fetch_status = %s;
+                    """
+                    return await self.pool.async_save(
+                        query=query,
+                        params=(
+                            body_text,
+                            json.dumps(images, ensure_ascii=False),
+                            read_cnt,
+                            like_cnt,
+                            share_cnt,
+                            looking_cnt,
+                            int(publish_timestamp / 1000),
+                            self.SUCCESS_STATUS,
+                            has_mini_program,
+                            wx_sn,
+                            self.PROCESSING_STATUS,
+                        ),
+                    )
+                except Exception as e:
+                    print(f"更新文章详情失败-{article_link}-{e}")
+                    return await self.update_fetch_status(
+                        wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
+                    )
+
+            case _:
+                return await self.update_fetch_status(
+                    wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
+                )
+
+    # 存储小程序信息
+    async def store_mini_program(self, mini_program, wx_sn):
+        for card_index, i in enumerate(mini_program, 1):
+            try:
+                video_id, root_source_id = self.extract_page_path(i["path"])
+                card_title = i["title"]
+                card_cover = i["image_url"]
+                mini_name = i["nike_name"]
+
+                query = """
+                    INSERT INTO cooperate_accounts_daily_mini_info
+                        (wx_sn, card_title, card_cover, video_id, root_source_id, mini_program_name, card_index)
+                    VALUES
+                        (%s, %s, %s, %s, %s, %s, %s);
+                """
+
+                await self.pool.async_save(
+                    query=query,
+                    params=(
+                        wx_sn,
+                        card_title,
+                        card_cover,
+                        video_id,
+                        root_source_id,
+                        mini_name,
+                        card_index,
+                    ),
+                )
+
+            except Exception as e:
+                print(e)
+                continue
+
+    # 存储文章
+    async def store_articles(self, gh_id, account_name, article_list):
+        params = []
+        for group_article in article_list:
+            base_info = group_article["AppMsg"]["BaseInfo"]
+            detail_info = group_article["AppMsg"]["DetailInfo"]
+            for single_article in detail_info:
+                single_param = (
+                    gh_id,
+                    account_name,
+                    base_info["AppMsgId"],
+                    base_info["Type"],
+                    single_article["ItemIndex"],
+                    single_article["Title"],
+                    single_article["ContentUrl"],
+                    single_article["CoverImgUrl"],
+                    single_article["Digest"],
+                    single_article["send_time"],
+                    self.extract_wx_sn(single_article["ContentUrl"]),
+                )
+                params.append(single_param)
+
+        query = """
+            INSERT IGNORE INTO cooperate_accounts_daily_detail
+                (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn)
+            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
+        """
+        await self.pool.async_save(query=query, params=params, batch=True)
+
+    # 存储单个账号
+    async def store_single_accounts(self, account):
+        account_name = account.公众号名
+        gh_id = account.ghid
+        if not gh_id:
+            gh_id = await self.fetch_gh_id(account_name)
+
+        if not gh_id:
+            return
+
+        # 只抓最新的文章
+        crawl_response = await get_article_list_from_account(gh_id)
+        if not crawl_response:
+            return
+
+        code = crawl_response.get("code")
+        match code:
+            case 0:
+                article_list = crawl_response.get("data", {} or {}).get("data", [])
+                # 将文章存储到库中
+                await self.store_articles(gh_id, account_name, article_list)
+
+            case _:
+                print(crawl_response["msg"])
+                pass
+
+    # 获取待处理的文章
+    async def get_article_list(self):
+        query = """
+            SELECT wx_sn, article_link FROM cooperate_accounts_daily_detail WHERE fetch_status = %s LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.ARTICLE_NUM)
+        )
+
+    # 入口函数
+    async def deal(self, task_name):
+        match task_name:
+            case "save_articles":
+                account_list = self.get_monitor_account_list()
+                for account in tqdm(account_list):
+                    try:
+                        await self.store_single_accounts(account)
+
+                    except Exception as e:
+                        print(f"获取账号文章失败--{account.公众号名}--{e}")
+
+            case "get_detail":
+                article_list = await self.get_article_list()
+                for article in tqdm(article_list, desc="处理文章详情"):
+                    try:
+                        await self.set_article_detail(article)
+
+                    except Exception as e:
+                        print(f"获取文章详情失败-{article['article_link']}-{e}")

+ 17 - 0
applications/tasks/task_handler.py

@@ -33,6 +33,7 @@ from applications.tasks.monitor_tasks import AutoReplyCardsMonitor
 from applications.tasks.monitor_tasks import check_kimi_balance
 from applications.tasks.monitor_tasks import GetOffVideos
 from applications.tasks.monitor_tasks import CheckVideoAuditStatus
+from applications.tasks.monitor_tasks import CooperateAccountsMonitorTask
 from applications.tasks.monitor_tasks import InnerGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesMonitor
 from applications.tasks.monitor_tasks import OutsideGzhArticlesCollector
@@ -293,5 +294,21 @@ class TaskHandler(TaskMapper):
         await task.deal(task_name="extract_task")
         return self.TASK_SUCCESS_STATUS
 
+    # 定时获取外部文章
+    async def _cooperate_accounts_monitor_handler(self) -> int:
+        task = CooperateAccountsMonitorTask(
+            pool=self.db_client, log_client=self.log_client
+        )
+        await task.deal(task_name="save_articles")
+        return self.TASK_SUCCESS_STATUS
+
+    # 定时更新详情
+    async def _cooperate_accounts_detail_handler(self) -> int:
+        task = CooperateAccountsMonitorTask(
+            pool=self.db_client, log_client=self.log_client
+        )
+        await task.deal(task_name="get_detail")
+        return self.TASK_SUCCESS_STATUS
+
 
 __all__ = ["TaskHandler"]

+ 4 - 0
applications/tasks/task_scheduler.py

@@ -215,6 +215,10 @@ class TaskScheduler(TaskHandler):
             "get_follow_result": self._get_follow_result_handler,
             # 解析自动回复结果
             "extract_reply_result": self._extract_reply_result_handler,
+            # 合作账号文章监测
+            "cooperate_accounts_monitor": self._cooperate_accounts_monitor_handler,
+            # 合作账号文章详情更新
+            "cooperate_accounts_detail": self._cooperate_accounts_detail_handler,
         }
 
         if task_name not in handlers:

+ 3 - 1
ui/src/views/Welcome.vue

@@ -48,8 +48,10 @@ const EXEC_PASSWORD = 'curry'
 
 // ✅ 所有 curl 任务
 const tasks = [
+  { name: '合作方账号 daily发文抓取', desc: '抓取合作方的 daily 发文', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'cooperate_accounts_monitor' } },
+  { name: '合作方发文详情获取', desc: '获取合作方发文的详情', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'cooperate_accounts_detail' } },
   { name: '公众号抓取-手动挑号', desc: '公众文章抓取-手动挑号', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'crawler_gzh_articles', account_method: '1030-手动挑号', crawl_mode: 'account', strategy: 'V1' } },
-  { name: '公众号抓取-合作账号', desc: '公众文章抓取-手动挑号', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'crawler_gzh_articles', account_method: 'cooperate_account', crawl_mode: 'account', strategy: 'V1' } },
+  { name: '公众号抓取-合作账号', desc: '公众文章抓取-合作挑号', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'crawler_gzh_articles', account_method: 'cooperate_account', crawl_mode: 'account', strategy: 'V1' } },
   { name: '头条文章抓取', desc: '头条推荐流文章专区', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'crawler_toutiao' } },
   { name: '公众号文章冷启动(v1)', desc: '文章路冷启动-策略 1', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'article_pool_cold_start', strategy: 'strategy_v1' } },
   { name: '公众号文章冷启动(v3)', desc: '文章路冷启动-策略 3', url: 'http://192.168.100.31:6060/api/run_task', data: { task_name: 'article_pool_cold_start', strategy: 'strategy_v3' } },