Просмотр исходного кода

add video_transcript in content search tools(for agent)

guantao 11 часов назад
Родитель
Сommit
1b62df52c5

+ 6 - 1
agent/tools/builtin/content/cache.py

@@ -137,11 +137,16 @@ def update_post_field(
 
     cid_str = str(content_id)
     updated = False
+    # Match by channel_content_id (primary, used by X / aigc-channel platforms)
+    # or video_id (fallback, used by YouTube whose post id field is named differently).
     for hist in histories:
         for post in hist.get("posts", []) or []:
             if not isinstance(post, dict):
                 continue
-            if str(post.get("channel_content_id", "")) == cid_str:
+            if (
+                str(post.get("channel_content_id", "")) == cid_str
+                or str(post.get("video_id", "")) == cid_str
+            ):
                 post[field] = value
                 updated = True
 

+ 98 - 7
agent/tools/builtin/content/platforms/aigc_channel.py

@@ -6,6 +6,7 @@ AIGC-Channel 平台实现(9 个中文平台)
 """
 
 import json
+import re
 from typing import Any, Dict, List, Optional
 
 import httpx
@@ -19,6 +20,43 @@ from agent.tools.builtin.content.registry import (
 BASE_URL = "http://aigc-channel.aiddit.com/aigc/channel"
 DEFAULT_TIMEOUT = 60.0
 
+# aigc-channel returns search-highlighted titles like
+# '<em class="highlight">关键词</em>'. Strip before any rendering / scoring use.
+_HTML_TAG_RE = re.compile(r"<[^>]+>")
+
+
+def _strip_html(text: Optional[str]) -> str:
+    if not text:
+        return ""
+    return _HTML_TAG_RE.sub("", text)
+
+
+_SPH_TITLE_MAX = 20  # sph normalized title 截断字符数
+
+
+def _normalize_sph_post(post: Dict[str, Any]) -> None:
+    """In-place: 视频号没有独立 title,后端把 caption 塞进 title 字段而 body_text 留空。
+
+    把整段 title 搬到 body_text,title 取剥 HTML 后前 20 字 + '...' 作为短摘要。
+    幂等:如果 body_text 已经有内容则不动,避免重复迁移或覆盖;title 已经 <=20 字
+    也不强加省略号。
+    """
+    if not isinstance(post, dict):
+        return
+    raw_title = post.get("title") or ""
+    body = post.get("body_text") or ""
+    body = body.strip() if isinstance(body, str) else ""
+    if not raw_title or body:
+        return
+    clean = _strip_html(raw_title).strip()
+    if not clean:
+        return
+    post["body_text"] = clean
+    if len(clean) > _SPH_TITLE_MAX:
+        post["title"] = clean[:_SPH_TITLE_MAX] + "..."
+    else:
+        post["title"] = clean
+
 
 # ── 平台注册 ──
 
@@ -114,16 +152,32 @@ async def search(
 
     posts = data.get("data", [])
 
+    # sph 字段 normalization:title 太长(后端把 caption 塞进 title),
+    # 把它搬到 body_text,title 取前 20 字。在评分 / summary / cache 之前做。
+    if platform_id == "sph":
+        for p in posts:
+            _normalize_sph_post(p)
+
     # 构建概览摘要
     summary_list = []
-    
+
     # 动态导入评价模块
     try:
         from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
         evaluator = SourceQualityEvaluator()
     except ImportError:
         evaluator = None
-        
+
+    # 视频帖在评分前先并发探测 mp4 duration(HTTP Range,不下载视频流),
+    # 让 evaluator 用真实时长替代 body 长度作为内容信号。
+    if evaluator and posts:
+        try:
+            from agent.tools.builtin.content.transcription import probe_durations_for_posts
+            await probe_durations_for_posts(platform_id, posts, concurrency=8)
+        except Exception as e:
+            import logging
+            logging.getLogger(__name__).info("duration probe failed: %s", e)
+
     for idx, post in enumerate(posts, 1):
         body = post.get("body_text", "") or ""
         title = post.get("title") or body[:20] or ""
@@ -210,8 +264,12 @@ async def _build_images_collage(urls: List[str]) -> Optional[Dict[str, Any]]:
         return {"type": "base64", "media_type": "image/png", "data": b64}
 
 
-async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None) -> ToolResult:
-    """返回单条帖子的完整内容"""
+async def detail(
+    post: Dict[str, Any],
+    extras: Optional[Dict[str, Any]] = None,
+    platform_id: str = "",
+) -> ToolResult:
+    """返回单条帖子的完整内容;sph/douyin 视频会通过 Deepgram 自动转写。"""
     title = post.get("title") or post.get("body_text", "")[:30] or "无标题"
 
     img_urls = [u for u in post.get("images", []) if u]
@@ -227,12 +285,41 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
         for u in img_urls:
             images.append({"type": "url", "url": u})
 
+    # 视频字幕:任何 aigc-channel 平台只要 post.videos 字段非空就触发 Deepgram 转写。
+    # 下载策略在 transcription._download_video 里按 platform 分支,未指定的平台走
+    # "yt-dlp on page URL → httpx direct" 两步兜底。
+    extras_d = extras or {}
+    transcript_text: Optional[str] = post.get("video_transcript")  # cache hit reuse
+    has_video = bool(post.get("videos"))
+    if (
+        not transcript_text
+        and has_video
+        and extras_d.get("include_transcript", True)
+    ):
+        from agent.tools.builtin.content.transcription import transcribe_video_from_post
+        transcript_text = await transcribe_video_from_post(platform_id, post)
+        if transcript_text:
+            post["video_transcript"] = transcript_text
+            import os as _os
+            from agent.tools.builtin.content import cache as _cache
+            trace_id = extras_d.get("__trace_id__") or _os.getenv("TRACE_ID")
+            content_id = (
+                post.get("channel_content_id")
+                or post.get("content_id")
+                or post.get("video_id")
+            )
+            if trace_id and content_id:
+                _cache.update_post_field(trace_id, platform_id, content_id, "video_transcript", transcript_text)
+
+    # transcript already embedded as post["video_transcript"] inside the JSON dump;
+    # no need to repeat as a separate section.
     output_text = json.dumps(post, ensure_ascii=False, indent=2)
 
+    memory_suffix = " +transcript" if transcript_text else ""
     return ToolResult(
         title=f"详情: {title}",
         output=output_text,
-        long_term_memory=f"Viewed detail: {title}",
+        long_term_memory=f"Viewed detail: {title}{memory_suffix}",
         images=images,
     )
 
@@ -270,7 +357,7 @@ async def _build_collage(posts: List[Dict[str, Any]]) -> Optional[str]:
         imgs = post.get("images", [])
         if imgs and imgs[0]:
             urls.append(imgs[0])
-            base_title = post.get("title", "") or ""
+            base_title = _strip_html(post.get("title", ""))
             score = post.get("_quality_score")
             if score is not None:
                 title_with_score = f"[{score}分] {base_title}"
@@ -319,7 +406,11 @@ async def _build_collage(posts: List[Dict[str, Any]]) -> Optional[str]:
 def _register_all():
     for p in _AIGC_PLATFORMS:
         p.search_impl = search
-        p.detail_impl = detail
+        # Bind each platform's id into detail_impl so the shared detail() knows
+        # whether to trigger video transcription (only for sph/douyin).
+        p.detail_impl = (
+            lambda post, extras, _pid=p.id: detail(post, extras, _pid)  # noqa: B023 (default-arg captures pid)
+        )
         if p.supports_suggest:
             p.suggest_impl = suggest
             p.suggest_channels = [p.id]

+ 35 - 4
agent/tools/builtin/content/platforms/x.py

@@ -45,6 +45,16 @@ async def search(
         except ImportError:
             evaluator = None
 
+        # 视频帖在评分前先并发探测 mp4 duration(HTTP Range,不下载视频流),
+        # 让 evaluator 用真实时长替代 body 长度作为内容信号。
+        if evaluator and tweets:
+            try:
+                from agent.tools.builtin.content.transcription import probe_durations_for_posts
+                await probe_durations_for_posts("x", tweets[:max_count], concurrency=8)
+            except Exception as e:
+                import logging
+                logging.getLogger(__name__).info("duration probe failed for x: %s", e)
+
         summary_list = []
         for idx, tweet in enumerate(tweets[:max_count], 1):
             text = tweet.get("body_text", "")
@@ -189,15 +199,29 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
 
     author_comments = await _fetch_author_comments(content_id, author_id)
 
+    extras_d = extras or {}
+    trace_id = extras_d.get("__trace_id__")
+    if not trace_id:
+        import os as _os
+        trace_id = _os.getenv("TRACE_ID")
+
     # 把作者评论写回 cache,让下游离线流程(如 extract_sources)也能拿到
     if author_comments:
-        import os
         from agent.tools.builtin.content import cache as _cache
-        # trace_id 优先从 extras 取(agent 路径由 dispatcher 注入),回退到 env(CLI 路径设置)
-        trace_id = (extras or {}).get("__trace_id__") or os.getenv("TRACE_ID")
         if trace_id and content_id:
             _cache.update_post_field(trace_id, "x", content_id, "author_comments", author_comments)
 
+    # 视频字幕:检测到 video_url_list 时通过 Deepgram 转写 (default on, opt-out via extras)
+    transcript_text: Optional[str] = post.get("video_transcript")  # cache hit reuse
+    if not transcript_text and extras_d.get("include_transcript", True):
+        from agent.tools.builtin.content.transcription import transcribe_video_from_post
+        transcript_text = await transcribe_video_from_post("x", post)
+        if transcript_text:
+            post["video_transcript"] = transcript_text
+            from agent.tools.builtin.content import cache as _cache
+            if trace_id and content_id:
+                _cache.update_post_field(trace_id, "x", content_id, "video_transcript", transcript_text)
+
     output_json = json.dumps(post, ensure_ascii=False, indent=2)
 
     sections = [output_json]
@@ -206,9 +230,16 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
         for i, c in enumerate(author_comments, 1):
             lines.append(f"{i}. [赞{c['likes']} · 回复{c['replies']}] {c['text']}")
         sections.append("\n".join(lines))
+    # transcript already embedded as post["video_transcript"] inside output_json above;
+    # no need to repeat as a separate section.
     output_text = "\n\n".join(sections)
 
-    memory_suffix = f" + {len(author_comments)} author replies" if author_comments else ""
+    memory_extras = []
+    if author_comments:
+        memory_extras.append(f"{len(author_comments)} author replies")
+    if transcript_text:
+        memory_extras.append("+transcript")
+    memory_suffix = " + " + ", ".join(memory_extras) if memory_extras else ""
     return ToolResult(
         title=f"X 详情: @{author}",
         output=output_text,

+ 167 - 2
agent/tools/builtin/content/platforms/youtube.py

@@ -5,6 +5,8 @@ YouTube 平台实现
 """
 
 import json
+import re
+import time
 from typing import Any, Dict, List, Optional
 
 import httpx
@@ -19,6 +21,138 @@ CRAWLER_BASE_URL = "http://crawler.aiddit.com/crawler"
 DEFAULT_TIMEOUT = 60.0
 
 
+# ── 字段 normalization:YouTube 后端字段名跟 evaluator/其他平台不一致 ──
+#
+# evaluator 期待的字段     | YouTube 后端返回的字段
+# ------------------------+---------------------------
+# channel_content_id      | video_id
+# body_text               | description_snippet
+# like_count (int)        | view_count ("130,461 views")
+# publish_timestamp (ms)  | published_time ("6 months ago")
+# link                    | url
+# duration_sec (float)    | duration ("6:15" or "1:23:45")
+# images (list[str])      | thumbnails (list[dict])
+# content_type=="video"   | (缺失)
+# videos                  | (缺失)
+#
+# 不做 normalization 的话 evaluator 会走 article 路径 + 8 个字段全找不到,
+# 视频拿 15 分 F。
+
+
+def _parse_duration(s: Any) -> Optional[float]:
+    """Parse 'MM:SS' or 'HH:MM:SS' to seconds (float)."""
+    if not isinstance(s, str):
+        return None
+    parts = s.strip().split(":")
+    try:
+        nums = [int(p) for p in parts]
+    except ValueError:
+        return None
+    if len(nums) == 2:
+        return float(nums[0] * 60 + nums[1])
+    if len(nums) == 3:
+        return float(nums[0] * 3600 + nums[1] * 60 + nums[2])
+    return None
+
+
+def _parse_view_count(s: Any) -> Optional[int]:
+    """Parse '130,461 views' (or '1.2M views') to int."""
+    if not isinstance(s, str):
+        return None
+    s = s.strip()
+    # "1.2M views" / "3.5K views"
+    m = re.match(r"([\d.]+)\s*([KMBkmb])\b", s)
+    if m:
+        try:
+            num = float(m.group(1))
+        except ValueError:
+            return None
+        mult = {"K": 1_000, "M": 1_000_000, "B": 1_000_000_000}[m.group(2).upper()]
+        return int(num * mult)
+    # "130,461 views"
+    m = re.search(r"([\d,]+)", s)
+    if m:
+        try:
+            return int(m.group(1).replace(",", ""))
+        except ValueError:
+            return None
+    return None
+
+
+_RELATIVE_TIME_RE = re.compile(
+    r"(\d+)\s+(minute|hour|day|week|month|year)s?\s+ago", re.IGNORECASE
+)
+_SECONDS_PER = {
+    "minute": 60, "hour": 3600, "day": 86400,
+    "week": 86400 * 7, "month": 86400 * 30, "year": 86400 * 365,
+}
+
+
+def _parse_relative_time(s: Any) -> Optional[int]:
+    """Parse '6 months ago' -> UTC milliseconds timestamp."""
+    if not isinstance(s, str):
+        return None
+    m = _RELATIVE_TIME_RE.search(s.lower())
+    if not m:
+        return None
+    n = int(m.group(1))
+    delta = n * _SECONDS_PER.get(m.group(2).lower(), 0)
+    if not delta:
+        return None
+    return int((time.time() - delta) * 1000)
+
+
+def _normalize_youtube_post(post: Dict[str, Any]) -> None:
+    """In-place: rewrite YouTube post fields onto the schema evaluator/transcription expect.
+
+    Idempotent — only fills missing fields, never overwrites existing values.
+    """
+    if not isinstance(post, dict):
+        return
+
+    if post.get("video_id") and not post.get("channel_content_id"):
+        post["channel_content_id"] = post["video_id"]
+
+    if post.get("description_snippet") and not post.get("body_text"):
+        post["body_text"] = post["description_snippet"]
+
+    if post.get("view_count") and not isinstance(post.get("like_count"), (int, float)):
+        n = _parse_view_count(post["view_count"])
+        if n is not None:
+            post["like_count"] = n
+
+    if post.get("published_time") and not post.get("publish_timestamp"):
+        ts = _parse_relative_time(post["published_time"])
+        if ts:
+            post["publish_timestamp"] = ts
+
+    if post.get("url") and not post.get("link"):
+        post["link"] = post["url"]
+
+    if post.get("duration") and not isinstance(post.get("duration_sec"), (int, float)):
+        sec = _parse_duration(post["duration"])
+        if sec:
+            post["duration_sec"] = sec
+
+    if post.get("thumbnails") and not post.get("images"):
+        imgs = []
+        for t in post["thumbnails"]:
+            if isinstance(t, dict) and t.get("url"):
+                imgs.append(t["url"])
+        if imgs:
+            post["images"] = imgs
+
+    if not post.get("content_type"):
+        post["content_type"] = "video"
+
+    if not post.get("videos"):
+        # transcription.extract_video_url for "youtube" uses video_id directly,
+        # so this `videos` field is just for evaluator.is_video detection.
+        url = post.get("url")
+        if url:
+            post["videos"] = [url]
+
+
 # ── 搜索 ──
 
 async def search(
@@ -43,6 +177,11 @@ async def search(
         result_data = data.get("data", {})
         videos = result_data.get("data", []) if isinstance(result_data, dict) else []
 
+        # YouTube 字段名跟其他平台不一致,先 normalize 让 evaluator 能正确评分
+        # (并且让 duration_sec / publish_timestamp 等被解析出来,复用 video-mode 评分)
+        for v in videos:
+            _normalize_youtube_post(v)
+
         # 动态导入评价模块
         try:
             from examples.process_pipeline.script.evaluate_source_quality import SourceQualityEvaluator
@@ -104,6 +243,7 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
     content_id = post.get("video_id", "")
     include_captions = extras.get("include_captions", True)
     download_video = extras.get("download_video", False)
+    include_transcript = extras.get("include_transcript", True)
 
     try:
         async with httpx.AsyncClient(timeout=DEFAULT_TIMEOUT) as client:
@@ -150,6 +290,23 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
             if captions_text:
                 video_outline = parse_srt_to_outline(captions_text)
 
+        # Deepgram 转写:独立于 captions,无论 captions 是否拿到都会跑(除非显式关掉),
+        # 这样面对官方字幕空缺/质量不佳的视频也有兜底。Cache 命中时复用。
+        transcript_text: Optional[str] = post.get("video_transcript")
+        if not transcript_text and include_transcript:
+            from agent.tools.builtin.content.transcription import transcribe_video_from_post
+            # transcribe_video_from_post 用 post.get("video_id") 构造 watch URL
+            if not post.get("video_id"):
+                post["video_id"] = content_id
+            transcript_text = await transcribe_video_from_post("youtube", post)
+            if transcript_text:
+                post["video_transcript"] = transcript_text
+                import os as _os
+                from agent.tools.builtin.content import cache as _cache
+                trace_id = extras.get("__trace_id__") or _os.getenv("TRACE_ID")
+                if trace_id and content_id:
+                    _cache.update_post_field(trace_id, "youtube", content_id, "video_transcript", transcript_text)
+
         output_data = {
             "video_id": content_id,
             "title": video_info.get("title", ""),
@@ -158,7 +315,8 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
             "like_count": video_info.get("like_count"),
             "comment_count": video_info.get("comment_count"),
             "content_link": video_info.get("content_link", ""),
-            "captions": captions_text,
+            "captions": captions_text,           # YouTube 官方字幕(可能为空)
+            "video_transcript": transcript_text, # Deepgram 转写兜底
         }
         if download_video:
             output_data["video_path"] = video_path
@@ -166,10 +324,17 @@ async def detail(post: Dict[str, Any], extras: Optional[Dict[str, Any]] = None)
 
         output_text = json.dumps(output_data, ensure_ascii=False, indent=2)
 
+        memory_parts = []
+        if captions_text:
+            memory_parts.append("captions")
+        if transcript_text and transcript_text != captions_text:
+            memory_parts.append("transcript")
+        memory_extra = f" with {'+'.join(memory_parts)}" if memory_parts else ""
+
         return ToolResult(
             title=f"YouTube 详情: {video_info.get('title', content_id)}",
             output=output_text,
-            long_term_memory=f"YouTube detail for {content_id}" + (" with captions" if captions_text else ""),
+            long_term_memory=f"YouTube detail for {content_id}{memory_extra}",
         )
 
     except Exception as e:

+ 351 - 0
agent/tools/builtin/content/transcription.py

@@ -0,0 +1,351 @@
+"""Download a post's source video, extract audio, transcribe via Deepgram.
+
+Used by platform detail() implementations whose posts ship raw video URLs
+(X, sph, douyin) and don't already supply captions. YouTube has its own
+captions endpoint and bypasses this module.
+
+Pipeline per video:
+  1. extract_video_url(platform, post)  -> source url (page or direct)
+  2. download to %TEMP%/content_transcribe/<platform>/<stem>.mp4
+     - X     : yt-dlp on the page URL (most robust against rotating video URLs)
+     - douyin: httpx + Referer https://www.douyin.com/
+     - sph   : httpx + Referer https://channels.weixin.qq.com/
+  3. ffmpeg -> 16kHz mono AAC 64kbps m4a (~3% the size of the source mp4)
+  4. POST to Deepgram /v1/listen, model=whisper-large by default
+  5. Strip spaces inserted by Deepgram between consecutive CJK characters
+
+Returns transcript text on success, None on any failure (silent fallback).
+"""
+
+from __future__ import annotations
+
+import asyncio
+import logging
+import os
+import re
+import subprocess
+import tempfile
+from pathlib import Path
+from typing import Any, Optional
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+DEEPGRAM_URL = "https://api.deepgram.com/v1/listen"
+DEEPGRAM_MODEL_DEFAULT = "whisper-large"
+DEEPGRAM_REQUEST_TIMEOUT = 600.0
+DOWNLOAD_TIMEOUT = 300
+FFMPEG_TIMEOUT = 600
+UA = ("Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
+      "(KHTML, like Gecko) Chrome/124.0 Safari/537.36")
+
+_TMP_ROOT = Path(tempfile.gettempdir()) / "content_transcribe"
+_SAFE_RE = re.compile(r"[^A-Za-z0-9._-]+")
+# Zero-width lookbehind/lookahead: remove whitespace strictly between CJK chars,
+# preserve CJK<->ASCII boundaries (e.g. "Remotion 是工具" stays intact).
+_CJK_SPACE_RE = re.compile(r"(?<=[一-鿿])\s+(?=[一-鿿])")
+
+# Referer headers required by some CDNs for ffprobe / yt-dlp / httpx to access video URLs.
+_PLATFORM_REFERERS = {
+    "douyin": "https://www.douyin.com/",
+    "sph": "https://channels.weixin.qq.com/",
+    "xhs": "https://www.xiaohongshu.com/",
+    "bili": "https://www.bilibili.com/",
+    "weibo": "https://weibo.com/",
+}
+_DURATION_PROBE_TIMEOUT = 15
+
+
+def extract_video_url(platform: str, post: dict[str, Any]) -> Optional[str]:
+    """Pluck a video URL (page or direct) out of a platform's raw post dict.
+
+    Mirrors scratch/crawl_videos.py so the two paths stay in sync; the
+    crawler is the source of truth for what shape each platform's post takes.
+    """
+    if platform == "x":
+        vlist = post.get("video_url_list") or []
+        if vlist:
+            head = vlist[0]
+            return head.get("video_url") if isinstance(head, dict) else head
+        return None
+    if platform == "youtube":
+        vid = post.get("video_id") or post.get("content_id")
+        return f"https://www.youtube.com/watch?v={vid}" if vid else None
+    # Generic: aigc-channel platforms (xhs / gzh / sph / douyin / bili / zhihu /
+    # weibo / toutiao / github) all expose video URLs under `videos[0]`.
+    videos = post.get("videos") or []
+    if videos:
+        return videos[0]
+    return None
+
+
+def _safe_stem(platform: str, post: dict[str, Any]) -> str:
+    raw_id = (
+        post.get("channel_content_id")
+        or post.get("video_id")
+        or post.get("content_id")
+        or "item"
+    )
+    return f"{platform}_{_SAFE_RE.sub('_', str(raw_id))[:60]}"
+
+
+def _yt_dlp_download(url: str, target: Path) -> Optional[Path]:
+    if target.exists() and target.stat().st_size > 0:
+        return target
+    cmd = ["yt-dlp", "-f", "best[ext=mp4]/best", "-o", str(target),
+           "--no-playlist", "--quiet", "--no-warnings", url]
+    try:
+        r = subprocess.run(cmd, capture_output=True, text=True, timeout=DOWNLOAD_TIMEOUT)
+    except (subprocess.TimeoutExpired, FileNotFoundError) as e:
+        logger.warning("yt-dlp failed for %s: %s", url, e)
+        return None
+    if r.returncode != 0:
+        logger.warning("yt-dlp non-zero for %s: %s", url, (r.stderr or r.stdout)[:200])
+        return None
+    if target.exists() and target.stat().st_size > 0:
+        return target
+    # yt-dlp may have written with a different extension
+    for f in target.parent.glob(target.stem + ".*"):
+        if f.is_file() and f.stat().st_size > 0:
+            return f
+    return None
+
+
+async def _httpx_download(url: str, target: Path, referer: Optional[str] = None) -> Optional[Path]:
+    if target.exists() and target.stat().st_size > 0:
+        return target
+    headers = {"User-Agent": UA}
+    if referer:
+        headers["Referer"] = referer
+    try:
+        async with httpx.AsyncClient(
+            timeout=DOWNLOAD_TIMEOUT, follow_redirects=True, headers=headers
+        ) as client:
+            async with client.stream("GET", url) as resp:
+                if resp.status_code != 200:
+                    logger.warning("download HTTP %s for %s", resp.status_code, url)
+                    return None
+                with target.open("wb") as f:
+                    async for chunk in resp.aiter_bytes(chunk_size=64 * 1024):
+                        f.write(chunk)
+    except Exception as e:
+        logger.warning("httpx download failed for %s: %s", url, e)
+        return None
+    return target if target.exists() and target.stat().st_size > 0 else None
+
+
+async def _download_video(
+    platform: str, post: dict[str, Any], video_url: str, target: Path
+) -> Optional[Path]:
+    """Dispatch to the right downloader per platform.
+
+    Known-good strategies (from scratch/crawl_videos.py):
+      x      : yt-dlp on the tweet page URL (video URLs are signed/rotating)
+      douyin : httpx direct with douyin.com Referer (video URL is a play API)
+      sph    : httpx direct with channels.weixin.qq.com Referer (stodownload link)
+      youtube: yt-dlp on the watch URL
+
+    For everything else (xhs / bili / weibo / zhihu / gzh / toutiao / github / ...):
+    try yt-dlp on the post's page URL first (yt-dlp supports 1000+ sites including
+    most aigc-channel platforms via cookies-free extractors), and fall back to
+    plain httpx on `videos[0]` if yt-dlp can't handle it.
+    """
+    if platform == "x":
+        page_url = post.get("link") or video_url
+        return await asyncio.to_thread(_yt_dlp_download, page_url, target)
+    if platform == "douyin":
+        return await _httpx_download(video_url, target, referer="https://www.douyin.com/")
+    if platform == "sph":
+        return await _httpx_download(video_url, target, referer="https://channels.weixin.qq.com/")
+    if platform == "youtube":
+        return await asyncio.to_thread(_yt_dlp_download, video_url, target)
+
+    # Generic two-step fallback for any other platform with a `videos` field.
+    page_url = post.get("link")
+    if page_url:
+        result = await asyncio.to_thread(_yt_dlp_download, page_url, target)
+        if result:
+            return result
+        logger.info("yt-dlp didn't handle %s page URL; falling back to httpx", platform)
+    return await _httpx_download(video_url, target)
+
+
+def _extract_m4a(video_path: Path, audio_path: Path) -> bool:
+    """ffmpeg: video -> 16kHz mono AAC 64kbps m4a. Returns True if file written."""
+    audio_path.parent.mkdir(parents=True, exist_ok=True)
+    if audio_path.exists() and audio_path.stat().st_size > 0:
+        return True
+    cmd = ["ffmpeg", "-y", "-hide_banner", "-loglevel", "error",
+           "-i", str(video_path),
+           "-vn", "-ac", "1", "-ar", "16000",
+           "-c:a", "aac", "-b:a", "64k",
+           str(audio_path)]
+    try:
+        subprocess.run(cmd, check=True, timeout=FFMPEG_TIMEOUT,
+                       capture_output=True)
+    except (subprocess.CalledProcessError, subprocess.TimeoutExpired, FileNotFoundError) as e:
+        logger.warning("ffmpeg failed for %s: %s", video_path, e)
+        return False
+    return audio_path.exists() and audio_path.stat().st_size > 0
+
+
+async def _transcribe_deepgram(
+    audio_path: Path,
+    api_key: str,
+    model: str = DEEPGRAM_MODEL_DEFAULT,
+    language: Optional[str] = None,
+) -> Optional[str]:
+    params: dict[str, str] = {
+        "model": model,
+        "smart_format": "true",
+        "punctuate": "true",
+    }
+    if language:
+        params["language"] = language
+    else:
+        params["detect_language"] = "true"
+    headers = {
+        "Authorization": f"Token {api_key}",
+        "Content-Type": "audio/mp4",
+    }
+    try:
+        audio_bytes = audio_path.read_bytes()
+        async with httpx.AsyncClient(timeout=DEEPGRAM_REQUEST_TIMEOUT) as client:
+            r = await client.post(DEEPGRAM_URL, params=params, headers=headers,
+                                  content=audio_bytes)
+    except Exception as e:
+        logger.warning("Deepgram request failed for %s: %s", audio_path.name, e)
+        return None
+    if r.status_code != 200:
+        logger.warning("Deepgram HTTP %s: %s", r.status_code, r.text[:200])
+        return None
+    try:
+        data = r.json()
+        alt = data["results"]["channels"][0]["alternatives"][0]
+        return alt.get("transcript") or None
+    except (KeyError, IndexError, ValueError) as e:
+        logger.warning("Deepgram response malformed: %s", e)
+        return None
+
+
+def _clean_chinese_spaces(text: str) -> str:
+    """Drop whitespace strictly between two CJK characters."""
+    return _CJK_SPACE_RE.sub("", text)
+
+
+def _ffprobe_duration_sync(video_url: str, referer: Optional[str] = None) -> Optional[float]:
+    """Read mp4 moov box over HTTP Range; returns duration (seconds) or None.
+
+    Does NOT download the video stream — typically pulls only a few KB even for
+    multi-GB files. Designed to be called from search() to enrich posts with
+    duration before scoring, without paying the cost of a full download.
+    """
+    cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration",
+           "-of", "default=nw=1:nk=1"]
+    if referer:
+        cmd += ["-headers", f"Referer: {referer}\r\n"]
+    cmd += [video_url]
+    try:
+        r = subprocess.run(cmd, capture_output=True, text=True, timeout=_DURATION_PROBE_TIMEOUT)
+    except (subprocess.TimeoutExpired, FileNotFoundError) as e:
+        logger.info("ffprobe duration probe failed for %s: %s", video_url[:80], e)
+        return None
+    out = (r.stdout or "").strip()
+    if not out:
+        return None
+    try:
+        d = float(out)
+    except ValueError:
+        return None
+    return d if d > 0 else None
+
+
+async def probe_video_duration(
+    video_url: str, platform: Optional[str] = None
+) -> Optional[float]:
+    """Async wrapper. Probes mp4 duration via HTTP Range; returns seconds or None.
+
+    Pass `platform` to auto-inject the right Referer header (douyin / sph / xhs / bili
+    require it). Safe to call concurrently — uses asyncio.to_thread so subprocesses
+    don't block the event loop. Each call is one ffprobe subprocess; cap parallelism
+    at the call site if probing many URLs.
+    """
+    if not video_url:
+        return None
+    referer = _PLATFORM_REFERERS.get(platform) if platform else None
+    return await asyncio.to_thread(_ffprobe_duration_sync, video_url, referer)
+
+
+async def probe_durations_for_posts(
+    platform: str, posts: list, concurrency: int = 8
+) -> None:
+    """In-place: probe each post's video URL and set post["duration_sec"] if found.
+
+    Skips posts with no video URL (image-only posts). Probes happen concurrently
+    bounded by `concurrency` to avoid spawning a flood of ffprobe subprocesses.
+    Failures are silent (post just won't have duration_sec — evaluator handles).
+    """
+    sem = asyncio.Semaphore(concurrency)
+
+    async def _one(post: dict) -> None:
+        url = extract_video_url(platform, post)
+        if not url:
+            return
+        async with sem:
+            d = await probe_video_duration(url, platform=platform)
+        if d is not None:
+            post["duration_sec"] = d
+
+    await asyncio.gather(*[_one(p) for p in posts if isinstance(p, dict)])
+
+
+def _get_api_key() -> Optional[str]:
+    key = os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
+    if key:
+        return key
+    try:
+        from dotenv import load_dotenv
+        load_dotenv()
+    except ImportError:
+        return None
+    return os.environ.get("DEEPGRAM_KEY") or os.environ.get("DEEPGRAM_API_KEY")
+
+
+async def transcribe_video_from_post(
+    platform: str,
+    post: dict[str, Any],
+    *,
+    model: str = DEEPGRAM_MODEL_DEFAULT,
+    language: Optional[str] = None,
+) -> Optional[str]:
+    """End-to-end: locate video, download, extract m4a, STT, clean spaces.
+
+    Returns transcript text or None if any step fails (logged at WARNING level).
+    Caller can safely ignore None and fall back to whatever body text it has.
+    """
+    url = extract_video_url(platform, post)
+    if not url:
+        return None
+    api_key = _get_api_key()
+    if not api_key:
+        logger.warning("DEEPGRAM_KEY not set; skipping transcription for %s", platform)
+        return None
+
+    stem = _safe_stem(platform, post)
+    work_dir = _TMP_ROOT / platform
+    work_dir.mkdir(parents=True, exist_ok=True)
+    video_path = work_dir / f"{stem}.mp4"
+    audio_path = work_dir / f"{stem}.m4a"
+
+    video = await _download_video(platform, post, url, video_path)
+    if not video:
+        return None
+
+    if not await asyncio.to_thread(_extract_m4a, video, audio_path):
+        return None
+
+    transcript = await _transcribe_deepgram(audio_path, api_key, model=model, language=language)
+    if not transcript:
+        return None
+    return _clean_chinese_spaces(transcript).strip()

+ 7 - 3
examples/process_pipeline/prompts/researcher.prompt

@@ -4,7 +4,7 @@ temperature: 0.3
 
 $system$
 
-你是一个专注的渠道调研专家。你负责在指定的单个渠道(如小红书、X、youtube)进行完整的广度调研,包括多关键词搜索、适度查看内容,并输出结构化的调研结果。
+你是一个专注的渠道调研专家。你负责在指定的单个渠道(图文为主如小红书/知乎/公众号;视频为主如 youtube / X / 抖音 / 视频号)进行完整的广度调研,包括多关键词搜索、适度查看内容,并输出结构化的调研结果。
 
 ---
 
@@ -16,8 +16,12 @@ $system$
 
 ### 搜索工具
 - `content_platforms(platform="")` — 列出/查询平台详细搜索参数
-- `content_search(platform, keyword, max_count=20)` — 跨平台搜索案例(返回结果和序列号)
-  - platform 常用值: `xhs`(小红书), `youtube`, `x`(Twitter), `bili`, `gzh`, `zhihu`
+- `content_search(platform, keyword, max_count=20, extras={...})` — 跨平台搜索案例(返回结果和序列号)
+  - platform 常用值(按内容形态分组):
+    - **图文为主**:`xhs`(小红书), `gzh`(公众号), `zhihu`(知乎), `weibo`(微博), `toutiao`(头条), `github`
+    - **视频为主**:`youtube`, `x`(Twitter), `douyin`(抖音), `sph`(微信视频号), `bili`(B 站)
+  - **搜视频教程时的关键技巧**:对 AIGC 系平台(xhs / douyin / sph / bili / gzh 等)传 `extras={"content_type": "视频"}` 强制只返回视频内容;youtube / x 平台本身偏视频,无需此参数。若调研目标明显是动手演示类教程,应优先选视频渠道。
+  - **视频可下载性**:`youtube` / `x` / `douyin` / `sph` 四个平台的搜索结果会自动附带可直接播放/下载的视频直链;其余平台仅返回页面 URL。
 - `content_detail(platform, index)` — 根据 content_search 结果的序号查看详细内容和全文
 - `content_suggest(platform, keyword)` — 获取搜索相关建议词
 

+ 52 - 0
examples/process_pipeline/run_metrics.json

@@ -2609,5 +2609,57 @@
       "research"
     ],
     "timestamp": "2026-05-14T17:42:11.272855"
+  },
+  {
+    "index": 109,
+    "requirement": "用ai生成真实摄影的美女写真组图,要求具有真实感,氛围感,人物一致性保持...",
+    "duration_seconds": 3.05,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "active_steps": [
+      "source"
+    ],
+    "timestamp": "2026-05-15T16:08:19.318536"
+  },
+  {
+    "index": 109,
+    "requirement": "用ai生成真实摄影的美女写真组图,要求具有真实感,氛围感,人物一致性保持...",
+    "duration_seconds": 2.21,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "active_steps": [
+      "source"
+    ],
+    "timestamp": "2026-05-15T16:24:20.607244"
+  },
+  {
+    "index": 109,
+    "requirement": "用ai生成真实摄影的美女写真组图,要求具有真实感,氛围感,人物一致性保持...",
+    "duration_seconds": 0.73,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "active_steps": [
+      "source"
+    ],
+    "timestamp": "2026-05-15T16:29:54.897458"
+  },
+  {
+    "index": 109,
+    "requirement": "用ai生成真实摄影的美女写真组图,要求具有真实感,氛围感,人物一致性保持...",
+    "duration_seconds": 0.73,
+    "total_cost_usd": 0.0,
+    "costs_breakdown": {},
+    "trace_ids": {},
+    "errors": [],
+    "active_steps": [
+      "source"
+    ],
+    "timestamp": "2026-05-15T16:31:36.684729"
   }
 ]

