Просмотр исходного кода

新增功能-图片识别(概念)

luojunhui 1 неделя назад
Родитель
Сommit
26e6135242

+ 1 - 0
app/core/config/global_settings.py

@@ -31,6 +31,7 @@ class GlobalConfigSettings(BaseSettings):
     deepseek: DeepSeekConfig = Field(default_factory=DeepSeekConfig)
 
     aliyun_log: AliyunLogConfig = Field(default_factory=AliyunLogConfig)
+    aliyun_oss: AliyunOssConfig = Field(default_factory=AliyunOssConfig)
     elasticsearch: ElasticsearchConfig = Field(default_factory=ElasticsearchConfig)
     apollo: ApolloConfig = Field(default_factory=ApolloConfig)
 

+ 3 - 1
app/core/config/settings/__init__.py

@@ -1,5 +1,6 @@
 from .apollo import ApolloConfig
 from .aliyun import AliyunLogConfig
+from .aliyun import AliyunOssConfig
 from .category import CategoryConfig
 from .cold_start import ColdStartConfig
 from .deepseek import DeepSeekConfig
@@ -16,6 +17,7 @@ from .task_chinese_name import TaskChineseNameConfig
 __ALL__ = [
     "ApolloConfig",
     "AliyunLogConfig",
+    "AliyunOssConfig",
     "CategoryConfig",
     "ColdStartConfig",
     "DeepSeekConfig",
@@ -26,5 +28,5 @@ __ALL__ = [
     "LongVideoDatabaseConfig",
     "PiaoquanCrawlerDatabaseConfig",
     "TaskChineseNameConfig",
-    "ReadRateLimited"
+    "ReadRateLimited",
 ]

+ 20 - 0
app/core/config/settings/aliyun.py

@@ -23,3 +23,23 @@ class AliyunLogConfig(BaseSettings):
             "project": self.project,
             "logstore": self.logstore,
         }
+
+
+class AliyunOssConfig(AliyunLogConfig):
+    """阿里云日志配置"""
+
+    endpoint: str = "oss-cn-hangzhou.aliyuncs.com"
+    bucket_name: str = "art-pubbucket"
+
+    model_config = SettingsConfigDict(
+        env_prefix="ALIYUN_OSS_", env_file=".env", case_sensitive=False, extra="ignore"
+    )
+
+    def to_dict(self) -> dict:
+        """转换为字典格式,用于兼容旧代码"""
+        return {
+            "endpoint": self.endpoint,
+            "access_key_id": self.access_key_id,
+            "access_key_secret": self.access_key_secret,
+            "bucket_name": self.bucket_name,
+        }

+ 1 - 3
app/core/config/settings/read_rate_limited.py

@@ -8,9 +8,7 @@ class ReadRateLimited(BaseSettings):
 
     # 统计周期
     stat_durations: List[int] = Field(
-        default_factory=lambda: [
-           2, 10, 30, 60, 90, 120, 150, 365
-        ]
+        default_factory=lambda: [2, 10, 30, 60, 90, 120, 150, 365]
     )
 
     # 认为满足限流阅读均值倍数阈值

+ 1 - 1
app/domains/analysis_task/__init__.py

@@ -9,5 +9,5 @@ __all__ = [
     "AccountPositionReadRateAvg",
     "AccountPositionReadAvg",
     "AccountPositionOpenRateAvg",
-    "RateLimitedArticleFilter"
+    "RateLimitedArticleFilter",
 ]

+ 8 - 5
app/domains/analysis_task/rate_limited_article_filter.py

@@ -63,7 +63,9 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
         super().__init__(pool=pool)
         self.config = config.read_rate_limit
 
-    async def _process_single_article(self, data: Dict, days: int, semaphore: asyncio.Semaphore):
+    async def _process_single_article(
+        self, data: Dict, days: int, semaphore: asyncio.Semaphore
+    ):
         """处理单个文章的异步任务"""
         async with semaphore:
             gh_id = data["gh_id"]
@@ -78,7 +80,9 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
                 },
                 ensure_ascii=False,
             )
-            insert_rows = await self.save_record(article_tuple=(title_md5, title, remark))
+            insert_rows = await self.save_record(
+                article_tuple=(title_md5, title, remark)
+            )
             if insert_rows:
                 await delete_illegal_gzh_articles(gh_id=gh_id, title=title)
             else:
@@ -90,7 +94,7 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
             days_duration=days,
             read_on_avg_threshold=self.config.read_on_avg_threshold,
             base_discover_time=self.config.base_discover_time,
-            low_read_rate_threshold=self.config.low_read_rate_threshold
+            low_read_rate_threshold=self.config.low_read_rate_threshold,
         )
 
         # 创建信号量限制并发数
@@ -98,8 +102,7 @@ class RateLimitedArticleFilter(RateLimitedArticleMapper):
 
         # 创建所有任务
         tasks = [
-            self._process_single_article(data, days, semaphore)
-            for data in data_list
+            self._process_single_article(data, days, semaphore) for data in data_list
         ]
 
         # 使用 tqdm 显示进度并发执行所有任务

+ 3 - 1
app/domains/crawler_tasks/crawler_gzh_fans.py

@@ -496,7 +496,9 @@ class CrawlerGzhFans(CrawlerGzhFansBase):
                     if i["consist_crawl_status"] == self.AVAILABLE_STATUS
                 ]
                 for account in consist_crawl_accounts:
-                    print(f"处理: {account['account_name']}: gh_id: {account['gh_id']}")
+                    print(
+                        f"处理: {account['account_name']}: gh_id: {account['gh_id']}"
+                    )
                     await self.crawl_new_fans_for_each_account(account)
                 # return await run_tasks_with_asyncio_task_group()
                 return {}

+ 0 - 785
app/domains/monitor_tasks/auto_reply_cards_monitor.py

