db.py 68 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451
  1. # -*- coding: utf-8 -*-
  2. """mode_workflow · MySQL 持久化(DB 为唯一事实源)
  3. ================================================================================
  4. 读 .env 的 MYSQL_* 连接 MySQL。四张表:
  5. search_process —— 每行一个 (query, 帖子):工序方向的搜索 + llm 评估结果
  6. search_tools —— 同结构,工具方向的搜索结果(方向由表区分,不再用 mode_type 列)
  7. mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
  8. mode_tools —— 每行一个解构出的工具
  9. 与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
  10. 不做"失败不阻断"。读侧保留防御(返回空/None)。
  11. 用法:
  12. python db.py init # 建表(幂等)
  13. python db.py check # 打印四表行数
  14. python db.py clear # 清空四表数据(TRUNCATE)
  15. """
  16. import json
  17. import os
  18. import sys
  19. from datetime import datetime
  20. from pathlib import Path
  21. PROJECT_ROOT = Path(__file__).resolve().parents[2]
  22. sys.path.insert(0, str(PROJECT_ROOT))
  23. from dotenv import load_dotenv
  24. load_dotenv()
  25. import pymysql
  26. from pymysql.cursors import DictCursor
  27. from dbutils.pooled_db import PooledDB
  28. # ── 连接池 ──────────────────────────────────────────────────────────────────
  29. # MySQL 是远程 RDS,每次 pymysql.connect() 的 TCP+鉴权握手 ~0.5s。旧实现每个
  30. # 请求新建一条连接,一次"点开帖子"要 2~3 个请求 = 2~3 次握手 ≈ 1s。改用连接池
  31. # 复用长连接后,握手只在池初始化时各发生一次,后续取连接近乎零开销。
  32. # server.py 是 ThreadingHTTPServer(每请求一线程),PooledDB 线程安全,正好匹配。
  33. # 注意:fetch_* 里的 conn.close() 在池连接上语义是"归还池中"而非真正断开。
  34. _POOL = None
  35. def _pool():
  36. global _POOL
  37. if _POOL is None:
  38. if not os.getenv("MYSQL_HOST"):
  39. raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
  40. _POOL = PooledDB(
  41. creator=pymysql,
  42. mincached=2, # 启动即预热 2 条,首点不再吃冷握手
  43. maxcached=5, # 空闲保留上限
  44. maxconnections=20, # 并发上限(ThreadingHTTPServer 线程数)
  45. blocking=True, # 连接耗尽时等待而非报错
  46. ping=1, # 取用前 ping,自动剔除被 RDS 掐断的死连接
  47. host=os.getenv("MYSQL_HOST"),
  48. port=int(os.getenv("MYSQL_PORT", 3306)),
  49. user=os.getenv("MYSQL_USER"),
  50. password=os.getenv("MYSQL_PASSWORD"),
  51. database=os.getenv("MYSQL_DATABASE"),
  52. charset="utf8mb4", cursorclass=DictCursor,
  53. autocommit=True, connect_timeout=10,
  54. )
  55. return _POOL
  56. def _conn():
  57. """从池取一条连接;用法不变(with cursor / conn.close() 归还池)。"""
  58. return _pool().connection()
  59. # ── DDL ──────────────────────────────────────────────────────────────────────
  60. SEARCH_TABLES = {"process": "search_process", "tools": "search_tools"}
  61. MODE_TABLES = {"process": "mode_process", "tools": "mode_tools"}
  62. def _search_table(mode_or_table):
  63. """mode(process/tools)或表名 → 合法搜索表名(白名单,防 SQL 注入)。"""
  64. t = SEARCH_TABLES.get(mode_or_table, mode_or_table)
  65. if t not in SEARCH_TABLES.values():
  66. raise ValueError(f"未知搜索表/模式: {mode_or_table!r}")
  67. return t
  68. def _mode_table(mode_or_table):
  69. """mode(process/tools)或表名 → 合法解构表名(白名单,防 SQL 注入)。"""
  70. t = MODE_TABLES.get(mode_or_table, mode_or_table)
  71. if t not in MODE_TABLES.values():
  72. raise ValueError(f"未知解构表/模式: {mode_or_table!r}")
  73. return t
  74. def _ddl_search(table, direction):
  75. return f"""
  76. CREATE TABLE IF NOT EXISTS {table} (
  77. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  78. query_id VARCHAR(32) NOT NULL COMMENT 'q0000',
  79. query_text VARCHAR(512) NULL,
  80. case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
  81. platform VARCHAR(32) NULL,
  82. channel_content_id VARCHAR(128) NULL,
  83. title VARCHAR(512) NULL,
  84. url VARCHAR(1024) NULL,
  85. content_type VARCHAR(32) NULL,
  86. body LONGTEXT NULL,
  87. images JSON NULL,
  88. videos JSON NULL,
  89. like_count INT NULL,
  90. publish_time VARCHAR(64) NULL,
  91. quality_score FLOAT NULL COMMENT 'post._quality_score',
  92. quality_grade VARCHAR(8) NULL,
  93. found_by JSON NULL COMMENT '命中的措辞数组',
  94. knowledge_type JSON NULL COMMENT '["能力","工序","工具"] 子集',
  95. overall_score FLOAT NULL COMMENT '(相关均值+质量均值)/2',
  96. llm_evaluation JSON NULL COMMENT '评估全量 blob',
  97. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  98. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  99. UNIQUE KEY uk_qid_case (query_id, case_id),
  100. KEY idx_platform (platform)
  101. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果({direction})';
  102. """
  103. DDL_PROCESS = """
  104. CREATE TABLE IF NOT EXISTS mode_process (
  105. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  106. query_id VARCHAR(32) NOT NULL,
  107. case_id VARCHAR(128) NOT NULL,
  108. platform VARCHAR(32) NULL,
  109. post_title VARCHAR(512) NULL,
  110. source JSON NULL COMMENT '解构返回的 source 块',
  111. procedure_id VARCHAR(16) NULL COMMENT 'p1,p2…',
  112. name VARCHAR(255) NULL,
  113. purpose TEXT NULL,
  114. category VARCHAR(32) NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
  115. declarations JSON NULL,
  116. type_registry JSON NULL,
  117. steps JSON NULL COMMENT '步骤数组全量',
  118. step_count INT NULL,
  119. tools_used JSON NULL COMMENT '从 steps[].via 去重提取',
  120. model VARCHAR(64) NULL,
  121. version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM,保留历史;link_* 为跨 query 复制(cost=0)',
  122. cost_usd DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
  123. duration_s FLOAT NULL,
  124. seq SMALLINT NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写',
  125. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  126. UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq),
  127. KEY idx_case_ver (case_id, version),
  128. KEY idx_qid (query_id)
  129. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
  130. """
  131. DDL_TOOLS = """
  132. CREATE TABLE IF NOT EXISTS mode_tools (
  133. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  134. query_id VARCHAR(32) NOT NULL,
  135. case_id VARCHAR(128) NOT NULL,
  136. platform VARCHAR(32) NULL,
  137. post_title VARCHAR(512) NULL,
  138. source JSON NULL COMMENT '解构时帖子来源块(tool_extract._row_to_source 产出)',
  139. tool_name VARCHAR(255) NULL,
  140. substance_scope JSON NULL COMMENT '实质作用域(数组)',
  141. form_scope JSON NULL COMMENT '形式作用域(数组或null)',
  142. creation_layer VARCHAR(32) NULL COMMENT '制作层/创作层',
  143. source_link VARCHAR(1024) NULL,
  144. input_desc TEXT NULL,
  145. output_desc TEXT NULL,
  146. usage_json JSON NULL,
  147. cases_json JSON NULL,
  148. defects_json JSON NULL,
  149. updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
  150. model VARCHAR(64) NULL,
  151. version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM;link_* 为跨 query 复制(cost=0)',
  152. cost_usd DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
  153. duration_s FLOAT NULL,
  154. seq SMALLINT NULL COMMENT '帖内序号(0-based);与 (query_id,case_id,version) 组唯一键防并发/重复写',
  155. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  156. UNIQUE KEY uk_q_case_ver_seq (query_id, case_id, version, seq),
  157. KEY idx_case_ver (case_id, version),
  158. KEY idx_qid (query_id),
  159. KEY idx_tool_name (tool_name)
  160. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
  161. """
  162. # 工序知识「已导入知识库」台账:防重复上传(stages/import_process_knowledge.py 用)。
  163. # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
  164. # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
  165. # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
  166. # 注:工具知识用独立的 tools_ingest_log,不与本表混用(case_id 是帖子物理身份,
  167. # 同帖可能既被工序解构又被工具解构,共表会在 (case_id, index) 上撞键)。
  168. DDL_INGEST_LOG = """
  169. CREATE TABLE IF NOT EXISTS knowledge_ingest_log (
  170. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  171. case_id VARCHAR(128) NOT NULL,
  172. proc_index INT NOT NULL COMMENT '工序序号(1-based),对齐导入脚本枚举',
  173. version VARCHAR(32) NULL COMMENT '导入时 mode_process 版本;变了应重导',
  174. knowledge_id VARCHAR(128) NULL COMMENT '接口返回的 knowledge_id',
  175. api_url VARCHAR(255) NULL,
  176. ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  177. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  178. UNIQUE KEY uk_case_proc (case_id, proc_index)
  179. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序知识已导入台账(防重复上传)';
  180. """
  181. # 工具知识「已导入知识库」台账:语义同 knowledge_ingest_log,但针对工具方向独立成表
  182. # (stages/import_tools_knowledge.py 用)。每条知识 = 某 case 的某个工具(tool_index 1-based),
  183. # 版本记录导入时的 mode_tools 版本;变了(重解构)应重导,不变即「已传过」跳过。
  184. DDL_TOOLS_INGEST_LOG = """
  185. CREATE TABLE IF NOT EXISTS tools_ingest_log (
  186. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  187. case_id VARCHAR(128) NOT NULL,
  188. tool_index INT NOT NULL COMMENT '工具序号(1-based),对齐导入脚本枚举',
  189. version VARCHAR(32) NULL COMMENT '导入时 mode_tools 版本;变了应重导',
  190. knowledge_id VARCHAR(128) NULL COMMENT '接口返回的 knowledge_id',
  191. api_url VARCHAR(255) NULL,
  192. ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  193. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  194. UNIQUE KEY uk_case_tool (case_id, tool_index)
  195. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具知识已导入台账(防重复上传)';
  196. """
  197. def _ensure_column(cur, table, column, column_ddl):
  198. """给已存在的表幂等补列:列已存在则跳过(MySQL ADD COLUMN 无 IF NOT EXISTS)。
  199. column_ddl 为 ADD COLUMN 后的完整定义,如 \"source JSON NULL ... AFTER post_title\"。"""
  200. cur.execute("""SELECT COUNT(*) AS n FROM information_schema.columns
  201. WHERE table_schema=DATABASE() AND table_name=%s AND column_name=%s""",
  202. (table, column))
  203. if cur.fetchone()["n"] == 0:
  204. cur.execute(f"ALTER TABLE {table} ADD COLUMN {column_ddl}")
  205. def _ensure_unique_index(cur, table, index_name, cols):
  206. """幂等加唯一索引:已存在则跳过(MySQL ADD INDEX 无 IF NOT EXISTS)。
  207. cols 为列表达式,如 "query_id, case_id, version, seq"。加之前需保证无冲突数据。"""
  208. cur.execute("""SELECT COUNT(*) AS n FROM information_schema.statistics
  209. WHERE table_schema=DATABASE() AND table_name=%s AND index_name=%s""",
  210. (table, index_name))
  211. if cur.fetchone()["n"] == 0:
  212. cur.execute(f"ALTER TABLE {table} ADD UNIQUE KEY {index_name} ({cols})")
  213. def init_tables():
  214. conn = _conn()
  215. try:
  216. with conn.cursor() as cur:
  217. cur.execute(_ddl_search("search_process", "工序方向"))
  218. cur.execute(_ddl_search("search_tools", "工具方向"))
  219. cur.execute(DDL_PROCESS)
  220. cur.execute(DDL_TOOLS)
  221. cur.execute(DDL_INGEST_LOG)
  222. cur.execute(DDL_TOOLS_INGEST_LOG)
  223. # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。
  224. # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。
  225. for t in ("mode_process", "mode_tools"):
  226. cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL")
  227. # 历史库迁移:给老 mode_tools 补 source 列(MySQL 的 ADD COLUMN 无 IF NOT EXISTS,
  228. # 故先查 information_schema 判存在,缺了才 ADD,幂等)。
  229. _ensure_column(cur, "mode_tools", "source",
  230. "source JSON NULL COMMENT '解构时帖子来源块' AFTER post_title")
  231. # 历史库迁移:加 seq(帖内序号)+ (query_id,case_id,version,seq) 唯一键,防并发/重复
  232. # 写入产生重复行。顺序必须是 加列 → 回填 → 加唯一键。MySQL 5.7 无窗口函数,seq 在
  233. # 应用层按 (query_id,case_id,version) 内 id 升序回填(现有数据该粒度已无重复)。
  234. for t in ("mode_process", "mode_tools"):
  235. _ensure_column(cur, t, "seq",
  236. "seq SMALLINT NULL COMMENT '帖内序号(0-based)' AFTER duration_s")
  237. for t in ("mode_process", "mode_tools"):
  238. cur.execute(f"""SELECT id, query_id, case_id, version FROM {t}
  239. WHERE seq IS NULL ORDER BY query_id, case_id, version, id""")
  240. key, n, ups = None, 0, []
  241. for r in cur.fetchall():
  242. k = (r["query_id"], r["case_id"], r["version"])
  243. if k != key:
  244. key, n = k, 0
  245. ups.append((n, r["id"])); n += 1
  246. if ups:
  247. cur.executemany(f"UPDATE {t} SET seq=%s WHERE id=%s", ups)
  248. print(f" ↳ {t}: 回填 seq {len(ups)} 行")
  249. for t in ("mode_process", "mode_tools"):
  250. _ensure_unique_index(cur, t, "uk_q_case_ver_seq",
  251. "query_id, case_id, version, seq")
  252. print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, "
  253. "knowledge_ingest_log, tools_ingest_log")
  254. finally:
  255. conn.close()
  256. def clear_tables():
  257. """清空四张表的数据(TRUNCATE,表结构保留)。"""
  258. conn = _conn()
  259. try:
  260. with conn.cursor() as cur:
  261. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  262. cur.execute(f"TRUNCATE TABLE {t}")
  263. print(f"🧹 已清空 {t}")
  264. finally:
  265. conn.close()
  266. # ── 工具函数 ──────────────────────────────────────────────────────────────────
  267. def _loads(v, default=None):
  268. """pymysql 的 JSON 列可能返回字符串,统一解析。"""
  269. if v is None:
  270. return default
  271. if isinstance(v, (list, dict)):
  272. return v
  273. try:
  274. return json.loads(v)
  275. except Exception:
  276. return default
  277. def _j(v):
  278. """写入 JSON 列:None 保持 NULL,其余 dumps。"""
  279. return None if v is None else json.dumps(v, ensure_ascii=False)
  280. def _collect_scores(node):
  281. """递归收集嵌套评估里所有「得分」。LLM 直出的得分多为字符串("1"/"4"),
  282. 个别为数字(如 时效性 10),统一按 float 解析;非数值(如 "N/A")跳过不计入。"""
  283. out = []
  284. if isinstance(node, dict):
  285. for k, v in node.items():
  286. if k == "得分":
  287. try:
  288. out.append(float(v))
  289. except (TypeError, ValueError):
  290. pass
  291. else:
  292. out.extend(_collect_scores(v))
  293. elif isinstance(node, list):
  294. for v in node:
  295. out.extend(_collect_scores(v))
  296. return out
  297. def overall_score(e):
  298. """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
  299. parts = []
  300. for key in ("相关性", "质量"):
  301. scores = _collect_scores((e or {}).get(key))
  302. if scores:
  303. parts.append(sum(scores) / len(scores))
  304. return round(sum(parts) / len(parts), 2) if parts else None
  305. def _recency_hard(date_str):
  306. """硬时效(同 mode_procedure/server.py:_recency_hard):半年内=3 / 两年内=2 / 更早=1。
  307. publish_time 头 10 字符按 YYYY-MM-DD 解析,失败返回 None(不参与判定)。"""
  308. try:
  309. d = datetime.strptime(str(date_str or "")[:10], "%Y-%m-%d")
  310. except (ValueError, TypeError):
  311. return None
  312. days = (datetime.now() - d).days
  313. if days <= 180:
  314. return 3
  315. if days <= 730:
  316. return 2
  317. return 1
  318. def _fixed_dim_score(evaluation, name):
  319. """取 质量.固定维度.<name>.得分 标量,缺失/非数值返回 None(不参与判定)。"""
  320. v = (((evaluation or {}).get("质量") or {}).get("固定维度") or {}).get(name)
  321. if isinstance(v, dict):
  322. v = v.get("得分")
  323. try:
  324. return float(v) if v is not None else None
  325. except (TypeError, ValueError):
  326. return None
  327. def _impl_score(evaluation):
  328. """取 质量.动态维度.工序.字段完整性.实现完整性.得分 标量,缺失/非数值返回 None。
  329. 新版 prompt 把旧「可复现性」的硬封顶规则并入了「实现完整性」,故采纳门槛改读此处。"""
  330. v = ((((((evaluation or {}).get("质量") or {}).get("动态维度") or {})
  331. .get("工序") or {}).get("字段完整性") or {}).get("实现完整性"))
  332. if isinstance(v, dict):
  333. v = v.get("得分")
  334. try:
  335. return float(v) if v is not None else None
  336. except (TypeError, ValueError):
  337. return None
  338. def _repro_score(evaluation):
  339. """采纳门槛用的「可复现/可实现」得分:优先旧版「可复现性」(固定维度),
  340. 缺失则回退新版「实现完整性」(动态维度.工序)。这样新旧两套评估 blob 都能正确判定。"""
  341. v = _fixed_dim_score(evaluation, "可复现性")
  342. return v if v is not None else _impl_score(evaluation)
  343. def is_adopted(overall, evaluation, publish_time):
  344. """采纳/命中判定,口径对齐 mode_procedure 的 decision=="report":
  345. 制作相关性<4、可复现/实现完整性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。
  346. (意图可控性暂只采分不设门槛,留待阈值标定后再开。)
  347. 可复现/实现门槛兼容新旧 schema:旧版读「可复现性」,新版读「实现完整性」(见 _repro_score)。
  348. fail-closed:评估失败(_error)、blob 缺失/为空、或综合分算不出(None)→ 直接判不采纳。
  349. 评不出的帖子不该混进命中集(此前 fail-open 会因各指标取不到值而误判采纳)。"""
  350. if not isinstance(evaluation, dict) or not evaluation or evaluation.get("_error"):
  351. return False
  352. if overall is None:
  353. return False
  354. rel = None
  355. v = ((evaluation or {}).get("相关性") or {}).get("和内容制作知识相关")
  356. if isinstance(v, dict):
  357. v = v.get("得分")
  358. try:
  359. rel = float(v) if v is not None else None
  360. except (TypeError, ValueError):
  361. rel = None
  362. if rel is not None and rel < 4:
  363. return False
  364. repro = _repro_score(evaluation)
  365. if repro is not None and repro < 4:
  366. return False
  367. rh = _recency_hard(publish_time)
  368. if rh is not None and rh < 2:
  369. return False
  370. if overall is not None and float(overall) < 6:
  371. return False
  372. return True
  373. def is_adopted_rel(overall, rel, publish_time, repro=None):
  374. """is_adopted 的轻量版:相关性得分(rel)、可复现/实现门槛(repro)已由 SQL JSON_EXTRACT
  375. 直接取出(repro 由 _REPRO_SQL 兼容新旧 schema 取值),无需传输/解析整块 llm_evaluation。
  376. 判定口径与 is_adopted 完全一致(含 fail-closed:综合分算不出→不采纳;失败帖的 overall_score 列为 NULL)。"""
  377. if overall is None:
  378. return False
  379. try:
  380. rel = float(rel) if rel is not None else None
  381. except (TypeError, ValueError):
  382. rel = None
  383. if rel is not None and rel < 4:
  384. return False
  385. try:
  386. repro = float(repro) if repro is not None else None
  387. except (TypeError, ValueError):
  388. repro = None
  389. if repro is not None and repro < 4:
  390. return False
  391. rh = _recency_hard(publish_time)
  392. if rh is not None and rh < 2:
  393. return False
  394. if overall is not None and float(overall) < 6:
  395. return False
  396. return True
  397. # ── search_process / search_tools ────────────────────────────────────────────
  398. def upsert_search_posts(query_id, query_text, results, table="search_process"):
  399. """一组搜索结果写入指定搜索表(按 (query_id, case_id) upsert)。返回写入条数。
  400. table:search_process(工序方向) / search_tools(工具方向)。"""
  401. table = _search_table(table)
  402. if not results:
  403. return 0
  404. rows = []
  405. for r in results:
  406. post = r.get("post") or {}
  407. e = r.get("llm_evaluation") or {}
  408. rows.append((
  409. query_id, query_text, r.get("case_id"), r.get("platform"),
  410. r.get("channel_content_id"),
  411. (post.get("title") or post.get("desc") or "")[:500],
  412. r.get("source_url"), post.get("content_type"),
  413. post.get("body_text") or post.get("desc") or "",
  414. _j(post.get("images") or []), _j(post.get("videos") or []),
  415. post.get("like_count"),
  416. str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
  417. post.get("_quality_score"), post.get("_quality_grade"),
  418. _j(r.get("found_by_queries") or []),
  419. _j(e.get("知识类型") or []),
  420. overall_score(e),
  421. _j(e),
  422. ))
  423. sql = f"""
  424. INSERT INTO {table}
  425. (query_id, query_text, case_id, platform, channel_content_id, title, url,
  426. content_type, body, images, videos, like_count, publish_time,
  427. quality_score, quality_grade, found_by, knowledge_type,
  428. overall_score, llm_evaluation)
  429. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  430. ON DUPLICATE KEY UPDATE
  431. query_text=VALUES(query_text), platform=VALUES(platform),
  432. channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
  433. content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
  434. videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
  435. quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
  436. found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
  437. overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
  438. """
  439. conn = _conn()
  440. try:
  441. with conn.cursor() as cur:
  442. cur.executemany(sql, rows)
  443. return len(rows)
  444. finally:
  445. conn.close()
  446. # 占位帖 case_id:query 列表由 search_process 按 query_id 聚合得出(无独立 query 主表),
  447. # 一个 query 要进列表必须至少有一行。为支持「只登记 query、不触发搜索」,给这类 query 写
  448. # 一行哨兵帖,只承载 query_id+query_text。该哨兵行不属于任何真实帖子,故所有「帖子视图 /
  449. # 统计」读取点都用 _REAL_POST 过滤掉它(fetch_queries 的 post_count、fetch_posts、
  450. # fetch_all_posts、count_executed_queries、fetch_dashboard_rows)。真搜不会用到此 case_id。
  451. PENDING_CASE_ID = "__pending__"
  452. _REAL_POST = f"case_id <> '{PENDING_CASE_ID}'"
  453. def add_pending_process_queries(texts):
  454. """把一批 query 词作为「占位 query」加入工序 query 列表(search_process),不触发搜索/解构。
  455. 每条新增写一行哨兵帖(case_id=PENDING_CASE_ID,只填 query_id/query_text)。
  456. 去重:① 文件内重复保序去重;② query_text 已存在于 search_process(含此前占位)则跳过。
  457. query_id 跨 process/tools 统一续号,避免与工具方向撞号。返回 (added, skipped)。"""
  458. seen, cleaned = set(), []
  459. for t in texts:
  460. t = (t or "").strip()
  461. if t and t not in seen:
  462. seen.add(t)
  463. cleaned.append(t)
  464. conn = _conn()
  465. try:
  466. with conn.cursor() as cur:
  467. cur.execute("SELECT DISTINCT query_text FROM search_process WHERE query_text IS NOT NULL")
  468. existing = {r["query_text"] for r in cur.fetchall()}
  469. cur.execute("SELECT query_id FROM search_process "
  470. "UNION SELECT query_id FROM search_tools")
  471. nums = [int(r["query_id"][1:]) for r in cur.fetchall()
  472. if r["query_id"] and r["query_id"].startswith("q") and r["query_id"][1:].isdigit()]
  473. nxt = (max(nums) + 1) if nums else 0
  474. rows = []
  475. for t in cleaned:
  476. if t in existing:
  477. continue
  478. rows.append((f"q{nxt:04d}", t, PENDING_CASE_ID))
  479. nxt += 1
  480. if rows:
  481. cur.executemany(
  482. "INSERT INTO search_process (query_id, query_text, case_id) "
  483. "VALUES (%s,%s,%s)", rows)
  484. return len(rows), len(cleaned) - len(rows)
  485. finally:
  486. conn.close()
  487. def fetch_queries(mode="process"):
  488. """某方向搜索表的 query 列表 + 帖子数 + 采纳/命中数 + 解构进度。"""
  489. table = _search_table(mode)
  490. conn = _conn()
  491. try:
  492. with conn.cursor() as cur:
  493. # post_count 只数真实帖,占位哨兵行不计(占位 query 显示为 0 帖);
  494. # GROUP BY 仍含占位 query_id,故无搜索的 query 也会出现在列表里。
  495. cur.execute(f"""SELECT query_id, MAX(query_text) AS query_text,
  496. COUNT(CASE WHEN {_REAL_POST} THEN 1 END) AS post_count
  497. FROM {table} GROUP BY query_id ORDER BY query_id""")
  498. queries = cur.fetchall()
  499. # 采纳数:SQL 直取 rel/repro 标量算,**不拉整表 llm_evaluation**(旧版全表 blob,切 tab 巨慢)
  500. cur.execute(f"""SELECT query_id, overall_score, publish_time,
  501. {_REL_SQL} AS rel, {_REPRO_SQL} AS repro FROM {table}""")
  502. hits = {}
  503. for r in cur.fetchall():
  504. if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"]):
  505. hits[r["query_id"]] = hits.get(r["query_id"], 0) + 1
  506. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
  507. np = {r["query_id"]: r["n"] for r in cur.fetchall()}
  508. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
  509. nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
  510. finally:
  511. conn.close()
  512. for q in queries:
  513. q["hit_count"] = hits.get(q["query_id"], 0)
  514. q["process_done"] = np.get(q["query_id"], 0)
  515. q["tools_done"] = nt.get(q["query_id"], 0)
  516. return queries
  517. def fetch_posts(query_id, mode="process"):
  518. """列表用:只取列表所需列 + SQL 直取 adopted 标量,**不拉 body/videos/llm_evaluation 大字段**
  519. (llm_evaluation ~1.5MB/帖,旧版 SELECT * 切 tab/选 query 要几十 MB 过远程 RDS,故慢)。
  520. 正文/评分等详情按需走 fetch_post。带 adopted/has_process/has_tools;adopted 口径用
  521. is_adopted_rel(与 is_adopted 完全一致,rel/repro 由 _REL_SQL/_REPRO_SQL 直取标量)。"""
  522. table = _search_table(mode)
  523. conn = _conn()
  524. try:
  525. with conn.cursor() as cur:
  526. cur.execute(f"""SELECT id, query_id, query_text, case_id, platform, channel_content_id,
  527. title, url, content_type, images, like_count, publish_time,
  528. quality_score, quality_grade, found_by, knowledge_type, overall_score,
  529. {_REL_SQL} AS rel, {_REPRO_SQL} AS repro
  530. FROM {table} WHERE query_id=%s AND {_REAL_POST}
  531. ORDER BY overall_score DESC, id""", (query_id,))
  532. rows = cur.fetchall()
  533. cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
  534. hp = {r["case_id"] for r in cur.fetchall()}
  535. cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
  536. ht = {r["case_id"] for r in cur.fetchall()}
  537. # 已归类(工序):hp 中各 case 最新真实版的工序里有任一步骤含 substanceMatch(与归类回写
  538. # 同口径)。库端 LIKE 算、只回 0/1,不拉 steps。聚合逻辑见 _categorized_from_rows
  539. # (不能只看 id 最大行——空 steps 的 procedure 行永远不含,会误判)。
  540. hc = set()
  541. if hp:
  542. ph = ",".join(["%s"] * len(hp))
  543. cur.execute(f"""SELECT case_id, version, id,
  544. (LEFT(version,5)='link_') AS islink, (steps LIKE %s) AS cat
  545. FROM mode_process WHERE case_id IN ({ph})""",
  546. ['%substanceMatch%'] + list(hp))
  547. hc = _categorized_from_rows(cur.fetchall())
  548. finally:
  549. conn.close()
  550. for r in rows:
  551. for col in ("images", "found_by", "knowledge_type"):
  552. r[col] = _loads(r[col])
  553. r["adopted"] = is_adopted_rel(r["overall_score"], r.pop("rel", None),
  554. r["publish_time"], r.pop("repro", None))
  555. r["has_process"] = r["case_id"] in hp
  556. r["has_tools"] = r["case_id"] in ht
  557. r["has_category"] = r["case_id"] in hc
  558. return rows
  559. def fetch_post(query_id, case_id, table="search_process"):
  560. """指定搜索表的单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
  561. table = _search_table(table)
  562. conn = _conn()
  563. try:
  564. with conn.cursor() as cur:
  565. cur.execute(f"SELECT * FROM {table} WHERE query_id=%s AND case_id=%s",
  566. (query_id, case_id))
  567. row = cur.fetchone()
  568. finally:
  569. conn.close()
  570. if not row:
  571. return None
  572. for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
  573. row[col] = _loads(row[col])
  574. return row
  575. def fetch_all_posts(mode="process", *, query_ids=None, case_ids=None, adopted_only=False, distinct=False,
  576. limit=None, offset=0):
  577. """某方向「全部帖子」:跨所有 query 的列表(瘦身列,口径同 fetch_posts,不拉
  578. body/videos/llm_evaluation 大字段)。fetch_posts 限定单 query,本函数默认取全表。
  579. - query_ids:选填 query_id 列表,传了就 WHERE query_id IN(...) 只取这些 query
  580. 的帖子(SQL 层过滤,不拉全表);None=全部,[]=空结果。
  581. - adopted_only=True:只返回采纳帖(is_adopted_rel 口径,rel/repro 由
  582. _REL_SQL/_REPRO_SQL 直取标量算,不拉整表 blob)。
  583. - distinct=True:按 case_id 去重(同一帖被多个 query 搜到时,只保留
  584. overall_score 最高的一行——已按 score 降序,取首次出现即最高分)。
  585. - limit/offset:分页(limit=None 不分页)。
  586. 返回 (total, rows):total 为过滤(+去重)后的总条数,rows 为本页切片。"""
  587. table = _search_table(mode)
  588. # 始终排除占位哨兵行(无搜索的 query 不在帖子视图里出现)
  589. where, params = f" WHERE {_REAL_POST}", []
  590. if query_ids is not None:
  591. if not query_ids:
  592. return 0, [] # 显式空列表:直接空结果,不必查库
  593. where += " AND query_id IN (" + ",".join(["%s"] * len(query_ids)) + ")"
  594. params = list(query_ids)
  595. if case_ids is not None:
  596. # case_ids:选填 case_id 列表(知识归类联动用——按「分类树节点的 knowledge_ids」
  597. # 取被归类进该节点的帖子,而非按 query 搜索结果)。None=不过滤,[]=空结果。
  598. if not case_ids:
  599. return 0, []
  600. where += " AND case_id IN (" + ",".join(["%s"] * len(case_ids)) + ")"
  601. params += list(case_ids)
  602. conn = _conn()
  603. try:
  604. with conn.cursor() as cur:
  605. cur.execute(f"""SELECT id, query_id, query_text, case_id, platform, channel_content_id,
  606. title, url, content_type, images, like_count, publish_time,
  607. quality_score, quality_grade, found_by, knowledge_type, overall_score,
  608. {_REL_SQL} AS rel, {_REPRO_SQL} AS repro
  609. FROM {table}{where}
  610. ORDER BY overall_score DESC, id""", params)
  611. rows = cur.fetchall()
  612. # has_process/has_tools 全局判定:跨 query 的「该帖是否已解构」,两张解构表各取一次
  613. cur.execute("SELECT DISTINCT case_id FROM mode_process")
  614. hp = {r["case_id"] for r in cur.fetchall()}
  615. cur.execute("SELECT DISTINCT case_id FROM mode_tools")
  616. ht = {r["case_id"] for r in cur.fetchall()}
  617. finally:
  618. conn.close()
  619. out, seen = [], set()
  620. for r in rows:
  621. for col in ("images", "found_by", "knowledge_type"):
  622. r[col] = _loads(r[col])
  623. r["adopted"] = is_adopted_rel(r["overall_score"], r.pop("rel", None),
  624. r["publish_time"], r.pop("repro", None))
  625. if adopted_only and not r["adopted"]:
  626. continue
  627. if distinct:
  628. if r["case_id"] in seen:
  629. continue
  630. seen.add(r["case_id"])
  631. r["has_process"] = r["case_id"] in hp
  632. r["has_tools"] = r["case_id"] in ht
  633. out.append(r)
  634. total = len(out)
  635. if limit is not None:
  636. out = out[offset:offset + limit]
  637. elif offset:
  638. out = out[offset:]
  639. return total, out
  640. def count_executed_queries(mode="process"):
  641. """该方向「已执行」的 query 数 = 搜索表里出现过的 distinct query_id 个数。
  642. 注:一次搜索若 0 命中则不写任何行,故不计入(口径为「已产出结果的 query」)。"""
  643. table = _search_table(mode)
  644. conn = _conn()
  645. try:
  646. with conn.cursor() as cur:
  647. cur.execute(f"SELECT COUNT(DISTINCT query_id) AS n FROM {table} WHERE {_REAL_POST}")
  648. return cur.fetchone()["n"]
  649. finally:
  650. conn.close()
  651. # ── mode_process ─────────────────────────────────────────────────────────────
  652. def replace_process(query_id, case_id, platform, post_title, payload,
  653. model, version, cost_usd, duration_s):
  654. """写入一帖某版本的工序解构结果(payload = {source, procedures})。
  655. 删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
  656. source = payload.get("source")
  657. procedures = payload.get("procedures") or []
  658. conn = _conn()
  659. try:
  660. conn.begin() # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行
  661. with conn.cursor() as cur:
  662. cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
  663. (case_id, version))
  664. if procedures:
  665. rows = []
  666. for i, p in enumerate(procedures):
  667. steps = p.get("steps") or []
  668. vias = []
  669. for s in steps:
  670. v = s.get("via")
  671. if v and v not in vias:
  672. vias.append(v)
  673. rows.append((
  674. query_id, case_id, platform, (post_title or "")[:500],
  675. _j(source), p.get("id"), (p.get("name") or "")[:250],
  676. p.get("purpose"), p.get("category"),
  677. _j(p.get("declarations")), _j(p.get("type_registry")),
  678. _j(steps), len(steps), _j(vias),
  679. model, version, cost_usd, duration_s, i,
  680. ))
  681. cur.executemany("""
  682. INSERT INTO mode_process
  683. (query_id, case_id, platform, post_title, source, procedure_id, name,
  684. purpose, category, declarations, type_registry, steps, step_count,
  685. tools_used, model, version, cost_usd, duration_s, seq)
  686. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  687. """, rows)
  688. conn.commit()
  689. return len(procedures)
  690. except Exception:
  691. conn.rollback()
  692. raise
  693. finally:
  694. conn.close()
  695. def fetch_process_versions(case_id):
  696. conn = _conn()
  697. try:
  698. with conn.cursor() as cur:
  699. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  700. FROM mode_process WHERE case_id=%s
  701. GROUP BY version
  702. ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
  703. return cur.fetchall()
  704. finally:
  705. conn.close()
  706. def fetch_process(case_id, version=None):
  707. """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
  708. conn = _conn()
  709. try:
  710. with conn.cursor() as cur:
  711. if version is None:
  712. cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
  713. ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,))
  714. row = cur.fetchone()
  715. if not row:
  716. return None
  717. version = row["version"]
  718. cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
  719. ORDER BY id""", (case_id, version))
  720. rows = cur.fetchall()
  721. finally:
  722. conn.close()
  723. return _proc_payload(case_id, version, rows)
  724. def fetch_all_process(case_ids=None, lite=False):
  725. """批量取多帖工序解构(每帖取最新真实版,link_ 排后),一次查询拍平。
  726. - case_ids:选填 case_id 列表;None=全表所有有解构的帖,[]=空(直接返回空)。
  727. - lite:True 走精简投影(丢大字段 + 截断 value),供工序库平铺表快速首屏。
  728. 返回 {case_id: _proc_payload(...)};无解构记录的 case_id 不出现在结果里。"""
  729. if case_ids is not None and not case_ids:
  730. return {}
  731. where, params = "", []
  732. if case_ids is not None:
  733. where = " WHERE case_id IN (" + ",".join(["%s"] * len(case_ids)) + ")"
  734. params = list(case_ids)
  735. conn = _conn()
  736. try:
  737. with conn.cursor() as cur:
  738. cur.execute(f"SELECT * FROM mode_process{where} ORDER BY case_id, id", params)
  739. rows = cur.fetchall()
  740. finally:
  741. conn.close()
  742. by_case = {}
  743. for r in rows:
  744. by_case.setdefault(r["case_id"], []).append(r)
  745. out = {}
  746. for cid, crows in by_case.items():
  747. # 选版本:非 link_ 优先(is_real=True 排前),再按 id 最大——口径同 fetch_process
  748. best = max(crows, key=lambda r: (not str(r["version"]).startswith("link_"), r["id"]))
  749. ver = best["version"]
  750. vrows = sorted((r for r in crows if r["version"] == ver),
  751. key=lambda r: (r["seq"] if r["seq"] is not None else 0, r["id"]))
  752. out[cid] = _proc_payload(cid, ver, vrows, lite=lite)
  753. return out
  754. def fetch_process_by_query(query_id, case_id, version=None):
  755. """同 fetch_process,但用 (query_id, case_id) 精确定位某 query 下该帖的工序
  756. (category-match 用:post_id=query_id / knowledge_id=case_id)。
  757. version=None 取该 (query_id, case_id) 下最新真实版(link_ 排后)。无行返回 None。"""
  758. conn = _conn()
  759. try:
  760. with conn.cursor() as cur:
  761. if version is None:
  762. cur.execute("""SELECT version FROM mode_process WHERE query_id=%s AND case_id=%s
  763. ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""",
  764. (query_id, case_id))
  765. row = cur.fetchone()
  766. if not row:
  767. return None
  768. version = row["version"]
  769. cur.execute("""SELECT * FROM mode_process WHERE query_id=%s AND case_id=%s AND version=%s
  770. ORDER BY seq, id""", (query_id, case_id, version))
  771. rows = cur.fetchall()
  772. finally:
  773. conn.close()
  774. return _proc_payload(case_id, version, rows)
  775. def update_process_steps_by_query(query_id, case_id, version, steps_in_order):
  776. """按工序顺序覆盖某 (query_id, case_id, version) 各行的 steps JSON 列。
  777. steps_in_order 必须与 fetch_process_by_query 返回的 procedures 同序(均按 seq, id 升序);
  778. 按行 id 一一对应更新,稳健于 seq 不连续。行数与工序数不符则报错回滚。返回更新行数。"""
  779. conn = _conn()
  780. try:
  781. conn.begin()
  782. with conn.cursor() as cur:
  783. cur.execute("""SELECT id FROM mode_process
  784. WHERE query_id=%s AND case_id=%s AND version=%s
  785. ORDER BY seq, id""", (query_id, case_id, version))
  786. ids = [r["id"] for r in cur.fetchall()]
  787. if len(ids) != len(steps_in_order):
  788. raise ValueError(f"行数({len(ids)})与工序数({len(steps_in_order)})不一致")
  789. n = 0
  790. for row_id, steps in zip(ids, steps_in_order):
  791. cur.execute("UPDATE mode_process SET steps=%s WHERE id=%s", (_j(steps), row_id))
  792. n += cur.rowcount
  793. conn.commit()
  794. return n
  795. except Exception:
  796. conn.rollback()
  797. raise
  798. finally:
  799. conn.close()
  800. def update_process_steps(case_id, version, steps_in_order):
  801. """按工序顺序覆盖某 (case_id, version) 各行的 steps JSON 列(不限 query_id)。
  802. 与 fetch_process / fetch_extract 同口径(按 case 的某版本),保证归类回写的版本
  803. 与前端 /api/extract 展示的版本一致(否则 link_ 复制帖会写错版本、前端看不到)。
  804. steps_in_order 须与 fetch_process(case_id, version).procedures 同序(按 id 升序)。
  805. 行数与工序数不符则报错回滚。返回更新行数。"""
  806. conn = _conn()
  807. try:
  808. conn.begin()
  809. with conn.cursor() as cur:
  810. cur.execute("""SELECT id FROM mode_process WHERE case_id=%s AND version=%s
  811. ORDER BY id""", (case_id, version))
  812. ids = [r["id"] for r in cur.fetchall()]
  813. if len(ids) != len(steps_in_order):
  814. raise ValueError(f"行数({len(ids)})与工序数({len(steps_in_order)})不一致")
  815. n = 0
  816. for row_id, steps in zip(ids, steps_in_order):
  817. cur.execute("UPDATE mode_process SET steps=%s WHERE id=%s", (_j(steps), row_id))
  818. n += cur.rowcount
  819. conn.commit()
  820. return n
  821. except Exception:
  822. conn.rollback()
  823. raise
  824. finally:
  825. conn.close()
  826. def _categorized_from_rows(rows):
  827. """rows:[{case_id, version, id, islink(0/1), cat(0/1)}]。返回已归类 case 集合。
  828. 口径:每 case 取最新真实版(真实版优先、id 最大),该版本**任一行** cat=1 即已归类。
  829. 关键——不能只看「id 最大的那一行」:工序里可能有 steps 为空的 procedure(step_count=0),
  830. 其行永远不含 substanceMatch,若恰好 id 最大会误判整条 case 未归类(见该函数修复缘由)。"""
  831. best, has = {}, {}
  832. for r in rows:
  833. c, v = r["case_id"], r["version"]
  834. sk = (1 if r["islink"] else 0, -r["id"]) # 真实版(islink=0)优先,其次 id 大;取 min
  835. if c not in best or sk < best[c][0]:
  836. best[c] = (sk, v)
  837. k = (c, v)
  838. has[k] = has.get(k, False) or bool(r["cat"])
  839. return {c for c, (sk, v) in best.items() if has.get((c, v))}
  840. def fetch_categorized_cases(case_ids, mode="process"):
  841. """返回 case_ids 中「已归类」的子集:该 case 最新真实版(link_ 排后)的工序里有任一步骤
  842. 含 substanceMatch(归类跑过的非空 step 一定带此 key)。与归类回写/前端展示同口径。
  843. 供前端判断「是否已全部归类 → 提示重新归类」。仅工序方向有意义(mode_process)。"""
  844. if not case_ids:
  845. return set()
  846. table = _mode_table(mode)
  847. ph = ",".join(["%s"] * len(case_ids))
  848. conn = _conn()
  849. try:
  850. with conn.cursor() as cur:
  851. # 拉各行的 (case_id, version, id, islink, cat);steps LIKE 在库端算,不拉 steps 大字段。
  852. cur.execute(f"""SELECT case_id, version, id,
  853. (LEFT(version,5)='link_') AS islink, (steps LIKE %s) AS cat
  854. FROM {table} WHERE case_id IN ({ph})""",
  855. ['%substanceMatch%'] + list(case_ids))
  856. rows = cur.fetchall()
  857. finally:
  858. conn.close()
  859. return _categorized_from_rows(rows)
  860. _LITE_VALUE_LIMIT = 300 # lite 模式 输入/输出 value 截断字节上限
  861. def _trunc(s, limit=_LITE_VALUE_LIMIT):
  862. """按 UTF-8 字节截断长文本(不切坏多字节字符),超限追加省略号。非字符串原样返回。"""
  863. if not isinstance(s, str):
  864. return s
  865. b = s.encode("utf-8")
  866. if len(b) <= limit:
  867. return s
  868. return b[:limit].decode("utf-8", "ignore") + "…"
  869. def _lite_steps(steps):
  870. """lite 模式:仅截断每步 inputs/outputs 的 value(其余字段供平铺表/分组用,保留)。"""
  871. if not isinstance(steps, list):
  872. return steps
  873. for st in steps:
  874. if not isinstance(st, dict):
  875. continue
  876. for io_key in ("inputs", "outputs"):
  877. ios = st.get(io_key)
  878. if isinstance(ios, list):
  879. for io in ios:
  880. if isinstance(io, dict) and "value" in io:
  881. io["value"] = _trunc(io.get("value"))
  882. return steps
  883. def _proc_payload(case_id, version, rows, lite=False):
  884. """mode_process 行集 → {case_id, version, …, procedures:[...]}。无行返回 None。
  885. lite=True:工序库平铺表只需 procedures[].{id,name,steps},丢弃大字段
  886. (source/declarations/type_registry/tools_used)并截断输入/输出 value;
  887. 完整值由前端展开时按 case 调 /api/process 懒加载。"""
  888. if not rows:
  889. return None
  890. if lite:
  891. procedures = [{
  892. "id": r["procedure_id"], "name": r["name"],
  893. "steps": _lite_steps(_loads(r["steps"], [])),
  894. } for r in rows]
  895. return {"case_id": case_id, "version": version,
  896. "title": rows[0]["post_title"], "procedures": procedures}
  897. procedures = [{
  898. "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
  899. "category": r["category"], "declarations": _loads(r["declarations"]),
  900. "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
  901. "tools_used": _loads(r["tools_used"], []),
  902. } for r in rows]
  903. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  904. "title": rows[0]["post_title"], "model": rows[0]["model"],
  905. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  906. "duration_s": rows[0]["duration_s"],
  907. "source": _loads(rows[0]["source"]), "procedures": procedures}
  908. # ── mode_tools ───────────────────────────────────────────────────────────────
  909. def replace_tools(query_id, case_id, platform, post_title, tools,
  910. model, version, cost_usd, duration_s, source=None):
  911. """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。
  912. source:帖子来源块(同 mode_process,每行重复存),供知识上传脚本重建 source 用。"""
  913. src = _j(source)
  914. conn = _conn()
  915. try:
  916. conn.begin() # DELETE+INSERT 原子化:配合 uk_q_case_ver_seq,并发/重复写入不会留下重复行
  917. with conn.cursor() as cur:
  918. cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
  919. (case_id, version))
  920. if tools:
  921. rows = [(
  922. query_id, case_id, platform, (post_title or "")[:500], src,
  923. (t.get("工具名称") or "")[:250],
  924. _j(t.get("实质作用域")), _j(t.get("形式作用域")),
  925. t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
  926. _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
  927. t.get("最新更新时间"), model, version, cost_usd, duration_s, i,
  928. ) for i, t in enumerate(tools)]
  929. cur.executemany("""
  930. INSERT INTO mode_tools
  931. (query_id, case_id, platform, post_title, source, tool_name, substance_scope,
  932. form_scope, creation_layer, source_link, input_desc, output_desc,
  933. usage_json, cases_json, defects_json, updated_time, model, version,
  934. cost_usd, duration_s, seq)
  935. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  936. """, rows)
  937. conn.commit()
  938. return len(tools)
  939. except Exception:
  940. conn.rollback()
  941. raise
  942. finally:
  943. conn.close()
  944. def fetch_tools_versions(case_id):
  945. conn = _conn()
  946. try:
  947. with conn.cursor() as cur:
  948. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  949. FROM mode_tools WHERE case_id=%s
  950. GROUP BY version
  951. ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
  952. return cur.fetchall()
  953. finally:
  954. conn.close()
  955. def fetch_tools(case_id, version=None):
  956. """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
  957. conn = _conn()
  958. try:
  959. with conn.cursor() as cur:
  960. if version is None:
  961. cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
  962. ORDER BY (LEFT(version,5)='link_') ASC, id DESC LIMIT 1""", (case_id,))
  963. row = cur.fetchone()
  964. if not row:
  965. return None
  966. version = row["version"]
  967. cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
  968. ORDER BY id""", (case_id, version))
  969. rows = cur.fetchall()
  970. finally:
  971. conn.close()
  972. return _tools_payload(case_id, version, rows)
  973. def _tools_payload(case_id, version, rows):
  974. """mode_tools 行集 → {case_id, version, …, tools:[...]}。无行返回 None。"""
  975. if not rows:
  976. return None
  977. tools = [{
  978. "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
  979. "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
  980. "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
  981. "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
  982. "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
  983. } for r in rows]
  984. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  985. "title": rows[0]["post_title"], "model": rows[0]["model"],
  986. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  987. "duration_s": rows[0]["duration_s"],
  988. "source": _loads(rows[0].get("source")),
  989. "tool_count": len(tools), "tools": tools}
  990. # ── 点击帖子合一查询(单连接,最少往返;远程 RDS 每次往返 ~80ms,故按次数优化)──
  991. def fetch_extract(mode, case_id, version=None):
  992. """一次取版本列表 + 解构详情,复用同一条池连接、最少往返。
  993. 返回 {versions, data, missing}。mode: process / tools。"""
  994. is_proc = mode != "tools"
  995. mtable = _mode_table("process" if is_proc else "tools")
  996. conn = _conn()
  997. try:
  998. with conn.cursor() as cur:
  999. cur.execute(f"""SELECT version, COUNT(*) AS n, MAX(model) AS model
  1000. FROM {mtable} WHERE case_id=%s
  1001. GROUP BY version
  1002. ORDER BY (LEFT(version,5)='link_') ASC, MAX(id) DESC""", (case_id,))
  1003. versions = cur.fetchall()
  1004. # 详情:把"取最新版本"折进同一条 SQL,版本指定时直接用;省一次往返。
  1005. target = version or (versions[0]["version"] if versions else None)
  1006. rows = []
  1007. if target is not None:
  1008. cur.execute(f"SELECT * FROM {mtable} WHERE case_id=%s AND version=%s ORDER BY id",
  1009. (case_id, target))
  1010. rows = cur.fetchall()
  1011. finally:
  1012. conn.close()
  1013. payload = (_proc_payload if is_proc else _tools_payload)(case_id, target, rows)
  1014. return {"versions": versions, "data": payload, "missing": payload is None}
  1015. # ── 跨 query 去重 / link 复制(方案A:解构前先去重,避免重复花钱)──────────────
  1016. # case_id 是帖子物理身份(platform_channelContentId),与 query 无关。同一帖被多个
  1017. # query 搜到时只需真实解构一次;其余 query 用 link_* 复制行补齐关联(cost=0)。
  1018. def latest_real_version(case_id, mode="process"):
  1019. """该 case 是否已有「真实」解构(任意 query;link_* 是复制品,不算源)。
  1020. 返回最新一行 {"version","query_id"} 或 None。给解构前去重判定用。"""
  1021. table = _mode_table(mode)
  1022. conn = _conn()
  1023. try:
  1024. with conn.cursor() as cur:
  1025. cur.execute(f"""SELECT version, query_id FROM {table}
  1026. WHERE case_id=%s AND LEFT(version,5) <> 'link_'
  1027. ORDER BY id DESC LIMIT 1""", (case_id,))
  1028. return cur.fetchone()
  1029. finally:
  1030. conn.close()
  1031. def link_process(query_id, case_id, mode="process"):
  1032. """把 case 在别处最新「真实」版本的解构行复制到目标 query
  1033. (version='link_'+源版本, cost_usd=0)。幂等(先删目标同版本)。
  1034. 返回复制行数;该 case 从未真实解构过则返回 0(无源可复制)。"""
  1035. table = _mode_table(mode)
  1036. conn = _conn()
  1037. try:
  1038. with conn.cursor() as cur:
  1039. cur.execute(f"""SELECT version FROM {table}
  1040. WHERE case_id=%s AND LEFT(version,5) <> 'link_'
  1041. ORDER BY id DESC LIMIT 1""", (case_id,))
  1042. r = cur.fetchone()
  1043. if not r:
  1044. return 0
  1045. srcver = r["version"]
  1046. newver = ("link_" + srcver)[:32] # version 列 VARCHAR(32)
  1047. # 复制除自增 id / 时间戳外的全部列,改写 query_id / version / cost。
  1048. cur.execute(f"SHOW COLUMNS FROM {table}")
  1049. cols = [c["Field"] for c in cur.fetchall()
  1050. if c["Field"] not in ("id", "created_at", "updated_at")]
  1051. cur.execute(f"SELECT {','.join(cols)} FROM {table} WHERE case_id=%s AND version=%s",
  1052. (case_id, srcver))
  1053. rows = cur.fetchall()
  1054. cur.execute(f"DELETE FROM {table} WHERE query_id=%s AND case_id=%s AND version=%s",
  1055. (query_id, case_id, newver))
  1056. for row in rows:
  1057. row = dict(row)
  1058. row["query_id"] = query_id
  1059. row["version"] = newver
  1060. row["cost_usd"] = 0
  1061. cur.execute(
  1062. f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['%s']*len(cols))})",
  1063. [row[k] for k in cols])
  1064. return len(rows)
  1065. finally:
  1066. conn.close()
  1067. # ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
  1068. # 采纳判定只需「和内容制作知识相关」的得分,用 SQL JSON_EXTRACT 直取这一个标量,
  1069. # 避免把整块 llm_evaluation(本库 ~1.5MB)拉到 Python 再解析。得分可能直接是数字,
  1070. # 也可能裹在 {"得分": x} 里,COALESCE 两条路径覆盖两种存法,口径同 is_adopted。
  1071. _REL_SQL = ("JSON_UNQUOTE(COALESCE("
  1072. "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\".\"得分\"'),"
  1073. "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\"')))")
  1074. # 可复现/实现门槛标量直取(口径同 is_adopted 的 _repro_score):兼容新旧 schema——
  1075. # 旧版「质量.固定维度.可复现性」,新版「质量.动态维度.工序.字段完整性.实现完整性」,COALESCE 依次回退。
  1076. _REPRO_SQL = ("JSON_UNQUOTE(COALESCE("
  1077. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\".\"得分\"'),"
  1078. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\"'),"
  1079. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"动态维度\".\"工序\".\"字段完整性\".\"实现完整性\".\"得分\"'),"
  1080. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"动态维度\".\"工序\".\"字段完整性\".\"实现完整性\"')))")
  1081. def fetch_adopted_process_cases(query_id=None):
  1082. """返回「已采纳且有工序解构」的 case_id 列表(供知识上传脚本用)。
  1083. 采纳是帖子级属性(评估存在 search_process),工序解构存在 mode_process,故二者 JOIN:
  1084. 只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。
  1085. relevance 得分由 _REL_SQL 直取标量,不传整块 llm_evaluation。
  1086. query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
  1087. """
  1088. sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
  1089. f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro "
  1090. "FROM search_process s "
  1091. "JOIN (SELECT DISTINCT case_id FROM mode_process) m ON s.case_id = m.case_id")
  1092. params = ()
  1093. if query_id:
  1094. sql += " WHERE s.query_id=%s"
  1095. params = (query_id,)
  1096. conn = _conn()
  1097. try:
  1098. with conn.cursor() as cur:
  1099. cur.execute(sql, params)
  1100. rows = cur.fetchall()
  1101. finally:
  1102. conn.close()
  1103. cases = [r["case_id"] for r in rows
  1104. if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])]
  1105. return sorted(set(cases))
  1106. def fetch_adopted_tools_cases(query_id=None):
  1107. """返回「已采纳且有工具解构」的 case_id 列表(供工具知识上传脚本用)。
  1108. 与 fetch_adopted_process_cases 完全同构,只把搜索/解构表换成工具方向:
  1109. 采纳是帖子级属性(评估存在 search_tools),工具解构存在 mode_tools,故二者 JOIN,
  1110. 只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。
  1111. query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
  1112. """
  1113. sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
  1114. f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro "
  1115. "FROM search_tools s "
  1116. "JOIN (SELECT DISTINCT case_id FROM mode_tools) m ON s.case_id = m.case_id")
  1117. params = ()
  1118. if query_id:
  1119. sql += " WHERE s.query_id=%s"
  1120. params = (query_id,)
  1121. conn = _conn()
  1122. try:
  1123. with conn.cursor() as cur:
  1124. cur.execute(sql, params)
  1125. rows = cur.fetchall()
  1126. finally:
  1127. conn.close()
  1128. cases = [r["case_id"] for r in rows
  1129. if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])]
  1130. return sorted(set(cases))
  1131. def route_tables(knowledge_types):
  1132. """知识类型标签 → 落表列表(有序去重)。
  1133. 工序/能力 → search_process;工具 → search_tools;两者都含写两表;空/None 兜底 search_process。
  1134. 评估是统一一套(同一 llm_evaluation blob),故同帖落多表不重复打分,只是多写一行。"""
  1135. kt = set(knowledge_types or [])
  1136. tables = []
  1137. if kt & {"工具"}:
  1138. tables.append("search_tools")
  1139. if (kt & {"工序", "能力"}) or not tables: # 工序/能力,或没命中任何已知标签 → 兜底 process
  1140. tables.insert(0, "search_process")
  1141. return tables
  1142. # ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)──────────
  1143. def fetch_existing_eval(case_id, table="search_process"):
  1144. """返回该 case 在搜索表里最近一条「有效」评估 blob(任意 query)。
  1145. 评估去重用:同帖在别的相似 query 下评过时,复用其 query 无关分(质量/通用相关/时效),
  1146. 只重算「和 query 相关」。无有效评估(全是 _error 或没评过)返回 None。
  1147. 取最近若干条逐一挑出首个非 error、结构完整的 blob。"""
  1148. table = _search_table(table)
  1149. conn = _conn()
  1150. try:
  1151. with conn.cursor() as cur:
  1152. cur.execute(f"""SELECT llm_evaluation FROM {table}
  1153. WHERE case_id=%s AND llm_evaluation IS NOT NULL
  1154. ORDER BY updated_at DESC, id DESC LIMIT 5""", (case_id,))
  1155. rows = cur.fetchall()
  1156. finally:
  1157. conn.close()
  1158. for r in rows:
  1159. e = _loads(r["llm_evaluation"])
  1160. if isinstance(e, dict) and not e.get("_error") and isinstance(e.get("相关性"), dict):
  1161. return e
  1162. return None
  1163. def fetch_existing_eval_any(case_id):
  1164. """跨两张搜索表找该 case 最近一条有效评估 blob。
  1165. 评估与表无关(统一一套),任一表评过即可复用,避免同帖在两表各评一次。无则 None。"""
  1166. for table in ("search_process", "search_tools"):
  1167. e = fetch_existing_eval(case_id, table)
  1168. if e:
  1169. return e
  1170. return None
  1171. def update_post_eval(query_id, case_id, evaluation, table="search_process"):
  1172. """用新的评估 blob 覆盖某 (query, case) 行的 llm_evaluation,并同步重算派生列
  1173. overall_score、knowledge_type(口径同 upsert_search_posts)。返回受影响行数。"""
  1174. table = _search_table(table)
  1175. overall = overall_score(evaluation)
  1176. ktype = evaluation.get("知识类型") if isinstance(evaluation, dict) else None
  1177. conn = _conn()
  1178. try:
  1179. with conn.cursor() as cur:
  1180. n = cur.execute(
  1181. f"UPDATE {table} SET llm_evaluation=%s, overall_score=%s, knowledge_type=%s "
  1182. "WHERE query_id=%s AND case_id=%s",
  1183. (_j(evaluation), overall, _j(ktype), query_id, case_id))
  1184. return n
  1185. finally:
  1186. conn.close()
  1187. # ── 上传去重:知识库已导入台账(stages/import_process_knowledge.py 用)────────────────
  1188. def fetch_ingested_map(case_id):
  1189. """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。"""
  1190. conn = _conn()
  1191. try:
  1192. with conn.cursor() as cur:
  1193. cur.execute("SELECT proc_index, version FROM knowledge_ingest_log WHERE case_id=%s",
  1194. (case_id,))
  1195. return {r["proc_index"]: r["version"] for r in cur.fetchall()}
  1196. finally:
  1197. conn.close()
  1198. def mark_ingested(case_id, proc_index, version, knowledge_id=None, api_url=None):
  1199. """记一条「已导入」台账(case_id+proc_index 唯一,重导同序号则更新版本/knowledge_id)。"""
  1200. conn = _conn()
  1201. try:
  1202. with conn.cursor() as cur:
  1203. cur.execute("""INSERT INTO knowledge_ingest_log
  1204. (case_id, proc_index, version, knowledge_id, api_url)
  1205. VALUES (%s,%s,%s,%s,%s)
  1206. ON DUPLICATE KEY UPDATE version=VALUES(version),
  1207. knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""",
  1208. (case_id, proc_index, version, knowledge_id, api_url))
  1209. finally:
  1210. conn.close()
  1211. def fetch_tools_ingested_map(case_id):
  1212. """返回 {tool_index: version} —— 该 case 各工具已导入知识库的版本。空表示没传过。
  1213. 工具方向独立台账(tools_ingest_log),与工序的 knowledge_ingest_log 互不干扰。"""
  1214. conn = _conn()
  1215. try:
  1216. with conn.cursor() as cur:
  1217. cur.execute("SELECT tool_index, version FROM tools_ingest_log WHERE case_id=%s",
  1218. (case_id,))
  1219. return {r["tool_index"]: r["version"] for r in cur.fetchall()}
  1220. finally:
  1221. conn.close()
  1222. def mark_tools_ingested(case_id, tool_index, version, knowledge_id=None, api_url=None):
  1223. """记一条工具「已导入」台账(case_id+tool_index 唯一,重导同序号则更新版本/knowledge_id)。"""
  1224. conn = _conn()
  1225. try:
  1226. with conn.cursor() as cur:
  1227. cur.execute("""INSERT INTO tools_ingest_log
  1228. (case_id, tool_index, version, knowledge_id, api_url)
  1229. VALUES (%s,%s,%s,%s,%s)
  1230. ON DUPLICATE KEY UPDATE version=VALUES(version),
  1231. knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""",
  1232. (case_id, tool_index, version, knowledge_id, api_url))
  1233. finally:
  1234. conn.close()
  1235. def fetch_dashboard_rows():
  1236. """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。
  1237. 优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分;
  1238. ② steps 只取每个 case 的最新版本(覆盖度只看最新版),历史/link_ 版本不传 steps。"""
  1239. conn = _conn()
  1240. try:
  1241. with conn.cursor() as cur:
  1242. # 进度分母走「采纳」口径;mode 标方向(工序帖来自 search_process)。
  1243. cols = (f"query_id, case_id, platform, overall_score, publish_time, "
  1244. f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro")
  1245. cur.execute(f"SELECT {cols} FROM search_process WHERE {_REAL_POST}")
  1246. posts = cur.fetchall()
  1247. for p in posts:
  1248. p["mode"] = "process"
  1249. cur.execute(f"SELECT {cols} FROM search_tools")
  1250. st = cur.fetchall()
  1251. for p in st:
  1252. p["mode"] = "tools"
  1253. posts += st
  1254. # 成本/耗时按全部版本计;steps 仅最新版需要 → 非最新版只回 NULL,省传输。
  1255. cur.execute("""SELECT p.id, p.case_id, p.version, p.cost_usd, p.duration_s, p.created_at,
  1256. CASE WHEN p.version = m.maxv THEN p.steps END AS steps
  1257. FROM mode_process p
  1258. JOIN (SELECT t.case_id, t.version AS maxv FROM mode_process t
  1259. JOIN (SELECT case_id, MAX(id) AS mid FROM mode_process
  1260. WHERE LEFT(version,5) <> 'link_' GROUP BY case_id) x
  1261. ON t.id = x.mid) m
  1262. ON p.case_id = m.case_id
  1263. ORDER BY p.id""")
  1264. procs = cur.fetchall()
  1265. cur.execute("""SELECT id, case_id, version, tool_name, substance_scope,
  1266. form_scope, cost_usd, duration_s, created_at
  1267. FROM mode_tools""")
  1268. tools = cur.fetchall()
  1269. finally:
  1270. conn.close()
  1271. for p in posts:
  1272. # 采纳判定:口径同帖子列表(is_adopted),作为「需解构」分母依据
  1273. p["adopted"] = is_adopted_rel(p["overall_score"], p["rel"], p["publish_time"], p["repro"])
  1274. for r in procs:
  1275. r["steps"] = _loads(r["steps"], [])
  1276. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  1277. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  1278. for r in tools:
  1279. r["substance_scope"] = _loads(r["substance_scope"], [])
  1280. r["form_scope"] = _loads(r["form_scope"], [])
  1281. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  1282. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  1283. return posts, procs, tools
  1284. def check():
  1285. conn = _conn()
  1286. try:
  1287. with conn.cursor() as cur:
  1288. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  1289. cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
  1290. print(f"{t}: {cur.fetchone()['n']} 行")
  1291. finally:
  1292. conn.close()
  1293. if __name__ == "__main__":
  1294. cmd = sys.argv[1] if len(sys.argv) > 1 else ""
  1295. if cmd == "init":
  1296. init_tables()
  1297. elif cmd == "check":
  1298. check()
  1299. elif cmd == "clear":
  1300. clear_tables()
  1301. else:
  1302. print("用法:\n python db.py init # 建表\n python db.py check # 四表行数\n python db.py clear # 清空四表数据")