guantao 1 день назад
Родитель
Сommit
1991164813

+ 1 - 0
.gitignore

@@ -48,6 +48,7 @@ CLAUDE.md
 htmlcov/
 .tox/
 .nox/
+scratch/
 
 # Misc
 .DS_Store

+ 42 - 14
agent/tools/builtin/content/platforms/aigc_channel.py

@@ -288,28 +288,56 @@ async def detail(
     # 视频字幕:任何 aigc-channel 平台只要 post.videos 字段非空就触发 Deepgram 转写。
     # 下载策略在 transcription._download_video 里按 platform 分支,未指定的平台走
     # "yt-dlp on page URL → httpx direct" 两步兜底。
+    #
+    # 三态语义(跟 extract_sources._needs_transcribe 对齐):
+    #   字段缺失     → 没尝试过,跑 Deepgram
+    #   字段 = ""    → 尝试过但失败,跳过(保护 Deepgram 额度)
+    #   字段 = text  → 已成功,复用
     extras_d = extras or {}
-    transcript_text: Optional[str] = post.get("video_transcript")  # cache hit reuse
     has_video = bool(post.get("videos"))
+    field_present = "video_transcript" in post
+    transcript_text: Optional[str] = post.get("video_transcript") or None
+
     if (
-        not transcript_text
+        not field_present
         and has_video
         and extras_d.get("include_transcript", True)
     ):
         from agent.tools.builtin.content.transcription import transcribe_video_from_post
-        transcript_text = await transcribe_video_from_post(platform_id, post)
-        if transcript_text:
-            post["video_transcript"] = transcript_text
-            import os as _os
-            from agent.tools.builtin.content import cache as _cache
-            trace_id = extras_d.get("__trace_id__") or _os.getenv("TRACE_ID")
-            content_id = (
-                post.get("channel_content_id")
-                or post.get("content_id")
-                or post.get("video_id")
+        transcribe_error: Optional[str] = None
+        try:
+            transcript_text = await transcribe_video_from_post(platform_id, post)
+        except Exception as e:
+            transcript_text = None
+            transcribe_error = f"{type(e).__name__}: {e}"
+            import logging as _logging
+            _logging.getLogger(__name__).warning(
+                "transcribe_video_from_post raised for %s: %s", platform_id, e
+            )
+
+        # 三态写回:成功 = text;失败/None = "" 作为"已尝试"标记,下次 cache hit 直接短路。
+        final_value = transcript_text or ""
+        post["video_transcript"] = final_value
+        if not final_value:
+            # 失败原因暴露到 output JSON,方便 agent/用户判断要不要重试或换平台
+            post["_transcribe_error"] = (
+                transcribe_error
+                or "transcribe returned None (下载/抽音/Deepgram 任一步失败,见 logger.warning)"
+            )
+
+        # cache writeback 不再以"成功"为前提:失败的 "" 也写回,让下次 cache hit 短路掉
+        import os as _os
+        from agent.tools.builtin.content import cache as _cache
+        trace_id = extras_d.get("__trace_id__") or _os.getenv("TRACE_ID")
+        content_id = (
+            post.get("channel_content_id")
+            or post.get("content_id")
+            or post.get("video_id")
+        )
+        if trace_id and content_id:
+            _cache.update_post_field(
+                trace_id, platform_id, content_id, "video_transcript", final_value
             )
-            if trace_id and content_id:
-                _cache.update_post_field(trace_id, platform_id, content_id, "video_transcript", transcript_text)
 
     # transcript already embedded as post["video_transcript"] inside the JSON dump;
     # no need to repeat as a separate section.

+ 129 - 80
agent/tools/builtin/content/platforms/youtube.py

@@ -238,13 +238,21 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
     """
     YouTube 详情:需要额外 HTTP 调用获取字幕/下载等。
     post 来自搜索缓存,extras 支持 include_captions / download_video。
+
+    Graceful degrade: 三条数据通路(/youtube/detail 增强元数据、/youtube/captions 官方字幕、
+    Deepgram 自研转写)独立进行,任何一条失败都不影响其他。特别是 Deepgram 走的是
+    yt-dlp 下载 watch URL → ffmpeg → Deepgram API,跟 crawler.aiddit.com 后端无关,
+    后端宕机时仍应自动跑 transcript。
     """
     extras = extras or {}
-    content_id = post.get("video_id", "")
+    content_id = post.get("video_id") or post.get("channel_content_id", "")
     include_captions = extras.get("include_captions", True)
     download_video = extras.get("download_video", False)
     include_transcript = extras.get("include_transcript", True)
 
+    # ── 1) /youtube/detail:拿增强元数据(标题/描述/点赞等)。失败时用 search post 兜底 ──
+    video_info: Dict[str, Any] = {}
+    detail_error: Optional[str] = None
     try:
         async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
             resp = await client.post(
@@ -253,92 +261,133 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
             )
             resp.raise_for_status()
             detail_data = resp.json()
+        if detail_data.get("code") == 0:
+            result_data = detail_data.get("data", {})
+            video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
+        else:
+            detail_error = detail_data.get("msg") or "未知错误"
+    except Exception as e:
+        detail_error = str(e)
 
-        if detail_data.get("code") != 0:
-            return ToolResult(title="详情获取失败", output="", error=detail_data.get("msg", "未知错误"))
-
-        result_data = detail_data.get("data", {})
-        video_info = result_data.get("data", {}) if isinstance(result_data, dict) else {}
-
-        # 字幕
-        captions_text = None
-        if include_captions or download_video:
-            try:
-                async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
-                    cap_resp = await client.post(
-                        f"{CRAWLER_BASE_URL}/youtube/captions",
-                        json={"content_id": content_id},
-                    )
-                    cap_resp.raise_for_status()
-                    cap_data = cap_resp.json()
-                    if cap_data.get("code") == 0:
-                        inner = cap_data.get("data", {})
-                        if isinstance(inner, dict):
-                            inner2 = inner.get("data", {})
-                            if isinstance(inner2, dict):
-                                captions_text = inner2.get("content")
-            except Exception:
-                pass
-
-        # 下载
-        video_path = None
-        video_outline = None
-        if download_video:
-            import asyncio
+    # ── 2) /youtube/captions:官方字幕(也走 crawler 后端,同样可能挂) ──
+    captions_text: Optional[str] = None
+    if include_captions or download_video:
+        try:
+            async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
+                cap_resp = await client.post(
+                    f"{CRAWLER_BASE_URL}/youtube/captions",
+                    json={"content_id": content_id},
+                )
+                cap_resp.raise_for_status()
+                cap_data = cap_resp.json()
+                if cap_data.get("code") == 0:
+                    inner = cap_data.get("data", {})
+                    if isinstance(inner, dict):
+                        inner2 = inner.get("data", {})
+                        if isinstance(inner2, dict):
+                            captions_text = inner2.get("content")
+        except Exception:
+            pass
+
+    # ── 3) 视频文件下载(用户显式 extras.download_video=True 时才跑) ──
+    video_path = None
+    video_outline = None
+    if download_video:
+        import asyncio
+        try:
             from agent.tools.builtin.content.media import download_youtube_video, parse_srt_to_outline
             video_path = await asyncio.to_thread(download_youtube_video, content_id)
             if captions_text:
                 video_outline = parse_srt_to_outline(captions_text)
-
-        # Deepgram 转写:独立于 captions,无论 captions 是否拿到都会跑(除非显式关掉),
-        # 这样面对官方字幕空缺/质量不佳的视频也有兜底。Cache 命中时复用。
-        transcript_text: Optional[str] = post.get("video_transcript")
-        if not transcript_text and include_transcript:
-            from agent.tools.builtin.content.transcription import transcribe_video_from_post
-            # transcribe_video_from_post 用 post.get("video_id") 构造 watch URL
-            if not post.get("video_id"):
-                post["video_id"] = content_id
+        except Exception as e:
+            import logging
+            logging.getLogger(__name__).warning("youtube download_video failed: %s", e)
+
+    # ── 4) Deepgram 转写:独立于 1)/2),走 yt-dlp+Deepgram,不依赖 crawler 后端 ──
+    #
+    # 三态语义(跟 extract_sources / aigc_channel.detail 对齐):
+    #   字段缺失     → 没尝试过,跑 Deepgram
+    #   字段 = ""    → 尝试过但失败,跳过(保护 Deepgram 额度)
+    #   字段 = text  → 已成功,复用
+    transcript_text: Optional[str] = post.get("video_transcript") or None
+    field_present = "video_transcript" in post
+    transcribe_error: Optional[str] = None
+    if not field_present and include_transcript:
+        from agent.tools.builtin.content.transcription import transcribe_video_from_post
+        if not post.get("video_id"):
+            post["video_id"] = content_id
+        try:
             transcript_text = await transcribe_video_from_post("youtube", post)
-            if transcript_text:
-                post["video_transcript"] = transcript_text
-                import os as _os
-                from agent.tools.builtin.content import cache as _cache
-                trace_id = extras.get("__trace_id__") or _os.getenv("TRACE_ID")
-                if trace_id and content_id:
-                    _cache.update_post_field(trace_id, "youtube", content_id, "video_transcript", transcript_text)
-
-        output_data = {
-            "video_id": content_id,
-            "title": video_info.get("title", ""),
-            "channel": video_info.get("channel_account_name", ""),
-            "description": video_info.get("body_text", ""),
-            "like_count": video_info.get("like_count"),
-            "comment_count": video_info.get("comment_count"),
-            "content_link": video_info.get("content_link", ""),
-            "captions": captions_text,           # YouTube 官方字幕(可能为空)
-            "video_transcript": transcript_text, # Deepgram 转写兜底
-        }
-        if download_video:
-            output_data["video_path"] = video_path
-            output_data["video_outline"] = video_outline
-
-        output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
-
-        memory_parts = []
-        if captions_text:
-            memory_parts.append("captions")
-        if transcript_text and transcript_text != captions_text:
-            memory_parts.append("transcript")
-        memory_extra = f" with {'+'.join(memory_parts)}" if memory_parts else ""
-
-        return ToolResult(
-            title=f"YouTube 详情: {video_info.get('title', content_id)}",
-            output=output_text,
-            long_term_memory=f"YouTube detail for {content_id}{memory_extra}",
-        )
+        except Exception as e:
+            import logging
+            logging.getLogger(__name__).warning("youtube transcribe failed: %s", e)
+            transcript_text = None
+            transcribe_error = f"{type(e).__name__}: {e}"
+
+        # 三态写回:成功 = text;失败/None = "" 作为"已尝试"标记
+        final_value = transcript_text or ""
+        post["video_transcript"] = final_value
+        if not final_value:
+            post["_transcribe_error"] = (
+                transcribe_error
+                or "transcribe returned None (yt-dlp/Deepgram 任一步失败,见 logger.warning)"
+            )
 
-    except Exception as e:
-        return ToolResult(title="YouTube 详情异常", output="", error=str(e))
+        # cache writeback 失败的 "" 也写,下次 cache hit 短路
+        import os as _os
+        from agent.tools.builtin.content import cache as _cache
+        trace_id = extras.get("__trace_id__") or _os.getenv("TRACE_ID")
+        if trace_id and content_id:
+            _cache.update_post_field(trace_id, "youtube", content_id, "video_transcript", final_value)
+
+    # ── 5) 组装输出:detail 接口的字段优先,缺失时用 search post 兜底 ──
+    output_data = {
+        "video_id": content_id,
+        "title": video_info.get("title") or post.get("title", ""),
+        "channel": video_info.get("channel_account_name") or post.get("author", ""),
+        "description": (
+            video_info.get("body_text")
+            or post.get("body_text")
+            or post.get("description_snippet", "")
+        ),
+        "like_count": (
+            video_info.get("like_count")
+            if video_info.get("like_count") is not None
+            else post.get("like_count")
+        ),
+        "comment_count": video_info.get("comment_count"),
+        "content_link": video_info.get("content_link") or post.get("link", ""),
+        "captions": captions_text,           # YouTube 官方字幕(可能为空)
+        # Deepgram 转写:读 post 字段,三态语义自然透出("" = 已尝试失败)
+        "video_transcript": post.get("video_transcript", ""),
+    }
+    if detail_error:
+        # 显式标记 graceful degrade 状态,让上层知道这次走的是 fallback
+        output_data["_detail_backend_error"] = detail_error
+    if post.get("_transcribe_error"):
+        # Deepgram 这一路失败原因透到 output,方便 agent/用户判断要不要重试
+        output_data["_transcribe_error"] = post["_transcribe_error"]
+    if download_video:
+        output_data["video_path"] = video_path
+        output_data["video_outline"] = video_outline
+
+    output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
+
+    memory_parts = []
+    if captions_text:
+        memory_parts.append("captions")
+    if transcript_text and transcript_text != captions_text:
+        memory_parts.append("transcript")
+    if detail_error:
+        memory_parts.append(f"degraded(detail backend down)")
+    memory_extra = f" with {'+'.join(memory_parts)}" if memory_parts else ""
+
+    title = video_info.get("title") or post.get("title") or content_id
+    return ToolResult(
+        title=f"YouTube 详情: {title}",
+        output=output_text,
+        long_term_memory=f"YouTube detail for {content_id}{memory_extra}",
+    )
 
 
 # ── 拼图 ──

+ 12 - 10
agent/tools/builtin/content/transcription.py

@@ -24,7 +24,6 @@ import logging
 import os
 import re
 import subprocess
-import tempfile
 from pathlib import Path
 from typing import Any, Optional
 
@@ -40,7 +39,10 @@ FFMPEG_TIMEOUT = 600
 UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
       "(KHTML, like Gecko) Chrome/124.0 Safari/537.36")
 
-_TMP_ROOT = Path(tempfile.gettempdir()) / "content_transcribe"
+# 项目根目录 / .cache / content_videos —— 不再用系统 %TEMP%,避免被 Windows 偶发清理
+# 也避免 8GB+ 视频堆在 AppData\Local\Temp 看不见。
+# parents[4]: transcription.py → content/ → builtin/ → tools/ → agent/ → project root
+_CACHE_ROOT = Path(__file__).resolve().parents[4] / ".cache" / "content_videos"
 _SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")
 # Zero-width lookbehind/lookahead: remove whitespace strictly between CJK chars,
 # preserve CJK<->ASCII boundaries (e.g. "Remotion 是工具" stays intact).
@@ -58,11 +60,7 @@ _DURATION_PROBE_TIMEOUT = 15
 
 
 def extract_video_url(platform: str, post: dict[str, Any]) -> Optional[str]:
-    """Pluck a video URL (page or direct) out of a platform's raw post dict.
-
-    Mirrors scratch/crawl_videos.py so the two paths stay in sync; the
-    crawler is the source of truth for what shape each platform's post takes.
-    """
+    """Pluck a video URL (page or direct) out of a platform's raw post dict."""
     if platform == "x":
         vlist = post.get("video_url_list") or []
         if vlist:
@@ -93,7 +91,11 @@ def _safe_stem(platform: str, post: dict[str, Any]) -> str:
 def _yt_dlp_download(url: str, target: Path) -> Optional[Path]:
     if target.exists() and target.stat().st_size > 0:
         return target
-    cmd = ["yt-dlp", "-f", "best[ext=mp4]/best", "-o", str(target),
+    # Format chain: 优先 muxed mp4(YouTube/X/douyin 通常命中,最快),
+    # fallback 到 bestvideo+bestaudio + ffmpeg merge(bili 等 DASH-only 平台),
+    # 最后兜底 best。
+    cmd = ["yt-dlp", "-f", "best[ext=mp4]/bestvideo+bestaudio/best",
+           "-o", str(target),
            "--no-playlist", "--quiet", "--no-warnings", url]
     try:
         r = subprocess.run(cmd, capture_output=True, text=True, timeout=DOWNLOAD_TIMEOUT)
@@ -140,7 +142,7 @@ async def _download_video(
 ) -> Optional[Path]:
     """Dispatch to the right downloader per platform.
 
-    Known-good strategies (from scratch/crawl_videos.py):
+    Per-platform strategies:
       x      : yt-dlp on the tweet page URL (video URLs are signed/rotating)
       douyin : httpx direct with douyin.com Referer (video URL is a play API)
       sph    : httpx direct with channels.weixin.qq.com Referer (stodownload link)
@@ -333,7 +335,7 @@ async def transcribe_video_from_post(
         return None
 
     stem = _safe_stem(platform, post)
-    work_dir = _TMP_ROOT / platform
+    work_dir = _CACHE_ROOT / platform
     work_dir.mkdir(parents=True, exist_ok=True)
     video_path = work_dir / f"{stem}.mp4"
     audio_path = work_dir / f"{stem}.m4a"

+ 3 - 1
examples/process_pipeline/db_requirements.json

@@ -112,5 +112,7 @@
   "图文排版(优先选择使用终端/API/agent友好工具的帖子)",
   "给定一段文案或需求,用 AI 工具生成符合要求的排版图片的方法/教程/工作流",
   "给定一段文案或需求,用 AI 工具生成符合要求的排版图片的方法/教程/工作流",
-  "帮我找深度的小红书/抖音的内容是如何从选题开始构建的知识,比如如何确定一个选题的方向,逐步填充选题的细节,选题如何扩展成创作脚本,创作脚本创作又有什么技巧的,这种内容构建的知识"
+  "给定一段文案或需求,用 AI 工具生成符合要求的排版图片的方法/教程/工作流",
+  "用ai生成真实摄影的美女写真组图,要求具有真实感,氛围感,人物一致性保持",
+  "520_test"
 ]

Разница между файлами не показана из-за своего большого размера
+ 66 - 0
examples/process_pipeline/run_metrics.json


+ 206 - 0
examples/process_pipeline/script/extract_sources.py

@@ -11,6 +11,7 @@
 """
 
 import json
+import logging
 import re
 from pathlib import Path
 from typing import Any, Dict, List, Optional, Tuple
@@ -18,6 +19,8 @@ import asyncio
 import aiohttp
 from urllib.parse import urlparse, parse_qs, urlencode
 
+logger = logging.getLogger(__name__)
+
 
 # ── URL → (platform, content_id) 解析 ────────────────────────────────
 
@@ -141,6 +144,141 @@ def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]:
     return merged
 
 
+def _needs_transcribe(platform: str, post: Dict[str, Any]) -> bool:
+    """是否需要给这条 post 跑 Deepgram 转写。
+
+    语义(用户明确约束):
+      - `video_transcript` 字段**缺失** → 视为"从未尝试",需要跑
+      - `video_transcript` 字段存在(即使为空字符串 "")→ 视为"已尝试过",跳过
+        (Deepgram 对纯音乐/无人声视频会返回空,这是合法的"失败"标记,不重跑)
+      - 必须有视频源(用 transcription.extract_video_url 统一判断,跨平台)
+    """
+    if not isinstance(post, dict):
+        return False
+    if "video_transcript" in post:
+        return False
+    try:
+        from agent.tools.builtin.content.transcription import extract_video_url
+        return bool(extract_video_url(platform, post))
+    except Exception:
+        return False
+
+
+async def _transcribe_one_post(
+    platform: str,
+    post: Dict[str, Any],
+    sem: asyncio.Semaphore,
+) -> Optional[str]:
+    """对一条 post 跑 transcribe_video_from_post,写回 post["video_transcript"]。
+
+    成功 → post["video_transcript"] = transcript
+    失败 → post["video_transcript"] = ""  # 明确"已尝试"标记,避免后续 backfill 重跑
+    返回 transcript 或 None。
+    """
+    from agent.tools.builtin.content.transcription import transcribe_video_from_post
+    async with sem:
+        try:
+            text = await transcribe_video_from_post(platform, post)
+        except Exception as e:
+            logger.warning("transcribe failed (%s): %s", platform, e)
+            text = None
+    if text:
+        post["video_transcript"] = text
+        return text
+    # 失败也写一个 "",表示"我们尝试过 Deepgram 但没拿到结果"
+    post["video_transcript"] = ""
+    return None
+
+
+async def _transcribe_pending_async(
+    matched: List[Dict[str, Any]],
+    concurrency: int = 3,
+) -> Dict[Tuple[str, str], str]:
+    """对 matched 里所有"缺 video_transcript 字段 + 有视频源"的 post 并发跑 Deepgram。
+
+    返回成功的 {(platform, channel_content_id): transcript_text} 映射,
+    供调用方写回 cache 文件(让其他 trace 命中同一 post 时复用)。
+    """
+    targets: List[Tuple[str, Dict[str, Any]]] = []
+    for src in matched:
+        platform = src.get("platform")
+        post = src.get("post")
+        if not isinstance(post, dict) or not platform:
+            continue
+        if _needs_transcribe(platform, post):
+            targets.append((platform, post))
+
+    if not targets:
+        return {}
+
+    logger.info("Auto-transcribe: %d post(s) pending", len(targets))
+    sem = asyncio.Semaphore(concurrency)
+    results = await asyncio.gather(
+        *[_transcribe_one_post(p, post, sem) for p, post in targets]
+    )
+
+    updates: Dict[Tuple[str, str], str] = {}
+    success_n = 0
+    for (platform, post), text in zip(targets, results):
+        if text:
+            success_n += 1
+            cid = post.get("channel_content_id") or post.get("video_id")
+            if cid:
+                updates[(platform, str(cid))] = text
+    logger.info("Auto-transcribe: %d/%d success", success_n, len(targets))
+    return updates
+
+
+def _writeback_transcript_to_cache(
+    cache_dir: Path,
+    updates: Dict[Tuple[str, str], str],
+) -> int:
+    """把新拿到的 transcript 写回所有 cache 文件里匹配的 post。
+
+    跨 trace 扩散:同一条 video 可能被多个 trace 的搜索引用过,这里一次写回所有
+    cache 副本,避免下次另一个 trace 跑 extract_sources 时又触发一遍 Deepgram。
+    返回 cache 中被更新的 post 总数。
+    """
+    if not updates or not cache_dir.exists():
+        return 0
+    written = 0
+    for cf in cache_dir.glob("*.json"):
+        try:
+            data = json.loads(cf.read_text(encoding="utf-8"))
+        except Exception:
+            continue
+        dirty = False
+        for key, entry in data.items():
+            if not key.startswith("search:") or not isinstance(entry, dict):
+                continue
+            platform = key.split(":", 1)[1]
+            post_lists = []
+            for h in entry.get("history", []) or []:
+                post_lists.append(h.get("posts", []))
+            if "posts" in entry and isinstance(entry["posts"], list):
+                post_lists.append(entry["posts"])
+            for posts in post_lists:
+                for post in posts or []:
+                    if not isinstance(post, dict):
+                        continue
+                    cid = post.get("channel_content_id") or post.get("video_id")
+                    if not cid:
+                        continue
+                    text = updates.get((platform, str(cid)))
+                    # 注意:用 "video_transcript" not in post 判断,跟 _needs_transcribe 语义一致
+                    # 已经有字段(即使为空)→ 不覆盖,尊重之前的"已尝试"状态
+                    if text and "video_transcript" not in post:
+                        post["video_transcript"] = text
+                        dirty = True
+                        written += 1
+        if dirty:
+            cf.write_text(
+                json.dumps(data, ensure_ascii=False, indent=2),
+                encoding="utf-8",
+            )
+    return written
+
+
 def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
     """判断帖子是否早于截止时间戳(秒级)
 
@@ -236,6 +374,27 @@ def _normalize_url(url: str) -> Optional[str]:
         return None
 
 
+def _normalize_post_in_place(platform: str, post: Dict[str, Any]) -> None:
+    """对 cache 里读出的 post 做平台相关的字段补齐(in-place)。
+
+    早期 cache 可能在 platform-level normalize 函数加上之前就写入了,此处兜底补救:
+    YouTube: description_snippet -> body_text / thumbnails -> images / url -> videos / ...
+    sph:     title (caption) -> body_text (视频号 title 字段塞的是整段 caption)
+    """
+    if platform == "youtube":
+        try:
+            from agent.tools.builtin.content.platforms.youtube import _normalize_youtube_post
+            _normalize_youtube_post(post)
+        except Exception:
+            pass
+    elif platform == "sph":
+        try:
+            from agent.tools.builtin.content.platforms.aigc_channel import _normalize_sph_post
+            _normalize_sph_post(post)
+        except Exception:
+            pass
+
+
 def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
     """
     构建 (platform, channel_content_id) -> post 映射。
@@ -285,9 +444,14 @@ def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) ->
                 for post in posts or []:
                     if not isinstance(post, dict):
                         continue