@@ -1,785 +0,0 @@
-import asyncio
-import os
-import json
-import time
-import traceback
-import uuid
-from typing import List, Dict
-import xml.etree.ElementTree as ET
-
-from tqdm import tqdm
-from datetime import datetime, timedelta
-from urllib.parse import unquote, parse_qs, urlparse
-
-import requests
-from requests.exceptions import RequestException
-
-from app.infra.shared.tools import upload_to_oss
-from app.infra.shared.tools import fetch_from_odps
-from app.infra.shared import AsyncHttpClient
-from app.infra.crawler.wechat import get_article_list_from_account
-from app.infra.crawler.wechat import get_article_detail
-
-
-class AutoReplyCardsMonitorConst:
-    # fetch_status
-    FETCH_INIT_STATUS = 0
-    FETCH_PROCESSING_STATUS = 1
-    FETCH_SUCCESS_STATUS = 2
-    FETCH_FAIL_STATUS = 3
-
-    # task_status
-    INIT_STATUS = 0
-    PROCESSING_STATUS = 1
-    SUCCESS_STATUS = 2
-    FAIL_STATUS = 99
-
-    # account_status
-    VALID_STATUS = 1
-    INVALID_STATUS = 0
-
-
-class AutoReplyCardsMonitorUtils(AutoReplyCardsMonitorConst):
-    @staticmethod
-    def generate_task_id(task_name, gh_id):
-        match task_name:
-            case "follow":
-                return f"{task_name}_{gh_id}"
-            case _:
-                return f"{task_name}_{uuid.uuid4()}"
-
-    @staticmethod
-    def parse_fields(root, fields, default=""):
-        result = {}
-        for key, path in fields.items():
-            elem = root.find(path)
-            result[key] = elem.text if elem is not None and elem.text else default
-        return result
-
-    def extract_reply_cards(self, msg_type: str, root) -> List[Dict]:
-        fields = {
-            "title": ".//title",
-            "page_path": ".//pagepath",
-            "mini_program": ".//sourcedisplayname",
-            "file_id": "appmsg/appattach/cdnthumburl",
-            "file_size": "appmsg/appattach/cdnthumblength",
-            "aes_key": "appmsg/appattach/aeskey",
-        }
-
-        data = self.parse_fields(root, fields)
-        data["msg_type"] = msg_type
-        results = [data]
-        return results
-
-    def extract_reply_articles(self, msg_type, root) -> Dict:
-        fields = {
-            "title": "appmsg/title",
-            "url": "appmsg/url",
-            "cover_url": "appmsg/thumburl",
-            "account_name": "appmsg/sourcedisplayname",
-            "gh_id": "appmsg/sourceusername",
-            "desc": "appmsg/des",
-        }
-        data = self.parse_fields(root, fields)
-        data["msg_type"] = msg_type
-        return data
-
-    @staticmethod
-    def extract_group_reply_articles(msg_type, root) -> List[Dict]:
-        items = []
-        for item in root.findall(".//item"):
-            data = {
-                "title": item.findtext("title"),
-                "url": item.findtext("url"),
-                "cover_url": item.findtext("cover"),
-                "account_name": item.findtext("sources/source/name"),
-                "gh_id": "",
-                "desc": "",
-                "msg_type": msg_type,
-            }
-            items.append(data)
-
-        return items
-
-    # 解析 xml
-    def extract_callback_xml(self, xml_text):
-        try:
-            root = ET.fromstring(xml_text)
-            msg_type = root.find("appmsg/type").text
-            match msg_type:
-                case "5":
-                    # return self.extract_reply_articles(msg_type, root)
-                    return self.extract_group_reply_articles(msg_type, root)
-
-                case "33":
-                    return self.extract_reply_cards(msg_type, root)
-
-                case "36":
-                    return self.extract_reply_cards(msg_type, root)
-
-                case _:
-                    return []
-
-        except Exception as e:
-            print(xml_text)
-            print(e)
-            print(traceback.format_exc())
-            return []
-
-    # 解析 page_path
-    @staticmethod
-    def extract_page_path(page_path):
-        # 解析外层 URL
-        parsed_url = urlparse(page_path)
-        outer_params = parse_qs(parsed_url.query)
-
-        # 取出并解码 jumpPage
-        jump_page = outer_params.get("jumpPage", [""])[0]
-        if not jump_page:
-            return None, None
-
-        decoded_jump_page = unquote(jump_page)
-
-        # 解析 jumpPage 内层参数
-        inner_query = urlparse(decoded_jump_page).query
-        inner_params = parse_qs(inner_query)
-
-        video_id = inner_params.get("id", [None])[0]
-        root_source_id = inner_params.get("rootSourceId", [None])[0]
-
-        return video_id, root_source_id
-
-    @staticmethod
-    async def get_cover_url(aes_key, total_size, file_id):
-        url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
-        data = {
-            "appId": "wx_anFlUnezoUynU3SKcqTWk",
-            "aesKey": aes_key,
-            "totalSize": total_size,
-            "fileId": file_id,
-            "type": "3",
-            "suffix": "jpg",
-        }
-        headers = {
-            "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
-            "Content-Type": "application/json",
-        }
-        async with AsyncHttpClient() as client:
-            response = await client.post(url, headers=headers, data=json.dumps(data))
-
-        return response
-
-    @staticmethod
-    async def get_sample_url(recent_articles):
-        for article in recent_articles:
-            link = article["ContentUrl"]
-            response = await get_article_detail(article_link=link)
-            if not response:
-                continue
-            code = response["code"]
-            if code == 0 or code == 25006:
-                return link
-
-        return None
-
-    # 获取检测的账号 list
-    @staticmethod
-    def get_monitor_account_list():
-        # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
-        week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
-        query = f"""
-            SELECT  公众号名, ghid, count(DISTINCT mid) AS uv
-            FROM    loghubods.opengid_base_data
-            WHERE   dt = MAX_PT('loghubods.opengid_base_data')
-            AND     hotsencetype = 1074
-            AND     usersharedepth = 0
-            AND     channel = '公众号合作-即转-稳定'
-            AND     点击时间 >= '{week_ago}'
-            GROUP BY 公众号名, ghid
-            HAVING uv >= 100
-            ORDER BY uv DESC
-            ;
-        """
-        result = fetch_from_odps(query)
-        return result
-
-    @staticmethod
-    def download_and_upload_cover(task_id, index, cover_obj, timeout=10):
-        try:
-            cover_url = cover_obj["data"]["fileUrl"]
-        except (KeyError, TypeError):
-            print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
-            return None
-
-        file_name = f"{task_id}_{index}.jpg"
-        save_dir = os.path.join(os.getcwd(), "static")
-        save_path = os.path.join(save_dir, file_name)
-
-        os.makedirs(save_dir, exist_ok=True)
-
-        try:
-            response = requests.get(cover_url, timeout=timeout)
-            response.raise_for_status()
-        except RequestException as e:
-            print(f"[ERROR] Download failed ({cover_url}): {e}")
-            return None
-
-        try:
-            with open(save_path, "wb") as f:
-                f.write(response.content)
-        except OSError as e:
-            print(f"[ERROR] Write file failed ({save_path}): {e}")
-            return None
-
-        oss_dir = "auto_rely_cards_cover"
-        oss_key = None
-        try:
-            oss_key = upload_to_oss(save_path, f"{oss_dir}/{file_name}")
-        except Exception as e:
-            print(f"[ERROR] Upload to OSS failed: {e}")
-
-        if oss_key:
-            try:
-                os.remove(save_path)
-            except OSError as e:
-                print(f"[WARN] Failed to remove temp file {save_path}: {e}")
-
-        return oss_key
-
-
-class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorUtils):
-    def __init__(self, pool, log_client):
-        self.pool = pool
-        self.log_client = log_client
-
-    # 获取自动回复任务结果
-    async def get_auto_reply_task_result(self, task_id):
-        query = """
-            SELECT task_result, task_status, err_msg, update_timestamp
-            FROM gzh_msg_record
-            WHERE task_id = %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(task_id,), db_name="aigc"
-        )
-
-    # 查询账号
-    async def fetch_account_status(self, account_name):
-        query = """
-            SELECT partner_name, partner_id, gh_id, status, follow_status
-            FROM cooperate_accounts
-            WHERE account_name = %s;
-        """
-        return await self.pool.async_fetch(query=query, params=(account_name,))
-
-    # 更新账号状态为无效
-    async def set_account_as_invalid(self, gh_id):
-        query = """
-            UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
-        """
-        await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
-
-    # 插入AIGC关注公众号任务
-    async def insert_aigc_follow_account_task(self, task_id, link):
-        timestamp = int(time.time() * 1000)
-        query = """
-            INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); 
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(task_id, "follow", link, timestamp, timestamp),
-            db_name="aigc",
-        )
-
-    # 插入AIGC自动回复任务
-    async def insert_aigc_auto_reply_task(self, task_id, account_name):
-        timestamp = int(time.time() * 1000)
-        query = """
-            INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); 
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(task_id, account_name, timestamp, timestamp),
-            db_name="aigc",
-        )
-
-    # 为账号设置 sample_url
-    async def set_sample_url(self, gh_id, sample_url):
-        query = """
-            UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
-        """
-        return await self.pool.async_save(query=query, params=(sample_url, gh_id))
-
-    # 修改账号的关注状态
-    async def update_follow_status(self, gh_id, ori_status, new_status):
-        query = """
-            UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
-        """
-        return await self.pool.async_save(
-            query=query, params=(new_status, gh_id, ori_status)
-        )
-
-    # 从 aigc 获取关注结果
-    async def fetch_follow_account_status(self, gh_id):
-        query = """
-            SELECT task_status, err_msg 
-            FROM gzh_msg_record
-            WHERE task_id = %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(f"follow_{gh_id}",), db_name="aigc"
-        )
-
-    # 创建自动回复任务
-    async def create_auto_reply_task(self, task_id, gh_id):
-        query = """
-            INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
-        """
-        return await self.pool.async_save(query=query, params=(task_id, gh_id))
-
-    async def update_auto_reply_task_status(
-        self, task_id, status_type, ori_status, new_status
-    ):
-        task_query = """
-            UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
-        """
-        extract_query = """
-            UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
-        """
-        match status_type:
-            case "task":
-                return await self.pool.async_save(
-                    query=task_query, params=(new_status, task_id, ori_status)
-                )
-            case "extract":
-                return await self.pool.async_save(
-                    query=extract_query, params=(new_status, task_id, ori_status)
-                )
-            case _:
-                print("status_type_error")
-                return None
-
-    # 获取正在自动回复卡片的任务 id
-    async def fetch_auto_replying_tasks(self):
-        query = """
-            SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(self.PROCESSING_STATUS,)
-        )
-
-    # 设置自动回复结果
-    async def set_auto_reply_result(self, task_id, finish_timestamp, result):
-        query = """
-            UPDATE cooperate_accounts_task 
-            SET finish_timestamp = %s, result = %s, task_status = %s
-            WHERE task_id = %s and task_status = %s;
-        """
-        return await self.pool.async_save(
-            query=query,
-            params=(
-                finish_timestamp,
-                result,
-                self.SUCCESS_STATUS,
-                task_id,
-                self.PROCESSING_STATUS,
-            ),
-        )
-
-    # 获取带解析的任务
-    async def get_extract_tasks(self):
-        query = """
-            SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
-        """
-        return await self.pool.async_fetch(
-            query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
-        )
-
-    # 存储解析结果
-    async def store_extract_result(self, query, row_table):
-        return await self.pool.async_save(query=query, params=row_table)
-
-    # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
-    async def fetch_cooperate_accounts(self, account_name):
-        fetch_query = """
-            SELECT t2.name AS partner_name, t2.channel AS partner_id,
-                   t1.name AS account_name, t1.gh_id
-            FROM content_platform_gzh_account t1 JOIN content_platform_account t2
-            ON t1.create_account_id = t2.id
-            WHERE t1.name =  %s;
-        """
-        fetch_response = await self.pool.async_fetch(
-            query=fetch_query, db_name="growth", params=(account_name,)
-        )
-        if not fetch_response:
-            return 0
-
-        account_detail = fetch_response[0]
-        save_query = """
-            INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
-            VALUES (%s, %s, %s, %s);
-        """
-        return await self.pool.async_save(
-            query=save_query,
-            params=(
-                account_detail["partner_name"],
-                account_detail["partner_id"],
-                account_detail["account_name"],
-                account_detail["gh_id"],
-            ),
-        )
-
-
-class AutoReplyCardsMonitor(AutoReplyCardsMonitorMapper):
-    def __init__(self, pool, log_client):
-        super().__init__(pool, log_client)
-
-    # 存储卡片信息
-    async def store_card(self, task_id, index, msg_type, xml_obj):
-        video_id, root_source_id = self.extract_page_path(xml_obj["page_path"])
-        cover_obj = await self.get_cover_url(
-            xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
-        )
-        cover_oss = self.download_and_upload_cover(task_id, index, cover_obj)
-        query = """
-            INSERT INTO cooperate_auto_reply_detail
-            (
-                task_id, position, msg_type, card_title, card_cover, 
-                video_id, root_source_id, mini_program_name, task_result
-            ) VALUES 
-            (
-                %s, %s, %s, %s, %s, 
-                %s, %s, %s, %s
-            );
-        """
-        insert_row = (
-            task_id,
-            index,
-            msg_type,
-            xml_obj["title"],
-            cover_oss,
-            video_id,
-            root_source_id,
-            xml_obj["mini_program"],
-            json.dumps(xml_obj, ensure_ascii=False),
-        )
-        await self.store_extract_result(query, insert_row)
-
-    # 存储文章信息
-    async def store_article(self, task_id, index, msg_type, xml_obj):
-        article_title = xml_obj.get("title")
-        article_link = xml_obj.get("url")
-        article_cover = xml_obj.get("cover_url")
-        article_desc = xml_obj.get("desc")
-
-        fetch_fail_status = False
-
-        fetch_response = await get_article_detail(
-            article_link=article_link, is_cache=False, is_count=True
-        )
-        if not fetch_response:
-            fetch_fail_status = True
-
-        if not fetch_fail_status:
-            if fetch_response.get("code") != 0:
-                fetch_fail_status = True
-
-        if fetch_fail_status:
-            query = """
-                INSERT INTO cooperate_auto_reply_detail
-                    (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
-                VALUES
-                (%s, %s, %s, %s, %s, %s, %s, %s);
-            """
-            remark = "获取文章详情失败"
-            insert_row = (
-                task_id,
-                index,
-                msg_type,
-                article_title,
-                article_link,
-                article_cover,
-                article_desc,
-                remark,
-            )
-            await self.store_extract_result(query, insert_row)
-
-        else:
-            article_detail = fetch_response["data"]["data"]
-            article_text = article_detail["body_text"]
-            article_images = article_detail["image_url_list"]
-            read_cnt = article_detail["view_count"]
-            like_cnt = article_detail["like_count"]
-            publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
-            parsed = urlparse(article_detail["content_link"])
-            params = parse_qs(parsed.query)
-            wx_sn = params.get("sn", [None])[0]
-            print(params)
-            print(wx_sn)
-            mini_info = article_detail.get("mini_program")
-            if not mini_info:
-                # video_id, root_source_id = None, None
-                query = """
-                    INSERT INTO cooperate_auto_reply_detail 
-                    (
-                        task_id, position, msg_type, article_title, article_link, 
-                        article_cover, article_text, article_images, article_desc, read_cnt, 
-                        like_cnt, publish_timestamp, task_result, wx_sn
-                    ) VALUES 
-                    (
-                        %s, %s, %s, %s, %s, 
-                        %s, %s, %s, %s, %s, 
-                        %s, %s, %s, %s
-                    );
-                """
-                values = (
-                    task_id,
-                    index,
-                    msg_type,
-                    article_title,
-                    article_link,
-                    article_cover,
-                    article_text,
-                    json.dumps(article_images, ensure_ascii=False),
-                    article_desc,
-                    read_cnt,
-                    like_cnt,
-                    publish_timestamp,
-                    json.dumps(fetch_response, ensure_ascii=False),
-                    wx_sn,
-                )
-                await self.store_extract_result(query, values)
-
-            else:
-                for card_index, i in enumerate(mini_info, 1):
-                    try:
-                        video_id, root_source_id = self.extract_page_path(i["path"])
-                        card_title = i["title"]
-                        card_cover = i["image_url"]
-                        mini_name = i["nike_name"]
-                        query = """
-                            INSERT INTO cooperate_auto_reply_detail 
-                                (
-                                    task_id, position, msg_type, card_title, card_cover, 
-                                    video_id, root_source_id, mini_program_name, article_title, article_link, 
-                                    article_cover, article_text, article_images, article_desc, read_cnt, 
-                                    like_cnt, publish_timestamp, task_result, wx_sn, card_index
-                                ) VALUES
-                                (
-                                    %s, %s, %s, %s, %s, 
-                                    %s, %s, %s, %s, %s, 
-                                    %s, %s, %s, %s, %s,
-                                    %s, %s, %s, %s, %s
-                                );
-                        """
-                        values = (
-                            task_id,
-                            index,
-                            msg_type,
-                            card_title,
-                            card_cover,
-                            video_id,
-                            root_source_id,
-                            mini_name,
-                            article_title,
-                            article_link,
-                            article_cover,
-                            article_text,
-                            json.dumps(article_images, ensure_ascii=False),
-                            article_desc,
-                            read_cnt,
-                            like_cnt,
-                            publish_timestamp,
-                            json.dumps(fetch_response, ensure_ascii=False),
-                            wx_sn,
-                            card_index,
-                        )
-                        await self.store_extract_result(query, values)
-                    except Exception as e:
-                        print(traceback.format_exc())
-                        print(e)
-
-    # 创建单个关注公众号任务
-    async def create_follow_single_account_task(self, gh_id):
-        response = await get_article_list_from_account(account_id=gh_id)
-        code = response.get("code")
-        match code:
-            case 0:
-                recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
-                article_url = await self.get_sample_url(recent_articles)
-                print(article_url)
-                if article_url:
-                    await self.set_sample_url(gh_id, article_url)
-
-                    task_id = self.generate_task_id(task_name="follow", gh_id=gh_id)
-                    affected_rows = await self.insert_aigc_follow_account_task(
-                        task_id, article_url
-                    )
-
-                    if affected_rows:
-                        await self.update_follow_status(
-                            gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
-                        )
-
-            case 25013:
-                await self.set_account_as_invalid(gh_id)
-
-            case _:
-                pass
-
-    # 创建单个账号自动回复任务
-    async def create_auto_reply_single_account_task(self, gh_id, account_name):
-        print(account_name)
-        task_id = self.generate_task_id(task_name="auto_reply", gh_id=gh_id)
-        # 先插入 task, 再创建自动回复任务
-        create_row = await self.create_auto_reply_task(task_id, gh_id)
-        if create_row:
-            affected_rows = await self.insert_aigc_auto_reply_task(task_id, gh_id)
-            if not affected_rows:
-                print("发布任务至 AIGC 失败")
-            else:
-                await self.update_auto_reply_task_status(
-                    task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
-                )
-        else:
-            print("创建任务至 DB 失败")
-
-    async def follow_gzh_task(self):
-        account_list = self.get_monitor_account_list()
-        for account in account_list:
-            try:
-                fetch_response = await self.fetch_account_status(account.公众号名)
-                if not fetch_response:
-                    affected_rows = await self.fetch_cooperate_accounts(
-                        account.公众号名
-                    )
-                    if affected_rows:
-                        fetch_response = await self.fetch_account_status(
-                            account.公众号名
-                        )
-
-                    else:
-                        print(f"系统中无账号,跳过: {account.公众号名}")
-                        continue
-
-                account_detail = fetch_response[0]
-                status = account_detail["status"]
-
-                if not status:
-                    print("账号已经迁移或者封禁")
-                    continue
-
-                # 新逻辑,无需考虑账号是否关注
-                await self.create_auto_reply_single_account_task(
-                    account_detail["gh_id"], account.公众号名
-                )
-
-            except Exception as e:
-                print(f"处理账号{account.公众号名}异常", e)
-
-    # 异步获取关注结果
-    async def get_auto_reply_response(self):
-        task_list = await self.fetch_auto_replying_tasks()
-        if not task_list:
-            print("No processing task yet")
-            return
-
-        for task in tqdm(task_list):
-            try:
-                task_id = task["task_id"]
-                response = await self.get_auto_reply_task_result(task_id)
-                if not response:
-                    continue
-
-                task_status = response[0]["task_status"]
-                task_result = response[0]["task_result"]
-                update_timestamp = response[0]["update_timestamp"]
-
-                match task_status:
-                    case self.FETCH_FAIL_STATUS:
-                        await self.update_auto_reply_task_status(
-                            task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
-                        )
-
-                    case self.FETCH_SUCCESS_STATUS:
-                        await self.set_auto_reply_result(
-                            task_id, update_timestamp, task_result
-                        )
-
-                    case _:
-                        continue
-
-            except Exception as e:
-                print(e)
-
-    # 解析单个xml
-    async def extract_single_xml(self, task):
-        task_id = task["task_id"]
-        result = task["result"]
-
-        # acquire lock
-        acquire_lock = await self.update_auto_reply_task_status(
-            task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
-        )
-        if not acquire_lock:
-            return
-
-        try:
-            # parse xml
-            xml_list = json.loads(result) if type(result) == str else result
-            index = 0
-            for item in xml_list:
-                xml_obj_list = self.extract_callback_xml(item)
-                if xml_obj_list:
-                    for xml_obj in xml_obj_list:
-                        index += 1
-                        msg_type = xml_obj.get("msg_type", None)
-                        match msg_type:
-                            case "33":
-                                await self.store_card(task_id, index, msg_type, xml_obj)
-
-                            case "5":
-                                await self.store_article(
-                                    task_id, index, msg_type, xml_obj
-                                )
-
-                            case _:
-                                continue
-
-                await asyncio.sleep(5)
-
-            await self.update_auto_reply_task_status(
-                task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
-            )
-
-        except Exception as e:
-            print(e)
-            print(traceback.format_exc())
-            await self.update_auto_reply_task_status(
-                task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
-            )
-
-    # main function
-    async def deal(self, task_name):
-        match task_name:
-            case "follow_gzh_task":
-                await self.follow_gzh_task()
-
-            case "get_auto_reply_task":
-                await self.get_auto_reply_response()
-
-            case "extract_task":
-                task_list = await self.get_extract_tasks()
-                if not task_list:
-                    print("No tasks to extract now")
-                    return
-
-                for task in tqdm(task_list, desc="解析任务"):
-                    await self.extract_single_xml(task)
-                    await asyncio.sleep(10)
-
-            case "re_extract_task":
-                pass
-
-            case _:
-                print("task_error")

+ 1 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/__init__.py

@@ -0,0 +1 @@
+from .entrance import AutoReplyCardsMonitor

+ 16 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_const.py

@@ -0,0 +1,16 @@
+class AutoReplyCardsMonitorConst:
+    # fetch_status
+    FETCH_INIT_STATUS = 0
+    FETCH_PROCESSING_STATUS = 1
+    FETCH_SUCCESS_STATUS = 2
+    FETCH_FAIL_STATUS = 3
+
+    # task_status
+    INIT_STATUS = 0
+    PROCESSING_STATUS = 1
+    SUCCESS_STATUS = 2
+    FAIL_STATUS = 99
+
+    # account_status
+    VALID_STATUS = 1
+    INVALID_STATUS = 0

+ 189 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -0,0 +1,189 @@
+import time
+
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import AutoReplyCardsMonitorConst
+
+
+class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
+    def __init__(self, pool: DatabaseManager, log_service: LogService):
+        self.pool = pool
+        self.log_service = log_service
+
+    # 获取自动回复任务结果
+    async def get_auto_reply_task_result(self, task_id):
+        query = """
+            SELECT task_result, task_status, err_msg, update_timestamp
+            FROM gzh_msg_record
+            WHERE task_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(task_id,), db_name="aigc"
+        )
+
+    # 查询账号
+    async def fetch_account_status(self, account_name):
+        query = """
+            SELECT partner_name, partner_id, gh_id, status, follow_status
+            FROM cooperate_accounts
+            WHERE account_name = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(account_name,))
+
+    # 更新账号状态为无效
+    async def set_account_as_invalid(self, gh_id):
+        query = """
+            UPDATE cooperate_accounts SET status = %s WHERE gh_id = %s;
+        """
+        await self.pool.async_save(query=query, params=(self.INVALID_STATUS, gh_id))
+
+    # 插入AIGC关注公众号任务
+    async def insert_aigc_follow_account_task(self, task_id, link):
+        timestamp = int(time.time() * 1000)
+        query = """
+            INSERT INTO gzh_msg_record (task_id, biz_type, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s, %s); 
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(task_id, "follow", link, timestamp, timestamp),
+            db_name="aigc",
+        )
+
+    # 插入AIGC自动回复任务
+    async def insert_aigc_auto_reply_task(self, task_id, account_name):
+        timestamp = int(time.time() * 1000)
+        query = """
+            INSERT INTO gzh_msg_record (task_id, task_params, create_timestamp, update_timestamp) VALUES (%s, %s, %s, %s); 
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(task_id, account_name, timestamp, timestamp),
+            db_name="aigc",
+        )
+
+    # 为账号设置 sample_url
+    async def set_sample_url(self, gh_id, sample_url):
+        query = """
+            UPDATE cooperate_accounts SET sample_link = %s WHERE gh_id = %s;
+        """
+        return await self.pool.async_save(query=query, params=(sample_url, gh_id))
+
+    # 修改账号的关注状态
+    async def update_follow_status(self, gh_id, ori_status, new_status):
+        query = """
+            UPDATE cooperate_accounts SET follow_status = %s WHERE gh_id = %s and follow_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query, params=(new_status, gh_id, ori_status)
+        )
+
+    # 从 aigc 获取关注结果
+    async def fetch_follow_account_status(self, gh_id):
+        query = """
+            SELECT task_status, err_msg 
+            FROM gzh_msg_record
+            WHERE task_id = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(f"follow_{gh_id}",), db_name="aigc"
+        )
+
+    # 创建自动回复任务
+    async def create_auto_reply_task(self, task_id, gh_id):
+        query = """
+            INSERT INTO cooperate_accounts_task (task_id, gh_id) VALUES (%s, %s);
+        """
+        return await self.pool.async_save(query=query, params=(task_id, gh_id))
+
+    async def update_auto_reply_task_status(
+        self, task_id, status_type, ori_status, new_status
+    ):
+        task_query = """
+            UPDATE cooperate_accounts_task SET task_status = %s WHERE task_id = %s AND task_status = %s;
+        """
+        extract_query = """
+            UPDATE cooperate_accounts_task SET extract_status = %s WHERE task_id = %s AND extract_status = %s;
+        """
+        match status_type:
+            case "task":
+                return await self.pool.async_save(
+                    query=task_query, params=(new_status, task_id, ori_status)
+                )
+            case "extract":
+                return await self.pool.async_save(
+                    query=extract_query, params=(new_status, task_id, ori_status)
+                )
+            case _:
+                print("status_type_error")
+                return None
+
+    # 获取正在自动回复卡片的任务 id
+    async def fetch_auto_replying_tasks(self):
+        query = """
+            SELECT task_id FROM cooperate_accounts_task WHERE task_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.PROCESSING_STATUS,)
+        )
+
+    # 设置自动回复结果
+    async def set_auto_reply_result(self, task_id, finish_timestamp, result):
+        query = """
+            UPDATE cooperate_accounts_task 
+            SET finish_timestamp = %s, result = %s, task_status = %s
+            WHERE task_id = %s and task_status = %s;
+        """
+        return await self.pool.async_save(
+            query=query,
+            params=(
+                finish_timestamp,
+                result,
+                self.SUCCESS_STATUS,
+                task_id,
+                self.PROCESSING_STATUS,
+            ),
+        )
+
+    # 获取带解析的任务
+    async def get_extract_tasks(self):
+        query = """
+            SELECT task_id, result FROM cooperate_accounts_task WHERE extract_status = %s AND task_status = %s;
+        """
+        return await self.pool.async_fetch(
+            query=query, params=(self.INIT_STATUS, self.SUCCESS_STATUS)
+        )
+
+    # 存储解析结果
+    async def store_extract_result(self, query, row_table):
+        return await self.pool.async_save(query=query, params=row_table)
+
+    # 从 growth 数据库获取账号信息,并且存储在 cooperate_accounts 表中
+    async def fetch_cooperate_accounts(self, account_name):
+        fetch_query = """
+            SELECT t2.name AS partner_name, t2.channel AS partner_id,
+                   t1.name AS account_name, t1.gh_id
+            FROM content_platform_gzh_account t1 JOIN content_platform_account t2
+            ON t1.create_account_id = t2.id
+            WHERE t1.name =  %s;
+        """
+        fetch_response = await self.pool.async_fetch(
+            query=fetch_query, db_name="growth", params=(account_name,)
+        )
+        if not fetch_response:
+            return 0
+
+        account_detail = fetch_response[0]
+        save_query = """
+            INSERT INTO cooperate_accounts (partner_name, partner_id, account_name, gh_id)
+            VALUES (%s, %s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=save_query,
+            params=(
+                account_detail["partner_name"],
+                account_detail["partner_id"],
+                account_detail["account_name"],
+                account_detail["gh_id"],
+            ),
+        )

+ 231 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -0,0 +1,231 @@
+import os
+import json
+import traceback
+import uuid
+from typing import List, Dict
+import xml.etree.ElementTree as ET
+
+from datetime import datetime, timedelta
+from urllib.parse import unquote, parse_qs, urlparse
+
+import requests
+from requests.exceptions import RequestException
+
+from app.infra.shared.tools import fetch_from_odps
+from app.infra.shared import AsyncHttpClient
+from app.infra.shared import ImageUtils
+from app.infra.shared import OssUtils
+from app.infra.crawler.wechat import get_article_detail
+
+
+class AutoReplyCardsMonitorUtils(OssUtils):
+    def __init__(self, oss_config):
+        super().__init__(oss_config)
+
+    @staticmethod
+    def generate_task_id(task_name, gh_id):
+        match task_name:
+            case "follow":
+                return f"{task_name}_{gh_id}"
+            case _:
+                return f"{task_name}_{uuid.uuid4()}"
+
+    @staticmethod
+    def parse_fields(root, fields, default=""):
+        result = {}
+        for key, path in fields.items():
+            elem = root.find(path)
+            result[key] = elem.text if elem is not None and elem.text else default
+        return result
+
+    def extract_reply_cards(self, msg_type: str, root) -> List[Dict]:
+        fields = {
+            "title": ".//title",
+            "page_path": ".//pagepath",
+            "mini_program": ".//sourcedisplayname",
+            "file_id": "appmsg/appattach/cdnthumburl",
+            "file_size": "appmsg/appattach/cdnthumblength",
+            "aes_key": "appmsg/appattach/aeskey",
+        }
+
+        data = self.parse_fields(root, fields)
+        data["msg_type"] = msg_type
+        results = [data]
+        return results
+
+    def extract_reply_articles(self, msg_type, root) -> Dict:
+        fields = {
+            "title": "appmsg/title",
+            "url": "appmsg/url",
+            "cover_url": "appmsg/thumburl",
+            "account_name": "appmsg/sourcedisplayname",
+            "gh_id": "appmsg/sourceusername",
+            "desc": "appmsg/des",
+        }
+        data = self.parse_fields(root, fields)
+        data["msg_type"] = msg_type
+        return data
+
+    @staticmethod
+    def extract_group_reply_articles(msg_type, root) -> List[Dict]:
+        items = []
+        for item in root.findall(".//item"):
+            data = {
+                "title": item.findtext("title"),
+                "url": item.findtext("url"),
+                "cover_url": item.findtext("cover"),
+                "account_name": item.findtext("sources/source/name"),
+                "gh_id": "",
+                "desc": "",
+                "msg_type": msg_type,
+            }
+            items.append(data)
+
+        return items
+
+    # 解析 xml
+    def extract_callback_xml(self, xml_text):
+        try:
+            root = ET.fromstring(xml_text)
+            msg_type = root.find("appmsg/type").text
+            match msg_type:
+                case "5":
+                    # return self.extract_reply_articles(msg_type, root)
+                    return self.extract_group_reply_articles(msg_type, root)
+
+                case "33":
+                    return self.extract_reply_cards(msg_type, root)
+
+                case "36":
+                    return self.extract_reply_cards(msg_type, root)
+
+                case _:
+                    return []
+
+        except Exception as e:
+            print(xml_text)
+            print(e)
+            print(traceback.format_exc())
+            return []
+
+    # 解析 page_path
+    @staticmethod
+    def extract_page_path(page_path):
+        # 解析外层 URL
+        parsed_url = urlparse(page_path)
+        outer_params = parse_qs(parsed_url.query)
+
+        # 取出并解码 jumpPage
+        jump_page = outer_params.get("jumpPage", [""])[0]
+        if not jump_page:
+            return None, None
+
+        decoded_jump_page = unquote(jump_page)
+
+        # 解析 jumpPage 内层参数
+        inner_query = urlparse(decoded_jump_page).query
+        inner_params = parse_qs(inner_query)
+
+        video_id = inner_params.get("id", [None])[0]
+        root_source_id = inner_params.get("rootSourceId", [None])[0]
+
+        return video_id, root_source_id
+
+    @staticmethod
+    async def get_cover_url(aes_key, total_size, file_id):
+        url = "http://api.geweapi.com/gewe/v2/api/message/downloadCdn"
+        data = {
+            "appId": "wx_anFlUnezoUynU3SKcqTWk",
+            "aesKey": aes_key,
+            "totalSize": total_size,
+            "fileId": file_id,
+            "type": "3",
+            "suffix": "jpg",
+        }
+        headers = {
+            "X-GEWE-TOKEN": "d3fb918f-0f36-4769-b095-410181614231",
+            "Content-Type": "application/json",
+        }
+        async with AsyncHttpClient() as client:
+            response = await client.post(url, headers=headers, data=json.dumps(data))
+
+        return response
+
+    @staticmethod
+    async def get_sample_url(recent_articles):
+        for article in recent_articles:
+            link = article["ContentUrl"]
+            response = await get_article_detail(article_link=link)
+            if not response:
+                continue
+            code = response["code"]
+            if code == 0 or code == 25006:
+                return link
+
+        return None
+
+    # 获取检测的账号 list
+    @staticmethod
+    def get_monitor_account_list():
+        # dt = (datetime.today() - timedelta(days=1)).strftime("%Y%m%d")
+        week_ago = (datetime.today() - timedelta(days=7)).strftime("%Y-%m-%d %H:%M:%S")
+        query = f"""
+            SELECT  公众号名, ghid, count(DISTINCT mid) AS uv
+            FROM    loghubods.opengid_base_data
+            WHERE   dt = MAX_PT('loghubods.opengid_base_data')
+            AND     hotsencetype = 1074
+            AND     usersharedepth = 0
+            AND     channel = '公众号合作-即转-稳定'
+            AND     点击时间 >= '{week_ago}'
+            GROUP BY 公众号名, ghid
+            HAVING uv >= 100
+            ORDER BY uv DESC
+            ;
+        """
+        result = fetch_from_odps(query)
+        return result
+
+    def download_and_upload_cover(self, task_id, index, cover_obj, timeout=10):
+        try:
+            cover_url = cover_obj["data"]["fileUrl"]
+        except (KeyError, TypeError):
+            print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
+            return None
+
+        file_name = f"{task_id}_{index}.jpg"
+        save_dir = os.path.join(os.getcwd(), "static")
+        save_path = os.path.join(save_dir, file_name)
+
+        os.makedirs(save_dir, exist_ok=True)
+
+        try:
+            response = requests.get(cover_url, timeout=timeout)
+            response.raise_for_status()
+        except RequestException as e:
+            print(f"[ERROR] Download failed ({cover_url}): {e}")
+            return None
+
+        try:
+            with open(save_path, "wb") as f:
+                f.write(response.content)
+        except OSError as e:
+            print(f"[ERROR] Write file failed ({save_path}): {e}")
+            return None
+
+        # 需要在这里校验,判断封面是否相同
+
+        oss_dir = "auto_rely_cards_cover"
+        oss_key = None
+        try:
+            oss_key = f"{oss_dir}/{file_name}"
+            self.save_from_file(save_path, f"{oss_dir}/{file_name}")
+        except Exception as e:
+            print(f"[ERROR] Upload to OSS failed: {e}")
+
+        if oss_key:
+            try:
+                os.remove(save_path)
+            except OSError as e:
+                print(f"[WARN] Failed to remove temp file {save_path}: {e}")
+
+        return oss_key

+ 382 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -0,0 +1,382 @@
+import asyncio
+import json
+import traceback
+
+from tqdm import tqdm
+from urllib.parse import parse_qs, urlparse
+
+from app.infra.crawler.wechat import get_article_list_from_account
+from app.infra.crawler.wechat import get_article_detail
+
+from app.core.config import GlobalConfigSettings
+from app.core.database import DatabaseManager
+from app.core.observability import LogService
+
+from ._const import AutoReplyCardsMonitorConst
+from ._mapper import AutoReplyCardsMonitorMapper
+from ._utils import AutoReplyCardsMonitorUtils
+
+
+class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
+    def __init__(
+        self,
+        pool: DatabaseManager,
+        log_service: LogService,
+        config: GlobalConfigSettings,
+    ):
+        self.mapper: AutoReplyCardsMonitorMapper = AutoReplyCardsMonitorMapper(
+            pool=pool, log_service=log_service
+        )
+        self.tool: AutoReplyCardsMonitorUtils = AutoReplyCardsMonitorUtils(
+            oss_config=config.aliyun_oss
+        )
+
+    # 存储卡片信息
+    async def store_card(self, task_id, index, msg_type, xml_obj):
+        video_id, root_source_id = self.tool.extract_page_path(xml_obj["page_path"])
+        cover_obj = await self.tool.get_cover_url(
+            xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
+        )
+        cover_oss = self.tool.download_and_upload_cover(task_id, index, cover_obj)
+        query = """
+            INSERT INTO cooperate_auto_reply_detail
+            (
+                task_id, position, msg_type, card_title, card_cover, 
+                video_id, root_source_id, mini_program_name, task_result
+            ) VALUES 
+            (
+                %s, %s, %s, %s, %s, 
+                %s, %s, %s, %s
+            );
+        """
+        insert_row = (
+            task_id,
+            index,
+            msg_type,
+            xml_obj["title"],
+            cover_oss,
+            video_id,
+            root_source_id,
+            xml_obj["mini_program"],
+            json.dumps(xml_obj, ensure_ascii=False),
+        )
+        await self.mapper.store_extract_result(query, insert_row)
+
+    # 存储文章信息
+    async def store_article(self, task_id, index, msg_type, xml_obj):
+        article_title = xml_obj.get("title")
+        article_link = xml_obj.get("url")
+        article_cover = xml_obj.get("cover_url")
+        article_desc = xml_obj.get("desc")
+
+        fetch_fail_status = False
+
+        fetch_response = await get_article_detail(
+            article_link=article_link, is_cache=False, is_count=True
+        )
+        if not fetch_response:
+            fetch_fail_status = True
+
+        if not fetch_fail_status:
+            if fetch_response.get("code") != 0:
+                fetch_fail_status = True
+
+        if fetch_fail_status:
+            query = """
+                INSERT INTO cooperate_auto_reply_detail
+                    (task_id, position, msg_type, article_title, article_link, article_cover, article_desc, remark)
+                VALUES
+                (%s, %s, %s, %s, %s, %s, %s, %s);
+            """
+            remark = "获取文章详情失败"
+            insert_row = (
+                task_id,
+                index,
+                msg_type,
+                article_title,
+                article_link,
+                article_cover,
+                article_desc,
+                remark,
+            )
+            await self.mapper.store_extract_result(query, insert_row)
+
+        else:
+            article_detail = fetch_response["data"]["data"]
+            article_text = article_detail["body_text"]
+            article_images = article_detail["image_url_list"]
+            read_cnt = article_detail["view_count"]
+            like_cnt = article_detail["like_count"]
+            publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
+            parsed = urlparse(article_detail["content_link"])
+            params = parse_qs(parsed.query)
+            wx_sn = params.get("sn", [None])[0]
+            print(params)
+            print(wx_sn)
+            mini_info = article_detail.get("mini_program")
+            if not mini_info:
+                # video_id, root_source_id = None, None
+                query = """
+                    INSERT INTO cooperate_auto_reply_detail 
+                    (
+                        task_id, position, msg_type, article_title, article_link, 
+                        article_cover, article_text, article_images, article_desc, read_cnt, 
+                        like_cnt, publish_timestamp, task_result, wx_sn
+                    ) VALUES 
+                    (
+                        %s, %s, %s, %s, %s, 
+                        %s, %s, %s, %s, %s, 
+                        %s, %s, %s, %s
+                    );
+                """
+                values = (
+                    task_id,
+                    index,
+                    msg_type,
+                    article_title,
+                    article_link,
+                    article_cover,
+                    article_text,
+                    json.dumps(article_images, ensure_ascii=False),
+                    article_desc,
+                    read_cnt,
+                    like_cnt,
+                    publish_timestamp,
+                    json.dumps(fetch_response, ensure_ascii=False),
+                    wx_sn,
+                )
+                await self.mapper.store_extract_result(query, values)
+
+            else:
+                for card_index, i in enumerate(mini_info, 1):
+                    try:
+                        video_id, root_source_id = self.tool.extract_page_path(i["path"])
+                        card_title = i["title"]
+                        card_cover = i["image_url"]
+                        mini_name = i["nike_name"]
+                        query = """
+                            INSERT INTO cooperate_auto_reply_detail 
+                                (
+                                    task_id, position, msg_type, card_title, card_cover, 
+                                    video_id, root_source_id, mini_program_name, article_title, article_link, 
+                                    article_cover, article_text, article_images, article_desc, read_cnt, 
+                                    like_cnt, publish_timestamp, task_result, wx_sn, card_index
+                                ) VALUES
+                                (
+                                    %s, %s, %s, %s, %s, 
+                                    %s, %s, %s, %s, %s, 
+                                    %s, %s, %s, %s, %s,
+                                    %s, %s, %s, %s, %s
+                                );
+                        """
+                        values = (
+                            task_id,
+                            index,
+                            msg_type,
+                            card_title,
+                            card_cover,
+                            video_id,
+                            root_source_id,
+                            mini_name,
+                            article_title,
+                            article_link,
+                            article_cover,
+                            article_text,
+                            json.dumps(article_images, ensure_ascii=False),
+                            article_desc,
+                            read_cnt,
+                            like_cnt,
+                            publish_timestamp,
+                            json.dumps(fetch_response, ensure_ascii=False),
+                            wx_sn,
+                            card_index,
+                        )
+                        await self.mapper.store_extract_result(query, values)
+                    except Exception as e:
+                        print(traceback.format_exc())
+                        print(e)
+
+    # 创建单个关注公众号任务
+    async def create_follow_single_account_task(self, gh_id):
+        response = await get_article_list_from_account(account_id=gh_id)
+        code = response.get("code")
+        match code:
+            case 0:
+                recent_articles = response["data"]["data"][0]["AppMsg"]["DetailInfo"]
+                article_url = await self.tool.get_sample_url(recent_articles)
+                print(article_url)
+                if article_url:
+                    await self.mapper.set_sample_url(gh_id, article_url)
+
+                    task_id = self.tool.generate_task_id(task_name="follow", gh_id=gh_id)
+                    affected_rows = await self.mapper.insert_aigc_follow_account_task(
+                        task_id, article_url
+                    )
+
+                    if affected_rows:
+                        await self.mapper.update_follow_status(
+                            gh_id, self.INIT_STATUS, self.PROCESSING_STATUS
+                        )
+
+            case 25013:
+                await self.mapper.set_account_as_invalid(gh_id)
+
+            case _:
+                pass
+
+    # 创建单个账号自动回复任务
+    async def create_auto_reply_single_account_task(self, gh_id, account_name):
+        print(account_name)
+        task_id = self.tool.generate_task_id(task_name="auto_reply", gh_id=gh_id)
+        # 先插入 task, 再创建自动回复任务
+        create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
+        if create_row:
+            affected_rows = await self.mapper.insert_aigc_auto_reply_task(task_id, gh_id)
+            if not affected_rows:
+                print("发布任务至 AIGC 失败")
+            else:
+                await self.mapper.update_auto_reply_task_status(
+                    task_id, "task", self.INIT_STATUS, self.PROCESSING_STATUS
+                )
+        else:
+            print("创建任务至 DB 失败")
+
+    async def follow_gzh_task(self):
+        account_list = self.tool.get_monitor_account_list()
+        for account in account_list:
+            try:
+                fetch_response = await self.mapper.fetch_account_status(account.公众号名)
+                if not fetch_response:
+                    affected_rows = await self.mapper.fetch_cooperate_accounts(
+                        account.公众号名
+                    )
+                    if affected_rows:
+                        fetch_response = await self.mapper.fetch_account_status(
+                            account.公众号名
+                        )
+
+                    else:
+                        print(f"系统中无账号,跳过: {account.公众号名}")
+                        continue
+
+                account_detail = fetch_response[0]
+                status = account_detail["status"]
+
+                if not status:
+                    print("账号已经迁移或者封禁")
+                    continue
+
+                # 新逻辑,无需考虑账号是否关注
+                await self.create_auto_reply_single_account_task(
+                    account_detail["gh_id"], account.公众号名
+                )
+
+            except Exception as e:
+                print(f"处理账号{account.公众号名}异常", e)
+
+    # 异步获取关注结果
+    async def get_auto_reply_response(self):
+        task_list = await self.mapper.fetch_auto_replying_tasks()
+        if not task_list:
+            print("No processing task yet")
+            return
+
+        for task in tqdm(task_list):
+            try:
+                task_id = task["task_id"]
+                response = await self.mapper.get_auto_reply_task_result(task_id)
+                if not response:
+                    continue
+
+                task_status = response[0]["task_status"]
+                task_result = response[0]["task_result"]
+                update_timestamp = response[0]["update_timestamp"]
+
+                match task_status:
+                    case self.FETCH_FAIL_STATUS:
+                        await self.mapper.update_auto_reply_task_status(
+                            task_id, "task", self.PROCESSING_STATUS, self.FAIL_STATUS
+                        )
+
+                    case self.FETCH_SUCCESS_STATUS:
+                        await self.mapper.set_auto_reply_result(
+                            task_id, update_timestamp, task_result
+                        )
+
+                    case _:
+                        continue
+
+            except Exception as e:
+                print(e)
+
+    # 解析单个xml
+    async def extract_single_xml(self, task):
+        task_id = task["task_id"]
+        result = task["result"]
+
+        # acquire lock
+        acquire_lock = await self.mapper.update_auto_reply_task_status(
+            task_id, "extract", self.INIT_STATUS, self.PROCESSING_STATUS
+        )
+        if not acquire_lock:
+            return
+
+        try:
+            # parse xml
+            xml_list = json.loads(result) if type(result) == str else result
+            index = 0
+            for item in xml_list:
+                xml_obj_list = self.tool.extract_callback_xml(item)
+                if xml_obj_list:
+                    for xml_obj in xml_obj_list:
+                        index += 1
+                        msg_type = xml_obj.get("msg_type", None)
+                        match msg_type:
+                            case "33":
+                                await self.store_card(task_id, index, msg_type, xml_obj)
+
+                            case "5":
+                                await self.store_article(
+                                    task_id, index, msg_type, xml_obj
+                                )
+
+                            case _:
+                                continue
+
+                await asyncio.sleep(5)
+
+            await self.mapper.update_auto_reply_task_status(
+                task_id, "extract", self.PROCESSING_STATUS, self.SUCCESS_STATUS
+            )
+
+        except Exception as e:
+            print(e)
+            print(traceback.format_exc())
+            await self.mapper.update_auto_reply_task_status(
+                task_id, "extract", self.PROCESSING_STATUS, self.FAIL_STATUS
+            )
+
+    # main function
+    async def deal(self, task_name):
+        match task_name:
+            case "follow_gzh_task":
+                await self.follow_gzh_task()
+
+            case "get_auto_reply_task":
+                await self.get_auto_reply_response()
+
+            case "extract_task":
+                task_list = await self.mapper.get_extract_tasks()
+                if not task_list:
+                    print("No tasks to extract now")
+                    return
+
+                for task in tqdm(task_list, desc="解析任务"):
+                    await self.extract_single_xml(task)
+                    await asyncio.sleep(10)
+
+            case "re_extract_task":
+                pass
+
+            case _:
+                print("task_error")

+ 2 - 0
app/infra/shared/__init__.py

@@ -1,6 +1,8 @@
 from .async_tasks import run_tasks_with_asyncio_task_group
 from .async_tasks import run_tasks_with_async_worker_group
 from .http_client import AsyncHttpClient
+from .oss import OssUtils
+from .image import ImageUtils
 
 # server response
 from app.infra.shared.response import TaskScheduleResponse

+ 57 - 0
app/infra/shared/image.py

@@ -0,0 +1,57 @@
+import requests
+import imagehash
+from PIL import Image
+from io import BytesIO
+from typing import Optional
+
+from app.core.config import GlobalConfigSettings
+from app.infra.shared.oss import OssUtils
+from app.schemas import ImagePath
+
+
+class ImageUtils(OssUtils):
+    """phash 汉明距离 0~64,越小越相似。低于此阈值视为同一张图。"""
+
+    SAME_IMAGE_PHASH_THRESHOLD = 5
+
+    def __init__(self, config: GlobalConfigSettings):
+        super().__init__(config.aliyun_oss)
+
+    def load_image(self, path: str, path_type: Optional[str] = None):
+        path_type = path_type or "oss_file"
+        match path_type:
+            case "filepath":
+                img = Image.open(path)
+
+            case "url":
+                img = Image.open(BytesIO(requests.get(path, timeout=5).content))
+
+            case "oss_file":
+                img = Image.open(BytesIO(self.bucket.get_object(path).read()))
+
+            case _:
+                return "file_type error"
+
+        return img.convert("RGB")
+
+    @staticmethod
+    def phash_distance(img1: Image.Image, img2: Image.Image):
+        h1 = imagehash.phash(img1)
+        h2 = imagehash.phash(img2)
+        return h1 - h2
+
+    def image_similar(self, file1: ImagePath, file2: ImagePath):
+        img1 = self.load_image(
+            path=file1.path,
+            path_type=file1.path_type,
+        )
+        img2 = self.load_image(
+            path=file2.path,
+            path_type=file2.path_type,
+        )
+
+        return self.phash_distance(img1, img2)
+
+    def is_same_image(self, file1: ImagePath, file2: ImagePath) -> bool:
+        """判断两张图是否视为同一张(phash 距离 <= SAME_IMAGE_PHASH_THRESHOLD)。"""
+        return self.image_similar(file1, file2) <= self.SAME_IMAGE_PHASH_THRESHOLD

+ 18 - 0
app/infra/shared/oss.py

@@ -0,0 +1,18 @@
+import oss2
+
+from app.core.config.settings import AliyunOssConfig
+
+
+class OssUtils:
+    def __init__(self, config: AliyunOssConfig):
+        self.bucket = oss2.Bucket(
+            oss2.Auth(config.access_key_id, config.access_key_secret),
+            config.endpoint,
+            config.bucket_name,
+        )
+
+    def fetch(self, oss_key):
+        return self.bucket.get_object(oss_key)
+
+    def save_from_file(self, file_path, oss_key):
+        return self.bucket.put_object_from_file(key=oss_key, filename=file_path)

+ 1 - 3
app/jobs/task_handler.py

@@ -428,9 +428,7 @@ class TaskHandler:
     @register("rate_limited_article_filter")
     async def _rate_limited_article_filter(self) -> int:
         """限流文章删除"""
-        task = RateLimitedArticleFilter(
-            pool=self.db_client, config=self.config
-        )
+        task = RateLimitedArticleFilter(pool=self.db_client, config=self.config)
         await task.deal()
         return TaskStatus.SUCCESS
 

+ 14 - 9
app/recommend/offline_recommend/core.py

@@ -72,7 +72,9 @@ class BaseOffRecommendUtils:
         return recommend_articles
 
     # 获取一批标题的推荐标题
-    async def get_recommend_articles_for_batch_titles(self, title_list: List[str], strategy: str) -> List[Dict[str, str]]:
+    async def get_recommend_articles_for_batch_titles(
+        self, title_list: List[str], strategy: str
+    ) -> List[Dict[str, str]]:
         match strategy:
             case "v1":
                 query = I2I.strategy_v1(title_list)
@@ -100,7 +102,9 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
         self.filter_keys: List[str] = []
 
     # 解析策略base 的数据结构
-    def extract_base(self, account_info, recommend_articles, published_titles: Set[str]):
+    def extract_base(
+        self, account_info, recommend_articles, published_titles: Set[str]
+    ):
         account_name = account_info["account_name"]
         gh_id = account_info["gh_id"]
         candidate_articles: List[Dict] = [
@@ -115,8 +119,8 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
             }
             for item in recommend_articles
             if item.rec_title
-               and item.rec_title not in self.filter_title
-               and item.rec_title not in published_titles
+            and item.rec_title not in self.filter_title
+            and item.rec_title not in published_titles
         ]
         return candidate_articles
 
@@ -135,8 +139,8 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
             }
             for item in recommend_articles
             if item.recommend_title
-               and item.recommend_title not in self.filter_title
-               and item.recommend_title not in published_titles
+            and item.recommend_title not in self.filter_title
+            and item.recommend_title not in published_titles
         ]
         return candidate_articles
 
@@ -153,9 +157,11 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
     ):
         gh_id: str = account_info["gh_id"]
         top_articles = await self.get_account_top_articles(gh_id, strategy)
-        top_titles = [i['title'] for i in top_articles]
+        top_titles = [i["title"] for i in top_articles]
 
-        recommend_articles = await self.get_recommend_articles_for_batch_titles(top_titles, strategy)
+        recommend_articles = await self.get_recommend_articles_for_batch_titles(
+            top_titles, strategy
+        )
         match strategy:
             case "v1":
                 return self.extract_v1(
@@ -167,7 +173,6 @@ class BaseOfflineRecommend(BaseOffRecommendUtils):
                     account_info, recommend_articles, published_titles
                 )
 
-
     async def deal(self, account_info: Dict[str, str], strategy: str):
         gh_id: str = account_info["gh_id"]
 

+ 0 - 2
app/recommend/offline_recommend/strategy/i2i.py

@@ -2,7 +2,6 @@ from .base import BaseStrategy
 
 
 class I2I(BaseStrategy):
-
     @staticmethod
     def base(title: str, limit: int = 50) -> str:
         query = f"""
@@ -66,7 +65,6 @@ class I2I(BaseStrategy):
         """
         return query
 
-
     @staticmethod
     def strategy_v1(title_list, limit: int = 500):
         title_tuple = tuple(title_list)

+ 5 - 0
app/schemas/__init__.py

@@ -0,0 +1,5 @@
+"""全项目共用的 Pydantic 模型(DTO / 文章注解类型等)。"""
+
+from app.schemas.image import ImagePath
+
+__all__ = ["ImagePath"]

+ 15 - 0
app/schemas/image.py

@@ -0,0 +1,15 @@
+"""图片相关共用模型。"""
+
+from typing import Optional
+
+from pydantic import BaseModel, Field
+
+
+class ImagePath(BaseModel):
+    """文章注解类型:图片路径描述,用于 image_similar、load_image 等。"""
+
+    path: str = Field(..., min_length=1, description="图片路径,不可空")
+    path_type: Optional[str] = Field(
+        default=None,
+        description="路径类型,可空;如 filepath / url / oss_file",
+    )