| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- """
- image_cdn.py - 将内容中的外站图片 URL 转换为自有 CDN 链接
- 使用场景:在 write_json / write_file 落盘前调用,自动把
- 小红书、B站、知乎、微博等平台的图片链接替换为 res.cybertogether.net CDN 链接,
- 防止外站图片过期/防盗链导致后续流程无法访问。
- """
- import hashlib
- import json
- import logging
- import re
- from typing import Any, Union
- import httpx
- logger = logging.getLogger(__name__)
- # ── 匹配需要转存的外站图片 URL(只抓图片后缀或明显的图床域名)──────────────────
- _IMG_URL_RE = re.compile(
- r'https?://(?!' # 排除自有 CDN,不重复上传
- r'res\.cybertogether\.net'
- r')'
- r'[^\s"\'<>]+' # URL 主体
- r'(?:'
- r'\.(?:jpg|jpeg|png|gif|webp|avif|bmp|svg)' # 明确图片后缀
- r'|'
- r'(?:xhscdn\.com|bdimg\.com|hdslb\.com|zhihu\.com/p/[^/]+\.(?:jpg|png)'
- r'|sinaimg\.cn|wx\d+\.sinaimg\.cn|mmbiz\.qpic\.cn' # 微博/微信图床
- r'|imagev[12]\.meitudata\.com' # 美图
- r'|p[0-9]\.douyinpic\.com' # 抖音封面
- r'|i0\.hdslb\.com|article\.biliimg\.com' # B站
- r')'
- r')',
- re.IGNORECASE,
- )
- BUCKET_NAME = "aigc-admin"
- BUCKET_PATH = "crawler/image"
- CDN_BASE = "https://res.cybertogether.net"
- async def _upload_bytes_to_oss(data: bytes, filename: str) -> str:
- """上传 bytes 到 OSS 并返回 CDN URL,失败时抛出异常"""
- from cyber_sdk.ali_oss import _upload_v2
- result = await _upload_v2(
- file_name=filename,
- file_content=data,
- bucket_path=BUCKET_PATH,
- bucket_name=BUCKET_NAME,
- )
- oss_key = result.get("oss_object_key")
- if not oss_key:
- raise ValueError(f"OSS response missing oss_object_key: {result}")
- return f"{CDN_BASE}/{oss_key}"
- async def _download_image(url: str) -> bytes:
- """下载图片 bytes,失败抛出异常"""
- async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
- resp = await client.get(url, headers={"Referer": url, "User-Agent": "Mozilla/5.0"})
- resp.raise_for_status()
- return resp.content
- def _ext_from_url(url: str) -> str:
- """从 URL 猜测文件扩展名,默认 jpg"""
- path = url.split("?")[0].lower()
- for ext in ("png", "gif", "webp", "avif", "bmp", "svg", "jpg", "jpeg"):
- if path.endswith(f".{ext}"):
- return ext
- return "jpg"
- async def replace_image_urls(text: str) -> str:
- """
- 扫描 text 中所有外站图片 URL,下载并上传到自有 OSS,原地替换为 CDN 链接。
- - 已是 res.cybertogether.net 的 URL 直接跳过
- - 下载/上传失败的 URL 保留原值,只打 WARNING 日志
- - 同一 URL 使用 MD5 去重(同一次调用内)
- """
- urls = list(dict.fromkeys(_IMG_URL_RE.findall(text))) # 去重但保留顺序
- if not urls:
- return text
- import asyncio
- async def _process_single_url(url: str) -> tuple[str, str]:
- url_hash = hashlib.md5(url.encode()).hexdigest()[:12]
- ext = _ext_from_url(url)
- filename = f"{url_hash}.{ext}"
- try:
- data = await _download_image(url)
- cdn_url = await _upload_bytes_to_oss(data, filename)
- logger.info("[ImageCDN] %s → %s", url[:60], cdn_url)
- return url, cdn_url
- except Exception as e:
- logger.warning("[ImageCDN] Failed to mirror %s: %s (%s)", url[:60], str(e) or repr(e), type(e).__name__)
- return url, url
- tasks = [_process_single_url(u) for u in urls]
- results = await asyncio.gather(*tasks)
- url_map: dict[str, str] = dict(results)
- for orig, cdn in url_map.items():
- if orig != cdn:
- text = text.replace(orig, cdn)
- replaced = sum(1 for o, c in url_map.items() if o != c)
- if replaced:
- logger.info("[ImageCDN] Replaced %d/%d image URLs with CDN links", replaced, len(urls))
- return text
- async def replace_image_urls_in_obj(obj: Any) -> Any:
- """
- 递归扫描 dict/list 中所有字符串值,替换其中的外站图片 URL。
- 先整体 JSON 序列化再做替换,再反序列化,效率高且避免遗漏嵌套字段。
- """
- raw = json.dumps(obj, ensure_ascii=False)
- replaced = await replace_image_urls(raw)
- if replaced == raw:
- return obj
- return json.loads(replaced)
|