+ 2 - 2
examples/process_pipeline/run_pipeline.py

@@ -23,7 +23,7 @@ CLI 速查
 
 可选参数:
   --case-index N             仅 decode-workflow / apply-grounding 支持
-  --platforms xhs,zhihu,gzh,youtube   research 阶段平台过滤
+  --platforms xhs,zhihu,gzh,youtube,douyin,sph   research 阶段平台过滤
   --skip-existing            仅在某 case 还没生成 decode 输出时才跑(增量模式)。
                              默认行为是全覆盖:每次跑都把所有 case 重新生成。
                              仅对 decode-workflow 批量模式生效;单 case 模式本身就总是重跑。
@@ -608,7 +608,7 @@ def _parse_args() -> argparse.Namespace:
     parser = argparse.ArgumentParser(description="AIGC Process Pipeline (5-step)")
     parser.add_argument("--index", type=int, required=True,
                         help="Index of requirement in db_requirements.json (0-based)")
-    parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube",
+    parser.add_argument("--platforms", type=str, default="xhs,zhihu,gzh,youtube,douyin,sph",
                         help="Comma-separated platforms for research step")
     parser.add_argument("--case-index", type=int, default=None,
                         help="Re-run a single case in decode-workflow / apply-grounding")

+ 121 - 12
examples/process_pipeline/script/evaluate_source_quality.py

