Bladeren bron

新增相同图片 oss_key合并

luojunhui 1 week geleden
bovenliggende
commit
1e2ff0f2fb

+ 3 - 1
app/domains/monitor_tasks/auto_reply_cards_monitor/__init__.py

@@ -1 +1,3 @@
-from .entrance import AutoReplyCardsMonitor
+from .entrance import AutoReplyCardsMonitor
+
+__all__ = ["AutoReplyCardsMonitor"]

+ 21 - 0
app/domains/monitor_tasks/auto_reply_cards_monitor/_mapper.py

@@ -1,5 +1,7 @@
 import time
 
+from typing import List, Dict
+
 from app.core.database import DatabaseManager
 from app.core.observability import LogService
 
@@ -187,3 +189,22 @@ class AutoReplyCardsMonitorMapper(AutoReplyCardsMonitorConst):
                 account_detail["gh_id"],
             ),
         )
+
+    # 通过 root_source_id 获取已经下载过的卡片封面 id
+    async def fetch_exist_covers(self, root_source_id: str) -> List[Dict]:
+        query = """
+            select cover_id, oss_path from cooperate_auto_reply_card_cover where root_source_id = %s;
+        """
+        return await self.pool.async_fetch(query=query, params=(root_source_id,))
+
+    # 保存封面至封面表
+    async def save_cover(self, cover_id, root_source_id, oss_path):
+        query = """
+            insert ignore into cooperate_auto_reply_card_cover
+                (cover_id, root_source_id, oss_path)
+            values
+                (%s, %s, %s);
+        """
+        return await self.pool.async_save(
+            query=query, params=(cover_id, root_source_id, oss_path)
+        )

+ 50 - 16
app/domains/monitor_tasks/auto_reply_cards_monitor/_utils.py

@@ -11,16 +11,21 @@ from urllib.parse import unquote, parse_qs, urlparse
 import requests
 from requests.exceptions import RequestException
 
-from app.infra.shared.tools import fetch_from_odps
+from app.core.config import GlobalConfigSettings
+
 from app.infra.shared import AsyncHttpClient
 from app.infra.shared import ImageUtils
 from app.infra.shared import OssUtils
+from app.infra.shared.tools import fetch_from_odps
 from app.infra.crawler.wechat import get_article_detail
 
+from app.schemas import ImagePath
 
-class AutoReplyCardsMonitorUtils(OssUtils):
-    def __init__(self, oss_config):
-        super().__init__(oss_config)
+
+class AutoReplyCardsMonitorUtils:
+    def __init__(self, config: GlobalConfigSettings):
+        self.image_tool = ImageUtils(config)
+        self.oss_tool = OssUtils(config.aliyun_oss)
 
     @staticmethod
     def generate_task_id(task_name, gh_id):
@@ -185,14 +190,14 @@ class AutoReplyCardsMonitorUtils(OssUtils):
         result = fetch_from_odps(query)
         return result
 
-    def download_and_upload_cover(self, task_id, index, cover_obj, timeout=10):
+    @staticmethod
+    def download_cover(file_name, cover_obj, timeout=10):
         try:
             cover_url = cover_obj["data"]["fileUrl"]
         except (KeyError, TypeError):
             print(f"[WARN] Invalid cover_obj structure: {cover_obj}")
             return None
 
-        file_name = f"{task_id}_{index}.jpg"
         save_dir = os.path.join(os.getcwd(), "static")
         save_path = os.path.join(save_dir, file_name)
 
@@ -212,20 +217,49 @@ class AutoReplyCardsMonitorUtils(OssUtils):
             print(f"[ERROR] Write file failed ({save_path}): {e}")
             return None
 
-        # 需要在这里校验,判断封面是否相同
+        return save_path
 
-        oss_dir = "auto_rely_cards_cover"
+    # 将封面上传到 oss
+    def upload_cover(self, file_name, save_path):
+        if save_path is None:
+            return None, None
+        oss_dir = "auto_reply_cards_cover"
+        image_file = ImagePath(path=save_path, path_type="filepath")
+        image_md5 = self.image_tool.get_image_md5(image_file)
         oss_key = None
         try:
-            oss_key = f"{oss_dir}/{file_name}"
-            self.save_from_file(save_path, f"{oss_dir}/{file_name}")
+            full_key = f"{oss_dir}/{file_name}"
+            self.oss_tool.save_from_file(save_path, full_key)
+            oss_key = full_key
         except Exception as e:
             print(f"[ERROR] Upload to OSS failed: {e}")
+            return None, image_md5
 
-        if oss_key:
-            try:
-                os.remove(save_path)
-            except OSError as e:
-                print(f"[WARN] Failed to remove temp file {save_path}: {e}")
+        try:
+            os.remove(save_path)
+        except OSError as e:
+            print(f"[WARN] Failed to remove temp file {save_path}: {e}")
+
+        return oss_key, image_md5
 