+
+                    # 平台字段 normalize:兜底救援早期 cache(normalize 函数加之前写入的)
+                    _normalize_post_in_place(platform, post)
+
                     cid = post.get("channel_content_id")
 
                     # YouTube 平台用 video_id 而非 channel_content_id
+                    # (normalize 已经处理过,这里是双保险,对早期未 normalize 的 post 也兜底)
                     if not cid and post.get("video_id"):
                         cid = post.get("video_id")
                         post["channel_content_id"] = cid  # 补全字段
@@ -318,6 +482,8 @@ def extract_sources_to_json(
     min_body_len: int = DEFAULT_MIN_BODY_LEN,
     min_score: float = DEFAULT_MIN_SCORE,
     cutoff_date: Tuple[int, int, int] = DEFAULT_CUTOFF_DATE,
+    auto_transcribe: bool = True,
+    transcribe_concurrency: int = 3,
 ) -> Dict[str, Any]:
     """
     扫描 raw_cases_dir 下的 case_*.json,
@@ -483,6 +649,45 @@ def extract_sources_to_json(
     # 4. 合并已有数据和新匹配的数据
     all_sources = existing_sources + matched
 
+    # 4.5. 自动 backfill 视频转写(保底兜底)
+    # 触发条件:post 有视频源(extract_video_url 非空)且**完全没有 `video_transcript` 字段**
+    # 空字符串视为"已尝试过"不重跑,跨平台统一。失败也会写 "",下次跳过避免反复浪费 Deepgram 额度。
+    # 跑完写回所有 cache 文件,让其他 trace 引用同一 post 时直接复用。
+    auto_transcribe_stats: Dict[str, Any] = {"attempted": 0, "succeeded": 0, "cache_writeback": 0}
+    if auto_transcribe and all_sources:
+        try:
+            transcribe_targets = sum(
+                1 for s in all_sources
+                if isinstance(s.get("post"), dict)
+                and _needs_transcribe(s.get("platform"), s["post"])
+            )
+            if transcribe_targets > 0:
+                logger.info("extract_sources: auto-transcribe %d post(s)", transcribe_targets)
+                updates = asyncio.run(
+                    _transcribe_pending_async(all_sources, concurrency=transcribe_concurrency)
+                )
+                auto_transcribe_stats["attempted"] = transcribe_targets
+                auto_transcribe_stats["succeeded"] = len(updates)
+                # 写回 cache(跨 trace 扩散)
+                if updates:
+                    n = _writeback_transcript_to_cache(cache_dir, updates)
+                    auto_transcribe_stats["cache_writeback"] = n
+                # 顺手把 transcript merge 进 body_text,保持跟 _merge_transcript_into_body 一致
+                for s in all_sources:
+                    post = s.get("post")
+                    if not isinstance(post, dict):
+                        continue
+                    if not post.get("video_transcript"):
+                        continue
+                    merged = _merge_transcript_into_body(post)
+                    if merged is not post:
+                        post["body_text"] = merged.get("body_text", post.get("body_text", ""))
+        except RuntimeError as e:
+            # 比如已在 event loop 内 — 跳过 auto-transcribe 不阻塞主流程
+            logger.warning("auto-transcribe skipped: %s", e)
+        except Exception as e:
+            logger.warning("auto-transcribe failed: %s", e)
+
     # 5. 统一过滤:body_text 完整性 / agent 评分 / 时效
     from datetime import datetime as _dt
     cutoff_ts = int(_dt(*cutoff_date).timestamp())
@@ -574,6 +779,7 @@ def extract_sources_to_json(
         "filtered_reasons": reason_counts,
         "filtered_details": filtered_details,
         "images_downloaded": images_downloaded,
+        "auto_transcribe": auto_transcribe_stats,
         "output_file": str(output_file),
     }
 

+ 49 - 0
examples/process_pipeline/ui/app.js

@@ -626,6 +626,11 @@ async function fetchRequirementData(index) {
     } catch (e) {
         console.error("Failed to fetch data", e);
     }
+
+    // Automatically re-apply search filter on newly loaded data
+    if (typeof applySearchFilter === 'function') {
+        applySearchFilter();
+    }
 }
 
 async function pollStatus() {
@@ -1237,8 +1242,52 @@ function setupEventListeners() {
             }
         });
     }
+
+    // Search input character matching for Case tab
+    const searchInput = document.querySelector('.search-input');
+    if (searchInput) {
+        searchInput.addEventListener('input', () => {
+            applySearchFilter();
+        });
+    }
 }
 
+window.applySearchFilter = function() {
+    const searchInput = document.querySelector('.search-input');
+    if (!searchInput) return;
+    const query = searchInput.value.toLowerCase().trim();
+
+    // Filter raw case cards (on "案例" page)
+    const cards = document.querySelectorAll('#json-raw .masonry-card');
+    cards.forEach(card => {
+        const text = card.textContent.toLowerCase();
+        if (text.includes(query)) {
+            card.style.display = '';
+        } else {
+            card.style.display = 'none';
+        }
+    });
+
+    // Handle empty group headers and grids
+    const grids = document.querySelectorAll('#json-raw .masonry-grid');
+    grids.forEach(grid => {
+        const visibleCards = Array.from(grid.querySelectorAll('.masonry-card')).filter(card => card.style.display !== 'none');
+        const prevSibling = grid.previousElementSibling;
+        
+        if (visibleCards.length > 0) {
+            grid.style.display = '';
+            if (prevSibling && prevSibling.tagName === 'H3') {
+                prevSibling.style.display = '';
+            }
+        } else {
+            grid.style.display = 'none';
+            if (prevSibling && prevSibling.tagName === 'H3') {
+                prevSibling.style.display = 'none';
+            }
+        }
+    });
+};
+
 // Boot
 // ----------------------------------------------------
 // Pipeline Chain Visualization Logic

Некоторые файлы не были показаны из-за большого количества измененных файлов