db.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. # -*- coding: utf-8 -*-
  2. """fixed_query_eval · MySQL 持久化(双写:本地 json + 数据库)
  3. ================================================================================
  4. 读 .env 的 MYSQL_* 连接 MySQL(仓库 MySQL 约定是 pymysql)。两张表:
  5. fqe_posts —— 每行一个 (query, 帖子):搜索 + llm 评分结果
  6. fqe_tools —— 每行一个解构出的工具:工具解构结果
  7. 设计原则:
  8. - **失败不阻断**:所有写入用 try/except 包,DB 挂了不影响本地 json 写入(文件是主存储)。
  9. - **幂等**:posts 用 (q, case_id) 唯一键 upsert;tools 先按 (q, case_id) 删旧再插新(重新解构会覆盖)。
  10. - 建表:init_tables() 用 CREATE TABLE IF NOT EXISTS,跑 `python db.py init` 即建表。
  11. """
  12. import os
  13. import json
  14. import sys
  15. from pathlib import Path
  16. PROJECT_ROOT = Path(__file__).resolve().parents[5]
  17. sys.path.insert(0, str(PROJECT_ROOT))
  18. from dotenv import load_dotenv
  19. load_dotenv()
  20. try:
  21. import pymysql
  22. from pymysql.cursors import DictCursor
  23. except ImportError:
  24. pymysql = None
  25. def _enabled() -> bool:
  26. return pymysql is not None and bool(os.getenv("MYSQL_HOST"))
  27. def _conn():
  28. return pymysql.connect(
  29. host=os.getenv("MYSQL_HOST"),
  30. port=int(os.getenv("MYSQL_PORT", 3306)),
  31. user=os.getenv("MYSQL_USER"),
  32. password=os.getenv("MYSQL_PASSWORD"),
  33. database=os.getenv("MYSQL_DATABASE"),
  34. charset="utf8mb4",
  35. cursorclass=DictCursor,
  36. autocommit=True,
  37. connect_timeout=10,
  38. )
  39. # ── DDL ──────────────────────────────────────────────────────────────────────
  40. DDL_POSTS = """
  41. CREATE TABLE IF NOT EXISTS fqe_posts (
  42. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  43. q VARCHAR(16) NOT NULL COMMENT 'query 目录名 q0000',
  44. query_text VARCHAR(255) NULL COMMENT '基准 query(如 GPT image2 评测)',
  45. case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
  46. platform VARCHAR(32) NULL,
  47. channel_content_id VARCHAR(128) NULL,
  48. title VARCHAR(512) NULL,
  49. url VARCHAR(1024) NULL,
  50. body MEDIUMTEXT NULL,
  51. images JSON NULL COMMENT '图片 URL 数组',
  52. like_count INT NULL,
  53. publish_time VARCHAR(64) NULL,
  54. found_by JSON NULL COMMENT '命中的措辞数组',
  55. knowledge_type JSON NULL COMMENT 'llm 判定的知识类型',
  56. overall_score FLOAT NULL COMMENT '综合分(相关性两子项均值,便于排序)',
  57. llm_evaluation JSON NULL COMMENT '完整评分 blob',
  58. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  59. updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  60. UNIQUE KEY uk_q_case (q, case_id),
  61. KEY idx_platform (platform),
  62. KEY idx_query_text (query_text)
  63. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='固定query搜索+评分结果';
  64. """
  65. DDL_TOOLS = """
  66. CREATE TABLE IF NOT EXISTS fqe_tools (
  67. id BIGINT AUTO_INCREMENT PRIMARY KEY,
  68. q VARCHAR(16) NOT NULL,
  69. case_id VARCHAR(128) NOT NULL,
  70. platform VARCHAR(32) NULL,
  71. post_title VARCHAR(512) NULL,
  72. tool_name VARCHAR(255) NULL COMMENT '工具名称',
  73. substance_scope VARCHAR(255) NULL COMMENT '实质作用域',
  74. form_scope VARCHAR(255) NULL COMMENT '形式作用域',
  75. creation_layer VARCHAR(32) NULL COMMENT '创作层级:制作层/创作层',
  76. source_link VARCHAR(1024) NULL COMMENT '来源链接',
  77. input_desc TEXT NULL COMMENT '输入',
  78. output_desc TEXT NULL COMMENT '输出',
  79. usage_json JSON NULL COMMENT '用法数组',
  80. cases_json JSON NULL COMMENT '案例数组',
  81. defects_json JSON NULL COMMENT '缺点数组',
  82. updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
  83. model VARCHAR(64) NULL COMMENT '解构模型',
  84. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  85. KEY idx_q_case (q, case_id),
  86. KEY idx_tool_name (tool_name)
  87. ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
  88. """
  89. def init_tables():
  90. """建表(幂等)。"""
  91. if not _enabled():
  92. print("⚠️ MySQL 未启用(缺 pymysql 或 MYSQL_HOST),跳过建表")
  93. return False
  94. conn = _conn()
  95. try:
  96. with conn.cursor() as cur:
  97. cur.execute(DDL_POSTS)
  98. cur.execute(DDL_TOOLS)
  99. print("✅ 建表完成:fqe_posts, fqe_tools")
  100. return True
  101. finally:
  102. conn.close()
  103. # ── 写入 ─────────────────────────────────────────────────────────────────────
  104. def _overall_from_eval(e):
  105. """从 mod schema 评分粗算综合分(相关性两子项均值)。算不出返回 None。"""
  106. try:
  107. rel = (e or {}).get("相关性") or {}
  108. vals = []
  109. for k in ("和内容制作知识相关", "和 query 相关"):
  110. v = (rel.get(k) or {}).get("得分")
  111. if v is not None:
  112. vals.append(float(v))
  113. return round(sum(vals) / len(vals), 2) if vals else None
  114. except Exception:
  115. return None
  116. def upsert_posts(q, query_text, results):
  117. """把一组搜索结果写入 fqe_posts(按 (q, case_id) upsert)。返回写入条数;失败返回 0。"""
  118. if not _enabled() or not results:
  119. return 0
  120. rows = []
  121. for r in results:
  122. post = r.get("post") or {}
  123. e = r.get("llm_evaluation") or {}
  124. rows.append((
  125. q, query_text, r.get("case_id"), r.get("platform"), r.get("channel_content_id"),
  126. (post.get("title") or post.get("desc") or "")[:500],
  127. r.get("source_url"),
  128. post.get("body_text") or post.get("desc") or "",
  129. json.dumps(post.get("images") or [], ensure_ascii=False),
  130. post.get("like_count"),
  131. str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
  132. json.dumps(r.get("found_by_queries") or [], ensure_ascii=False),
  133. json.dumps(e.get("知识类型") or [], ensure_ascii=False),
  134. _overall_from_eval(e),
  135. json.dumps(e, ensure_ascii=False),
  136. ))
  137. sql = """
  138. INSERT INTO fqe_posts
  139. (q, query_text, case_id, platform, channel_content_id, title, url, body,
  140. images, like_count, publish_time, found_by, knowledge_type, overall_score, llm_evaluation)
  141. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  142. ON DUPLICATE KEY UPDATE
  143. query_text=VALUES(query_text), platform=VALUES(platform),
  144. channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
  145. body=VALUES(body), images=VALUES(images), like_count=VALUES(like_count),
  146. publish_time=VALUES(publish_time), found_by=VALUES(found_by),
  147. knowledge_type=VALUES(knowledge_type), overall_score=VALUES(overall_score),
  148. llm_evaluation=VALUES(llm_evaluation);
  149. """
  150. try:
  151. conn = _conn()
  152. try:
  153. with conn.cursor() as cur:
  154. cur.executemany(sql, rows)
  155. return len(rows)
  156. finally:
  157. conn.close()
  158. except Exception as ex:
  159. print(f"⚠️ fqe_posts 写库失败(不影响本地 json):{ex}")
  160. return 0
  161. def upsert_tools(q, case_id, model, tools, platform=None, post_title=None):
  162. """把一帖的工具解构结果写入 fqe_tools(先删该 (q,case_id) 旧行再插)。失败返回 0。"""
  163. if not _enabled():
  164. return 0
  165. try:
  166. conn = _conn()
  167. try:
  168. with conn.cursor() as cur:
  169. cur.execute("DELETE FROM fqe_tools WHERE q=%s AND case_id=%s", (q, case_id))
  170. if tools:
  171. rows = [(
  172. q, case_id, platform, (post_title or "")[:500],
  173. t.get("工具名称"), t.get("实质作用域"), t.get("形式作用域"),
  174. t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
  175. json.dumps(t.get("用法"), ensure_ascii=False) if t.get("用法") is not None else None,
  176. json.dumps(t.get("案例"), ensure_ascii=False) if t.get("案例") is not None else None,
  177. json.dumps(t.get("缺点"), ensure_ascii=False) if t.get("缺点") is not None else None,
  178. t.get("最新更新时间"), model,
  179. ) for t in tools]
  180. cur.executemany("""
  181. INSERT INTO fqe_tools
  182. (q, case_id, platform, post_title, tool_name, substance_scope, form_scope,
  183. creation_layer, source_link, input_desc, output_desc,
  184. usage_json, cases_json, defects_json, updated_time, model)
  185. VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
  186. """, rows)
  187. return len(tools)
  188. finally:
  189. conn.close()
  190. except Exception as ex:
  191. print(f"⚠️ fqe_tools 写库失败(不影响本地 json):{ex}")
  192. return 0
  193. # ── 读取(本地 runs_full 被清空时,server 回退读库重建视图)────────────────────────
  194. def _loads(v, default=None):
  195. """pymysql 的 JSON 列可能返回字符串,统一解析。"""
  196. if v is None:
  197. return default
  198. if isinstance(v, (list, dict)):
  199. return v
  200. try:
  201. return json.loads(v)
  202. except Exception:
  203. return default
  204. def fetch_posts_grouped():
  205. """从 fqe_posts 重建 {q: {query_text, results:[r...]}}(r 的结构对齐 form_A.json 的 result,
  206. 便于喂给 server 的 adapt())。库不可用/空/异常返回 {}。"""
  207. if not _enabled():
  208. return {}
  209. try:
  210. conn = _conn()
  211. try:
  212. with conn.cursor() as cur:
  213. cur.execute("SELECT * FROM fqe_posts ORDER BY q, overall_score DESC")
  214. rows = cur.fetchall()
  215. finally:
  216. conn.close()
  217. except Exception as ex:
  218. print(f"⚠️ 读 fqe_posts 失败:{ex}")
  219. return {}
  220. out = {}
  221. for row in rows:
  222. q = row["q"]
  223. r = {
  224. "case_id": row["case_id"], "platform": row["platform"],
  225. "channel_content_id": row["channel_content_id"], "source_url": row["url"],
  226. "found_by_queries": _loads(row["found_by"], []),
  227. "llm_evaluation": _loads(row["llm_evaluation"], {}),
  228. "post": {
  229. "title": row["title"], "body_text": row["body"],
  230. "images": _loads(row["images"], []), "like_count": row["like_count"],
  231. "publish_timestamp": row["publish_time"],
  232. },
  233. }
  234. out.setdefault(q, {"query_text": row["query_text"], "results": []})["results"].append(r)
  235. return out
  236. def fetch_tools(q, case_id):
  237. """从 fqe_tools 重建 {case_id, model, tool_count, tools:[...]}(对齐本地 tools/{case_id}.json)。
  238. 无记录返回 None。"""
  239. if not _enabled():
  240. return None
  241. try:
  242. conn = _conn()
  243. try:
  244. with conn.cursor() as cur:
  245. cur.execute("SELECT * FROM fqe_tools WHERE q=%s AND case_id=%s ORDER BY id", (q, case_id))
  246. rows = cur.fetchall()
  247. finally:
  248. conn.close()
  249. except Exception as ex:
  250. print(f"⚠️ 读 fqe_tools 失败:{ex}")
  251. return None
  252. if not rows:
  253. return None
  254. tools = [{
  255. "工具名称": r["tool_name"], "实质作用域": r["substance_scope"],
  256. "形式作用域": r["form_scope"], "创作层级": r["creation_layer"],
  257. "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
  258. "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
  259. "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
  260. } for r in rows]
  261. return {"case_id": case_id, "platform": rows[0]["platform"],
  262. "title": rows[0]["post_title"], "model": rows[0]["model"],
  263. "tool_count": len(tools), "tools": tools}
  264. def has_tools(q, case_id):
  265. """库里是否有该帖的工具解构记录。"""
  266. if not _enabled():
  267. return False
  268. try:
  269. conn = _conn()
  270. try:
  271. with conn.cursor() as cur:
  272. cur.execute("SELECT 1 FROM fqe_tools WHERE q=%s AND case_id=%s LIMIT 1", (q, case_id))
  273. return cur.fetchone() is not None
  274. finally:
  275. conn.close()
  276. except Exception:
  277. return False
  278. def rebuild_local(runs_dir=None):
  279. """从数据库重建本地 runs_full/{q}/form_A.json 和 tools/{case_id}.json。
  280. 用于本地文件丢失后恢复(界面回退只让界面有数据;提取脚本仍读本地文件,故提供本命令)。"""
  281. if not _enabled():
  282. print("⚠️ MySQL 未启用,无法重建"); return
  283. runs_dir = Path(runs_dir) if runs_dir else (Path(__file__).resolve().parent / "runs_full")
  284. grouped = fetch_posts_grouped()
  285. np = nt = 0
  286. for q, g in grouped.items():
  287. d = {"form": "A", "query": g["query_text"], "original_q": g["query_text"] or "",
  288. "platforms": [], "total": len(g["results"]), "failed": 0, "results": g["results"]}
  289. p = runs_dir / q / "form_A.json"
  290. p.parent.mkdir(parents=True, exist_ok=True)
  291. p.write_text(json.dumps(d, ensure_ascii=False, indent=2), encoding="utf-8")
  292. np += 1
  293. # tools
  294. try:
  295. conn = _conn()
  296. try:
  297. with conn.cursor() as cur:
  298. cur.execute("SELECT DISTINCT q, case_id FROM fqe_tools")
  299. pairs = cur.fetchall()
  300. finally:
  301. conn.close()
  302. for row in pairs:
  303. data = fetch_tools(row["q"], row["case_id"])
  304. if data:
  305. tp = runs_dir / row["q"] / "tools" / f"{row['case_id']}.json"
  306. tp.parent.mkdir(parents=True, exist_ok=True)
  307. tp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
  308. nt += 1
  309. except Exception as ex:
  310. print(f"⚠️ 重建 tools 失败:{ex}")
  311. print(f"✅ 从库重建本地:{np} 个 query 的 form_A.json,{nt} 帖工具结果 → {runs_dir}")
  312. if __name__ == "__main__":
  313. cmd = sys.argv[1] if len(sys.argv) > 1 else ""
  314. if cmd == "init":
  315. init_tables()
  316. elif cmd == "rebuild":
  317. rebuild_local()
  318. else:
  319. print("用法:\n python db.py init # 建表\n python db.py rebuild # 从库重建本地 runs_full(本地文件丢失后恢复)")