-        return oss_key
+    @staticmethod
+    def remove_local_cover(save_path):
+        """删除本地临时封面文件,避免泄漏。"""
+        if not save_path:
+            return
+        try:
+            os.remove(save_path)
+        except OSError as e:
+            print(f"[WARN] Failed to remove temp file {save_path}: {e}")
+
+    # 检验封面地址是否和历史相同
+    def check_cover(self, save_path, exist_covers):
+        img1 = ImagePath(path=save_path, path_type="filepath")
+        for exist_cover in exist_covers:
+            exist_oss = exist_cover["oss_path"]
+            img2 = ImagePath(path=exist_oss)
+            cover_id = exist_cover["cover_id"]
+            if self.image_tool.is_same_image(img1, img2):
+                return cover_id
+
+        return False

+ 56 - 17
app/domains/monitor_tasks/auto_reply_cards_monitor/entrance.py

@@ -5,8 +5,8 @@ import traceback
 from tqdm import tqdm
 from urllib.parse import parse_qs, urlparse
 
-from app.infra.crawler.wechat import get_article_list_from_account
 from app.infra.crawler.wechat import get_article_detail
+from app.infra.crawler.wechat import get_article_list_from_account
 
 from app.core.config import GlobalConfigSettings
 from app.core.database import DatabaseManager
@@ -28,7 +28,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             pool=pool, log_service=log_service
         )
         self.tool: AutoReplyCardsMonitorUtils = AutoReplyCardsMonitorUtils(
-            oss_config=config.aliyun_oss
+            config=config
         )
 
     # 存储卡片信息
@@ -37,11 +37,43 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         cover_obj = await self.tool.get_cover_url(
             xml_obj["aes_key"], xml_obj["file_size"], xml_obj["file_id"]
         )
