db.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. # -*- coding: utf-8 -*-
  2. """mode_workflow · MySQL 持久化(DB 为唯一事实源)
  3. ================================================================================
  4. 读 .env 的 MYSQL_* 连接 MySQL。四张表:
  5. search_process —— 每行一个 (query, 帖子):工序方向的搜索 + llm 评估结果
  6. search_tools —— 同结构,工具方向的搜索结果(方向由表区分,不再用 mode_type 列)
  7. mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
  8. mode_tools —— 每行一个解构出的工具
  9. 与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
  10. 不做"失败不阻断"。读侧保留防御(返回空/None)。
  11. 用法:
  12. python db.py init # 建表(幂等)
  13. python db.py check # 打印四表行数
  14. python db.py clear # 清空四表数据(TRUNCATE)
  15. """
  16. import json
  17. import os
  18. import sys
  19. from datetime import datetime
  20. from pathlib import Path
  21. PROJECT_ROOT = Path(__file__).resolve().parents[2]
  22. sys.path.insert(0, str(PROJECT_ROOT))
  23. from dotenv import load_dotenv
  24. load_dotenv()
  25. import pymysql
  26. from pymysql.cursors import DictCursor
  27. from dbutils.pooled_db import PooledDB
  28. # ── 连接池 ──────────────────────────────────────────────────────────────────
  29. # MySQL 是远程 RDS,每次 pymysql.connect() 的 TCP+鉴权握手 ~0.5s。旧实现每个
  30. # 请求新建一条连接,一次"点开帖子"要 2~3 个请求 = 2~3 次握手 ≈ 1s。改用连接池
  31. # 复用长连接后,握手只在池初始化时各发生一次,后续取连接近乎零开销。
  32. # server.py 是 ThreadingHTTPServer(每请求一线程),PooledDB 线程安全,正好匹配。
  33. # 注意:fetch_* 里的 conn.close() 在池连接上语义是"归还池中"而非真正断开。
  34. _POOL = None
  35. def _pool():
  36. global _POOL
  37. if _POOL is None:
  38. if not os.getenv("MYSQL_HOST"):
  39. raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
  40. _POOL = PooledDB(
  41. creator=pymysql,
  42. mincached=2, # 启动即预热 2 条,首点不再吃冷握手
  43. maxcached=5, # 空闲保留上限
  44. maxconnections=20, # 并发上限(ThreadingHTTPServer 线程数)
  45. blocking=True, # 连接耗尽时等待而非报错
  46. ping=1, # 取用前 ping,自动剔除被 RDS 掐断的死连接
  47. host=os.getenv("MYSQL_HOST"),
  48. port=int(os.getenv("MYSQL_PORT", 3306)),
  49. user=os.getenv("MYSQL_USER"),
  50. password=os.getenv("MYSQL_PASSWORD"),
  51. database=os.getenv("MYSQL_DATABASE"),
  52. charset="utf8mb4", cursorclass=DictCursor,
  53. autocommit=True, connect_timeout=10,
  54. )
  55. return _POOL
  56. def _conn():
  57. """从池取一条连接;用法不变(with cursor / conn.close() 归还池)。"""
  58. return _pool().connection()
  59. # ── DDL ──────────────────────────────────────────────────────────────────────
  60. SEARCH_TABLES = {"process": "search_process", "tools": "search_tools"}
  61. MODE_TABLES = {"process": "mode_process", "tools": "mode_tools"}
  62. def _search_table(mode_or_table):
  63. """mode(process/tools)或表名 → 合法搜索表名(白名单,防 SQL 注入)。"""
  64. t = SEARCH_TABLES.get(mode_or_table, mode_or_table)
  65. if t not in SEARCH_TABLES.values():
  66. raise ValueError(f"未知搜索表/模式: {mode_or_table!r}")
  67. return t
  68. def _mode_table(mode_or_table):
  69. """mode(process/tools)或表名 → 合法解构表名(白名单,防 SQL 注入)。"""
  70. t = MODE_TABLES.get(mode_or_table, mode_or_table)
  71. if t not in MODE_TABLES.values():
  72. raise ValueError(f"未知解构表/模式: {mode_or_table!r}")
  73. return t
  74. def _ddl_search(table, direction):
  75. return f"""
  76. CREATE TABLE IF NOT EXISTS {table} (
  77. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  78. query_id VARCHAR(32) NOT NULL COMMENT 'q0000',
  79. query_text VARCHAR(512) NULL,
  80. case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
  81. platform VARCHAR(32) NULL,
  82. channel_content_id VARCHAR(128) NULL,
  83. title VARCHAR(512) NULL,
  84. url VARCHAR(1024) NULL,
  85. content_type VARCHAR(32) NULL,
  86. body LONGTEXT NULL,
  87. images JSON NULL,
  88. videos JSON NULL,
  89. like_count INT NULL,
  90. publish_time VARCHAR(64) NULL,
  91. quality_score FLOAT NULL COMMENT 'post._quality_score',
  92. quality_grade VARCHAR(8) NULL,
  93. found_by JSON NULL COMMENT '命中的措辞数组',
  94. knowledge_type JSON NULL COMMENT '["能力","工序","工具"] 子集',
  95. overall_score FLOAT NULL COMMENT '(相关均值+质量均值)/2',
  96. llm_evaluation JSON NULL COMMENT '评估全量 blob',
  97. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  98. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  99. UNIQUE KEY uk_qid_case (query_id, case_id),
  100. KEY idx_platform (platform)
  101. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果({direction})';
  102. """
  103. DDL_PROCESS = """
  104. CREATE TABLE IF NOT EXISTS mode_process (
  105. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  106. query_id VARCHAR(32) NOT NULL,
  107. case_id VARCHAR(128) NOT NULL,
  108. platform VARCHAR(32) NULL,
  109. post_title VARCHAR(512) NULL,
  110. source JSON NULL COMMENT '解构返回的 source 块',
  111. procedure_id VARCHAR(16) NULL COMMENT 'p1,p2…',
  112. name VARCHAR(255) NULL,
  113. purpose TEXT NULL,
  114. category VARCHAR(32) NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
  115. declarations JSON NULL,
  116. type_registry JSON NULL,
  117. steps JSON NULL COMMENT '步骤数组全量',
  118. step_count INT NULL,
  119. tools_used JSON NULL COMMENT '从 steps[].via 去重提取',
  120. model VARCHAR(64) NULL,
  121. version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM,保留历史;link_* 为跨 query 复制(cost=0)',
  122. cost_usd DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
  123. duration_s FLOAT NULL,
  124. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  125. KEY idx_case_ver (case_id, version),
  126. KEY idx_qid (query_id)
  127. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
  128. """
  129. DDL_TOOLS = """
  130. CREATE TABLE IF NOT EXISTS mode_tools (
  131. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  132. query_id VARCHAR(32) NOT NULL,
  133. case_id VARCHAR(128) NOT NULL,
  134. platform VARCHAR(32) NULL,
  135. post_title VARCHAR(512) NULL,
  136. tool_name VARCHAR(255) NULL,
  137. substance_scope JSON NULL COMMENT '实质作用域(数组)',
  138. form_scope JSON NULL COMMENT '形式作用域(数组或null)',
  139. creation_layer VARCHAR(32) NULL COMMENT '制作层/创作层',
  140. source_link VARCHAR(1024) NULL,
  141. input_desc TEXT NULL,
  142. output_desc TEXT NULL,
  143. usage_json JSON NULL,
  144. cases_json JSON NULL,
  145. defects_json JSON NULL,
  146. updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
  147. model VARCHAR(64) NULL,
  148. version VARCHAR(32) NULL COMMENT 'v_MMDDHHMM;link_* 为跨 query 复制(cost=0)',
  149. cost_usd DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
  150. duration_s FLOAT NULL,
  151. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  152. KEY idx_case_ver (case_id, version),
  153. KEY idx_qid (query_id),
  154. KEY idx_tool_name (tool_name)
  155. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
  156. """
  157. # 工序知识「已导入知识库」台账:防重复上传(import_process_knowledge.py 用)。
  158. # 每条知识 = 某 case 的某个工序(proc_index 1-based)。记录导入时的 mode_process 版本:
  159. # 版本变了(重解构)说明内容已变,应重导;版本不变即视为「已传过」,跳过。
  160. # 选 DB 台账而非本地文件,是为了换机器/换链接后也不会重复写知识库。
  161. DDL_INGEST_LOG = """
  162. CREATE TABLE IF NOT EXISTS knowledge_ingest_log (
  163. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  164. case_id VARCHAR(128) NOT NULL,
  165. proc_index INT NOT NULL COMMENT '工序序号(1-based),对齐导入脚本枚举',
  166. version VARCHAR(32) NULL COMMENT '导入时 mode_process 版本;变了应重导',
  167. knowledge_id VARCHAR(128) NULL COMMENT '接口返回的 knowledge_id',
  168. api_url VARCHAR(255) NULL,
  169. ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  170. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  171. UNIQUE KEY uk_case_proc (case_id, proc_index)
  172. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序知识已导入台账(防重复上传)';
  173. """
  174. def init_tables():
  175. conn = _conn()
  176. try:
  177. with conn.cursor() as cur:
  178. cur.execute(_ddl_search("search_process", "工序方向"))
  179. cur.execute(_ddl_search("search_tools", "工具方向"))
  180. cur.execute(DDL_PROCESS)
  181. cur.execute(DDL_TOOLS)
  182. cur.execute(DDL_INGEST_LOG)
  183. # 历史库迁移:version 由 VARCHAR(16) 放宽到 32,容纳 link_v_mopN_* 复制版本。
  184. # MODIFY 幂等(已是 32 则 MySQL 元数据无操作),建表后表必存在,可安全执行。
  185. for t in ("mode_process", "mode_tools"):
  186. cur.execute(f"ALTER TABLE {t} MODIFY COLUMN version VARCHAR(32) NULL")
  187. print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools, knowledge_ingest_log")
  188. finally:
  189. conn.close()
  190. def clear_tables():
  191. """清空四张表的数据(TRUNCATE,表结构保留)。"""
  192. conn = _conn()
  193. try:
  194. with conn.cursor() as cur:
  195. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  196. cur.execute(f"TRUNCATE TABLE {t}")
  197. print(f"🧹 已清空 {t}")
  198. finally:
  199. conn.close()
  200. # ── 工具函数 ──────────────────────────────────────────────────────────────────
  201. def _loads(v, default=None):
  202. """pymysql 的 JSON 列可能返回字符串,统一解析。"""
  203. if v is None:
  204. return default
  205. if isinstance(v, (list, dict)):
  206. return v
  207. try:
  208. return json.loads(v)
  209. except Exception:
  210. return default
  211. def _j(v):
  212. """写入 JSON 列:None 保持 NULL,其余 dumps。"""
  213. return None if v is None else json.dumps(v, ensure_ascii=False)
  214. def _collect_scores(node):
  215. """递归收集嵌套评估里所有「得分」。LLM 直出的得分多为字符串("1"/"4"),
  216. 个别为数字(如 时效性 10),统一按 float 解析;非数值(如 "N/A")跳过不计入。"""
  217. out = []
  218. if isinstance(node, dict):
  219. for k, v in node.items():
  220. if k == "得分":
  221. try:
  222. out.append(float(v))
  223. except (TypeError, ValueError):
  224. pass
  225. else:
  226. out.extend(_collect_scores(v))
  227. elif isinstance(node, list):
  228. for v in node:
  229. out.extend(_collect_scores(v))
  230. return out
  231. def overall_score(e):
  232. """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
  233. parts = []
  234. for key in ("相关性", "质量"):
  235. scores = _collect_scores((e or {}).get(key))
  236. if scores:
  237. parts.append(sum(scores) / len(scores))
  238. return round(sum(parts) / len(parts), 2) if parts else None
  239. def _recency_hard(date_str):
  240. """硬时效(同 mode_procedure/server.py:_recency_hard):半年内=3 / 两年内=2 / 更早=1。
  241. publish_time 头 10 字符按 YYYY-MM-DD 解析,失败返回 None(不参与判定)。"""
  242. try:
  243. d = datetime.strptime(str(date_str or "")[:10], "%Y-%m-%d")
  244. except (ValueError, TypeError):
  245. return None
  246. days = (datetime.now() - d).days
  247. if days <= 180:
  248. return 3
  249. if days <= 730:
  250. return 2
  251. return 1
  252. def _fixed_dim_score(evaluation, name):
  253. """取 质量.固定维度.<name>.得分 标量,缺失/非数值返回 None(不参与判定)。"""
  254. v = (((evaluation or {}).get("质量") or {}).get("固定维度") or {}).get(name)
  255. if isinstance(v, dict):
  256. v = v.get("得分")
  257. try:
  258. return float(v) if v is not None else None
  259. except (TypeError, ValueError):
  260. return None
  261. def is_adopted(overall, evaluation, publish_time):
  262. """采纳/命中判定,口径对齐 mode_procedure 的 decision=="report":
  263. 制作相关性<4、可复现性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。
  264. (意图可控性暂只采分不设门槛,留待阈值标定后再开。)"""
  265. rel = None
  266. v = ((evaluation or {}).get("相关性") or {}).get("和内容制作知识相关")
  267. if isinstance(v, dict):
  268. v = v.get("得分")
  269. try:
  270. rel = float(v) if v is not None else None
  271. except (TypeError, ValueError):
  272. rel = None
  273. if rel is not None and rel < 4:
  274. return False
  275. repro = _fixed_dim_score(evaluation, "可复现性")
  276. if repro is not None and repro < 4:
  277. return False
  278. rh = _recency_hard(publish_time)
  279. if rh is not None and rh < 2:
  280. return False
  281. if overall is not None and float(overall) < 6:
  282. return False
  283. return True
  284. def is_adopted_rel(overall, rel, publish_time, repro=None):
  285. """is_adopted 的轻量版:相关性得分(rel)、可复现性(repro)已由 SQL JSON_EXTRACT 直接取出,
  286. 无需传输/解析整块 llm_evaluation。判定口径与 is_adopted 完全一致。"""
  287. try:
  288. rel = float(rel) if rel is not None else None
  289. except (TypeError, ValueError):
  290. rel = None
  291. if rel is not None and rel < 4:
  292. return False
  293. try:
  294. repro = float(repro) if repro is not None else None
  295. except (TypeError, ValueError):
  296. repro = None
  297. if repro is not None and repro < 4:
  298. return False
  299. rh = _recency_hard(publish_time)
  300. if rh is not None and rh < 2:
  301. return False
  302. if overall is not None and float(overall) < 6:
  303. return False
  304. return True
  305. # ── search_process / search_tools ────────────────────────────────────────────
  306. def upsert_search_posts(query_id, query_text, results, table="search_process"):
  307. """一组搜索结果写入指定搜索表(按 (query_id, case_id) upsert)。返回写入条数。
  308. table:search_process(工序方向) / search_tools(工具方向)。"""
  309. table = _search_table(table)
  310. if not results:
  311. return 0
  312. rows = []
  313. for r in results:
  314. post = r.get("post") or {}
  315. e = r.get("llm_evaluation") or {}
  316. rows.append((
  317. query_id, query_text, r.get("case_id"), r.get("platform"),
  318. r.get("channel_content_id"),
  319. (post.get("title") or post.get("desc") or "")[:500],
  320. r.get("source_url"), post.get("content_type"),
  321. post.get("body_text") or post.get("desc") or "",
  322. _j(post.get("images") or []), _j(post.get("videos") or []),
  323. post.get("like_count"),
  324. str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
  325. post.get("_quality_score"), post.get("_quality_grade"),
  326. _j(r.get("found_by_queries") or []),
  327. _j(e.get("知识类型") or []),
  328. overall_score(e),
  329. _j(e),
  330. ))
  331. sql = f"""
  332. INSERT INTO {table}
  333. (query_id, query_text, case_id, platform, channel_content_id, title, url,
  334. content_type, body, images, videos, like_count, publish_time,
  335. quality_score, quality_grade, found_by, knowledge_type,
  336. overall_score, llm_evaluation)
  337. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  338. ON DUPLICATE KEY UPDATE
  339. query_text=VALUES(query_text), platform=VALUES(platform),
  340. channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
  341. content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
  342. videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
  343. quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
  344. found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
  345. overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
  346. """
  347. conn = _conn()
  348. try:
  349. with conn.cursor() as cur:
  350. cur.executemany(sql, rows)
  351. return len(rows)
  352. finally:
  353. conn.close()
  354. def fetch_queries(mode="process"):
  355. """某方向搜索表的 query 列表 + 帖子数 + 采纳/命中数 + 解构进度。"""
  356. table = _search_table(mode)
  357. conn = _conn()
  358. try:
  359. with conn.cursor() as cur:
  360. cur.execute(f"""SELECT query_id, MAX(query_text) AS query_text,
  361. COUNT(*) AS post_count
  362. FROM {table} GROUP BY query_id ORDER BY query_id""")
  363. queries = cur.fetchall()
  364. cur.execute(f"""SELECT query_id, overall_score, llm_evaluation, publish_time
  365. FROM {table}""")
  366. hits = {}
  367. for r in cur.fetchall():
  368. if is_adopted(r["overall_score"], _loads(r["llm_evaluation"]), r["publish_time"]):
  369. hits[r["query_id"]] = hits.get(r["query_id"], 0) + 1
  370. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
  371. np = {r["query_id"]: r["n"] for r in cur.fetchall()}
  372. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
  373. nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
  374. finally:
  375. conn.close()
  376. for q in queries:
  377. q["hit_count"] = hits.get(q["query_id"], 0)
  378. q["process_done"] = np.get(q["query_id"], 0)
  379. q["tools_done"] = nt.get(q["query_id"], 0)
  380. return queries
  381. def fetch_posts(query_id, mode="process"):
  382. """某方向搜索表里某 query 的全部帖子(JSON 列已解析),带 has_process/has_tools 标记。"""
  383. table = _search_table(mode)
  384. conn = _conn()
  385. try:
  386. with conn.cursor() as cur:
  387. cur.execute(f"""SELECT * FROM {table} WHERE query_id=%s
  388. ORDER BY overall_score DESC, id""", (query_id,))
  389. rows = cur.fetchall()
  390. cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
  391. hp = {r["case_id"] for r in cur.fetchall()}
  392. cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
  393. ht = {r["case_id"] for r in cur.fetchall()}
  394. finally:
  395. conn.close()
  396. for r in rows:
  397. for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
  398. r[col] = _loads(r[col])
  399. r["adopted"] = is_adopted(r["overall_score"], r["llm_evaluation"], r["publish_time"])
  400. r["has_process"] = r["case_id"] in hp
  401. r["has_tools"] = r["case_id"] in ht
  402. r.pop("created_at", None); r.pop("updated_at", None)
  403. return rows
  404. def fetch_post(query_id, case_id, table="search_process"):
  405. """指定搜索表的单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
  406. table = _search_table(table)
  407. conn = _conn()
  408. try:
  409. with conn.cursor() as cur:
  410. cur.execute(f"SELECT * FROM {table} WHERE query_id=%s AND case_id=%s",
  411. (query_id, case_id))
  412. row = cur.fetchone()
  413. finally:
  414. conn.close()
  415. if not row:
  416. return None
  417. for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
  418. row[col] = _loads(row[col])
  419. return row
  420. # ── mode_process ─────────────────────────────────────────────────────────────
  421. def replace_process(query_id, case_id, platform, post_title, payload,
  422. model, version, cost_usd, duration_s):
  423. """写入一帖某版本的工序解构结果(payload = {source, procedures})。
  424. 删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
  425. source = payload.get("source")
  426. procedures = payload.get("procedures") or []
  427. conn = _conn()
  428. try:
  429. with conn.cursor() as cur:
  430. cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
  431. (case_id, version))
  432. if procedures:
  433. rows = []
  434. for p in procedures:
  435. steps = p.get("steps") or []
  436. vias = []
  437. for s in steps:
  438. v = s.get("via")
  439. if v and v not in vias:
  440. vias.append(v)
  441. rows.append((
  442. query_id, case_id, platform, (post_title or "")[:500],
  443. _j(source), p.get("id"), (p.get("name") or "")[:250],
  444. p.get("purpose"), p.get("category"),
  445. _j(p.get("declarations")), _j(p.get("type_registry")),
  446. _j(steps), len(steps), _j(vias),
  447. model, version, cost_usd, duration_s,
  448. ))
  449. cur.executemany("""
  450. INSERT INTO mode_process
  451. (query_id, case_id, platform, post_title, source, procedure_id, name,
  452. purpose, category, declarations, type_registry, steps, step_count,
  453. tools_used, model, version, cost_usd, duration_s)
  454. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  455. """, rows)
  456. return len(procedures)
  457. finally:
  458. conn.close()
  459. def fetch_process_versions(case_id):
  460. conn = _conn()
  461. try:
  462. with conn.cursor() as cur:
  463. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  464. FROM mode_process WHERE case_id=%s
  465. GROUP BY version ORDER BY version DESC""", (case_id,))
  466. return cur.fetchall()
  467. finally:
  468. conn.close()
  469. def fetch_process(case_id, version=None):
  470. """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
  471. conn = _conn()
  472. try:
  473. with conn.cursor() as cur:
  474. if version is None:
  475. cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
  476. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  477. row = cur.fetchone()
  478. if not row:
  479. return None
  480. version = row["version"]
  481. cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
  482. ORDER BY id""", (case_id, version))
  483. rows = cur.fetchall()
  484. finally:
  485. conn.close()
  486. return _proc_payload(case_id, version, rows)
  487. def _proc_payload(case_id, version, rows):
  488. """mode_process 行集 → {case_id, version, …, procedures:[...]}。无行返回 None。"""
  489. if not rows:
  490. return None
  491. procedures = [{
  492. "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
  493. "category": r["category"], "declarations": _loads(r["declarations"]),
  494. "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
  495. "tools_used": _loads(r["tools_used"], []),
  496. } for r in rows]
  497. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  498. "title": rows[0]["post_title"], "model": rows[0]["model"],
  499. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  500. "duration_s": rows[0]["duration_s"],
  501. "source": _loads(rows[0]["source"]), "procedures": procedures}
  502. # ── mode_tools ───────────────────────────────────────────────────────────────
  503. def replace_tools(query_id, case_id, platform, post_title, tools,
  504. model, version, cost_usd, duration_s):
  505. """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
  506. conn = _conn()
  507. try:
  508. with conn.cursor() as cur:
  509. cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
  510. (case_id, version))
  511. if tools:
  512. rows = [(
  513. query_id, case_id, platform, (post_title or "")[:500],
  514. (t.get("工具名称") or "")[:250],
  515. _j(t.get("实质作用域")), _j(t.get("形式作用域")),
  516. t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
  517. _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
  518. t.get("最新更新时间"), model, version, cost_usd, duration_s,
  519. ) for t in tools]
  520. cur.executemany("""
  521. INSERT INTO mode_tools
  522. (query_id, case_id, platform, post_title, tool_name, substance_scope,
  523. form_scope, creation_layer, source_link, input_desc, output_desc,
  524. usage_json, cases_json, defects_json, updated_time, model, version,
  525. cost_usd, duration_s)
  526. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  527. """, rows)
  528. return len(tools)
  529. finally:
  530. conn.close()
  531. def fetch_tools_versions(case_id):
  532. conn = _conn()
  533. try:
  534. with conn.cursor() as cur:
  535. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  536. FROM mode_tools WHERE case_id=%s
  537. GROUP BY version ORDER BY version DESC""", (case_id,))
  538. return cur.fetchall()
  539. finally:
  540. conn.close()
  541. def fetch_tools(case_id, version=None):
  542. """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
  543. conn = _conn()
  544. try:
  545. with conn.cursor() as cur:
  546. if version is None:
  547. cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
  548. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  549. row = cur.fetchone()
  550. if not row:
  551. return None
  552. version = row["version"]
  553. cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
  554. ORDER BY id""", (case_id, version))
  555. rows = cur.fetchall()
  556. finally:
  557. conn.close()
  558. return _tools_payload(case_id, version, rows)
  559. def _tools_payload(case_id, version, rows):
  560. """mode_tools 行集 → {case_id, version, …, tools:[...]}。无行返回 None。"""
  561. if not rows:
  562. return None
  563. tools = [{
  564. "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
  565. "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
  566. "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
  567. "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
  568. "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
  569. } for r in rows]
  570. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  571. "title": rows[0]["post_title"], "model": rows[0]["model"],
  572. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  573. "duration_s": rows[0]["duration_s"],
  574. "tool_count": len(tools), "tools": tools}
  575. # ── 点击帖子合一查询(单连接,最少往返;远程 RDS 每次往返 ~80ms,故按次数优化)──
  576. def fetch_extract(mode, case_id, version=None):
  577. """一次取版本列表 + 解构详情,复用同一条池连接、最少往返。
  578. 返回 {versions, data, missing}。mode: process / tools。"""
  579. is_proc = mode != "tools"
  580. mtable = _mode_table("process" if is_proc else "tools")
  581. conn = _conn()
  582. try:
  583. with conn.cursor() as cur:
  584. cur.execute(f"""SELECT version, COUNT(*) AS n, MAX(model) AS model
  585. FROM {mtable} WHERE case_id=%s
  586. GROUP BY version ORDER BY version DESC""", (case_id,))
  587. versions = cur.fetchall()
  588. # 详情:把"取最新版本"折进同一条 SQL,版本指定时直接用;省一次往返。
  589. target = version or (versions[0]["version"] if versions else None)
  590. rows = []
  591. if target is not None:
  592. cur.execute(f"SELECT * FROM {mtable} WHERE case_id=%s AND version=%s ORDER BY id",
  593. (case_id, target))
  594. rows = cur.fetchall()
  595. finally:
  596. conn.close()
  597. payload = (_proc_payload if is_proc else _tools_payload)(case_id, target, rows)
  598. return {"versions": versions, "data": payload, "missing": payload is None}
  599. # ── 跨 query 去重 / link 复制(方案A:解构前先去重,避免重复花钱)──────────────
  600. # case_id 是帖子物理身份(platform_channelContentId),与 query 无关。同一帖被多个
  601. # query 搜到时只需真实解构一次;其余 query 用 link_* 复制行补齐关联(cost=0)。
  602. def latest_real_version(case_id, mode="process"):
  603. """该 case 是否已有「真实」解构(任意 query;link_* 是复制品,不算源)。
  604. 返回最新一行 {"version","query_id"} 或 None。给解构前去重判定用。"""
  605. table = _mode_table(mode)
  606. conn = _conn()
  607. try:
  608. with conn.cursor() as cur:
  609. cur.execute(f"""SELECT version, query_id FROM {table}
  610. WHERE case_id=%s AND LEFT(version,5) <> 'link_'
  611. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  612. return cur.fetchone()
  613. finally:
  614. conn.close()
  615. def link_process(query_id, case_id, mode="process"):
  616. """把 case 在别处最新「真实」版本的解构行复制到目标 query
  617. (version='link_'+源版本, cost_usd=0)。幂等(先删目标同版本)。
  618. 返回复制行数;该 case 从未真实解构过则返回 0(无源可复制)。"""
  619. table = _mode_table(mode)
  620. conn = _conn()
  621. try:
  622. with conn.cursor() as cur:
  623. cur.execute(f"""SELECT version FROM {table}
  624. WHERE case_id=%s AND LEFT(version,5) <> 'link_'
  625. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  626. r = cur.fetchone()
  627. if not r:
  628. return 0
  629. srcver = r["version"]
  630. newver = ("link_" + srcver)[:32] # version 列 VARCHAR(32)
  631. # 复制除自增 id / 时间戳外的全部列,改写 query_id / version / cost。
  632. cur.execute(f"SHOW COLUMNS FROM {table}")
  633. cols = [c["Field"] for c in cur.fetchall()
  634. if c["Field"] not in ("id", "created_at", "updated_at")]
  635. cur.execute(f"SELECT {','.join(cols)} FROM {table} WHERE case_id=%s AND version=%s",
  636. (case_id, srcver))
  637. rows = cur.fetchall()
  638. cur.execute(f"DELETE FROM {table} WHERE query_id=%s AND case_id=%s AND version=%s",
  639. (query_id, case_id, newver))
  640. for row in rows:
  641. row = dict(row)
  642. row["query_id"] = query_id
  643. row["version"] = newver
  644. row["cost_usd"] = 0
  645. cur.execute(
  646. f"INSERT INTO {table} ({','.join(cols)}) VALUES ({','.join(['%s']*len(cols))})",
  647. [row[k] for k in cols])
  648. return len(rows)
  649. finally:
  650. conn.close()
  651. # ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
  652. # 采纳判定只需「和内容制作知识相关」的得分,用 SQL JSON_EXTRACT 直取这一个标量,
  653. # 避免把整块 llm_evaluation(本库 ~1.5MB)拉到 Python 再解析。得分可能直接是数字,
  654. # 也可能裹在 {"得分": x} 里,COALESCE 两条路径覆盖两种存法,口径同 is_adopted。
  655. _REL_SQL = ("JSON_UNQUOTE(COALESCE("
  656. "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\".\"得分\"'),"
  657. "JSON_EXTRACT(llm_evaluation,'$.\"相关性\".\"和内容制作知识相关\"')))")
  658. # 可复现性门槛同样需要标量直取(口径同 is_adopted 的 _fixed_dim_score)。
  659. _REPRO_SQL = ("JSON_UNQUOTE(COALESCE("
  660. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\".\"得分\"'),"
  661. "JSON_EXTRACT(llm_evaluation,'$.\"质量\".\"固定维度\".\"可复现性\"')))")
  662. def fetch_adopted_process_cases(query_id=None):
  663. """返回「已采纳且有工序解构」的 case_id 列表(供知识上传脚本用)。
  664. 采纳是帖子级属性(评估存在 search_process),工序解构存在 mode_process,故二者 JOIN:
  665. 只取两边都有的 case,再用 is_adopted_rel(口径同 Dashboard)在 Python 侧过滤。
  666. relevance 得分由 _REL_SQL 直取标量,不传整块 llm_evaluation。
  667. query_id 给定时只看该搜索任务下的 case。返回去重、按 case_id 排序的列表。
  668. """
  669. sql = (f"SELECT DISTINCT s.case_id, s.overall_score, s.publish_time, "
  670. f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro "
  671. "FROM search_process s "
  672. "JOIN (SELECT DISTINCT case_id FROM mode_process) m ON s.case_id = m.case_id")
  673. params = ()
  674. if query_id:
  675. sql += " WHERE s.query_id=%s"
  676. params = (query_id,)
  677. conn = _conn()
  678. try:
  679. with conn.cursor() as cur:
  680. cur.execute(sql, params)
  681. rows = cur.fetchall()
  682. finally:
  683. conn.close()
  684. cases = [r["case_id"] for r in rows
  685. if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"])]
  686. return sorted(set(cases))
  687. # ── 评估去重:复用 query 无关分,只重算 query 相关分(search_eval.py 用)──────────
  688. def fetch_existing_eval(case_id, table="search_process"):
  689. """返回该 case 在搜索表里最近一条「有效」评估 blob(任意 query)。
  690. 评估去重用:同帖在别的相似 query 下评过时,复用其 query 无关分(质量/通用相关/时效),
  691. 只重算「和 query 相关」。无有效评估(全是 _error 或没评过)返回 None。
  692. 取最近若干条逐一挑出首个非 error、结构完整的 blob。"""
  693. table = _search_table(table)
  694. conn = _conn()
  695. try:
  696. with conn.cursor() as cur:
  697. cur.execute(f"""SELECT llm_evaluation FROM {table}
  698. WHERE case_id=%s AND llm_evaluation IS NOT NULL
  699. ORDER BY updated_at DESC, id DESC LIMIT 5""", (case_id,))
  700. rows = cur.fetchall()
  701. finally:
  702. conn.close()
  703. for r in rows:
  704. e = _loads(r["llm_evaluation"])
  705. if isinstance(e, dict) and not e.get("_error") and isinstance(e.get("相关性"), dict):
  706. return e
  707. return None
  708. # ── 上传去重:知识库已导入台账(import_process_knowledge.py 用)────────────────
  709. def fetch_ingested_map(case_id):
  710. """返回 {proc_index: version} —— 该 case 各工序已导入知识库的版本。空表示没传过。"""
  711. conn = _conn()
  712. try:
  713. with conn.cursor() as cur:
  714. cur.execute("SELECT proc_index, version FROM knowledge_ingest_log WHERE case_id=%s",
  715. (case_id,))
  716. return {r["proc_index"]: r["version"] for r in cur.fetchall()}
  717. finally:
  718. conn.close()
  719. def mark_ingested(case_id, proc_index, version, knowledge_id=None, api_url=None):
  720. """记一条「已导入」台账(case_id+proc_index 唯一,重导同序号则更新版本/knowledge_id)。"""
  721. conn = _conn()
  722. try:
  723. with conn.cursor() as cur:
  724. cur.execute("""INSERT INTO knowledge_ingest_log
  725. (case_id, proc_index, version, knowledge_id, api_url)
  726. VALUES (%s,%s,%s,%s,%s)
  727. ON DUPLICATE KEY UPDATE version=VALUES(version),
  728. knowledge_id=VALUES(knowledge_id), api_url=VALUES(api_url)""",
  729. (case_id, proc_index, version, knowledge_id, api_url))
  730. finally:
  731. conn.close()
  732. def fetch_dashboard_rows():
  733. """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。
  734. 优化:① 不传 llm_evaluation 整块,SQL 只取采纳判定要的相关性得分;
  735. ② steps 只取每个 case 的最新版本(覆盖度只看最新版),历史/link_ 版本不传 steps。"""
  736. conn = _conn()
  737. try:
  738. with conn.cursor() as cur:
  739. # 进度分母走「采纳」口径;mode 标方向(工序帖来自 search_process)。
  740. cols = (f"query_id, case_id, platform, overall_score, publish_time, "
  741. f"{_REL_SQL} AS rel, {_REPRO_SQL} AS repro")
  742. cur.execute(f"SELECT {cols} FROM search_process")
  743. posts = cur.fetchall()
  744. for p in posts:
  745. p["mode"] = "process"
  746. cur.execute(f"SELECT {cols} FROM search_tools")
  747. st = cur.fetchall()
  748. for p in st:
  749. p["mode"] = "tools"
  750. posts += st
  751. # 成本/耗时按全部版本计;steps 仅最新版需要 → 非最新版只回 NULL,省传输。
  752. cur.execute("""SELECT p.case_id, p.version, p.cost_usd, p.duration_s, p.created_at,
  753. CASE WHEN p.version = m.maxv THEN p.steps END AS steps
  754. FROM mode_process p
  755. JOIN (SELECT case_id, MAX(version) AS maxv
  756. FROM mode_process GROUP BY case_id) m
  757. ON p.case_id = m.case_id
  758. ORDER BY p.id""")
  759. procs = cur.fetchall()
  760. cur.execute("""SELECT case_id, version, tool_name, substance_scope,
  761. form_scope, cost_usd, duration_s, created_at
  762. FROM mode_tools""")
  763. tools = cur.fetchall()
  764. finally:
  765. conn.close()
  766. for p in posts:
  767. # 采纳判定:口径同帖子列表(is_adopted),作为「需解构」分母依据
  768. p["adopted"] = is_adopted_rel(p["overall_score"], p["rel"], p["publish_time"], p["repro"])
  769. for r in procs:
  770. r["steps"] = _loads(r["steps"], [])
  771. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  772. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  773. for r in tools:
  774. r["substance_scope"] = _loads(r["substance_scope"], [])
  775. r["form_scope"] = _loads(r["form_scope"], [])
  776. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  777. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  778. return posts, procs, tools
  779. def check():
  780. conn = _conn()
  781. try:
  782. with conn.cursor() as cur:
  783. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  784. cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
  785. print(f"{t}: {cur.fetchone()['n']} 行")
  786. finally:
  787. conn.close()
  788. if __name__ == "__main__":
  789. cmd = sys.argv[1] if len(sys.argv) > 1 else ""
  790. if cmd == "init":
  791. init_tables()
  792. elif cmd == "check":
  793. check()
  794. elif cmd == "clear":
  795. clear_tables()
  796. else:
  797. print("用法:\n python db.py init # 建表\n python db.py check # 四表行数\n python db.py clear # 清空四表数据")