@@ -10,10 +10,20 @@ Source.json 质量评估模块
 """
 
 import json
+import re
 from pathlib import Path
 from typing import Dict, List, Tuple
 from datetime import datetime, timedelta
 
+_HTML_TAG_RE = re.compile(r"<[^>]+>")
+
+
+def _strip_html(text) -> str:
+    """Remove inline HTML tags (e.g. <em class="highlight">) from search-result text."""
+    if not text:
+        return ""
+    return _HTML_TAG_RE.sub("", str(text))
+
 
 class SourceQualityEvaluator:
     """Source 数据质量评估器"""
@@ -65,10 +75,18 @@ class SourceQualityEvaluator:
                 "total_fields": int,       # 总字段数
             }
         """
+        # Video posts (content_type=="video" 或 videos 字段非空) 通常没有 body_text,
+        # 仅靠 caption + 互动数据评分,避免被 body 长度一律打低分。
+        is_video = (
+            post.get("content_type") == "video"
+            or bool(post.get("videos"))
+        )
+
         result = {
+            "mode": "video" if is_video else "text",
             "field_score": 0.0,
-            "text_score": 0.0,
-            "engagement_score": 0.0,
+            "text_score": 0.0,       # video 模式下含义为 title-only (0-15)
+            "engagement_score": 0.0, # video 模式下扩展为 (0-45)
             "total_score": 0.0,
             "grade": "F",
             "issues": [],
@@ -76,20 +94,23 @@ class SourceQualityEvaluator:
             "total_fields": len(self.FIELD_WEIGHTS),
         }
 