-        cover_oss = self.tool.download_and_upload_cover(task_id, index, cover_obj)
+        file_name = f"{task_id}_{index}.jpg"
+        save_path = self.tool.download_cover(file_name, cover_obj)
+        if save_path is None:
+            return
+        exist_covers = await self.mapper.fetch_exist_covers(root_source_id)
+        if exist_covers:
+            exist_cover_id = self.tool.check_cover(save_path, exist_covers)
+            if exist_cover_id is not False:
+                cover_id = exist_cover_id
+                self.tool.remove_local_cover(save_path)
+            else:
+                # upload to oss
+                oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
+                if oss_key is not None:
+                    await self.mapper.save_cover(
+                        cover_id=cover_id,
+                        root_source_id=root_source_id,
+                        oss_path=oss_key,
+                    )
+                else:
+                    self.tool.remove_local_cover(save_path)
+                    return
+        else:
+            # upload to oss
+            oss_key, cover_id = self.tool.upload_cover(file_name, save_path)
+            if oss_key is not None:
+                await self.mapper.save_cover(
+                    cover_id=cover_id, root_source_id=root_source_id, oss_path=oss_key
+                )
+            else:
+                self.tool.remove_local_cover(save_path)
+                return
+
         query = """
             INSERT INTO cooperate_auto_reply_detail
             (
-                task_id, position, msg_type, card_title, card_cover, 
+                task_id, position, msg_type, card_title, card_cover_id, 
                 video_id, root_source_id, mini_program_name, task_result
             ) VALUES 
             (
@@ -54,7 +86,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             index,
             msg_type,
             xml_obj["title"],
-            cover_oss,
+            cover_id,
             video_id,
             root_source_id,
             xml_obj["mini_program"],
@@ -103,16 +135,15 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
 
         else:
             article_detail = fetch_response["data"]["data"]
-            article_text = article_detail["body_text"]
-            article_images = article_detail["image_url_list"]
-            read_cnt = article_detail["view_count"]
-            like_cnt = article_detail["like_count"]
-            publish_timestamp = int(article_detail["publish_timestamp"] / 1000)
+            article_text = article_detail.get("body_text", "")
+            article_images = article_detail.get("image_url_list", [])
+            read_cnt = article_detail.get("view_count") or 0
+            like_cnt = article_detail.get("like_count") or 0
+            pt = article_detail.get("publish_timestamp")
+            publish_timestamp = int(pt / 1000) if pt is not None else 0
             parsed = urlparse(article_detail["content_link"])
             params = parse_qs(parsed.query)
             wx_sn = params.get("sn", [None])[0]
-            print(params)
-            print(wx_sn)
             mini_info = article_detail.get("mini_program")
             if not mini_info:
                 # video_id, root_source_id = None, None
@@ -150,7 +181,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
             else:
                 for card_index, i in enumerate(mini_info, 1):
                     try:
-                        video_id, root_source_id = self.tool.extract_page_path(i["path"])
+                        video_id, root_source_id = self.tool.extract_page_path(
+                            i["path"]
+                        )
                         card_title = i["title"]
                         card_cover = i["image_url"]
                         mini_name = i["nike_name"]
@@ -208,7 +241,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
                 if article_url:
                     await self.mapper.set_sample_url(gh_id, article_url)
 
-                    task_id = self.tool.generate_task_id(task_name="follow", gh_id=gh_id)
+                    task_id = self.tool.generate_task_id(
+                        task_name="follow", gh_id=gh_id
+                    )
                     affected_rows = await self.mapper.insert_aigc_follow_account_task(
                         task_id, article_url
                     )
@@ -231,7 +266,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         # 先插入 task, 再创建自动回复任务
         create_row = await self.mapper.create_auto_reply_task(task_id, gh_id)
         if create_row:
-            affected_rows = await self.mapper.insert_aigc_auto_reply_task(task_id, gh_id)
+            affected_rows = await self.mapper.insert_aigc_auto_reply_task(
+                task_id, gh_id
+            )
             if not affected_rows:
                 print("发布任务至 AIGC 失败")
             else:
@@ -245,7 +282,9 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
         account_list = self.tool.get_monitor_account_list()
         for account in account_list:
             try:
-                fetch_response = await self.mapper.fetch_account_status(account.公众号名)
+                fetch_response = await self.mapper.fetch_account_status(
+                    account.公众号名
+                )
                 if not fetch_response:
                     affected_rows = await self.mapper.fetch_cooperate_accounts(
                         account.公众号名
@@ -323,7 +362,7 @@ class AutoReplyCardsMonitor(AutoReplyCardsMonitorConst):
 
         try:
             # parse xml
-            xml_list = json.loads(result) if type(result) == str else result
+            xml_list = json.loads(result) if isinstance(result, str) else result
             index = 0
             for item in xml_list:
                 xml_obj_list = self.tool.extract_callback_xml(item)

+ 19 - 1
app/infra/shared/image.py

@@ -1,3 +1,4 @@
+import hashlib
 import requests
 import imagehash
 from PIL import Image
@@ -27,13 +28,30 @@ class ImageUtils(OssUtils):
                 img = Image.open(BytesIO(requests.get(path, timeout=5).content))
 
             case "oss_file":
-                img = Image.open(BytesIO(self.bucket.get_object(path).read()))
+                img = Image.open(BytesIO(self.fetch_oss_file(path).read()))
 
             case _:
                 return "file_type error"
 
         return img.convert("RGB")
 
+    def _read_file_bytes(self, path: str, path_type: Optional[str] = None) -> bytes:
+        """根据 path_type 读取文件原始字节。"""
+        path_type = path_type or "oss_file"
+        if path_type == "filepath":
+            with open(path, "rb") as f:
+                return f.read()
+        if path_type == "url":
+            return requests.get(path, timeout=5).content
+        if path_type == "oss_file":
+            return self.fetch_oss_file(path).read()
+        raise ValueError(f"unsupported path_type: {path_type}")
+
+    def get_image_md5(self, file: ImagePath) -> str:
+        """计算图片文件的 MD5(基于原始文件字节),返回 32 位十六进制字符串。"""
+        content = self._read_file_bytes(file.path, file.path_type)
+        return hashlib.md5(content).hexdigest()
+
     @staticmethod
     def phash_distance(img1: Image.Image, img2: Image.Image):
         h1 = imagehash.phash(img1)

+ 1 - 1
app/infra/shared/oss.py

@@ -11,7 +11,7 @@ class OssUtils:
             config.bucket_name,
         )
 
-    def fetch(self, oss_key):
+    def fetch_oss_file(self, oss_key):
         return self.bucket.get_object(oss_key)
 
     def save_from_file(self, file_path, oss_key):

+ 9 - 3
app/jobs/task_handler.py

@@ -396,21 +396,27 @@ class TaskHandler:
     @register("auto_follow_account")
     async def _auto_follow_account_handler(self) -> int:
         """自动关注公众号"""
-        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
+        task = AutoReplyCardsMonitor(
+            pool=self.db_client, log_service=self.log_client, config=self.config
+        )
         await task.deal(task_name="follow_gzh_task")
         return TaskStatus.SUCCESS
 
     @register("get_follow_result")
     async def _get_follow_result_handler(self) -> int:
         """获取自动关注回复"""
-        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
+        task = AutoReplyCardsMonitor(
+            pool=self.db_client, log_service=self.log_client, config=self.config
+        )
         await task.deal(task_name="get_auto_reply_task")
         return TaskStatus.SUCCESS
 
     @register("extract_reply_result")
     async def _extract_reply_result_handler(self) -> int:
         """解析自动回复结果"""
-        task = AutoReplyCardsMonitor(pool=self.db_client, log_client=self.log_client)
+        task = AutoReplyCardsMonitor(
+            pool=self.db_client, log_service=self.log_client, config=self.config
+        )
         await task.deal(task_name="extract_task")
         return TaskStatus.SUCCESS