Просмотр исходного кода

feat: 新增工具知识导入、X平台适配与前端视频预览

- 新增db.py中的fetch_adopted_tools_cases函数,查询已采纳且带工具解构的案例
- 新增工具知识批量导入脚本,复用工序导入逻辑,支持Dry-Run、限流和按搜索任务筛选
- 补充extract_sources.py的X平台帖子处理,转换嵌套媒体数组并补全链接
- 为mode_workflow前端页新增视频预览模块,包含样式与渲染逻辑,支持错误降级
刘文武 1 день назад
Родитель
Сommit
f14bc12260

+ 28 - 0
examples/mode_workflow/db.py

@@ -806,6 +806,34 @@ def fetch_adopted_process_cases(query_id=None):
     return sorted(set(cases))
     return sorted(set(cases))
 
 
 
 
+def fetch_adopted_tools_cases(query_id=None):
+    """返回「已采纳且有工具解构」的 case_id 列表(供工具知识上传脚本用)。
+
+    与 fetch_adopted_process_cases 完全同构,只把搜索/解构表换成工具方向:
+    采纳是帖子级属性(评估存在 search_tools),工具解构存在 mode_tools,故二者 JOIN,
+    只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。
+    query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
+    """
+    sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
+           f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro "
+           "FROM search_tools s "
+           "JOIN (SELECT DISTINCT case_id FROM mode_tools) m ON s.case_id = m.case_id")
+    params = ()
+    if query_id:
+        sql += " WHERE s.query_id=%s"
+        params = (query_id,)
+    conn = _conn()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(sql, params)
+            rows = cur.fetchall()
+    finally:
+        conn.close()
+    cases = [r["case_id"] for r in rows
+             if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])]
+    return sorted(set(cases))
+
+
 # ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)──────────
 # ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)──────────
 
 
 def fetch_existing_eval(case_id, table="search_process"):
 def fetch_existing_eval(case_id, table="search_process"):

+ 38 - 0
examples/mode_workflow/index.html

@@ -1362,6 +1362,18 @@
         background: #f3f2ed;
         background: #f3f2ed;
         cursor: zoom-in;
         cursor: zoom-in;
       }
       }