-        # 1. 字段完整性评分 (0-40分)
+        # 1. 字段完整性评分 (0-40 分)
         field_score, valid_count = self._evaluate_fields(post)
         result["field_score"] = field_score
         result["valid_fields"] = valid_count
 
-        # 2. 文本质量评分 (0-40分)
-        text_score, text_issues = self._evaluate_text(post)
-        result["text_score"] = text_score
-        result["issues"].extend(text_issues)
-
-        # 3. 互动数据评分 (0-20分)
-        engagement_score, engagement_issues = self._evaluate_engagement(post)
-        result["engagement_score"] = engagement_score
-        result["issues"].extend(engagement_issues)
+        # 2 & 3. 文本/互动评分(视频模式跳过 body 长度,重分权重到 title + 互动)
+        if is_video:
+            title_score, eng_score, issues = self._evaluate_video_signals(post)
+            result["text_score"] = title_score
+            result["engagement_score"] = eng_score
+            result["issues"].extend(issues)
+        else:
+            text_score, text_issues = self._evaluate_text(post)
+            engagement_score, engagement_issues = self._evaluate_engagement(post)
+            result["text_score"] = text_score
+            result["engagement_score"] = engagement_score
+            result["issues"].extend(text_issues + engagement_issues)
 
         # 计算总分和等级
         result["total_score"] = round(
@@ -99,6 +120,94 @@ class SourceQualityEvaluator:
 
         return result
 
+    # ── video-mode 阈值(mirror body length tiers, but on seconds) ──
+    DURATION_THRESHOLDS = {
+        "very_short": 30,    # <30s     -> 5/30
+        "short":      60,    # 30-60s   -> 12/30
+        "fair":       120,   # 60-120s  -> 20/30
+        "good":       300,   # 2-5 min  -> 26/30
+        "long":       1800,  # 5-30 min -> 30/30 (best)
+        # >=1800s (>30 min) -> 22/30 (信息密度下降)
+    }
+
+    def _evaluate_video_signals(self, post: dict) -> Tuple[float, float, List[str]]:
+        """For video posts: replaces body-length scoring with video-duration scoring.
+
+        Composition: title (0-10) + duration (0-30) + engagement (0-20) = 0-60,
+        mirroring the article-post weights but with duration as the content signal.
+
+        Reads `duration_sec` from the post (populated by search() via
+        transcription.probe_durations_for_posts before scoring). If absent
+        (probe failed / no video URL), duration_score is 0 with an issue noted.
+        """
+        issues: List[str] = []
+
+        # ── title 0-10 ──
+        title = _strip_html(post.get("title", "")).strip()
+        tlen = len(title)
+        if tlen == 0:
+            title_score = 0
+            issues.append("标题为空")
+        elif tlen < 10:
+            title_score = 3
+            issues.append(f"标题过短 ({tlen}字)")
+        elif tlen < 20:
+            title_score = 6
+        else:
+            title_score = 10
+
+        # ── duration 0-30 (replaces body_text length) ──
+        duration = post.get("duration_sec")
+        if not isinstance(duration, (int, float)) or duration <= 0:
+            dur_score = 0
+            issues.append("无视频时长")
+        elif duration < self.DURATION_THRESHOLDS["very_short"]:
+            dur_score = 5
+            issues.append(f"视频极短 ({duration:.0f}s)")
+        elif duration < self.DURATION_THRESHOLDS["short"]:
+            dur_score = 12
+            issues.append(f"视频较短 ({duration:.0f}s)")
+        elif duration < self.DURATION_THRESHOLDS["fair"]:
+            dur_score = 20
+        elif duration < self.DURATION_THRESHOLDS["good"]:
+            dur_score = 26
+        elif duration < self.DURATION_THRESHOLDS["long"]:
+            dur_score = 30
+        else:
+            dur_score = 22
+            issues.append(f"视频较长 ({duration:.0f}s,>30 分钟密度可能下降)")
+
+        # ── engagement 0-20 (与文章帖相同) ──
+        like_count = post.get("like_count", 0)
+        if not isinstance(like_count, (int, float)):
+            like_count = 0
+        if like_count == 0:
+            like_score = 0
+            issues.append("无点赞数据")
+        elif like_count < 10:
+            like_score = 3
+        elif like_count < 100:
+            like_score = 6
+        elif like_count < 1000:
+            like_score = 8
+        else:
+            like_score = 10
+
+        timestamp = post.get("publish_timestamp", 0)
+        if not isinstance(timestamp, (int, float)):
+            timestamp = 0
+        if timestamp == 0:
+            ts_score = 0
+            issues.append("无发布时间")
+        elif timestamp < self.cutoff_timestamp:
+            ts_score = 2
+            issues.append(f"内容过时(超过{self.time_window_days}天)")
+        else:
+            ts_score = 10
+
+        # text_score 字段在 video mode 下含义 = title + duration (0-40)
+        return float(title_score + dur_score), float(like_score + ts_score), issues
+
     def _evaluate_fields(self, post: dict) -> Tuple[float, int]:
         """评估字段完整性"""
         total_weight = sum(self.FIELD_WEIGHTS.values())

+ 76 - 24
examples/process_pipeline/script/extract_sources.py

@@ -35,6 +35,12 @@ _URL_PATTERNS = [
     ("zhihu", re.compile(r"zhihu\.com/question/\d+/answer/(\d+)")),
     # 公众号: 通过 __biz 或整个 URL 作为 id(后备)
     ("gzh", re.compile(r"mp\.weixin\.qq\.com/s[/?]([^\s\"']+)")),
+    # 抖音: https://www.douyin.com/video/{numeric_id} — content_id 直接匹配
+    # douyin post.channel_content_id
+    ("douyin", re.compile(r"douyin\.com/video/(\d+)")),
+    # 视频号: post.link 是 'export/{base64-like token}' 不是 http URL;
+    # 这里识别出来打上 sph 标签,真实定位走 ("url", link) 索引兜底(见 main 流程)
+    ("sph", re.compile(r"^export/([A-Za-z0-9+/=_\-]+)$")),
 ]
 
 
@@ -102,6 +108,39 @@ DEFAULT_MIN_SCORE = 70.0
 DEFAULT_CUTOFF_DATE = (2025, 10, 1)
 
 
+_TRANSCRIPT_MARKER = "[视频字幕]"
+
+
+def _merge_transcript_into_body(post: Dict[str, Any]) -> Dict[str, Any]:
+    """Return a shallow copy of `post` where `body_text` includes the video transcript.
+
+    For sph/douyin/x video posts whose original body_text is just hashtags or empty,
+    this exposes the real video content (Deepgram transcript or YouTube captions)
+    under the field the frontend reads. The original transcript field is left intact
+    so downstream code that wants it separately still works.
+
+    Idempotent: if body_text already contains the `[视频字幕]` marker, skip.
+    """
+    if not isinstance(post, dict):
+        return post
+    transcript = post.get("video_transcript") or post.get("captions") or ""
+    if not isinstance(transcript, str) or not transcript.strip():
+        return post
+    transcript = transcript.strip()
+    body = post.get("body_text") or post.get("desc") or ""
+    body = body.strip() if isinstance(body, str) else ""
+
+    if body and _TRANSCRIPT_MARKER in body:
+        return post  # 已经合并过
+
+    merged = dict(post)  # shallow copy — body_text 是 top-level field
+    if body:
+        merged["body_text"] = f"{body}\n\n{_TRANSCRIPT_MARKER}\n{transcript}"
+    else:
+        merged["body_text"] = f"{_TRANSCRIPT_MARKER}\n{transcript}"
+    return merged
+
+
 def _is_before_cutoff(source: Dict[str, Any], cutoff_ts: int) -> bool:
     """判断帖子是否早于截止时间戳(秒级)
 
@@ -150,10 +189,14 @@ def _check_filters(
     """
     post = source.get("post", {}) or {}
 
-    # 1. body_text 完整性
+    # 1. 内容完整性:视频帖把 video_transcript 也计入(视频帖 body_text 通常为空)
     body = post.get("body_text") or post.get("desc") or ""
-    if not isinstance(body, str) or len(body.strip()) < min_body_len:
-        return f"missing_body_text:len={len(body.strip()) if isinstance(body, str) else 0}"
+    transcript = post.get("video_transcript") or post.get("captions") or ""
+    body_len = len(body.strip()) if isinstance(body, str) else 0
+    transcript_len = len(transcript.strip()) if isinstance(transcript, str) else 0
+    total_len = body_len + transcript_len
+    if total_len < min_body_len:
+        return f"missing_body_text:body={body_len},transcript={transcript_len}"
 
     # 2. agent 评分
     evaluation = source.get("evaluation")
@@ -370,9 +413,32 @@ def extract_sources_to_json(
         for entry in entries:
             url = entry["url"]
             evaluation = entry.get("evaluation")
-            # 解析 URL 得到 platform 和 content_id
+            # 解析 URL 得到 platform 和 content_id(可能失败,例如 sph 的 export/... token)
             parsed = parse_url(url)
-            if not parsed:
+            platform: Optional[str] = parsed[0] if parsed else None
+            cid: Optional[str] = parsed[1] if parsed else None
+
+            # 多级匹配,按优先级:
+            # 1. (platform, content_id) 精确匹配(parse_url 成功的情况)
+            # 2. ("url", link) 完整字符串匹配(兜底,对 sph 这种非 http URL 必需)
+            # 3. ("norm_url", normalized) 规范化匹配
+            post = None
+            if platform and cid:
+                post = cache_index.get((platform, cid))
+            if not post:
+                post = cache_index.get(("url", url))
+            if not post:
+                norm = _normalize_url(url)
+                if norm:
+                    post = cache_index.get(("norm_url", norm))
+
+            # 如果 parse_url 失败但 URL 索引命中了 post,从 post 推断 platform / cid
+            if post and (not platform or not cid):
+                platform = platform or post.get("channel") or "unknown"
+                cid = cid or post.get("channel_content_id") or post.get("video_id") or url
+
+            # 既无 parsed 又无 cache 命中:真 unmatched
+            if not post and not platform:
                 unmatched.append({
                     "case_file": case_file.name,
                     "url": url,
@@ -380,34 +446,20 @@ def extract_sources_to_json(
                 })
                 continue
 
-            platform, cid = parsed
-            key = (platform, cid)
+            key = (platform, str(cid) if cid else url)
             if key in seen_keys:
                 continue
             seen_keys.add(key)
 
-            # 多级匹配:
-            # 1. (platform, content_id) 精确匹配
-            # 2. 完整 URL 匹配
-            # 3. 规范化 URL 匹配
-            post = cache_index.get(key)
-
-            if not post:
-                # 2. 完整 URL
-                post = cache_index.get(("url", url))
-
-            if not post:
-                # 3. 规范化 URL
-                norm = _normalize_url(url)
-                if norm:
-                    post = cache_index.get(("norm_url", norm))
-
             if post:
                 # 统一用 cache 中的 channel_content_id 生成 case_id
                 # 这样保证 case_id 和 cache 中的 ID 一致
                 actual_cid = post.get("channel_content_id") or post.get("video_id") or cid
                 actual_case_id = f"{platform}_{actual_cid}"
 
+                # 合并 video_transcript 到 body_text(视频帖前端展示需要)
+                post_for_source = _merge_transcript_into_body(post)
+
                 matched.append({
                     "case_id": actual_case_id,
                     "case_file": case_file.name,
@@ -415,7 +467,7 @@ def extract_sources_to_json(
                     "channel_content_id": str(actual_cid),
                     "source_url": url,
                     "evaluation": evaluation,
-                    "post": post,
+                    "post": post_for_source,
                     "comments": post.get("author_comments", []) or [],
                 })
             else:

+ 23 - 3
examples/process_pipeline/script/generate_case.py

@@ -87,10 +87,30 @@ def _extract_url(post: Dict[str, Any], platform: str) -> str:
 
 
 def _extract_body(post: Dict[str, Any], platform: str) -> str:
-    """字段映射:body_text / description"""
+    """字段映射:body_text / description;视频帖把 video_transcript 也并入。
+
+    抖音 / 视频号等视频平台 post.body_text 通常只是几个 hashtag(甚至为空),
+    而真正的内容在 video_transcript(Deepgram 转写)/ captions(YouTube 官方字幕)。
+    把两者拼起来让下游评分、过滤、agent prompt 都能看到完整内容。
+
+    幂等:如果 body_text 已经被 extract_sources 合并过(含 `[视频字幕]` 标记),
+    直接返回原 body,避免重复 append。
+    """
     if platform == "youtube":
-        return post.get("description") or post.get("body_text") or ""
-    return post.get("body_text") or ""
+        body = post.get("description") or post.get("body_text") or ""
+    else:
+        body = post.get("body_text") or ""
+
+    body = body.strip() if isinstance(body, str) else ""
+    # 已经合并过 → 跳过,避免重复
+    if body and "[视频字幕]" in body:
+        return body
+
+    transcript = post.get("video_transcript") or post.get("captions") or ""
+    transcript = transcript.strip() if isinstance(transcript, str) else ""
+    if transcript and body:
+        return f"{body}\n\n[视频字幕]\n{transcript}"
+    return transcript or body
 
 
 def _extract_raw_images(post: Dict[str, Any], platform: str) -> List[str]:

+ 1 - 1
examples/process_pipeline/script/validate_schema.py

@@ -24,7 +24,7 @@ from typing import Dict, List, Optional, Set, Tuple
 
 from .schema_manager import validate_with_schema, get_schema_manager
 
-VALID_PLATFORMS = {"xhs", "youtube", "bili", "x", "zhihu", "gzh"}
+VALID_PLATFORMS = {"xhs", "youtube", "bili", "x", "zhihu", "gzh", "douyin", "sph"}
 
 
 # ── 文件名 → schema 名映射 ──────────────────────────────

+ 3 - 3
examples/process_pipeline/ui/app.js

@@ -2,7 +2,7 @@ let requirements = [];
 let currentSelectedIndex = null;
 let activeRuns = {};
 let statusInterval = null;
-let currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
+let currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x', 'douyin', 'sph'];
 
 let currentPromptName = null;
 const modalPrompts = document.getElementById('prompts-modal');
@@ -618,10 +618,10 @@ async function fetchRequirementData(index) {
                 .filter(p => p.startsWith('case_') && p !== 'case_detailed' && p !== 'case')
                 .map(p => p.replace('case_', ''));
             if (currentAvailablePlatforms.length === 0) {
-                currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
+                currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x', 'douyin', 'sph'];
             }
         } else {
-            currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x'];
+            currentAvailablePlatforms = ['xhs', 'youtube', 'bili', 'x', 'douyin', 'sph'];
         }
     } catch (e) {
         console.error("Failed to fetch data", e);