# -*- coding: utf-8 -*- """mode_workflow · MySQL 持久化(DB 为唯一事实源) ================================================================================ 读 .env 的 MYSQL_* 连接 MySQL。四张表: search_process —— 每行一个 (query, 帖子):工序方向的搜索 + llm 评估结果 search_tools —— 同结构,工具方向的搜索结果(方向由表区分,不再用 mode_type 列) mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列) mode_tools —— 每行一个解构出的工具 与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise, 不做"失败不阻断"。读侧保留防御(返回空/None)。 用法: python db.py init # 建表(幂等) python db.py check # 打印四表行数 python db.py clear # 清空四表数据(TRUNCATE) """ import json import os import sys from datetime import datetime from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[2] sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() import pymysql from pymysql.cursors import DictCursor from dbutils.pooled_db import PooledDB # ── 连接池 ────────────────────────────────────────────────────────────────── # MySQL 是远程 RDS,每次 pymysql.connect() 的 TCP+鉴权握手 ~0.5s。旧实现每个 # 请求新建一条连接,一次"点开帖子"要 2~3 个请求 = 2~3 次握手 ≈ 1s。改用连接池 # 复用长连接后,握手只在池初始化时各发生一次,后续取连接近乎零开销。 # server.py 是 ThreadingHTTPServer(每请求一线程),PooledDB 线程安全,正好匹配。 # 注意:fetch_* 里的 conn.close() 在池连接上语义是"归还池中"而非真正断开。 _POOL = None def _pool(): global _POOL if _POOL is None: if not os.getenv("MYSQL_HOST"): raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置") _POOL = PooledDB( creator=pymysql, mincached=2, # 启动即预热 2 条,首点不再吃冷握手 maxcached=5, # 空闲保留上限 maxconnections=20, # 并发上限(ThreadingHTTPServer 线程数) blocking=True, # 连接耗尽时等待而非报错 ping=1, # 取用前 ping,自动剔除被 RDS 掐断的死连接 host=os.getenv("MYSQL_HOST"), port=int(os.getenv("MYSQL_PORT", 3306)), user=os.getenv("MYSQL_USER"), password=os.getenv("MYSQL_PASSWORD"), database=os.getenv("MYSQL_DATABASE"), charset="utf8mb4", cursorclass=DictCursor, autocommit=True, connect_timeout=10, ) return _POOL def _conn(): """从池取一条连接;用法不变(with cursor / conn.close() 归还池)。""" return _pool().connection() # ── DDL ────────────────────────────────────────────────────────────────────── SEARCH_TABLES = {"process": "search_process", "tools": "search_tools"} MODE_TABLES = {"process": "mode_process", "tools": "mode_tools"} def _search_table(mode_or_table): """mode(process/tools)或表名 → 合法搜索表名(白名单,防 SQL 注入)。""" t = SEARCH_TABLES.get(mode_or_table, mode_or_table) if t not in SEARCH_TABLES.values(): raise ValueError(f"未知搜索表/模式: {mode_or_table!r}") return t def _mode_table(mode_or_table): """mode(process/tools)或表名 → 合法解构表名(白名单,防 SQL 注入)。""" t = MODE_TABLES.get(mode_or_table, mode_or_table) if t not in MODE_TABLES.values(): raise ValueError(f"未知解构表/模式: {mode_or_table!r}") return t def _ddl_search(table, direction): return f""" CREATE TABLE IF NOT EXISTS {table} ( id BIGINT AUTO_INCREMENT PRIMARY KEY, query_id VARCHAR(32) NOT NULL COMMENT 'q0000', query_text VARCHAR(512) NULL, case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId', platform VARCHAR(32) NULL, channel_content_id VARCHAR(128) NULL, title VARCHAR(512) NULL, url VARCHAR(1024) NULL, content_type VARCHAR(32) NULL, body LONGTEXT NULL, images JSON NULL, videos JSON NULL, like_count INT NULL, publish_time VARCHAR(64) NULL, quality_score FLOAT NULL COMMENT 'post._quality_score', quality_grade VARCHAR(8) NULL, found_by JSON NULL COMMENT '命中的措辞数组', knowledge_type JSON NULL COMMENT '["能力","工序","工具"] 子集', overall_score FLOAT NULL COMMENT '(相关均值+质量均值)/2', llm_evaluation JSON NULL COMMENT '评估全量 blob', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_qid_case (query_id, case_id), KEY idx_platform (platform) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果({direction})'; """ DDL_PROCESS = """ CREATE TABLE IF NOT EXISTS mode_process ( id BIGINT AUTO_INCREMENT PRIMARY KEY, query_id VARCHAR(32) NOT NULL, case_id VARCHAR(128) NOT NULL, platform VARCHAR(32) NULL, post_title VARCHAR(512) NULL, source JSON NULL COMMENT '解构返回的 source 块', procedure_id VARCHAR(16) NULL COMMENT 'p1,p2…', name VARCHAR(255) NULL, purpose TEXT NULL, category VARCHAR(32) NULL COMMENT '产物创造/资产建设/自动化/分析/学习', declarations JSON NULL, type_registry JSON NULL, steps JSON NULL COMMENT '步骤数组全量', step_count INT NULL, tools_used JSON NULL COMMENT '从 steps[].via 去重提取', model VARCHAR(64) NULL, version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM,保留历史;link_* 为跨 query 复制(cost=0)', cost_usd DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)', duration_s FLOAT NULL, seq SMALLINT NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq), KEY idx_case_ver (case_id, version), KEY idx_qid (query_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)'; """ DDL_TOOLS = """ CREATE TABLE IF NOT EXISTS mode_tools ( id BIGINT AUTO_INCREMENT PRIMARY KEY, query_id VARCHAR(32) NOT NULL, case_id VARCHAR(128) NOT NULL, platform VARCHAR(32) NULL, post_title VARCHAR(512) NULL, source JSON NULL COMMENT '解构时帖子来源块(tool_extract._row_to_source 产出)', tool_name VARCHAR(255) NULL, substance_scope JSON NULL COMMENT '实质作用域(数组)', form_scope JSON NULL COMMENT '形式作用域(数组或null)', creation_layer VARCHAR(32) NULL COMMENT '制作层/创作层', source_link VARCHAR(1024) NULL, input_desc TEXT NULL, output_desc TEXT NULL, usage_json JSON NULL, cases_json JSON NULL, defects_json JSON NULL, updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间', model VARCHAR(64) NULL, version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM;link_* 为跨 query 复制(cost=0)', cost_usd DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重', duration_s FLOAT NULL, seq SMALLINT NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq), KEY idx_case_ver (case_id, version), KEY idx_qid (query_id), KEY idx_tool_name (tool_name) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)'; """ # 工序知识「已导入知识库」台账:防重复上传(stages/import_process_knowledge.py 用)。 # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本: # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。 # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。 # 注:工具知识用独立的 tools_ingest_log,不与本表混用(case_id 是帖子物理身份, # 同帖可能既被工序解构又被工具解构,共表会在 (case_id, index) 上撞键)。 DDL_INGEST_LOG = """ CREATE TABLE IF NOT EXISTS knowledge_ingest_log ( id BIGINT AUTO_INCREMENT PRIMARY KEY, case_id VARCHAR(128) NOT NULL, proc_index INT NOT NULL COMMENT '工序序号(1-based),对齐导入脚本枚举', version VARCHAR(32) NULL COMMENT '导入时 mode_process 版本;变了应重导', knowledge_id VARCHAR(128) NULL COMMENT '接口返回的 knowledge_id', api_url VARCHAR(255) NULL, ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_case_proc (case_id, proc_index) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序知识已导入台账(防重复上传)'; """ # 工具知识「已导入知识库」台账:语义同 knowledge_ingest_log,但针对工具方向独立成表 # (stages/import_tools_knowledge.py 用)。每条知识 = 某 case 的某个工具(tool_index 1-based), # 版本记录导入时的 mode_tools 版本;变了(重解构)应重导,不变即「已传过」跳过。 DDL_TOOLS_INGEST_LOG = """ CREATE TABLE IF NOT EXISTS tools_ingest_log ( id BIGINT AUTO_INCREMENT PRIMARY KEY, case_id VARCHAR(128) NOT NULL, tool_index INT NOT NULL COMMENT '工具序号(1-based),对齐导入脚本枚举', version VARCHAR(32) NULL COMMENT '导入时 mode_tools 版本;变了应重导', knowledge_id VARCHAR(128) NULL COMMENT '接口返回的 knowledge_id', api_url VARCHAR(255) NULL, ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_case_tool (case_id, tool_index) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具知识已导入台账(防重复上传)'; """ def _ensure_column(cur, table, column, column_ddl): """给已存在的表幂等补列:列已存在则跳过(MySQL ADD COLUMN 无 IF NOT EXISTS)。 column_ddl 为 ADD COLUMN 后的完整定义,如 \"source JSON NULL ... AFTER post_title\"。""" cur.execute("""SELECT COUNT(*) AS n FROM information_schema.columns WHERE table_schema=DATABASE() AND table_name=%s AND column_name=%s""", (table, column)) if cur.fetchone()["n"] == 0: cur.execute(f"ALTER TABLE {table} ADD COLUMN {column_ddl}") def _ensure_unique_index(cur, table, index_name, cols): """幂等加唯一索引:已存在则跳过(MySQL ADD INDEX 无 IF NOT EXISTS)。 cols 为列表达式,如 "query_id, case_id, version, seq"。加之前需保证无冲突数据。""" cur.execute("""SELECT COUNT(*) AS n FROM information_schema.statistics WHERE table_schema=DATABASE() AND table_name=%s AND index_name=%s""", (table, index_name)) if cur.fetchone()["n"] == 0: cur.execute(f"ALTER TABLE {table} ADD UNIQUE KEY {index_name} ({cols})") def init_tables(): conn = _conn() try: with conn.cursor() as cur: cur.execute(_ddl_search("search_process", "工序方向")) cur.execute(_ddl_search("search_tools", "工具方向")) cur.execute(DDL_PROCESS) cur.execute(DDL_TOOLS) cur.execute(DDL_INGEST_LOG) cur.execute(DDL_TOOLS_INGEST_LOG) # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。 # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。 for t in ("mode_process", "mode_tools"): cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL") # 历史库迁移:给老 mode_tools 补 source 列(MySQL 的 ADD COLUMN 无 IF NOT EXISTS, # 故先查 information_schema 判存在,缺了才 ADD,幂等)。 _ensure_column(cur, "mode_tools", "source", "source JSON NULL COMMENT '解构时帖子来源块' AFTER post_title") # 历史库迁移:加 seq(帖内序号)+ (query_id,case_id,version,seq) 唯一键,防并发/重复 # 写入产生重复行。顺序必须是 加列 → 回填 → 加唯一键。MySQL 5.7 无窗口函数,seq 在 # 应用层按 (query_id,case_id,version) 内 id 升序回填(现有数据该粒度已无重复)。 for t in ("mode_process", "mode_tools"): _ensure_column(cur, t, "seq", "seq SMALLINT NULL COMMENT '帖内序号(0-based)' AFTER duration_s") for t in ("mode_process", "mode_tools"): cur.execute(f"""SELECT id, query_id, case_id, version FROM {t} WHERE seq IS NULL ORDER BY query_id, case_id, version, id""") key, n, ups = None, 0, [] for r in cur.fetchall(): k = (r["query_id"], r["case_id"], r["version"]) if k != key: key, n = k, 0 ups.append((n, r["id"])); n += 1 if ups: cur.executemany(f"UPDATE {t} SET seq=%s WHERE id=%s", ups) print(f" ↳ {t}: 回填 seq {len(ups)} 行") for t in ("mode_process", "mode_tools"): _ensure_unique_index(cur, t, "uk_q_case_ver_seq", "query_id, case_id, version, seq") print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, " "knowledge_ingest_log, tools_ingest_log") finally: conn.close() def clear_tables(): """清空四张表的数据(TRUNCATE,表结构保留)。""" conn = _conn() try: with conn.cursor() as cur: for t in ("search_process", "search_tools", "mode_process", "mode_tools"): cur.execute(f"TRUNCATE TABLE {t}") print(f"🧹 已清空 {t}") finally: conn.close() # ── 工具函数 ────────────────────────────────────────────────────────────────── def _loads(v, default=None): """pymysql 的 JSON 列可能返回字符串,统一解析。""" if v is None: return default if isinstance(v, (list, dict)): return v try: return json.loads(v) except Exception: return default def _j(v): """写入 JSON 列:None 保持 NULL,其余 dumps。""" return None if v is None else json.dumps(v, ensure_ascii=False) def _collect_scores(node): """递归收集嵌套评估里所有「得分」。LLM 直出的得分多为字符串("1"/"4"), 个别为数字(如 时效性 10),统一按 float 解析;非数值(如 "N/A")跳过不计入。""" out = [] if isinstance(node, dict): for k, v in node.items(): if k == "得分": try: out.append(float(v)) except (TypeError, ValueError): pass else: out.extend(_collect_scores(v)) elif isinstance(node, list): for v in node: out.extend(_collect_scores(v)) return out def overall_score(e): """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。""" parts = [] for key in ("相关性", "质量"): scores = _collect_scores((e or {}).get(key)) if scores: parts.append(sum(scores) / len(scores)) return round(sum(parts) / len(parts), 2) if parts else None def _recency_hard(date_str): """硬时效(同 mode_procedure/server.py:_recency_hard):半年内=3 / 两年内=2 / 更早=1。 publish_time 头 10 字符按 YYYY-MM-DD 解析,失败返回 None(不参与判定)。""" try: d = datetime.strptime(str(date_str or "")[:10], "%Y-%m-%d") except (ValueError, TypeError): return None days = (datetime.now() - d).days if days <= 180: return 3 if days <= 730: return 2 return 1 def _fixed_dim_score(evaluation, name): """取 质量.固定维度..得分 标量,缺失/非数值返回 None(不参与判定)。""" v = (((evaluation or {}).get("质量") or {}).get("固定维度") or {}).get(name) if isinstance(v, dict): v = v.get("得分") try: return float(v) if v is not None else None except (TypeError, ValueError): return None def _impl_score(evaluation): """取 质量.动态维度.工序.字段完整性.实现完整性.得分 标量,缺失/非数值返回 None。 新版 prompt 把旧「可复现性」的硬封顶规则并入了「实现完整性」,故采纳门槛改读此处。""" v = ((((((evaluation or {}).get("质量") or {}).get("动态维度") or {}) .get("工序") or {}).get("字段完整性") or {}).get("实现完整性")) if isinstance(v, dict): v = v.get("得分") try: return float(v) if v is not None else None except (TypeError, ValueError): return None def _repro_score(evaluation): """采纳门槛用的「可复现/可实现」得分:优先旧版「可复现性」(固定维度), 缺失则回退新版「实现完整性」(动态维度.工序)。这样新旧两套评估 blob 都能正确判定。""" v = _fixed_dim_score(evaluation, "可复现性") return v if v is not None else _impl_score(evaluation) def is_adopted(overall, evaluation, publish_time): """采纳/命中判定,口径对齐 mode_procedure 的 decision=="report": 制作相关性<4、可复现/实现完整性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。 (意图可控性暂只采分不设门槛,留待阈值标定后再开。) 可复现/实现门槛兼容新旧 schema:旧版读「可复现性」,新版读「实现完整性」(见 _repro_score)。 fail-closed:评估失败(_error)、blob 缺失/为空、或综合分算不出(None)→ 直接判不采纳。 评不出的帖子不该混进命中集(此前 fail-open 会因各指标取不到值而误判采纳)。""" if not isinstance(evaluation, dict) or not evaluation or evaluation.get("_error"): return False if overall is None: return False rel = None v = ((evaluation or {}).get("相关性") or {}).get("和内容制作知识相关") if isinstance(v, dict): v = v.get("得分") try: rel = float(v) if v is not None else None except (TypeError, ValueError): rel = None if rel is not None and rel < 4: return False repro = _repro_score(evaluation) if repro is not None and repro < 4: return False rh = _recency_hard(publish_time) if rh is not None and rh < 2: return False if overall is not None and float(overall) < 6: return False return True def is_adopted_rel(overall, rel, publish_time, repro=None): """is_adopted 的轻量版:相关性得分(rel)、可复现/实现门槛(repro)已由 SQL JSON_EXTRACT 直接取出(repro 由 _REPRO_SQL 兼容新旧 schema 取值),无需传输/解析整块 llm_evaluation。 判定口径与 is_adopted 完全一致(含 fail-closed:综合分算不出→不采纳;失败帖的 overall_score 列为 NULL)。""" if overall is None: return False try: rel = float(rel) if rel is not None else None except (TypeError, ValueError): rel = None if rel is not None and rel < 4: return False try: repro = float(repro) if repro is not None else None except (TypeError, ValueError): repro = None if repro is not None and repro < 4: return False rh = _recency_hard(publish_time) if rh is not None and rh < 2: return False if overall is not None and float(overall) < 6: return False return True # ── search_process / search_tools ──────────────────────────────────────────── def upsert_search_posts(query_id, query_text, results, table="search_process"): """一组搜索结果写入指定搜索表(按 (query_id, case_id) upsert)。返回写入条数。 table:search_process(工序方向) / search_tools(工具方向)。""" table = _search_table(table) if not results: return 0 rows = [] for r in results: post = r.get("post") or {} e = r.get("llm_evaluation") or {} rows.append(( query_id, query_text, r.get("case_id"), r.get("platform"), r.get("channel_content_id"), (post.get("title") or post.get("desc") or "")[:500], r.get("source_url"), post.get("content_type"), post.get("body_text") or post.get("desc") or "", _j(post.get("images") or []), _j(post.get("videos") or []), post.get("like_count"), str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64], post.get("_quality_score"), post.get("_quality_grade"), _j(r.get("found_by_queries") or []), _j(e.get("知识类型") or []), overall_score(e), _j(e), )) sql = f""" INSERT INTO {table} (query_id, query_text, case_id, platform, channel_content_id, title, url, content_type, body, images, videos, like_count, publish_time, quality_score, quality_grade, found_by, knowledge_type, overall_score, llm_evaluation) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE query_text=VALUES(query_text), platform=VALUES(platform), channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url), content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images), videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time), quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade), found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type), overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation); """ conn = _conn() try: with conn.cursor() as cur: cur.executemany(sql, rows) return len(rows) finally: conn.close() # 占位帖 case_id:query 列表由 search_process 按 query_id 聚合得出(无独立 query 主表), # 一个 query 要进列表必须至少有一行。为支持「只登记 query、不触发搜索」,给这类 query 写 # 一行哨兵帖,只承载 query_id+query_text。该哨兵行不属于任何真实帖子,故所有「帖子视图 / # 统计」读取点都用 _REAL_POST 过滤掉它(fetch_queries 的 post_count、fetch_posts、 # fetch_all_posts、count_executed_queries、fetch_dashboard_rows)。真搜不会用到此 case_id。 PENDING_CASE_ID = "__pending__" _REAL_POST = f"case_id <> '{PENDING_CASE_ID}'" def add_pending_process_queries(texts): """把一批 query 词作为「占位 query」加入工序 query 列表(search_process),不触发搜索/解构。 每条新增写一行哨兵帖(case_id=PENDING_CASE_ID,只填 query_id/query_text)。 去重:① 文件内重复保序去重;② query_text 已存在于 search_process(含此前占位)则跳过。 query_id 跨 process/tools 统一续号,避免与工具方向撞号。返回 (added, skipped)。""" seen, cleaned = set(), [] for t in texts: t = (t or "").strip() if t and t not in seen: seen.add(t) cleaned.append(t) conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT DISTINCT query_text FROM search_process WHERE query_text IS NOT NULL") existing = {r["query_text"] for r in cur.fetchall()} cur.execute("SELECT query_id FROM search_process " "UNION SELECT query_id FROM search_tools") nums = [int(r["query_id"][1:]) for r in cur.fetchall() if r["query_id"] and r["query_id"].startswith("q") and r["query_id"][1:].isdigit()] nxt = (max(nums) + 1) if nums else 0 rows = [] for t in cleaned: if t in existing: continue rows.append((f"q{nxt:04d}", t, PENDING_CASE_ID)) nxt += 1 if rows: cur.executemany( "INSERT INTO search_process (query_id, query_text, case_id) " "VALUES (%s,%s,%s)", rows) return len(rows), len(cleaned) - len(rows) finally: conn.close() def fetch_queries(mode="process"): """某方向搜索表的 query 列表 + 帖子数 + 采纳/命中数 + 解构进度。""" table = _search_table(mode) conn = _conn() try: with conn.cursor() as cur: # post_count 只数真实帖,占位哨兵行不计(占位 query 显示为 0 帖); # GROUP BY 仍含占位 query_id,故无搜索的 query 也会出现在列表里。 cur.execute(f"""SELECT query_id, MAX(query_text) AS query_text, COUNT(CASE WHEN {_REAL_POST} THEN 1 END) AS post_count FROM {table} GROUP BY query_id ORDER BY query_id""") queries = cur.fetchall() # 采纳数:SQL 直取 rel/repro 标量算,**不拉整表 llm_evaluation**(旧版全表 blob,切 tab 巨慢) cur.execute(f"""SELECT query_id, overall_score, publish_time, {_REL_SQL} AS rel, {_REPRO_SQL} AS repro FROM {table}""") hits = {} for r in cur.fetchall(): if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"]): hits[r["query_id"]] = hits.get(r["query_id"], 0) + 1 cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id") np = {r["query_id"]: r["n"] for r in cur.fetchall()} cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id") nt = {r["query_id"]: r["n"] for r in cur.fetchall()} finally: conn.close() for q in queries: q["hit_count"] = hits.get(q["query_id"], 0) q["process_done"] = np.get(q["query_id"], 0) q["tools_done"] = nt.get(q["query_id"], 0) return queries def fetch_posts(query_id, mode="process"): """列表用:只取列表所需列 + SQL 直取 adopted 标量,**不拉 body/videos/llm_evaluation 大字段** (llm_evaluation ~1.5MB/帖,旧版 SELECT * 切 tab/选 query 要几十 MB 过远程 RDS,故慢)。 正文/评分等详情按需走 fetch_post。带 adopted/has_process/has_tools;adopted 口径用 is_adopted_rel(与 is_adopted 完全一致,rel/repro 由 _REL_SQL/_REPRO_SQL 直取标量)。""" table = _search_table(mode) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT id, query_id, query_text, case_id, platform, channel_content_id, title, url, content_type, images, like_count, publish_time, quality_score, quality_grade, found_by, knowledge_type, overall_score, {_REL_SQL} AS rel, {_REPRO_SQL} AS repro FROM {table} WHERE query_id=%s AND {_REAL_POST} ORDER BY overall_score DESC, id""", (query_id,)) rows = cur.fetchall() cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,)) hp = {r["case_id"] for r in cur.fetchall()} cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,)) ht = {r["case_id"] for r in cur.fetchall()} # 已归类(工序):hp 中各 case 最新真实版的工序里有任一步骤含 substanceMatch(与归类回写 # 同口径)。库端 LIKE 算、只回 0/1,不拉 steps。聚合逻辑见 _categorized_from_rows # (不能只看 id 最大行——空 steps 的 procedure 行永远不含,会误判)。 hc = set() if hp: ph = ",".join(["%s"] * len(hp)) cur.execute(f"""SELECT case_id, version, id, (LEFT(version,5)='link_') AS islink, (steps LIKE %s) AS cat FROM mode_process WHERE case_id IN ({ph})""", ['%substanceMatch%'] + list(hp)) hc = _categorized_from_rows(cur.fetchall()) finally: conn.close() for r in rows: for col in ("images", "found_by", "knowledge_type"): r[col] = _loads(r[col]) r["adopted"] = is_adopted_rel(r["overall_score"], r.pop("rel", None), r["publish_time"], r.pop("repro", None)) r["has_process"] = r["case_id"] in hp r["has_tools"] = r["case_id"] in ht r["has_category"] = r["case_id"] in hc return rows def fetch_post(query_id, case_id, table="search_process"): """指定搜索表的单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。""" table = _search_table(table) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {table} WHERE query_id=%s AND case_id=%s", (query_id, case_id)) row = cur.fetchone() finally: conn.close() if not row: return None for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"): row[col] = _loads(row[col]) return row def fetch_all_posts(mode="process", *, query_ids=None, adopted_only=False, distinct=False, limit=None, offset=0): """某方向「全部帖子」:跨所有 query 的列表(瘦身列,口径同 fetch_posts,不拉 body/videos/llm_evaluation 大字段)。fetch_posts 限定单 query,本函数默认取全表。 - query_ids:选填 query_id 列表,传了就 WHERE query_id IN(...) 只取这些 query 的帖子(SQL 层过滤,不拉全表);None=全部,[]=空结果。 - adopted_only=True:只返回采纳帖(is_adopted_rel 口径,rel/repro 由 _REL_SQL/_REPRO_SQL 直取标量算,不拉整表 blob)。 - distinct=True:按 case_id 去重(同一帖被多个 query 搜到时,只保留 overall_score 最高的一行——已按 score 降序,取首次出现即最高分)。 - limit/offset:分页(limit=None 不分页)。 返回 (total, rows):total 为过滤(+去重)后的总条数,rows 为本页切片。""" table = _search_table(mode) # 始终排除占位哨兵行(无搜索的 query 不在帖子视图里出现) where, params = f" WHERE {_REAL_POST}", [] if query_ids is not None: if not query_ids: return 0, [] # 显式空列表:直接空结果,不必查库 where += " AND query_id IN (" + ",".join(["%s"] * len(query_ids)) + ")" params = list(query_ids) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT id, query_id, query_text, case_id, platform, channel_content_id, title, url, content_type, images, like_count, publish_time, quality_score, quality_grade, found_by, knowledge_type, overall_score, {_REL_SQL} AS rel, {_REPRO_SQL} AS repro FROM {table}{where} ORDER BY overall_score DESC, id""", params) rows = cur.fetchall() # has_process/has_tools 全局判定:跨 query 的「该帖是否已解构」,两张解构表各取一次 cur.execute("SELECT DISTINCT case_id FROM mode_process") hp = {r["case_id"] for r in cur.fetchall()} cur.execute("SELECT DISTINCT case_id FROM mode_tools") ht = {r["case_id"] for r in cur.fetchall()} finally: conn.close() out, seen = [], set() for r in rows: for col in ("images", "found_by", "knowledge_type"): r[col] = _loads(r[col]) r["adopted"] = is_adopted_rel(r["overall_score"], r.pop("rel", None), r["publish_time"], r.pop("repro", None)) if adopted_only and not r["adopted"]: continue if distinct: if r["case_id"] in seen: continue seen.add(r["case_id"]) r["has_process"] = r["case_id"] in hp r["has_tools"] = r["case_id"] in ht out.append(r) total = len(out) if limit is not None: out = out[offset:offset + limit] elif offset: out = out[offset:] return total, out def count_executed_queries(mode="process"): """该方向「已执行」的 query 数 = 搜索表里出现过的 distinct query_id 个数。 注:一次搜索若 0 命中则不写任何行,故不计入(口径为「已产出结果的 query」)。""" table = _search_table(mode) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"SELECT COUNT(DISTINCT query_id) AS n FROM {table} WHERE {_REAL_POST}") return cur.fetchone()["n"] finally: conn.close() # ── mode_process ───────────────────────────────────────────────────────────── def replace_process(query_id, case_id, platform, post_title, payload, model, version, cost_usd, duration_s): """写入一帖某版本的工序解构结果(payload = {source, procedures})。 删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。""" source = payload.get("source") procedures = payload.get("procedures") or [] conn = _conn() try: conn.begin() # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行 with conn.cursor() as cur: cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s", (case_id, version)) if procedures: rows = [] for i, p in enumerate(procedures): steps = p.get("steps") or [] vias = [] for s in steps: v = s.get("via") if v and v not in vias: vias.append(v) rows.append(( query_id, case_id, platform, (post_title or "")[:500], _j(source), p.get("id"), (p.get("name") or "")[:250], p.get("purpose"), p.get("category"), _j(p.get("declarations")), _j(p.get("type_registry")), _j(steps), len(steps), _j(vias), model, version, cost_usd, duration_s, i, )) cur.executemany(""" INSERT INTO mode_process (query_id, case_id, platform, post_title, source, procedure_id, name, purpose, category, declarations, type_registry, steps, step_count, tools_used, model, version, cost_usd, duration_s, seq) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, rows) conn.commit() return len(procedures) except Exception: conn.rollback() raise finally: conn.close() def fetch_process_versions(case_id): conn = _conn() try: with conn.cursor() as cur: cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model FROM mode_process WHERE case_id=%s GROUP BY version ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,)) return cur.fetchall() finally: conn.close() def fetch_process(case_id, version=None): """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。""" conn = _conn() try: with conn.cursor() as cur: if version is None: cur.execute("""SELECT version FROM mode_process WHERE case_id=%s ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,)) row = cur.fetchone() if not row: return None version = row["version"] cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s ORDER BY id""", (case_id, version)) rows = cur.fetchall() finally: conn.close() return _proc_payload(case_id, version, rows) def fetch_all_process(case_ids=None, lite=False): """批量取多帖工序解构(每帖取最新真实版,link_ 排后),一次查询拍平。 - case_ids:选填 case_id 列表;None=全表所有有解构的帖,[]=空(直接返回空)。 - lite:True 走精简投影(丢大字段 + 截断 value),供工序库平铺表快速首屏。 返回 {case_id: _proc_payload(...)};无解构记录的 case_id 不出现在结果里。""" if case_ids is not None and not case_ids: return {} where, params = "", [] if case_ids is not None: where = " WHERE case_id IN (" + ",".join(["%s"] * len(case_ids)) + ")" params = list(case_ids) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"SELECT * FROM mode_process{where} ORDER BY case_id, id", params) rows = cur.fetchall() finally: conn.close() by_case = {} for r in rows: by_case.setdefault(r["case_id"], []).append(r) out = {} for cid, crows in by_case.items(): # 选版本:非 link_ 优先(is_real=True 排前),再按 id 最大——口径同 fetch_process best = max(crows, key=lambda r: (not str(r["version"]).startswith("link_"), r["id"])) ver = best["version"] vrows = sorted((r for r in crows if r["version"] == ver), key=lambda r: (r["seq"] if r["seq"] is not None else 0, r["id"])) out[cid] = _proc_payload(cid, ver, vrows, lite=lite) return out def fetch_process_by_query(query_id, case_id, version=None): """同 fetch_process,但用 (query_id, case_id) 精确定位某 query 下该帖的工序 (category-match 用:post_id=query_id / knowledge_id=case_id)。 version=None 取该 (query_id, case_id) 下最新真实版(link_ 排后)。无行返回 None。""" conn = _conn() try: with conn.cursor() as cur: if version is None: cur.execute("""SELECT version FROM mode_process WHERE query_id=%s AND case_id=%s ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (query_id, case_id)) row = cur.fetchone() if not row: return None version = row["version"] cur.execute("""SELECT * FROM mode_process WHERE query_id=%s AND case_id=%s AND version=%s ORDER BY seq, id""", (query_id, case_id, version)) rows = cur.fetchall() finally: conn.close() return _proc_payload(case_id, version, rows) def update_process_steps_by_query(query_id, case_id, version, steps_in_order): """按工序顺序覆盖某 (query_id, case_id, version) 各行的 steps JSON 列。 steps_in_order 必须与 fetch_process_by_query 返回的 procedures 同序(均按 seq, id 升序); 按行 id 一一对应更新,稳健于 seq 不连续。行数与工序数不符则报错回滚。返回更新行数。""" conn = _conn() try: conn.begin() with conn.cursor() as cur: cur.execute("""SELECT id FROM mode_process WHERE query_id=%s AND case_id=%s AND version=%s ORDER BY seq, id""", (query_id, case_id, version)) ids = [r["id"] for r in cur.fetchall()] if len(ids) != len(steps_in_order): raise ValueError(f"行数({len(ids)})与工序数({len(steps_in_order)})不一致") n = 0 for row_id, steps in zip(ids, steps_in_order): cur.execute("UPDATE mode_process SET steps=%s WHERE id=%s", (_j(steps), row_id)) n += cur.rowcount conn.commit() return n except Exception: conn.rollback() raise finally: conn.close() def update_process_steps(case_id, version, steps_in_order): """按工序顺序覆盖某 (case_id, version) 各行的 steps JSON 列(不限 query_id)。 与 fetch_process / fetch_extract 同口径(按 case 的某版本),保证归类回写的版本 与前端 /api/extract 展示的版本一致(否则 link_ 复制帖会写错版本、前端看不到)。 steps_in_order 须与 fetch_process(case_id, version).procedures 同序(按 id 升序)。 行数与工序数不符则报错回滚。返回更新行数。""" conn = _conn() try: conn.begin() with conn.cursor() as cur: cur.execute("""SELECT id FROM mode_process WHERE case_id=%s AND version=%s ORDER BY id""", (case_id, version)) ids = [r["id"] for r in cur.fetchall()] if len(ids) != len(steps_in_order): raise ValueError(f"行数({len(ids)})与工序数({len(steps_in_order)})不一致") n = 0 for row_id, steps in zip(ids, steps_in_order): cur.execute("UPDATE mode_process SET steps=%s WHERE id=%s", (_j(steps), row_id)) n += cur.rowcount conn.commit() return n except Exception: conn.rollback() raise finally: conn.close() def _categorized_from_rows(rows): """rows:[{case_id, version, id, islink(0/1), cat(0/1)}]。返回已归类 case 集合。 口径:每 case 取最新真实版(真实版优先、id 最大),该版本**任一行** cat=1 即已归类。 关键——不能只看「id 最大的那一行」:工序里可能有 steps 为空的 procedure(step_count=0), 其行永远不含 substanceMatch,若恰好 id 最大会误判整条 case 未归类(见该函数修复缘由)。""" best, has = {}, {} for r in rows: c, v = r["case_id"], r["version"] sk = (1 if r["islink"] else 0, -r["id"]) # 真实版(islink=0)优先,其次 id 大;取 min if c not in best or sk < best[c][0]: best[c] = (sk, v) k = (c, v) has[k] = has.get(k, False) or bool(r["cat"]) return {c for c, (sk, v) in best.items() if has.get((c, v))} def fetch_categorized_cases(case_ids, mode="process"): """返回 case_ids 中「已归类」的子集:该 case 最新真实版(link_ 排后)的工序里有任一步骤 含 substanceMatch(归类跑过的非空 step 一定带此 key)。与归类回写/前端展示同口径。 供前端判断「是否已全部归类 → 提示重新归类」。仅工序方向有意义(mode_process)。""" if not case_ids: return set() table = _mode_table(mode) ph = ",".join(["%s"] * len(case_ids)) conn = _conn() try: with conn.cursor() as cur: # 拉各行的 (case_id, version, id, islink, cat);steps LIKE 在库端算,不拉 steps 大字段。 cur.execute(f"""SELECT case_id, version, id, (LEFT(version,5)='link_') AS islink, (steps LIKE %s) AS cat FROM {table} WHERE case_id IN ({ph})""", ['%substanceMatch%'] + list(case_ids)) rows = cur.fetchall() finally: conn.close() return _categorized_from_rows(rows) _LITE_VALUE_LIMIT = 300 # lite 模式 输入/输出 value 截断字节上限 def _trunc(s, limit=_LITE_VALUE_LIMIT): """按 UTF-8 字节截断长文本(不切坏多字节字符),超限追加省略号。非字符串原样返回。""" if not isinstance(s, str): return s b = s.encode("utf-8") if len(b) <= limit: return s return b[:limit].decode("utf-8", "ignore") + "…" def _lite_steps(steps): """lite 模式:仅截断每步 inputs/outputs 的 value(其余字段供平铺表/分组用,保留)。""" if not isinstance(steps, list): return steps for st in steps: if not isinstance(st, dict): continue for io_key in ("inputs", "outputs"): ios = st.get(io_key) if isinstance(ios, list): for io in ios: if isinstance(io, dict) and "value" in io: io["value"] = _trunc(io.get("value")) return steps def _proc_payload(case_id, version, rows, lite=False): """mode_process 行集 → {case_id, version, …, procedures:[...]}。无行返回 None。 lite=True:工序库平铺表只需 procedures[].{id,name,steps},丢弃大字段 (source/declarations/type_registry/tools_used)并截断输入/输出 value; 完整值由前端展开时按 case 调 /api/process 懒加载。""" if not rows: return None if lite: procedures = [{ "id": r["procedure_id"], "name": r["name"], "steps": _lite_steps(_loads(r["steps"], [])), } for r in rows] return {"case_id": case_id, "version": version, "title": rows[0]["post_title"], "procedures": procedures} procedures = [{ "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"], "category": r["category"], "declarations": _loads(r["declarations"]), "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []), "tools_used": _loads(r["tools_used"], []), } for r in rows] return {"case_id": case_id, "version": version, "platform": rows[0]["platform"], "title": rows[0]["post_title"], "model": rows[0]["model"], "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None, "duration_s": rows[0]["duration_s"], "source": _loads(rows[0]["source"]), "procedures": procedures} # ── mode_tools ─────────────────────────────────────────────────────────────── def replace_tools(query_id, case_id, platform, post_title, tools, model, version, cost_usd, duration_s, source=None): """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。 source:帖子来源块(同 mode_process,每行重复存),供知识上传脚本重建 source 用。""" src = _j(source) conn = _conn() try: conn.begin() # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行 with conn.cursor() as cur: cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s", (case_id, version)) if tools: rows = [( query_id, case_id, platform, (post_title or "")[:500], src, (t.get("工具名称") or "")[:250], _j(t.get("实质作用域")), _j(t.get("形式作用域")), t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"), _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")), t.get("最新更新时间"), model, version, cost_usd, duration_s, i, ) for i, t in enumerate(tools)] cur.executemany(""" INSERT INTO mode_tools (query_id, case_id, platform, post_title, source, tool_name, substance_scope, form_scope, creation_layer, source_link, input_desc, output_desc, usage_json, cases_json, defects_json, updated_time, model, version, cost_usd, duration_s, seq) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, rows) conn.commit() return len(tools) except Exception: conn.rollback() raise finally: conn.close() def fetch_tools_versions(case_id): conn = _conn() try: with conn.cursor() as cur: cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model FROM mode_tools WHERE case_id=%s GROUP BY version ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,)) return cur.fetchall() finally: conn.close() def fetch_tools(case_id, version=None): """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。""" conn = _conn() try: with conn.cursor() as cur: if version is None: cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,)) row = cur.fetchone() if not row: return None version = row["version"] cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s ORDER BY id""", (case_id, version)) rows = cur.fetchall() finally: conn.close() return _tools_payload(case_id, version, rows) def _tools_payload(case_id, version, rows): """mode_tools 行集 → {case_id, version, …, tools:[...]}。无行返回 None。""" if not rows: return None tools = [{ "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]), "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"], "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"], "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]), "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"], } for r in rows] return {"case_id": case_id, "version": version, "platform": rows[0]["platform"], "title": rows[0]["post_title"], "model": rows[0]["model"], "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None, "duration_s": rows[0]["duration_s"], "source": _loads(rows[0].get("source")), "tool_count": len(tools), "tools": tools} # ── 点击帖子合一查询(单连接,最少往返;远程 RDS 每次往返 ~80ms,故按次数优化)── def fetch_extract(mode, case_id, version=None): """一次取版本列表 + 解构详情,复用同一条池连接、最少往返。 返回 {versions, data, missing}。mode: process / tools。""" is_proc = mode != "tools" mtable = _mode_table("process" if is_proc else "tools") conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT version, COUNT(*) AS n, MAX(model) AS model FROM {mtable} WHERE case_id=%s GROUP BY version ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,)) versions = cur.fetchall() # 详情:把"取最新版本"折进同一条 SQL,版本指定时直接用;省一次往返。 target = version or (versions[0]["version"] if versions else None) rows = [] if target is not None: cur.execute(f"SELECT * FROM {mtable} WHERE case_id=%s AND version=%s ORDER BY id", (case_id, target)) rows = cur.fetchall() finally: conn.close() payload = (_proc_payload if is_proc else _tools_payload)(case_id, target, rows) return {"versions": versions, "data": payload, "missing": payload is None} # ── 跨 query 去重 / link 复制(方案A:解构前先去重,避免重复花钱)────────────── # case_id 是帖子物理身份(platform_channelContentId),与 query 无关。同一帖被多个 # query 搜到时只需真实解构一次;其余 query 用 link_* 复制行补齐关联(cost=0)。 def latest_real_version(case_id, mode="process"): """该 case 是否已有「真实」解构(任意 query;link_* 是复制品,不算源)。 返回最新一行 {"version","query_id"} 或 None。给解构前去重判定用。""" table = _mode_table(mode) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT version, query_id FROM {table} WHERE case_id=%s AND LEFT(version,5) <> 'link_' ORDER BY id DESC LIMIT 1""", (case_id,)) return cur.fetchone() finally: conn.close() def link_process(query_id, case_id, mode="process"): """把 case 在别处最新「真实」版本的解构行复制到目标 query (version='link_'+源版本, cost_usd=0)。幂等(先删目标同版本)。 返回复制行数;该 case 从未真实解构过则返回 0(无源可复制)。""" table = _mode_table(mode) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT version FROM {table} WHERE case_id=%s AND LEFT(version,5) <> 'link_' ORDER BY id DESC LIMIT 1""", (case_id,)) r = cur.fetchone() if not r: return 0 srcver = r["version"] newver = ("link_" + srcver)[:32] # version 列 VARCHAR(32) # 复制除自增 id / 时间戳外的全部列,改写 query_id / version / cost。 cur.execute(f"SHOW COLUMNS FROM {table}") cols = [c["Field"] for c in cur.fetchall() if c["Field"] not in ("id", "created_at", "updated_at")] cur.execute(f"SELECT {','.join(cols)} FROM {table} WHERE case_id=%s AND version=%s", (case_id, srcver)) rows = cur.fetchall() cur.execute(f"DELETE FROM {table} WHERE query_id=%s AND case_id=%s AND version=%s", (query_id, case_id, newver)) for row in rows: row = dict(row) row["query_id"] = query_id row["version"] = newver row["cost_usd"] = 0 cur.execute( f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['%s']*len(cols))})", [row[k] for k in cols]) return len(rows) finally: conn.close() # ── Dashboard 原始行(指标计算在 server.py)───────────────────────────────────── # 采纳判定只需「和内容制作知识相关」的得分,用 SQL JSON_EXTRACT 直取这一个标量, # 避免把整块 llm_evaluation(本库 ~1.5MB)拉到 Python 再解析。得分可能直接是数字, # 也可能裹在 {"得分": x} 里,COALESCE 两条路径覆盖两种存法,口径同 is_adopted。 _REL_SQL = ("JSON_UNQUOTE(COALESCE(" "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\".\"得分\"')," "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\"')))") # 可复现/实现门槛标量直取(口径同 is_adopted 的 _repro_score):兼容新旧 schema—— # 旧版「质量.固定维度.可复现性」,新版「质量.动态维度.工序.字段完整性.实现完整性」,COALESCE 依次回退。 _REPRO_SQL = ("JSON_UNQUOTE(COALESCE(" "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\".\"得分\"')," "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\"')," "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"动态维度\".\"工序\".\"字段完整性\".\"实现完整性\".\"得分\"')," "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"动态维度\".\"工序\".\"字段完整性\".\"实现完整性\"')))") def fetch_adopted_process_cases(query_id=None): """返回「已采纳且有工序解构」的 case_id 列表(供知识上传脚本用)。 采纳是帖子级属性(评估存在 search_process),工序解构存在 mode_process,故二者 JOIN: 只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。 relevance 得分由 _REL_SQL 直取标量,不传整块 llm_evaluation。 query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。 """ sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, " f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro " "FROM search_process s " "JOIN (SELECT DISTINCT case_id FROM mode_process) m ON s.case_id = m.case_id") params = () if query_id: sql += " WHERE s.query_id=%s" params = (query_id,) conn = _conn() try: with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() finally: conn.close() cases = [r["case_id"] for r in rows if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])] return sorted(set(cases)) def fetch_adopted_tools_cases(query_id=None): """返回「已采纳且有工具解构」的 case_id 列表(供工具知识上传脚本用)。 与 fetch_adopted_process_cases 完全同构,只把搜索/解构表换成工具方向: 采纳是帖子级属性(评估存在 search_tools),工具解构存在 mode_tools,故二者 JOIN, 只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。 query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。 """ sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, " f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro " "FROM search_tools s " "JOIN (SELECT DISTINCT case_id FROM mode_tools) m ON s.case_id = m.case_id") params = () if query_id: sql += " WHERE s.query_id=%s" params = (query_id,) conn = _conn() try: with conn.cursor() as cur: cur.execute(sql, params) rows = cur.fetchall() finally: conn.close() cases = [r["case_id"] for r in rows if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])] return sorted(set(cases)) def route_tables(knowledge_types): """知识类型标签 → 落表列表(有序去重)。 工序/能力 → search_process;工具 → search_tools;两者都含写两表;空/None 兜底 search_process。 评估是统一一套(同一 llm_evaluation blob),故同帖落多表不重复打分,只是多写一行。""" kt = set(knowledge_types or []) tables = [] if kt & {"工具"}: tables.append("search_tools") if (kt & {"工序", "能力"}) or not tables: # 工序/能力,或没命中任何已知标签 → 兜底 process tables.insert(0, "search_process") return tables # ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)────────── def fetch_existing_eval(case_id, table="search_process"): """返回该 case 在搜索表里最近一条「有效」评估 blob(任意 query)。 评估去重用:同帖在别的相似 query 下评过时,复用其 query 无关分(质量/通用相关/时效), 只重算「和 query 相关」。无有效评估(全是 _error 或没评过)返回 None。 取最近若干条逐一挑出首个非 error、结构完整的 blob。""" table = _search_table(table) conn = _conn() try: with conn.cursor() as cur: cur.execute(f"""SELECT llm_evaluation FROM {table} WHERE case_id=%s AND llm_evaluation IS NOT NULL ORDER BY updated_at DESC, id DESC LIMIT 5""", (case_id,)) rows = cur.fetchall() finally: conn.close() for r in rows: e = _loads(r["llm_evaluation"]) if isinstance(e, dict) and not e.get("_error") and isinstance(e.get("相关性"), dict): return e return None def fetch_existing_eval_any(case_id): """跨两张搜索表找该 case 最近一条有效评估 blob。 评估与表无关(统一一套),任一表评过即可复用,避免同帖在两表各评一次。无则 None。""" for table in ("search_process", "search_tools"): e = fetch_existing_eval(case_id, table) if e: return e return None def update_post_eval(query_id, case_id, evaluation, table="search_process"): """用新的评估 blob 覆盖某 (query, case) 行的 llm_evaluation,并同步重算派生列 overall_score、knowledge_type(口径同 upsert_search_posts)。返回受影响行数。""" table = _search_table(table) overall = overall_score(evaluation) ktype = evaluation.get("知识类型") if isinstance(evaluation, dict) else None conn = _conn() try: with conn.cursor() as cur: n = cur.execute( f"UPDATE {table} SET llm_evaluation=%s, overall_score=%s, knowledge_type=%s " "WHERE query_id=%s AND case_id=%s", (_j(evaluation), overall, _j(ktype), query_id, case_id)) return n finally: conn.close() # ── 上传去重:知识库已导入台账(stages/import_process_knowledge.py 用)──────────────── def fetch_ingested_map(case_id): """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。""" conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT proc_index, version FROM knowledge_ingest_log WHERE case_id=%s", (case_id,)) return {r["proc_index"]: r["version"] for r in cur.fetchall()} finally: conn.close() def mark_ingested(case_id, proc_index, version, knowledge_id=None, api_url=None): """记一条「已导入」台账(case_id+proc_index 唯一,重导同序号则更新版本/knowledge_id)。""" conn = _conn() try: with conn.cursor() as cur: cur.execute("""INSERT INTO knowledge_ingest_log (case_id, proc_index, version, knowledge_id, api_url) VALUES (%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE version=VALUES(version), knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""", (case_id, proc_index, version, knowledge_id, api_url)) finally: conn.close() def fetch_tools_ingested_map(case_id): """返回 {tool_index: version} —— 该 case 各工具已导入知识库的版本。空表示没传过。 工具方向独立台账(tools_ingest_log),与工序的 knowledge_ingest_log 互不干扰。""" conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT tool_index, version FROM tools_ingest_log WHERE case_id=%s", (case_id,)) return {r["tool_index"]: r["version"] for r in cur.fetchall()} finally: conn.close() def mark_tools_ingested(case_id, tool_index, version, knowledge_id=None, api_url=None): """记一条工具「已导入」台账(case_id+tool_index 唯一,重导同序号则更新版本/knowledge_id)。""" conn = _conn() try: with conn.cursor() as cur: cur.execute("""INSERT INTO tools_ingest_log (case_id, tool_index, version, knowledge_id, api_url) VALUES (%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE version=VALUES(version), knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""", (case_id, tool_index, version, knowledge_id, api_url)) finally: conn.close() def fetch_dashboard_rows(): """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。 优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分; ② steps 只取每个 case 的最新版本(覆盖度只看最新版),历史/link_ 版本不传 steps。""" conn = _conn() try: with conn.cursor() as cur: # 进度分母走「采纳」口径;mode 标方向(工序帖来自 search_process)。 cols = (f"query_id, case_id, platform, overall_score, publish_time, " f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro") cur.execute(f"SELECT {cols} FROM search_process WHERE {_REAL_POST}") posts = cur.fetchall() for p in posts: p["mode"] = "process" cur.execute(f"SELECT {cols} FROM search_tools") st = cur.fetchall() for p in st: p["mode"] = "tools" posts += st # 成本/耗时按全部版本计;steps 仅最新版需要 → 非最新版只回 NULL,省传输。 cur.execute("""SELECT p.id, p.case_id, p.version, p.cost_usd, p.duration_s, p.created_at, CASE WHEN p.version = m.maxv THEN p.steps END AS steps FROM mode_process p JOIN (SELECT t.case_id, t.version AS maxv FROM mode_process t JOIN (SELECT case_id, MAX(id) AS mid FROM mode_process WHERE LEFT(version,5) <> 'link_' GROUP BY case_id) x ON t.id = x.mid) m ON p.case_id = m.case_id ORDER BY p.id""") procs = cur.fetchall() cur.execute("""SELECT id, case_id, version, tool_name, substance_scope, form_scope, cost_usd, duration_s, created_at FROM mode_tools""") tools = cur.fetchall() finally: conn.close() for p in posts: # 采纳判定:口径同帖子列表(is_adopted),作为「需解构」分母依据 p["adopted"] = is_adopted_rel(p["overall_score"], p["rel"], p["publish_time"], p["repro"]) for r in procs: r["steps"] = _loads(r["steps"], []) r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None r["created_at"] = str(r["created_at"]) if r["created_at"] else None for r in tools: r["substance_scope"] = _loads(r["substance_scope"], []) r["form_scope"] = _loads(r["form_scope"], []) r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None r["created_at"] = str(r["created_at"]) if r["created_at"] else None return posts, procs, tools def check(): conn = _conn() try: with conn.cursor() as cur: for t in ("search_process", "search_tools", "mode_process", "mode_tools"): cur.execute(f"SELECT COUNT(*) AS n FROM {t}") print(f"{t}: {cur.fetchone()['n']} 行") finally: conn.close() if __name__ == "__main__": cmd = sys.argv[1] if len(sys.argv) > 1 else "" if cmd == "init": init_tables() elif cmd == "check": check() elif cmd == "clear": clear_tables() else: print("用法:\n python db.py init # 建表\n python db.py check # 四表行数\n python db.py clear # 清空四表数据")