db.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # -*- coding: utf-8 -*-
  2. """fixed_query_eval · MySQL 持久化(双写:本地 json + 数据库)
  3. ================================================================================
  4. 读 .env 的 MYSQL_* 连接 MySQL(仓库 MySQL 约定是 pymysql)。两张表:
  5. fqe_posts —— 每行一个 (query, 帖子):搜索 + llm 评分结果
  6. fqe_tools —— 每行一个解构出的工具:工具解构结果
  7. 设计原则:
  8. - **失败不阻断**:所有写入用 try/except 包,DB 挂了不影响本地 json 写入(文件是主存储)。
  9. - **幂等**:posts 用 (q, case_id) 唯一键 upsert;tools 先按 (q, case_id) 删旧再插新(重新解构会覆盖)。
  10. - 建表:init_tables() 用 CREATE TABLE IF NOT EXISTS,跑 `python db.py init` 即建表。
  11. """
  12. import os
  13. import json
  14. import sys
  15. from pathlib import Path
  16. PROJECT_ROOT = Path(__file__).resolve().parents[5]
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. try:
  21. import pymysql
  22. from pymysql.cursors import DictCursor
  23. except ImportError:
  24. pymysql = None
  25. def _enabled() -> bool:
  26. return pymysql is not None and bool(os.getenv("MYSQL_HOST"))
  27. def _conn():
  28. return pymysql.connect(
  29. host=os.getenv("MYSQL_HOST"),
  30. port=int(os.getenv("MYSQL_PORT", 3306)),
  31. user=os.getenv("MYSQL_USER"),
  32. password=os.getenv("MYSQL_PASSWORD"),
  33. database=os.getenv("MYSQL_DATABASE"),
  34. charset="utf8mb4",
  35. cursorclass=DictCursor,
  36. autocommit=True,
  37. connect_timeout=10,
  38. )
  39. # ── DDL ──────────────────────────────────────────────────────────────────────
  40. DDL_POSTS = """
  41. CREATE TABLE IF NOT EXISTS fqe_posts (
  42. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  43. q VARCHAR(16) NOT NULL COMMENT 'query 目录名 q0000',
  44. query_text VARCHAR(255) NULL COMMENT '基准 query(如 GPT image2 评测)',
  45. case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
  46. platform VARCHAR(32) NULL,
  47. channel_content_id VARCHAR(128) NULL,
  48. title VARCHAR(512) NULL,
  49. url VARCHAR(1024) NULL,
  50. body MEDIUMTEXT NULL,
  51. images JSON NULL COMMENT '图片 URL 数组',
  52. like_count INT NULL,
  53. publish_time VARCHAR(64) NULL,
  54. found_by JSON NULL COMMENT '命中的措辞数组',
  55. knowledge_type JSON NULL COMMENT 'llm 判定的知识类型',
  56. overall_score FLOAT NULL COMMENT '综合分(相关性两子项均值,便于排序)',
  57. llm_evaluation JSON NULL COMMENT '完整评分 blob',
  58. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  59. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  60. UNIQUE KEY uk_q_case (q, case_id),
  61. KEY idx_platform (platform),
  62. KEY idx_query_text (query_text)
  63. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='固定query搜索+评分结果';
  64. """
  65. DDL_TOOLS = """
  66. CREATE TABLE IF NOT EXISTS fqe_tools (
  67. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  68. q VARCHAR(16) NOT NULL,
  69. case_id VARCHAR(128) NOT NULL,
  70. platform VARCHAR(32) NULL,
  71. post_title VARCHAR(512) NULL,
  72. tool_name VARCHAR(255) NULL COMMENT '工具名称',
  73. substance_scope VARCHAR(255) NULL COMMENT '实质作用域',
  74. form_scope VARCHAR(255) NULL COMMENT '形式作用域',
  75. creation_layer VARCHAR(32) NULL COMMENT '创作层级:制作层/创作层',
  76. source_link VARCHAR(1024) NULL COMMENT '来源链接',
  77. input_desc TEXT NULL COMMENT '输入',
  78. output_desc TEXT NULL COMMENT '输出',
  79. usage_json JSON NULL COMMENT '用法数组',
  80. cases_json JSON NULL COMMENT '案例数组',
  81. defects_json JSON NULL COMMENT '缺点数组',
  82. updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
  83. model VARCHAR(64) NULL COMMENT '解构模型',
  84. version VARCHAR(16) NULL COMMENT '解构版本号 v_MMDDHHMM(每次解构生成;保留历史,多版本共存)',
  85. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  86. KEY idx_q_case (q, case_id),
  87. KEY idx_q_case_ver (q, case_id, version),
  88. KEY idx_tool_name (tool_name)
  89. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
  90. """
  91. def init_tables():
  92. """建表(幂等)。"""
  93. if not _enabled():
  94. print("⚠️ MySQL 未启用(缺 pymysql 或 MYSQL_HOST),跳过建表")
  95. return False
  96. conn = _conn()
  97. try:
  98. with conn.cursor() as cur:
  99. cur.execute(DDL_POSTS)
  100. cur.execute(DDL_TOOLS)
  101. print("✅ 建表完成:fqe_posts, fqe_tools")
  102. return True
  103. finally:
  104. conn.close()
  105. # ── 写入 ─────────────────────────────────────────────────────────────────────
  106. def _overall_from_eval(e):
  107. """从 mod schema 评分粗算综合分(相关性两子项均值)。算不出返回 None。"""
  108. try:
  109. rel = (e or {}).get("相关性") or {}
  110. vals = []
  111. for k in ("和内容制作知识相关", "和 query 相关"):
  112. v = (rel.get(k) or {}).get("得分")
  113. if v is not None:
  114. vals.append(float(v))
  115. return round(sum(vals) / len(vals), 2) if vals else None
  116. except Exception:
  117. return None
  118. def upsert_posts(q, query_text, results):
  119. """把一组搜索结果写入 fqe_posts(按 (q, case_id) upsert)。返回写入条数;失败返回 0。"""
  120. if not _enabled() or not results:
  121. return 0
  122. rows = []
  123. for r in results:
  124. post = r.get("post") or {}
  125. e = r.get("llm_evaluation") or {}
  126. rows.append((
  127. q, query_text, r.get("case_id"), r.get("platform"), r.get("channel_content_id"),
  128. (post.get("title") or post.get("desc") or "")[:500],
  129. r.get("source_url"),
  130. post.get("body_text") or post.get("desc") or "",
  131. json.dumps(post.get("images") or [], ensure_ascii=False),
  132. post.get("like_count"),
  133. str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
  134. json.dumps(r.get("found_by_queries") or [], ensure_ascii=False),
  135. json.dumps(e.get("知识类型") or [], ensure_ascii=False),
  136. _overall_from_eval(e),
  137. json.dumps(e, ensure_ascii=False),
  138. ))
  139. sql = """
  140. INSERT INTO fqe_posts
  141. (q, query_text, case_id, platform, channel_content_id, title, url, body,
  142. images, like_count, publish_time, found_by, knowledge_type, overall_score, llm_evaluation)
  143. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  144. ON DUPLICATE KEY UPDATE
  145. query_text=VALUES(query_text), platform=VALUES(platform),
  146. channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
  147. body=VALUES(body), images=VALUES(images), like_count=VALUES(like_count),
  148. publish_time=VALUES(publish_time), found_by=VALUES(found_by),
  149. knowledge_type=VALUES(knowledge_type), overall_score=VALUES(overall_score),
  150. llm_evaluation=VALUES(llm_evaluation);
  151. """
  152. try:
  153. conn = _conn()
  154. try:
  155. with conn.cursor() as cur:
  156. cur.executemany(sql, rows)
  157. return len(rows)
  158. finally:
  159. conn.close()
  160. except Exception as ex:
  161. print(f"⚠️ fqe_posts 写库失败(不影响本地 json):{ex}")
  162. return 0
  163. def upsert_tools(q, case_id, model, tools, version, platform=None, post_title=None):
  164. """写入一帖某个版本的工具解构结果。**保留历史**:不删其它版本,只删本 (q,case_id,version)
  165. 的旧行再插(保证同一版本重跑幂等)。失败返回 0。"""
  166. if not _enabled():
  167. return 0
  168. try:
  169. conn = _conn()
  170. try:
  171. with conn.cursor() as cur:
  172. cur.execute("DELETE FROM fqe_tools WHERE q=%s AND case_id=%s AND version=%s",
  173. (q, case_id, version))
  174. if tools:
  175. rows = [(
  176. q, case_id, platform, (post_title or "")[:500],
  177. t.get("工具名称"), t.get("实质作用域"), t.get("形式作用域"),
  178. t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
  179. json.dumps(t.get("用法"), ensure_ascii=False) if t.get("用法") is not None else None,
  180. json.dumps(t.get("案例"), ensure_ascii=False) if t.get("案例") is not None else None,
  181. json.dumps(t.get("缺点"), ensure_ascii=False) if t.get("缺点") is not None else None,
  182. t.get("最新更新时间"), model, version,
  183. ) for t in tools]
  184. cur.executemany("""
  185. INSERT INTO fqe_tools
  186. (q, case_id, platform, post_title, tool_name, substance_scope, form_scope,
  187. creation_layer, source_link, input_desc, output_desc,
  188. usage_json, cases_json, defects_json, updated_time, model, version)
  189. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  190. """, rows)
  191. return len(tools)
  192. finally:
  193. conn.close()
  194. except Exception as ex:
  195. print(f"⚠️ fqe_tools 写库失败(不影响本地 json):{ex}")
  196. return 0
  197. # ── 读取(本地 runs_full 被清空时,server 回退读库重建视图)────────────────────────
  198. def _loads(v, default=None):
  199. """pymysql 的 JSON 列可能返回字符串,统一解析。"""
  200. if v is None:
  201. return default
  202. if isinstance(v, (list, dict)):
  203. return v
  204. try:
  205. return json.loads(v)
  206. except Exception:
  207. return default
  208. def fetch_posts_grouped():
  209. """从 fqe_posts 重建 {q: {query_text, results:[r...]}}(r 的结构对齐 form_A.json 的 result,
  210. 便于喂给 server 的 adapt())。库不可用/空/异常返回 {}。"""
  211. if not _enabled():
  212. return {}
  213. try:
  214. conn = _conn()
  215. try:
  216. with conn.cursor() as cur:
  217. cur.execute("SELECT * FROM fqe_posts ORDER BY q, overall_score DESC")
  218. rows = cur.fetchall()
  219. finally:
  220. conn.close()
  221. except Exception as ex:
  222. print(f"⚠️ 读 fqe_posts 失败:{ex}")
  223. return {}
  224. out = {}
  225. for row in rows:
  226. q = row["q"]
  227. r = {
  228. "case_id": row["case_id"], "platform": row["platform"],
  229. "channel_content_id": row["channel_content_id"], "source_url": row["url"],
  230. "found_by_queries": _loads(row["found_by"], []),
  231. "llm_evaluation": _loads(row["llm_evaluation"], {}),
  232. "post": {
  233. "title": row["title"], "body_text": row["body"],
  234. "images": _loads(row["images"], []), "like_count": row["like_count"],
  235. "publish_timestamp": row["publish_time"],
  236. },
  237. }
  238. out.setdefault(q, {"query_text": row["query_text"], "results": []})["results"].append(r)
  239. return out
  240. def fetch_tool_versions(q, case_id):
  241. """列出某帖的全部解构版本(降序,最新在前)。无则返回 []。"""
  242. if not _enabled():
  243. return []
  244. try:
  245. conn = _conn()
  246. try:
  247. with conn.cursor() as cur:
  248. cur.execute("SELECT DISTINCT version FROM fqe_tools WHERE q=%s AND case_id=%s "
  249. "ORDER BY version DESC", (q, case_id))
  250. return [r["version"] for r in cur.fetchall() if r["version"]]
  251. finally:
  252. conn.close()
  253. except Exception as ex:
  254. print(f"⚠️ 读 fqe_tools 版本失败:{ex}")
  255. return []
  256. def fetch_tools(q, case_id, version=None):
  257. """从 fqe_tools 重建 {case_id, version, model, tool_count, tools:[...]}。
  258. version=None 取最新版本(v_MMDDHHMM 字符串降序);指定则取该版本。无记录返回 None。"""
  259. if not _enabled():
  260. return None
  261. try:
  262. conn = _conn()
  263. try:
  264. with conn.cursor() as cur:
  265. if version is None:
  266. cur.execute("SELECT version FROM fqe_tools WHERE q=%s AND case_id=%s "
  267. "ORDER BY version DESC, id DESC LIMIT 1", (q, case_id))
  268. row = cur.fetchone()
  269. if not row:
  270. return None
  271. version = row["version"]
  272. cur.execute("SELECT * FROM fqe_tools WHERE q=%s AND case_id=%s AND version=%s "
  273. "ORDER BY id", (q, case_id, version))
  274. rows = cur.fetchall()
  275. finally:
  276. conn.close()
  277. except Exception as ex:
  278. print(f"⚠️ 读 fqe_tools 失败:{ex}")
  279. return None
  280. if not rows:
  281. return None
  282. tools = [{
  283. "工具名称": r["tool_name"], "实质作用域": r["substance_scope"],
  284. "形式作用域": r["form_scope"], "创作层级": r["creation_layer"],
  285. "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
  286. "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
  287. "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
  288. } for r in rows]
  289. return {"case_id": case_id, "version": version, "platform": rows[0]["platform"],
  290. "title": rows[0]["post_title"], "model": rows[0]["model"],
  291. "tool_count": len(tools), "tools": tools}
  292. def has_tools(q, case_id):
  293. """库里是否有该帖的工具解构记录。"""
  294. if not _enabled():
  295. return False
  296. try:
  297. conn = _conn()
  298. try:
  299. with conn.cursor() as cur:
  300. cur.execute("SELECT 1 FROM fqe_tools WHERE q=%s AND case_id=%s LIMIT 1", (q, case_id))
  301. return cur.fetchone() is not None
  302. finally:
  303. conn.close()
  304. except Exception:
  305. return False
  306. def rebuild_local(runs_dir=None):
  307. """从数据库重建本地 runs_full/{q}/form_A.json 和 tools/{case_id}.json。
  308. 用于本地文件丢失后恢复(界面回退只让界面有数据;提取脚本仍读本地文件,故提供本命令)。"""
  309. if not _enabled():
  310. print("⚠️ MySQL 未启用,无法重建"); return
  311. runs_dir = Path(runs_dir) if runs_dir else (Path(__file__).resolve().parent / "runs_full")
  312. grouped = fetch_posts_grouped()
  313. np = nt = 0
  314. for q, g in grouped.items():
  315. d = {"form": "A", "query": g["query_text"], "original_q": g["query_text"] or "",
  316. "platforms": [], "total": len(g["results"]), "failed": 0, "results": g["results"]}
  317. p = runs_dir / q / "form_A.json"
  318. p.parent.mkdir(parents=True, exist_ok=True)
  319. p.write_text(json.dumps(d, ensure_ascii=False, indent=2), encoding="utf-8")
  320. np += 1
  321. # tools
  322. try:
  323. conn = _conn()
  324. try:
  325. with conn.cursor() as cur:
  326. cur.execute("SELECT DISTINCT q, case_id FROM fqe_tools")
  327. pairs = cur.fetchall()
  328. finally:
  329. conn.close()
  330. for row in pairs:
  331. data = fetch_tools(row["q"], row["case_id"])
  332. if data:
  333. tp = runs_dir / row["q"] / "tools" / f"{row['case_id']}.json"
  334. tp.parent.mkdir(parents=True, exist_ok=True)
  335. tp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
  336. nt += 1
  337. except Exception as ex:
  338. print(f"⚠️ 重建 tools 失败:{ex}")
  339. print(f"✅ 从库重建本地:{np} 个 query 的 form_A.json,{nt} 帖工具结果 → {runs_dir}")
  340. if __name__ == "__main__":
  341. cmd = sys.argv[1] if len(sys.argv) > 1 else ""
  342. if cmd == "init":
  343. init_tables()
  344. elif cmd == "rebuild":
  345. rebuild_local()
  346. else:
  347. print("用法:\n python db.py init # 建表\n python db.py rebuild # 从库重建本地 runs_full(本地文件丢失后恢复)")