+      .pd-videos {
+        display: grid;
+        grid-template-columns: 1fr;
+        gap: 8px;
+      }
+      .pd-videos video {
+        width: 100%;
+        max-height: 320px;
+        border: 1px solid var(--line);
+        border-radius: 10px;
+        background: #000;
+      }
       /* ── 图片预览灯箱 ── */
       /* ── 图片预览灯箱 ── */
       dialog.lightbox {
       dialog.lightbox {
         position: fixed;
         position: fixed;
@@ -2223,6 +2235,18 @@
               class="pd-images"
               class="pd-images"
               id="pd-images"
               id="pd-images"
             ></div>
             ></div>
+            <div
+              class="pd-sec-title"
+              id="pd-videos-title"
+              style="display: none"
+            >
+              视频预览
+            </div>
+            <div
+              class="pd-videos"
+              id="pd-videos"
+              style="display: none"
+            ></div>
             <div
             <div
               class="pd-tags"
               class="pd-tags"
               id="pd-tags"
               id="pd-tags"
@@ -3050,6 +3074,20 @@
               .join("")
               .join("")
           : '<p style="color:var(--ink-faint);font-size:12px">搜索详情未返回图片。</p>';
           : '<p style="color:var(--ink-faint);font-size:12px">搜索详情未返回图片。</p>';
         lb.imgs = imgs;
         lb.imgs = imgs;
+        // 视频预览:仅当 videos 有值时显示模块
+        const vids = (p.videos || []).filter(Boolean);
+        const showVids = vids.length > 0;
+        $("#pd-videos-title").style.display = showVids ? "" : "none";
+        const pdVideos = $("#pd-videos");
+        pdVideos.style.display = showVids ? "" : "none";
+        pdVideos.innerHTML = showVids
+          ? vids
+              .map(
+                (s) =>
+                  `<video src="${esc(imgDirect(s))}" referrerpolicy="no-referrer" controls preload="metadata" playsinline onerror="this.onerror=null;this.src='${esc(imgProxy(s))}'"></video>`,
+              )
+              .join("")
+          : "";
         $("#pd-tags").innerHTML = [
         $("#pd-tags").innerHTML = [
           ...(p.knowledge_type || []).map((t) => "类型:" + t),
           ...(p.knowledge_type || []).map((t) => "类型:" + t),
           ...(p.found_by || []).map((q) => "命中:" + q),
           ...(p.found_by || []).map((q) => "命中:" + q),

+ 356 - 0
examples/mode_workflow/stages/import_tools_knowledge.py

@@ -0,0 +1,356 @@
+"""
+把数据库里「已采纳」的工具解构(mode_tools)批量导入到知识导入接口。
+
+与 stages/import_process_knowledge.py 完全同构,只把数据方向从「工序」换成「工具」:
+  参考实现 —— Downloads/import/tools_knowledge/main.py:扫 data/*/tools/*.json 本地文件。
+  本脚本   —— 从 MySQL 取数:db.fetch_adopted_tools_cases() 拿采纳 case_id,
+              再 db.fetch_tools(case_id) 重建 {…, tools:[...]}(取最新版本)。
+两边「每个 tool → 一条知识」的口径一致,故 payload 组装逻辑直接复用参考实现。
+
+字段映射(同参考实现 tools_knowledge/main.py):
+  source.id          ← case_id(DB 主键)
+  source.source_type ← 固定 "post"
+  source.title       ← mode_tools.post_title
+  每个 tool → 一条知识:
+    title          ← tool.工具名称;为空回退 "来源标题 — 工具N"
+    content        ← 整个 tool 对象的 JSON 串(原封不动传过去)
+    dim_attributes ← 固定 ["how工具"]   dim_creations ← 固定 ["制作"]
+    scopes         ← 实质作用域(substance)/形式作用域(form)按顿号拆分去重
+    custom_ext     ← 工具名称→工具、输入、输出 去重
+
+采纳口径:db.is_adopted_rel(相关性<4 / 实现完整性<4 / 发布超两年 / 综合分<6 任一命中即不采纳)。
+
+去重台账:复用 knowledge_ingest_log(case_id, proc_index)。工具与工序共用此表,故工具的
+index 统一加 TOOL_INDEX_BASE 偏移,避免同一 case 既有工序又有工具时键冲突。
+
+用法:
+    python stages/import_tools_knowledge.py                      # 真实导入(采纳工具全量)
+    python stages/import_tools_knowledge.py --dry-run            # 只组装+打印,不调接口
+    python stages/import_tools_knowledge.py --dry-run --verbose  # 打印完整 payload JSON
+    python stages/import_tools_knowledge.py --query-id q0001     # 只传某搜索任务下的采纳 case
+    python stages/import_tools_knowledge.py --limit 5            # 只处理前 5 个 case(调试)
+    python stages/import_tools_knowledge.py --api-url http://... # 指定后端地址
+    python stages/import_tools_knowledge.py --delay 200          # 每次调用间隔 200ms
+"""
+
+import argparse
+import json
+import logging
+import sys
+import time
+
+import requests
+
+# 本脚本归档在 stages/ 子目录,补 mode_workflow/ 到 sys.path 以裸 import db
+from pathlib import Path
+sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
+import db
+
+# ── 配置(对齐参考实现 tools_knowledge/main.py)────────────────────────────────
+
+DEFAULT_API_URL = "http://47.236.83.130:8001"
+INGEST_ENDPOINT = "/api/v1/knowledge/ingest"
+
+DIM_ATTRIBUTES = ["how工具"]
+DIM_CREATIONS = ["制作"]
+
+EXT_KEY_TOOL = "工具"
+
+# 工具知识在共享台账 knowledge_ingest_log 里的 index 偏移量,避免与工序(1-based)冲突。
+TOOL_INDEX_BASE = 100000
+
+# ── 日志 ──────────────────────────────────────────────────────────────────────
+
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s [%(levelname)s] %(message)s",
+    datefmt="%H:%M:%S",
+)
+logger = logging.getLogger(__name__)
+
+
+# ── 数据来源:从 DB 取采纳工具(替代参考实现的本地文件扫描)──────────────────────
+
+def iter_cases_from_db(query_id=None, limit=None):
+    """产出 (case_id, source_data, tools, version)。
+
+    先取采纳 case_id 列表,再逐个 fetch_tools 重建解构详情(取最新版本)。
+    fetch_tools 返回 None(无解构行)或 tools 为空的 case 自动跳过。
+    source_data 用 mode_tools 自带的 platform/post_title 拼装(mode_tools 不存 source 块)。
+    version 用于上传去重:同 case 同工具、版本未变即视为已传过,跳过。
+    """
+    case_ids = db.fetch_adopted_tools_cases(query_id)
+    if limit:
+        case_ids = case_ids[:limit]
+    for case_id in case_ids:
+        payload = db.fetch_tools(case_id)   # 最新版本
+        if not payload:
+            continue
+        source_data = {
+            "title": payload.get("title"),
+            "platform": payload.get("platform"),
+        }
+        yield (case_id, source_data,
+               (payload.get("tools") or []), payload.get("version"))
+
+
+# ── 作用域提取(原样复用参考实现)──────────────────────────────────────────────
+
+def _split_values(raw):
+    """按顿号分割,括号内的顿号不作为分隔符,结果去重保序。
+      "高保真线框图、UI设计稿"          → ["高保真线框图", "UI设计稿"]
+      "修改后的照片(发型、服装)、二次元服装" → ["修改后的照片(发型、服装)", "二次元服装"]
+    """
+    parts, current, depth = [], [], 0
+    for ch in raw:
+        if ch in ("(", "("):
+            depth += 1
+            current.append(ch)
+        elif ch in (")", ")"):
+            depth -= 1
+            current.append(ch)
+        elif ch == "、" and depth == 0:
+            part = "".join(current).strip()
+            if part:
+                parts.append(part)
+            current = []
+        else:
+            current.append(ch)
+    part = "".join(current).strip()
+    if part:
+        parts.append(part)
+
+    seen, result = set(), []
+    for p in parts:
+        if p not in seen:
+            seen.add(p)
+            result.append(p)
+    return result
+
+
+def _iter_scope_items(raw):
+    """实质/形式作用域在 mode_tools 里存为 JSON 数组(也兼容旧的顿号串):
+    先归一为字符串列表,再对每个元素按顿号拆分。逐个产出拆分后的标量值。"""
+    if raw is None:
+        return
+    items = raw if isinstance(raw, list) else [raw]
+    for item in items:
+        text = str(item).strip()
+        if not text:
+            continue
+        for value in _split_values(text):
+            yield value
+
+
+def build_scopes(tool):
+    """从工具的实质作用域(substance)/形式作用域(form)各自去重,返回 scope 列表。"""
+    seen_sub, seen_form, scopes = set(), set(), []
+    for sub in _iter_scope_items(tool.get("实质作用域")):
+        if sub not in seen_sub:
+            scopes.append({"scope_type": "substance", "value": sub})
+            seen_sub.add(sub)
+    for form in _iter_scope_items(tool.get("形式作用域")):
+        if form not in seen_form:
+            scopes.append({"scope_type": "form", "value": form})
+            seen_form.add(form)
+    return scopes
+
+
+def build_custom_ext(tool):
+    """从工具 JSON 提取工具名称、输入、输出,构造 custom_ext 条目(同参考实现)。"""
+    entries = []
+
+    name = (tool.get("工具名称") or "").strip()
+    if name:
+        entries.append({"key": EXT_KEY_TOOL, "type": "str", "value": name})
+
+    for key in ("输入", "输出"):
+        raw = tool.get(key)
+        if not raw or not str(raw).strip():
+            continue
+        for value in _split_values(str(raw)):
+            entries.append({"key": key, "type": "str", "value": value})
+
+    return entries
+
+
+# ── 单条 payload 组装(复用参考实现;source_id 直接用 case_id)─────────────────────
+
+def build_payload(source_id, source_data, tool, tool_index):
+    source_title = (source_data.get("title") or "").strip()
+    tool_name = (tool.get("工具名称") or "").strip()
+
+    if tool_name:
+        knowledge_title = tool_name
+    elif source_title:
+        knowledge_title = f"{source_title} — 工具{tool_index}"
+    else:
+        knowledge_title = f"工具{tool_index}"
+
+    content = json.dumps(tool, ensure_ascii=False)
+
+    source_metadata = {
+        "platform": source_data.get("platform") or "",
+        "url": tool.get("来源链接") or None,
+        "creation_layer": tool.get("创作层级") or "",
+        "updated_time": tool.get("最新更新时间") or "",
+    }
+
+    payload = {
+        "source": {
+            "id": source_id,
+            "source_type": "post",
+            "title": source_title or None,
+            "author": None,
+            "source_metadata": source_metadata,
+        },
+        "title": knowledge_title[:512],
+        "content": content,
+        "dim_attributes": DIM_ATTRIBUTES,
+        "dim_creations": DIM_CREATIONS,
+    }
+
+    scopes = build_scopes(tool)
+    if scopes:
+        payload["scopes"] = scopes
+    custom_ext = build_custom_ext(tool)
+    if custom_ext:
+        payload["custom_ext"] = custom_ext
+    return payload
+
+
+# ── 单条写入(原样复用参考实现)────────────────────────────────────────────────
+
+def ingest_one(api_url, payload, dry_run):
+    """调用导入接口写入一条知识,返回 (success, info_message, knowledge_id)。"""
+    if dry_run:
+        return True, "(dry-run, skipped)", None
+
+    url = api_url.rstrip("/") + INGEST_ENDPOINT
+    try:
+        resp = requests.post(url, json=payload, timeout=30)
+        if resp.status_code == 201:
+            kid = resp.json().get("knowledge_id", "?")
+            return True, f"knowledge_id={kid}", kid
+        try:
+            detail = resp.json().get("detail", resp.text[:300])
+        except Exception:
+            detail = resp.text[:300]
+        return False, f"HTTP {resp.status_code}: {detail}", None
+    except requests.Timeout:
+        return False, "超时(30s)", None
+    except requests.RequestException as exc:
+        return False, str(exc), None
+
+
+# ── 主循环 ────────────────────────────────────────────────────────────────────
+
+def run(api_url, dry_run, verbose, delay_ms, query_id, limit, force):
+    cases = list(iter_cases_from_db(query_id, limit))
+    if not cases:
+        scope = f"(query_id={query_id})" if query_id else ""
+        logger.error("DB 中未发现任何「已采纳且有工具解构」的 case %s", scope)
+        sys.exit(1)
+
+    mode_tag = "  [DRY-RUN]" if dry_run else ""
+    force_tag = "  [FORCE]" if force else ""
+    logger.info("发现 %d 个采纳 case。目标接口:%s%s%s", len(cases), api_url, mode_tag, force_tag)
+
+    ok_count = fail_count = skip_count = dup_count = 0
+
+    for case_id, source_data, tools, version in cases:
+        logger.info("── %s", case_id)
+        if not tools:
+            logger.warning("  无 tools,跳过")
+            skip_count += 1
+            continue
+        # 去重台账:该 case 各工具已导入的版本。force 时清空(强制重导)。
+        ingested = {} if force else db.fetch_ingested_map(case_id)
+        logger.info("  source_id=%-45s  tools=%d  version=%s", case_id, len(tools), version)
+
+        for idx, tool in enumerate(tools, 1):
+            ledger_index = TOOL_INDEX_BASE + idx
+            # 已导入且版本未变 → 跳过,杜绝重复上传(版本变了说明重解构过,应重导)。
+            if not force and ingested.get(ledger_index) == version:
+                logger.info("  ♻️ [%d/%d] 已导入(版本 %s),跳过", idx, len(tools), version)
+                dup_count += 1
+                continue
+
+            payload = build_payload(case_id, source_data, tool, idx)
+            title = payload["title"]
+            n_scopes = len(payload.get("scopes", []))
+            n_ext = len(payload.get("custom_ext", []))
+
+            if dry_run and verbose:
+                print(f"\n{'=' * 60}")
+                print(f"[{case_id}] 工具 {idx}/{len(tools)}")
+                print(json.dumps(payload, ensure_ascii=False, indent=2))
+
+            ok, msg, kid = ingest_one(api_url, payload, dry_run)
+            status_icon = "✓" if ok else "✗"
+            level = logging.INFO if ok else logging.WARNING
+            logger.log(
+                level,
+                "  %s [%d/%d] title=%r  scopes=%d  ext=%d  %s",
+                status_icon, idx, len(tools),
+                title[:40], n_scopes, n_ext, msg,
+            )
+
+            if ok:
+                ok_count += 1
+                # 仅真实导入成功才记台账(dry-run 不写,免污染去重状态)
+                if not dry_run:
+                    try:
+                        db.mark_ingested(case_id, ledger_index, version, kid, api_url)
+                    except Exception as exc:
+                        logger.warning("  ⚠️ 台账写入失败(不影响本次导入):%s", exc)
+            else:
+                fail_count += 1
+
+            if delay_ms > 0 and not dry_run:
+                time.sleep(delay_ms / 1000)
+
+    logger.info(
+        "完成。成功=%d  失败=%d  无工具跳过=%d  已传过跳过=%d  合计导入=%d",
+        ok_count, fail_count, skip_count, dup_count, ok_count,
+    )
+    if fail_count:
+        sys.exit(1)
+
+
+# ── CLI ───────────────────────────────────────────────────────────────────────
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="把 DB 中已采纳的工具解构(mode_tools)批量导入知识接口",
+        formatter_class=argparse.RawDescriptionHelpFormatter,
+    )
+    parser.add_argument("--api-url", default=DEFAULT_API_URL, metavar="URL",
+                        help=f"后端 API 根地址(默认:{DEFAULT_API_URL})")
+    parser.add_argument("--dry-run", action="store_true",
+                        help="仅从 DB 取数并组装 payload,不实际调用接口")
+    parser.add_argument("--verbose", "-v", action="store_true",
+                        help="dry-run 时打印完整 payload JSON")
+    parser.add_argument("--delay", type=int, default=100, metavar="MS",
+                        help="两次 API 调用之间的间隔毫秒数(默认:100)")
+    parser.add_argument("--query-id", default=None, metavar="QID",
+                        help="只导入该搜索任务(query_id)下的采纳 case")
+    parser.add_argument("--limit", type=int, default=None, metavar="N",
+                        help="只处理前 N 个 case(调试用)")
+    parser.add_argument("--force", action="store_true",
+                        help="忽略去重台账,强制重导(换 prompt/模型、需覆盖时用)")
+
+    args = parser.parse_args()
+    run(
+        api_url=args.api_url,
+        dry_run=args.dry_run,
+        verbose=args.verbose,
+        delay_ms=args.delay,
+        query_id=args.query_id,
+        limit=args.limit,
+        force=args.force,
+    )
+
+
+if __name__ == "__main__":
+    main()

+ 16 - 0
examples/process_pipeline/script/extract_sources.py

@@ -383,6 +383,22 @@ def _normalize_post_in_place(platform: str, post: Dict[str, Any]) -> None:
             _normalize_sph_post(post)
             _normalize_sph_post(post)
         except Exception:
         except Exception:
             pass
             pass
+    elif platform == "x":
+        # X 爬虫返回 image_url_list/video_url_list(对象数组),通用 schema 用
+        # images/videos(扁平 URL 数组)。拍平补齐,否则下游(配图评估/入库)取不到图。
+        if "images" not in post:
+            post["images"] = [i.get("image_url") for i in (post.get("image_url_list") or [])
+                              if isinstance(i, dict) and i.get("image_url")]
+        if "videos" not in post:
+            post["videos"] = [v.get("video_url") for v in (post.get("video_url_list") or [])
+                              if isinstance(v, dict) and v.get("video_url")]
+        # X 的原文链接在 content_link;link/url 为空,补上(下游 source_url 取 link)。
+        if not post.get("link"):
+            cid = post.get("channel_content_id")
+            acct = post.get("channel_account_name")
+            post["link"] = post.get("content_link") or (
+                f"https://x.com/{acct}/status/{cid}" if (acct and cid)
+                else (f"https://x.com/i/status/{cid}" if cid else ""))
 
 
 
 
 def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]:
 def build_cache_index(cache_dir: Path, trace_ids: Optional[List[str]] = None) -> Dict[Tuple[str, str], Dict[str, Any]]: