db.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611
  1. # -*- coding: utf-8 -*-
  2. """mode_workflow · MySQL 持久化(DB 为唯一事实源)
  3. ================================================================================
  4. 读 .env 的 MYSQL_* 连接 MySQL。四张表:
  5. search_process —— 每行一个 (query, 帖子):工序方向的搜索 + llm 评估结果
  6. search_tools —— 同结构,工具方向的搜索结果(方向由表区分,不再用 mode_type 列)
  7. mode_process —— 每行一个解构出的工序(steps 等嵌套结构存 JSON 列)
  8. mode_tools —— 每行一个解构出的工具
  9. 与旧 fixed_query_eval/db.py 的关键差异:本系统 DB 是主存储,写入失败直接 raise,
  10. 不做"失败不阻断"。读侧保留防御(返回空/None)。
  11. 用法:
  12. python db.py init # 建表(幂等)
  13. python db.py check # 打印四表行数
  14. python db.py clear # 清空四表数据(TRUNCATE)
  15. """
  16. import json
  17. import os
  18. import sys
  19. from datetime import datetime
  20. from pathlib import Path
  21. PROJECT_ROOT = Path(__file__).resolve().parents[2]
  22. sys.path.insert(0, str(PROJECT_ROOT))
  23. from dotenv import load_dotenv
  24. load_dotenv()
  25. import pymysql
  26. from pymysql.cursors import DictCursor
  27. def _conn():
  28. if not os.getenv("MYSQL_HOST"):
  29. raise RuntimeError("缺 MYSQL_HOST:检查 .env 的 MYSQL_* 配置")
  30. return pymysql.connect(
  31. host=os.getenv("MYSQL_HOST"),
  32. port=int(os.getenv("MYSQL_PORT", 3306)),
  33. user=os.getenv("MYSQL_USER"),
  34. password=os.getenv("MYSQL_PASSWORD"),
  35. database=os.getenv("MYSQL_DATABASE"),
  36. charset="utf8mb4", cursorclass=DictCursor,
  37. autocommit=True, connect_timeout=10,
  38. )
  39. # ── DDL ──────────────────────────────────────────────────────────────────────
  40. SEARCH_TABLES = {"process": "search_process", "tools": "search_tools"}
  41. def _search_table(mode_or_table):
  42. """mode(process/tools)或表名 → 合法搜索表名(白名单,防 SQL 注入)。"""
  43. t = SEARCH_TABLES.get(mode_or_table, mode_or_table)
  44. if t not in SEARCH_TABLES.values():
  45. raise ValueError(f"未知搜索表/模式: {mode_or_table!r}")
  46. return t
  47. def _ddl_search(table, direction):
  48. return f"""
  49. CREATE TABLE IF NOT EXISTS {table} (
  50. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  51. query_id VARCHAR(32) NOT NULL COMMENT 'q0000',
  52. query_text VARCHAR(512) NULL,
  53. case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
  54. platform VARCHAR(32) NULL,
  55. channel_content_id VARCHAR(128) NULL,
  56. title VARCHAR(512) NULL,
  57. url VARCHAR(1024) NULL,
  58. content_type VARCHAR(32) NULL,
  59. body LONGTEXT NULL,
  60. images JSON NULL,
  61. videos JSON NULL,
  62. like_count INT NULL,
  63. publish_time VARCHAR(64) NULL,
  64. quality_score FLOAT NULL COMMENT 'post._quality_score',
  65. quality_grade VARCHAR(8) NULL,
  66. found_by JSON NULL COMMENT '命中的措辞数组',
  67. knowledge_type JSON NULL COMMENT '["能力","工序","工具"] 子集',
  68. overall_score FLOAT NULL COMMENT '(相关均值+质量均值)/2',
  69. llm_evaluation JSON NULL COMMENT '评估全量 blob',
  70. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  71. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  72. UNIQUE KEY uk_qid_case (query_id, case_id),
  73. KEY idx_platform (platform)
  74. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='搜索+评估结果({direction})';
  75. """
  76. DDL_PROCESS = """
  77. CREATE TABLE IF NOT EXISTS mode_process (
  78. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  79. query_id VARCHAR(32) NOT NULL,
  80. case_id VARCHAR(128) NOT NULL,
  81. platform VARCHAR(32) NULL,
  82. post_title VARCHAR(512) NULL,
  83. source JSON NULL COMMENT '解构返回的 source 块',
  84. procedure_id VARCHAR(16) NULL COMMENT 'p1,p2…',
  85. name VARCHAR(255) NULL,
  86. purpose TEXT NULL,
  87. category VARCHAR(32) NULL COMMENT '产物创造/资产建设/自动化/分析/学习',
  88. declarations JSON NULL,
  89. type_registry JSON NULL,
  90. steps JSON NULL COMMENT '步骤数组全量',
  91. step_count INT NULL,
  92. tools_used JSON NULL COMMENT '从 steps[].via 去重提取',
  93. model VARCHAR(64) NULL,
  94. version VARCHAR(16) NULL COMMENT 'v_MMDDHHMM,保留历史',
  95. cost_usd DECIMAL(10,6) NULL COMMENT '本次解构调用成本(同版本各行相同,聚合需按 case+version 去重)',
  96. duration_s FLOAT NULL,
  97. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  98. KEY idx_case_ver (case_id, version),
  99. KEY idx_qid (query_id)
  100. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工序解构结果(每行一个工序)';
  101. """
  102. DDL_TOOLS = """
  103. CREATE TABLE IF NOT EXISTS mode_tools (
  104. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  105. query_id VARCHAR(32) NOT NULL,
  106. case_id VARCHAR(128) NOT NULL,
  107. platform VARCHAR(32) NULL,
  108. post_title VARCHAR(512) NULL,
  109. tool_name VARCHAR(255) NULL,
  110. substance_scope JSON NULL COMMENT '实质作用域(数组)',
  111. form_scope JSON NULL COMMENT '形式作用域(数组或null)',
  112. creation_layer VARCHAR(32) NULL COMMENT '制作层/创作层',
  113. source_link VARCHAR(1024) NULL,
  114. input_desc TEXT NULL,
  115. output_desc TEXT NULL,
  116. usage_json JSON NULL,
  117. cases_json JSON NULL,
  118. defects_json JSON NULL,
  119. updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
  120. model VARCHAR(64) NULL,
  121. version VARCHAR(16) NULL,
  122. cost_usd DECIMAL(10,6) NULL COMMENT '同 mode_process,聚合按 case+version 去重',
  123. duration_s FLOAT NULL,
  124. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  125. KEY idx_case_ver (case_id, version),
  126. KEY idx_qid (query_id),
  127. KEY idx_tool_name (tool_name)
  128. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
  129. """
  130. def init_tables():
  131. conn = _conn()
  132. try:
  133. with conn.cursor() as cur:
  134. cur.execute(_ddl_search("search_process", "工序方向"))
  135. cur.execute(_ddl_search("search_tools", "工具方向"))
  136. cur.execute(DDL_PROCESS)
  137. cur.execute(DDL_TOOLS)
  138. print("✅ 建表完成:search_process, search_tools, mode_process, mode_tools")
  139. finally:
  140. conn.close()
  141. def clear_tables():
  142. """清空四张表的数据(TRUNCATE,表结构保留)。"""
  143. conn = _conn()
  144. try:
  145. with conn.cursor() as cur:
  146. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  147. cur.execute(f"TRUNCATE TABLE {t}")
  148. print(f"🧹 已清空 {t}")
  149. finally:
  150. conn.close()
  151. # ── 工具函数 ──────────────────────────────────────────────────────────────────
  152. def _loads(v, default=None):
  153. """pymysql 的 JSON 列可能返回字符串,统一解析。"""
  154. if v is None:
  155. return default
  156. if isinstance(v, (list, dict)):
  157. return v
  158. try:
  159. return json.loads(v)
  160. except Exception:
  161. return default
  162. def _j(v):
  163. """写入 JSON 列:None 保持 NULL,其余 dumps。"""
  164. return None if v is None else json.dumps(v, ensure_ascii=False)
  165. def _collect_scores(node):
  166. """递归收集嵌套评估里所有「得分」。LLM 直出的得分多为字符串("1"/"4"),
  167. 个别为数字(如 时效性 10),统一按 float 解析;非数值(如 "N/A")跳过不计入。"""
  168. out = []
  169. if isinstance(node, dict):
  170. for k, v in node.items():
  171. if k == "得分":
  172. try:
  173. out.append(float(v))
  174. except (TypeError, ValueError):
  175. pass
  176. else:
  177. out.extend(_collect_scores(v))
  178. elif isinstance(node, list):
  179. for v in node:
  180. out.extend(_collect_scores(v))
  181. return out
  182. def overall_score(e):
  183. """综合分 = (相关性各项均值 + 质量各项均值) / 可得部分数。算不出返回 None。"""
  184. parts = []
  185. for key in ("相关性", "质量"):
  186. scores = _collect_scores((e or {}).get(key))
  187. if scores:
  188. parts.append(sum(scores) / len(scores))
  189. return round(sum(parts) / len(parts), 2) if parts else None
  190. def _recency_hard(date_str):
  191. """硬时效(同 mode_procedure/server.py:_recency_hard):半年内=3 / 两年内=2 / 更早=1。
  192. publish_time 头 10 字符按 YYYY-MM-DD 解析,失败返回 None(不参与判定)。"""
  193. try:
  194. d = datetime.strptime(str(date_str or "")[:10], "%Y-%m-%d")
  195. except (ValueError, TypeError):
  196. return None
  197. days = (datetime.now() - d).days
  198. if days <= 180:
  199. return 3
  200. if days <= 730:
  201. return 2
  202. return 1
  203. def is_adopted(overall, evaluation, publish_time):
  204. """采纳/命中判定,口径对齐 mode_procedure 的 decision=="report":
  205. 制作相关性<4、发布超两年、综合分<6 —— 任一命中即不采纳;指标缺失不参与判定。"""
  206. rel = None
  207. v = ((evaluation or {}).get("相关性") or {}).get("和内容制作知识相关")
  208. if isinstance(v, dict):
  209. v = v.get("得分")
  210. try:
  211. rel = float(v) if v is not None else None
  212. except (TypeError, ValueError):
  213. rel = None
  214. if rel is not None and rel < 4:
  215. return False
  216. rh = _recency_hard(publish_time)
  217. if rh is not None and rh < 2:
  218. return False
  219. if overall is not None and float(overall) < 6:
  220. return False
  221. return True
  222. # ── search_process / search_tools ────────────────────────────────────────────
  223. def upsert_search_posts(query_id, query_text, results, table="search_process"):
  224. """一组搜索结果写入指定搜索表(按 (query_id, case_id) upsert)。返回写入条数。
  225. table:search_process(工序方向) / search_tools(工具方向)。"""
  226. table = _search_table(table)
  227. if not results:
  228. return 0
  229. rows = []
  230. for r in results:
  231. post = r.get("post") or {}
  232. e = r.get("llm_evaluation") or {}
  233. rows.append((
  234. query_id, query_text, r.get("case_id"), r.get("platform"),
  235. r.get("channel_content_id"),
  236. (post.get("title") or post.get("desc") or "")[:500],
  237. r.get("source_url"), post.get("content_type"),
  238. post.get("body_text") or post.get("desc") or "",
  239. _j(post.get("images") or []), _j(post.get("videos") or []),
  240. post.get("like_count"),
  241. str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
  242. post.get("_quality_score"), post.get("_quality_grade"),
  243. _j(r.get("found_by_queries") or []),
  244. _j(e.get("知识类型") or []),
  245. overall_score(e),
  246. _j(e),
  247. ))
  248. sql = f"""
  249. INSERT INTO {table}
  250. (query_id, query_text, case_id, platform, channel_content_id, title, url,
  251. content_type, body, images, videos, like_count, publish_time,
  252. quality_score, quality_grade, found_by, knowledge_type,
  253. overall_score, llm_evaluation)
  254. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  255. ON DUPLICATE KEY UPDATE
  256. query_text=VALUES(query_text), platform=VALUES(platform),
  257. channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
  258. content_type=VALUES(content_type), body=VALUES(body), images=VALUES(images),
  259. videos=VALUES(videos), like_count=VALUES(like_count), publish_time=VALUES(publish_time),
  260. quality_score=VALUES(quality_score), quality_grade=VALUES(quality_grade),
  261. found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type),
  262. overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation);
  263. """
  264. conn = _conn()
  265. try:
  266. with conn.cursor() as cur:
  267. cur.executemany(sql, rows)
  268. return len(rows)
  269. finally:
  270. conn.close()
  271. def fetch_queries(mode="process"):
  272. """某方向搜索表的 query 列表 + 帖子数 + 采纳/命中数 + 解构进度。"""
  273. table = _search_table(mode)
  274. conn = _conn()
  275. try:
  276. with conn.cursor() as cur:
  277. cur.execute(f"""SELECT query_id, MAX(query_text) AS query_text,
  278. COUNT(*) AS post_count
  279. FROM {table} GROUP BY query_id ORDER BY query_id""")
  280. queries = cur.fetchall()
  281. cur.execute(f"""SELECT query_id, overall_score, llm_evaluation, publish_time
  282. FROM {table}""")
  283. hits = {}
  284. for r in cur.fetchall():
  285. if is_adopted(r["overall_score"], _loads(r["llm_evaluation"]), r["publish_time"]):
  286. hits[r["query_id"]] = hits.get(r["query_id"], 0) + 1
  287. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
  288. np = {r["query_id"]: r["n"] for r in cur.fetchall()}
  289. cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_tools GROUP BY query_id")
  290. nt = {r["query_id"]: r["n"] for r in cur.fetchall()}
  291. finally:
  292. conn.close()
  293. for q in queries:
  294. q["hit_count"] = hits.get(q["query_id"], 0)
  295. q["process_done"] = np.get(q["query_id"], 0)
  296. q["tools_done"] = nt.get(q["query_id"], 0)
  297. return queries
  298. def fetch_posts(query_id, mode="process"):
  299. """某方向搜索表里某 query 的全部帖子(JSON 列已解析),带 has_process/has_tools 标记。"""
  300. table = _search_table(mode)
  301. conn = _conn()
  302. try:
  303. with conn.cursor() as cur:
  304. cur.execute(f"""SELECT * FROM {table} WHERE query_id=%s
  305. ORDER BY overall_score DESC, id""", (query_id,))
  306. rows = cur.fetchall()
  307. cur.execute("SELECT DISTINCT case_id FROM mode_process WHERE query_id=%s", (query_id,))
  308. hp = {r["case_id"] for r in cur.fetchall()}
  309. cur.execute("SELECT DISTINCT case_id FROM mode_tools WHERE query_id=%s", (query_id,))
  310. ht = {r["case_id"] for r in cur.fetchall()}
  311. finally:
  312. conn.close()
  313. for r in rows:
  314. for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
  315. r[col] = _loads(r[col])
  316. r["adopted"] = is_adopted(r["overall_score"], r["llm_evaluation"], r["publish_time"])
  317. r["has_process"] = r["case_id"] in hp
  318. r["has_tools"] = r["case_id"] in ht
  319. r.pop("created_at", None); r.pop("updated_at", None)
  320. return rows
  321. def fetch_post(query_id, case_id, table="search_process"):
  322. """指定搜索表的单帖完整行(给 pipeline 脚本重建 source 用)。无则 None。"""
  323. table = _search_table(table)
  324. conn = _conn()
  325. try:
  326. with conn.cursor() as cur:
  327. cur.execute(f"SELECT * FROM {table} WHERE query_id=%s AND case_id=%s",
  328. (query_id, case_id))
  329. row = cur.fetchone()
  330. finally:
  331. conn.close()
  332. if not row:
  333. return None
  334. for col in ("images", "videos", "found_by", "knowledge_type", "llm_evaluation"):
  335. row[col] = _loads(row[col])
  336. return row
  337. # ── mode_process ─────────────────────────────────────────────────────────────
  338. def replace_process(query_id, case_id, platform, post_title, payload,
  339. model, version, cost_usd, duration_s):
  340. """写入一帖某版本的工序解构结果(payload = {source, procedures})。
  341. 删 (case_id, version) 旧行再插,同版本重跑幂等、跨版本保留历史。返回工序条数。"""
  342. source = payload.get("source")
  343. procedures = payload.get("procedures") or []
  344. conn = _conn()
  345. try:
  346. with conn.cursor() as cur:
  347. cur.execute("DELETE FROM mode_process WHERE case_id=%s AND version=%s",
  348. (case_id, version))
  349. if procedures:
  350. rows = []
  351. for p in procedures:
  352. steps = p.get("steps") or []
  353. vias = []
  354. for s in steps:
  355. v = s.get("via")
  356. if v and v not in vias:
  357. vias.append(v)
  358. rows.append((
  359. query_id, case_id, platform, (post_title or "")[:500],
  360. _j(source), p.get("id"), (p.get("name") or "")[:250],
  361. p.get("purpose"), p.get("category"),
  362. _j(p.get("declarations")), _j(p.get("type_registry")),
  363. _j(steps), len(steps), _j(vias),
  364. model, version, cost_usd, duration_s,
  365. ))
  366. cur.executemany("""
  367. INSERT INTO mode_process
  368. (query_id, case_id, platform, post_title, source, procedure_id, name,
  369. purpose, category, declarations, type_registry, steps, step_count,
  370. tools_used, model, version, cost_usd, duration_s)
  371. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  372. """, rows)
  373. return len(procedures)
  374. finally:
  375. conn.close()
  376. def fetch_process_versions(case_id):
  377. conn = _conn()
  378. try:
  379. with conn.cursor() as cur:
  380. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  381. FROM mode_process WHERE case_id=%s
  382. GROUP BY version ORDER BY version DESC""", (case_id,))
  383. return cur.fetchall()
  384. finally:
  385. conn.close()
  386. def fetch_process(case_id, version=None):
  387. """重建 {case_id, version, model, source, procedures:[...]}。version=None 取最新。"""
  388. conn = _conn()
  389. try:
  390. with conn.cursor() as cur:
  391. if version is None:
  392. cur.execute("""SELECT version FROM mode_process WHERE case_id=%s
  393. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  394. row = cur.fetchone()
  395. if not row:
  396. return None
  397. version = row["version"]
  398. cur.execute("""SELECT * FROM mode_process WHERE case_id=%s AND version=%s
  399. ORDER BY id""", (case_id, version))
  400. rows = cur.fetchall()
  401. finally:
  402. conn.close()
  403. if not rows:
  404. return None
  405. procedures = [{
  406. "id": r["procedure_id"], "name": r["name"], "purpose": r["purpose"],
  407. "category": r["category"], "declarations": _loads(r["declarations"]),
  408. "type_registry": _loads(r["type_registry"]), "steps": _loads(r["steps"], []),
  409. "tools_used": _loads(r["tools_used"], []),
  410. } for r in rows]
  411. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  412. "title": rows[0]["post_title"], "model": rows[0]["model"],
  413. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  414. "duration_s": rows[0]["duration_s"],
  415. "source": _loads(rows[0]["source"]), "procedures": procedures}
  416. # ── mode_tools ───────────────────────────────────────────────────────────────
  417. def replace_tools(query_id, case_id, platform, post_title, tools,
  418. model, version, cost_usd, duration_s):
  419. """写入一帖某版本的工具解构结果。语义同 replace_process。返回工具条数。"""
  420. conn = _conn()
  421. try:
  422. with conn.cursor() as cur:
  423. cur.execute("DELETE FROM mode_tools WHERE case_id=%s AND version=%s",
  424. (case_id, version))
  425. if tools:
  426. rows = [(
  427. query_id, case_id, platform, (post_title or "")[:500],
  428. (t.get("工具名称") or "")[:250],
  429. _j(t.get("实质作用域")), _j(t.get("形式作用域")),
  430. t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
  431. _j(t.get("用法")), _j(t.get("案例")), _j(t.get("缺点")),
  432. t.get("最新更新时间"), model, version, cost_usd, duration_s,
  433. ) for t in tools]
  434. cur.executemany("""
  435. INSERT INTO mode_tools
  436. (query_id, case_id, platform, post_title, tool_name, substance_scope,
  437. form_scope, creation_layer, source_link, input_desc, output_desc,
  438. usage_json, cases_json, defects_json, updated_time, model, version,
  439. cost_usd, duration_s)
  440. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  441. """, rows)
  442. return len(tools)
  443. finally:
  444. conn.close()
  445. def fetch_tools_versions(case_id):
  446. conn = _conn()
  447. try:
  448. with conn.cursor() as cur:
  449. cur.execute("""SELECT version, COUNT(*) AS n, MAX(model) AS model
  450. FROM mode_tools WHERE case_id=%s
  451. GROUP BY version ORDER BY version DESC""", (case_id,))
  452. return cur.fetchall()
  453. finally:
  454. conn.close()
  455. def fetch_tools(case_id, version=None):
  456. """重建 {case_id, version, model, tool_count, tools:[...]}。version=None 取最新。"""
  457. conn = _conn()
  458. try:
  459. with conn.cursor() as cur:
  460. if version is None:
  461. cur.execute("""SELECT version FROM mode_tools WHERE case_id=%s
  462. ORDER BY version DESC, id DESC LIMIT 1""", (case_id,))
  463. row = cur.fetchone()
  464. if not row:
  465. return None
  466. version = row["version"]
  467. cur.execute("""SELECT * FROM mode_tools WHERE case_id=%s AND version=%s
  468. ORDER BY id""", (case_id, version))
  469. rows = cur.fetchall()
  470. finally:
  471. conn.close()
  472. if not rows:
  473. return None
  474. tools = [{
  475. "工具名称": r["tool_name"], "实质作用域": _loads(r["substance_scope"]),
  476. "形式作用域": _loads(r["form_scope"]), "创作层级": r["creation_layer"],
  477. "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
  478. "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
  479. "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
  480. } for r in rows]
  481. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  482. "title": rows[0]["post_title"], "model": rows[0]["model"],
  483. "cost_usd": float(rows[0]["cost_usd"]) if rows[0]["cost_usd"] is not None else None,
  484. "duration_s": rows[0]["duration_s"],
  485. "tool_count": len(tools), "tools": tools}
  486. # ── Dashboard 原始行(指标计算在 server.py)─────────────────────────────────────
  487. def fetch_dashboard_rows():
  488. """拉 Dashboard 计算所需的轻量行。数据量级:百~千行,Python 聚合足够。"""
  489. conn = _conn()
  490. try:
  491. with conn.cursor() as cur:
  492. # 进度分母走「采纳」口径,需带上 is_adopted 判定所需字段;
  493. # mode 标方向(工序帖来自 search_process,工具帖来自 search_tools)。
  494. cols = ("query_id, case_id, platform, knowledge_type, "
  495. "overall_score, publish_time, llm_evaluation")
  496. cur.execute(f"SELECT {cols} FROM search_process")
  497. posts = cur.fetchall()
  498. for p in posts:
  499. p["mode"] = "process"
  500. cur.execute(f"SELECT {cols} FROM search_tools")
  501. st = cur.fetchall()
  502. for p in st:
  503. p["mode"] = "tools"
  504. posts += st
  505. cur.execute("""SELECT case_id, version, steps, tools_used, cost_usd,
  506. duration_s, created_at FROM mode_process""")
  507. procs = cur.fetchall()
  508. cur.execute("""SELECT case_id, version, tool_name, substance_scope,
  509. form_scope, cost_usd, duration_s, created_at
  510. FROM mode_tools""")
  511. tools = cur.fetchall()
  512. finally:
  513. conn.close()
  514. for p in posts:
  515. p["knowledge_type"] = _loads(p["knowledge_type"], [])
  516. # 采纳判定:口径同帖子列表(is_adopted),作为「需解构」分母依据
  517. p["adopted"] = is_adopted(
  518. p["overall_score"], _loads(p["llm_evaluation"]), p["publish_time"])
  519. for r in procs:
  520. r["steps"] = _loads(r["steps"], [])
  521. r["tools_used"] = _loads(r["tools_used"], [])
  522. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  523. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  524. for r in tools:
  525. r["substance_scope"] = _loads(r["substance_scope"], [])
  526. r["form_scope"] = _loads(r["form_scope"], [])
  527. r["cost_usd"] = float(r["cost_usd"]) if r["cost_usd"] is not None else None
  528. r["created_at"] = str(r["created_at"]) if r["created_at"] else None
  529. return posts, procs, tools
  530. def check():
  531. conn = _conn()
  532. try:
  533. with conn.cursor() as cur:
  534. for t in ("search_process", "search_tools", "mode_process", "mode_tools"):
  535. cur.execute(f"SELECT COUNT(*) AS n FROM {t}")
  536. print(f"{t}: {cur.fetchone()['n']} 行")
  537. finally:
  538. conn.close()
  539. if __name__ == "__main__":
  540. cmd = sys.argv[1] if len(sys.argv) > 1 else ""
  541. if cmd == "init":
  542. init_tables()
  543. elif cmd == "check":
  544. check()
  545. elif cmd == "clear":
  546. clear_tables()
  547. else:
  548. print("用法:\n python db.py init # 建表\n python db.py check # 四表行数\n python db.py clear # 清空四表数据")