Преглед изворни кода

Merge branch 'feature/luojunhui/ad-platform-account-crawler' of Server/LongArticleTaskServer into master

luojunhui пре 1 дан
родитељ
комит
083cdb3366

+ 2 - 0
app/domains/monitor_tasks/__init__.py

@@ -9,6 +9,7 @@ from .task_processing_monitor import TaskProcessingMonitor
 from .auto_reply_cards_monitor import AutoReplyCardsMonitor
 from .cooperate_accounts_monitor import CooperateAccountsMonitorTask
 from .fwh_group_publish_monitor import FwhGroupPublishMonitor
+from .ad_platform_accounts_monitor import AdPlatformAccountsMonitorTask
 
 
 __all__ = [
@@ -23,4 +24,5 @@ __all__ = [
     "AutoReplyCardsMonitor",
     "CooperateAccountsMonitorTask",
     "FwhGroupPublishMonitor",
+    "AdPlatformAccountsMonitorTask"
 ]

+ 4 - 0
app/domains/monitor_tasks/ad_platform_accounts_monitor/__init__.py

@@ -0,0 +1,4 @@
+from .entrance import AdPlatformAccountsMonitorTask
+
+
+__all__ = ["AdPlatformAccountsMonitorTask"]

+ 38 - 0
app/domains/monitor_tasks/ad_platform_accounts_monitor/_const.py

@@ -0,0 +1,38 @@
+class AdPlatformAccountsMonitorTaskConst:
+    INVALID_STATUS = 0
+    VALID_STATUS = 1
+
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+    ARTICLE_NUM = 100
+
+    ACCOUNT_CRAWL_DURATION = 180 * 24 * 3600
+
+    # blogger status
+    CRAWL_SUCCESS_STATUS = 0
+    ACCOUNT_FORBIDDEN_STATUS = 25013
+    CRAWL_CRASH_CODE = 20000
+
+    # 位置
+    HEAD_POSITION = 1
+
+    ZERO_FLOAT = 0.0
+    INT_ZERO = 0
+
+    # 发文类型
+    GROUP_PUBLISH = "9"
+    UNLIMITED_PUBLISH = "10002"
+
+    # 是否需要解构
+    NEED_EXTRACT = 1
+    DONT_NEED_EXTRACT = 0
+    ALREADY_EXTRACTED = 2
+
+    # 阅读中位数阅读率倍数阈值
+    READ_MEDIAN_MULTIPLIER_THRESHOLD: float = 2.0
+
+    # 中位数阅读率
+    READ_MEDIAN_RATE_THRESHOLD: float = 0.005

+ 146 - 0
app/domains/monitor_tasks/ad_platform_accounts_monitor/_mapper.py

@@ -0,0 +1,146 @@
+from typing import List, Tuple
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import AdPlatformAccountsMonitorTaskConst
+
+
+class AdPlatformAccountsMonitorMapper(AdPlatformAccountsMonitorTaskConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+
+    # ============================账号操作===================================
+    # 获取账号
+    async def fetch_monitor_accounts(self):
+        query = """
+            SELECT gh_id, account_name, category, fans_level, remark
+            FROM ad_platform_accounts
+            WHERE status = %s AND gh_id IS NOT NULL;
+        """
+        return await self.pool.async_fetch(query=query, params=(self.VALID_STATUS,))
+
+    # 修改账号状态
+    async def update_account_status(self, gh_id, ori_status, new_status, remark):
+        query = """
+            UPDATE ad_platform_accounts SET status = %s, remark = %s WHERE gh_id = %s AND status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, remark, gh_id, ori_status)
+        )
+
+    # 获取账号状态
+    async def get_account_status(self, gh_id):
+        query = """
+            SELECT status FROM ad_platform_accounts WHERE gh_id = %s;
+        """
+        fetch_response = await self.pool.async_fetch(query=query, params=(gh_id,))
+        return fetch_response
+
+    # 更新账号的阅读均值
+    async def update_account_read_median(
+        self, gh_id: str, read_median: float, head_article_num: int, remark: str = None
+    ):
+        query = """
+            UPDATE ad_platform_accounts SET read_median = %s, head_article_num = %s, remark = %s
+            WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(read_median, head_article_num, remark, gh_id)
+        )
+
+    # 更新账号的解构状态
+    async def update_account_extract_status(self, gh_id, target_status):
+        query = """
+            UPDATE ad_platform_accounts SET need_extract = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(target_status, gh_id)
+        )
+
+    # ============================文章操作===================================
+    # 修改文章 fetch 状态
+    async def update_fetch_status(self, wx_sn, ori_status, new_status):
+        query = """
+               UPDATE ad_platform_accounts_daily_detail SET fetch_status = %s WHERE wx_sn = %s AND fetch_status = %s;
+           """
+        return await self.pool.async_save(
+            query=query, params=(new_status, wx_sn, ori_status)
+        )
+
+    # 存储文章
+    async def save_articles_batch(self, article_tuple_list: List[Tuple]):
+        query = """
+           INSERT INTO ad_platform_accounts_daily_detail
+               (gh_id, account_name, app_msg_id, publish_type, position, article_title, article_link, article_cover, article_desc, publish_timestamp, wx_sn, read_cnt, like_cnt)
+           VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
+           ON DUPLICATE KEY UPDATE
+           read_cnt = VALUES(read_cnt), like_cnt = VALUES(like_cnt);
+        """
+        await self.pool.async_save(query=query, params=article_tuple_list, batch=True)
+
+    # 更新文章阅读信息
+    async def update_article(self, params):
+        query = """
+            UPDATE ad_platform_accounts_daily_detail SET
+                article_text = %s,
+                article_images = %s,
+                publish_timestamp = %s,
+                fetch_status = %s
+            WHERE wx_sn = %s AND fetch_status = %s;
+        """
+        return await self.pool.async_save(query=query, params=params)
+
+    # 异步获取待处理的文章
+    async def get_head_article_list(self):
+        query = """
+            SELECT wx_sn, article_link
+            FROM ad_platform_accounts_daily_detail t1
+            JOIN ad_platform_accounts t2
+            ON t1.gh_id = t2.gh_id
+            WHERE t1.fetch_status = %s AND t1.position = %s AND t1.read_median_multiplier >= %s
+                  AND t2.need_extract = %s
+            ORDER BY position LIMIT %s;
+        """
+        return await self.pool.async_fetch(
+            query=query,
+            params=(
+                self.INIT_STATUS,
+                self.HEAD_POSITION,
+                self.READ_MEDIAN_MULTIPLIER_THRESHOLD,
+                self.NEED_EXTRACT,
+                self.ARTICLE_NUM,
+            ),
+        )
+
+    # 获取最新的发文时间戳
+    async def fetch_max_publish_timestamp(self, gh_id: str):
+        query = """
+            select max(publish_timestamp) as publish_timestamp from ad_platform_accounts_daily_detail where gh_id = %s;
+        """
+        return await self.pool.async_fetch(query, params=(gh_id,))
+
+    # 获取账号的头条文章
+    async def fetch_head_articles(self, gh_id: str):
+        query = """
+            SELECT gh_id, read_cnt, wx_sn FROM ad_platform_accounts_daily_detail
+            WHERE gh_id = %s AND position = %s AND publish_type = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(gh_id, self.HEAD_POSITION, self.GROUP_PUBLISH)
+        )
+
+    # 更新阅读均值倍数
+    async def set_read_median_multiplier(
+        self, wx_sn: str, multiplier: float, remark: str = None
+    ):
+        query = """
+            UPDATE ad_platform_accounts_daily_detail SET read_median_multiplier = %s, remark = %s WHERE wx_sn = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(multiplier, remark, wx_sn)
+        )
+
+
+__all__ = ["AdPlatformAccountsMonitorMapper"]

+ 35 - 0
app/domains/monitor_tasks/ad_platform_accounts_monitor/_utils.py

@@ -0,0 +1,35 @@
+import time, json, random
+import asyncio
+import statistics
+from datetime import datetime
+
+from urllib.parse import parse_qs, urlparse
+
+
+class AdPlatformAccountsMonitorTaskUtils:
+    @staticmethod
+    def extract_wx_sn(content_url):
+        if not content_url:
+            return None
+        query = urlparse(content_url).query
+        return parse_qs(query).get("sn", [None])[0]
+
+    @staticmethod
+    def get_now_dt():
+        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+    @staticmethod
+    def get_now_timestamp():
+        return int(time.time())
+
+    @staticmethod
+    def get_median(values: list) -> float:
+        return statistics.median(values)
+
+    @staticmethod
+    def json_dumps(obj: dict | list) -> str:
+        return json.dumps(obj, ensure_ascii=False)
+
+    @staticmethod
+    async def sleep_between_each_request():
+        await asyncio.sleep(random.randint(1, 3))

+ 259 - 0
app/domains/monitor_tasks/ad_platform_accounts_monitor/entrance.py

@@ -0,0 +1,259 @@
+import traceback
+
+from tqdm import tqdm
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from app.infra.shared.tools import show_desc_to_sta
+from app.infra.crawler.wechat import get_article_list_from_account
+from app.infra.crawler.wechat import get_article_detail
+
+from ._const import AdPlatformAccountsMonitorTaskConst
+from ._mapper import AdPlatformAccountsMonitorMapper
+from ._utils import AdPlatformAccountsMonitorTaskUtils
+
+
+class AdPlatformAccountsMonitorTask(AdPlatformAccountsMonitorTaskConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.log_service = log_service
+        self.tool = AdPlatformAccountsMonitorTaskUtils()
+        self.mapper = AdPlatformAccountsMonitorMapper(pool, self.log_service)
+
+    # 更新文章详情
+    async def set_article_detail(self, article):
+        wx_sn = article["wx_sn"]
+        article_link = article["article_link"]
+        # acquire lock
+        acquire_lock = await self.mapper.update_fetch_status(
+            wx_sn, self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            print("锁抢占失败")
+            return acquire_lock
+
+        article_detail = await get_article_detail(article_link)
+        if not article_detail:
+            # 如果爬虫偶发失败,则回退为 init 状态
+            return await self.mapper.update_fetch_status(
+                wx_sn, self.PROCESSING_STATUS, self.INIT_STATUS
+            )
+
+        # 更新文章信息
+        code = article_detail.get("code", None)
+
+        match code:
+            case self.CRAWL_SUCCESS_STATUS:
+                try:
+                    article = article_detail.get("data", {}).get("data", {})
+                    images = article.get("image_url_list", [])
+                    params = (
+                        article.get("body_text", None),
+                        self.tool.json_dumps(images),
+                        # article.get("view_count", None),
+                        # article.get("like_count", None),
+                        # article.get("share_count", None),
+                        # article.get("looking_count", None),
+                        int(article.get("publish_timestamp", 0) / 1000),
+                        self.SUCCESS_STATUS,
+                        wx_sn,
+                        self.PROCESSING_STATUS,
+                    )
+                    return await self.mapper.update_article(params)
+
+                except Exception as e:
+                    print(f"更新文章详情失败-{article_link}-{e}")
+                    return await self.mapper.update_fetch_status(
+                        wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
+                    )
+
+            case _:
+                return await self.mapper.update_fetch_status(
+                    wx_sn, self.PROCESSING_STATUS, self.FAIL_STATUS
+                )
+
+    # 存储文章
+    async def store_articles(self, gh_id, account_name, article_list):
+        params = []
+        for group_article in article_list:
+            base_info = group_article["AppMsg"]["BaseInfo"]
+            detail_info = group_article["AppMsg"]["DetailInfo"]
+            for single_article in detail_info:
+                try:
+                    show_stat = show_desc_to_sta(single_article.get("ShowDesc", None))
+                    single_param = (
+                        gh_id,
+                        account_name,
+                        base_info["AppMsgId"],
+                        base_info["Type"],
+                        single_article["ItemIndex"],
+                        single_article["Title"],
+                        single_article["ContentUrl"],
+                        single_article["CoverImgUrl"],
+                        single_article["Digest"],
+                        single_article["send_time"],
+                        self.tool.extract_wx_sn(single_article["ContentUrl"]),
+                        show_stat.get("show_view_count", 0),
+                        show_stat.get("show_like_count", 0),
+                    )
+                    params.append(single_param)
+                except Exception as e:
+                    print(f"存储文章失败-{single_article['ContentUrl']}-{e}")
+
+        return await self.mapper.save_articles_batch(article_tuple_list=params)
+
+    # 抓取单个账号
+    async def crawl_single_account(self, account):
+        account_name = account["account_name"]
+        gh_id = account["gh_id"]
+
+        publish_time_detail = await self.mapper.fetch_max_publish_timestamp(gh_id)
+        if publish_time_detail:
+            max_publish_timestamp = publish_time_detail[0]["publish_timestamp"]
+            if max_publish_timestamp is None:
+                max_publish_timestamp = self.tool.get_now_timestamp() - self.ACCOUNT_CRAWL_DURATION
+        else:
+            max_publish_timestamp = self.tool.get_now_timestamp() - self.ACCOUNT_CRAWL_DURATION
+
+        cursor = None
+        while True:
+            crawl_response = await get_article_list_from_account(
+                account_id=gh_id, index=cursor
+            )
+            await self.tool.sleep_between_each_request()
+            if not crawl_response:
+                return
+
+            code = crawl_response.get("code")
+            match code:
+                case self.CRAWL_SUCCESS_STATUS:
+                    article_list = crawl_response.get("data", {}).get("data", [])
+                    if not article_list:
+                        return
+
+                    # save articles
+                    await self.store_articles(gh_id, account_name, article_list)
+
+                    # update cursor
+                    last_article = article_list[-1]
+                    last_publish_timestamp = last_article["AppMsg"]["BaseInfo"][
+                        "UpdateTime"
+                    ]
+                    if last_publish_timestamp <= max_publish_timestamp:
+                        return
+
+                    cursor = crawl_response["data"].get("next_cursor")
+                    if not cursor:
+                        return
+
+                case self.ACCOUNT_FORBIDDEN_STATUS:
+                    msg = crawl_response.get("msg")
+                    await self.mapper.update_account_status(
+                        gh_id, self.VALID_STATUS, self.INVALID_STATUS, msg
+                    )
+                    return
+
+                case self.CRAWL_CRASH_CODE:
+                    await self.log_service.log(
+                        contents={
+                            "task": "ad_platform_accounts_monitor",
+                            "data": {"gh_id": gh_id},
+                            "message": "爬虫挂掉",
+                            "status": "fail",
+                        }
+                    )
+                    return
+
+                case _:
+                    print(crawl_response.get("msg", crawl_response))
+                    return
+
+    # 计算账号阅读中位数,更新文章阅读中位数倍数
+    async def calculate_read_median(self, account):
+        gh_id = account["gh_id"]
+        fans_level = account["fans_level"]
+
+        head_articles = await self.mapper.fetch_head_articles(gh_id)
+        head_article_num = len(head_articles)
+        execute_time = self.tool.get_now_dt()
+
+        if not head_article_num:
+            print(
+                f"No article for account to calculate read median--{account['account_name']}"
+            )
+            remark = f"{execute_time}--计算阅读头条阅读中位数无文章"
+            await self.mapper.update_account_read_median(
+                gh_id=gh_id,
+                read_median=self.ZERO_FLOAT,
+                head_article_num=head_article_num,
+                remark=remark,
+            )
+
+        else:
+            read_list = [
+                (i.get("read_cnt") or self.INT_ZERO) for i in head_articles
+            ]
+            read_median = self.tool.get_median(read_list)
+            remark = f"{execute_time}--计算阅读头条阅读中位数"
+            await self.mapper.update_account_read_median(
+                gh_id=gh_id,
+                read_median=read_median,
+                head_article_num=head_article_num,
+                remark=remark,
+            )
+
+            # 更新账号的解构状态
+            read_median_rate = read_median / fans_level if fans_level else self.ZERO_FLOAT
+            if read_median_rate >= self.READ_MEDIAN_RATE_THRESHOLD:
+                await self.mapper.update_account_extract_status(gh_id, self.NEED_EXTRACT)
+
+            # 更新文章的阅读中位数倍数
+            for article in head_articles:
+                wx_sn: str = article["wx_sn"]
+                read_cnt = article.get("read_cnt") or self.INT_ZERO
+
+                if read_cnt > self.INT_ZERO:
+                    multiplier = read_cnt / read_median if read_median else self.ZERO_FLOAT
+                else:
+                    multiplier = self.ZERO_FLOAT
+                article_remark = f"{execute_time}--更新文章阅读中位数倍数"
+
+                await self.mapper.set_read_median_multiplier(
+                    wx_sn=wx_sn, multiplier=multiplier, remark=article_remark
+                )
+
+    # 入口函数
+    async def deal(self, task_name):
+        match task_name:
+            case "crawl_articles":
+                account_list = await self.mapper.fetch_monitor_accounts()
+                for account in tqdm(account_list):
+                    print(f"开始处理账号:{account['account_name']}")
+                    try:
+                        await self.crawl_single_account(account)
+
+                    except Exception as e:
+                        print(f"获取账号文章失败--{account['account_name']}--{e}")
+                        print(traceback.format_exc())
+
+            case "get_detail":
+                article_list = await self.mapper.get_head_article_list()
+                for article in tqdm(article_list, desc="处理文章详情"):
+                    try:
+                        await self.set_article_detail(article)
+                        # 等待一会儿
+                        await self.tool.sleep_between_each_request()
+
+                    except Exception as e:
+                        print(f"获取文章详情失败-{article['article_link']}-{e}")
+
+            case "cal_read_median":
+                account_list = await self.mapper.fetch_monitor_accounts()
+                for account in tqdm(account_list):
+                    print(f"开始处理账号:{account['account_name']}")
+                    try:
+                        await self.calculate_read_median(account)
+
+                    except Exception as e:
+                        print(f"计算阅读中位数失败--{account['account_name']}--{e}")
+                        print(traceback.format_exc())

+ 24 - 0
app/jobs/task_handler.py

@@ -44,6 +44,7 @@ from app.domains.monitor_tasks import OutsideGzhArticlesMonitor
 from app.domains.monitor_tasks import OutsideGzhArticlesCollector
 from app.domains.monitor_tasks import TaskProcessingMonitor
 from app.domains.monitor_tasks import LimitedAccountAnalysisTask
+from app.domains.monitor_tasks import AdPlatformAccountsMonitorTask
 
 from app.jobs.task_config import TaskStatus
 from app.jobs.task_utils import TaskValidationError
@@ -161,6 +162,29 @@ class TaskHandler:
         await task.deal(task_name="get_detail")
         return TaskStatus.SUCCESS
 
+    @register("ad_platform_accounts_crawler")
+    async def _ad_platform_accounts_crawler_handler(self) -> int:
+        """广告平台账号监测"""
+        task = AdPlatformAccountsMonitorTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        # 抓账号内所有的文章
+        await task.deal(task_name="crawl_articles")
+
+        # 抓完之后, 计算账号的阅读量中位数
+        await task.deal(task_name="cal_read_median")
+
+        return TaskStatus.SUCCESS
+
+    @register("ad_platform_article_detail")
+    async def _ad_platform_article_detail_handler(self) -> int:
+        """广告平台文章详情更新"""
+        task = AdPlatformAccountsMonitorTask(
+            pool=self.db_client, log_service=self.log_client
+        )
+        await task.deal(task_name="get_detail")
+        return TaskStatus.SUCCESS
+
     # ==================== 爬虫类任务 ====================
 
     @register("crawler_toutiao")