""" image_cdn.py - 将内容中的外站图片 URL 转换为自有 CDN 链接 使用场景:在 write_json / write_file 落盘前调用,自动把 小红书、B站、知乎、微博等平台的图片链接替换为 res.cybertogether.net CDN 链接, 防止外站图片过期/防盗链导致后续流程无法访问。 """ import hashlib import json import logging import re from typing import Any, Union import httpx logger = logging.getLogger(__name__) # ── 匹配需要转存的外站图片 URL(只抓图片后缀或明显的图床域名)────────────────── _IMG_URL_RE = re.compile( r'https?://(?!' # 排除自有 CDN,不重复上传 r'res\.cybertogether\.net' r')' r'[^\s"\'<>]+' # URL 主体 r'(?:' r'\.(?:jpg|jpeg|png|gif|webp|avif|bmp|svg)' # 明确图片后缀 r'|' r'(?:xhscdn\.com|bdimg\.com|hdslb\.com|zhihu\.com/p/[^/]+\.(?:jpg|png)' r'|sinaimg\.cn|wx\d+\.sinaimg\.cn|mmbiz\.qpic\.cn' # 微博/微信图床 r'|imagev[12]\.meitudata\.com' # 美图 r'|p[0-9]\.douyinpic\.com' # 抖音封面 r'|i0\.hdslb\.com|article\.biliimg\.com' # B站 r')' r')', re.IGNORECASE, ) BUCKET_NAME = "aigc-admin" BUCKET_PATH = "crawler/image" CDN_BASE = "https://res.cybertogether.net" async def _upload_bytes_to_oss(data: bytes, filename: str) -> str: """上传 bytes 到 OSS 并返回 CDN URL,失败时抛出异常""" from cyber_sdk.ali_oss import _upload_v2 result = await _upload_v2( file_name=filename, file_content=data, bucket_path=BUCKET_PATH, bucket_name=BUCKET_NAME, ) oss_key = result.get("oss_object_key") if not oss_key: raise ValueError(f"OSS response missing oss_object_key: {result}") return f"{CDN_BASE}/{oss_key}" async def _download_image(url: str) -> bytes: """下载图片 bytes,失败抛出异常""" async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: resp = await client.get(url, headers={"Referer": url, "User-Agent": "Mozilla/5.0"}) resp.raise_for_status() return resp.content def _ext_from_url(url: str) -> str: """从 URL 猜测文件扩展名,默认 jpg""" path = url.split("?")[0].lower() for ext in ("png", "gif", "webp", "avif", "bmp", "svg", "jpg", "jpeg"): if path.endswith(f".{ext}"): return ext return "jpg" async def replace_image_urls(text: str) -> str: """ 扫描 text 中所有外站图片 URL,下载并上传到自有 OSS,原地替换为 CDN 链接。 - 已是 res.cybertogether.net 的 URL 直接跳过 - 下载/上传失败的 URL 保留原值,只打 WARNING 日志 - 同一 URL 使用 MD5 去重(同一次调用内) """ urls = list(dict.fromkeys(_IMG_URL_RE.findall(text))) # 去重但保留顺序 if not urls: return text import asyncio async def _process_single_url(url: str) -> tuple[str, str]: url_hash = hashlib.md5(url.encode()).hexdigest()[:12] ext = _ext_from_url(url) filename = f"{url_hash}.{ext}" try: data = await _download_image(url) cdn_url = await _upload_bytes_to_oss(data, filename) logger.info("[ImageCDN] %s → %s", url[:60], cdn_url) return url, cdn_url except Exception as e: logger.warning("[ImageCDN] Failed to mirror %s: %s (%s)", url[:60], str(e) or repr(e), type(e).__name__) return url, url tasks = [_process_single_url(u) for u in urls] results = await asyncio.gather(*tasks) url_map: dict[str, str] = dict(results) for orig, cdn in url_map.items(): if orig != cdn: text = text.replace(orig, cdn) replaced = sum(1 for o, c in url_map.items() if o != c) if replaced: logger.info("[ImageCDN] Replaced %d/%d image URLs with CDN links", replaced, len(urls)) return text async def replace_image_urls_in_obj(obj: Any) -> Any: """ 递归扫描 dict/list 中所有字符串值,替换其中的外站图片 URL。 先整体 JSON 序列化再做替换,再反序列化,效率高且避免遗漏嵌套字段。 """ raw = json.dumps(obj, ensure_ascii=False) replaced = await replace_image_urls(raw) if replaced == raw: return obj return json.loads(replaced)