# -*- coding: utf-8 -*- """fixed_query_eval · MySQL 持久化(双写:本地 json + 数据库) ================================================================================ 读 .env 的 MYSQL_* 连接 MySQL(仓库 MySQL 约定是 pymysql)。两张表: fqe_posts —— 每行一个 (query, 帖子):搜索 + llm 评分结果 fqe_tools —— 每行一个解构出的工具:工具解构结果 设计原则: - **失败不阻断**:所有写入用 try/except 包,DB 挂了不影响本地 json 写入(文件是主存储)。 - **幂等**:posts 用 (q, case_id) 唯一键 upsert;tools 先按 (q, case_id) 删旧再插新(重新解构会覆盖)。 - 建表:init_tables() 用 CREATE TABLE IF NOT EXISTS,跑 `python db.py init` 即建表。 """ import os import json import sys from pathlib import Path PROJECT_ROOT = Path(__file__).resolve().parents[5] sys.path.insert(0, str(PROJECT_ROOT)) from dotenv import load_dotenv load_dotenv() try: import pymysql from pymysql.cursors import DictCursor except ImportError: pymysql = None def _enabled() -> bool: return pymysql is not None and bool(os.getenv("MYSQL_HOST")) def _conn(): return pymysql.connect( host=os.getenv("MYSQL_HOST"), port=int(os.getenv("MYSQL_PORT", 3306)), user=os.getenv("MYSQL_USER"), password=os.getenv("MYSQL_PASSWORD"), database=os.getenv("MYSQL_DATABASE"), charset="utf8mb4", cursorclass=DictCursor, autocommit=True, connect_timeout=10, ) # ── DDL ────────────────────────────────────────────────────────────────────── DDL_POSTS = """ CREATE TABLE IF NOT EXISTS fqe_posts ( id BIGINT AUTO_INCREMENT PRIMARY KEY, q VARCHAR(16) NOT NULL COMMENT 'query 目录名 q0000', query_text VARCHAR(255) NULL COMMENT '基准 query(如 GPT image2 评测)', case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId', platform VARCHAR(32) NULL, channel_content_id VARCHAR(128) NULL, title VARCHAR(512) NULL, url VARCHAR(1024) NULL, body MEDIUMTEXT NULL, images JSON NULL COMMENT '图片 URL 数组', like_count INT NULL, publish_time VARCHAR(64) NULL, found_by JSON NULL COMMENT '命中的措辞数组', knowledge_type JSON NULL COMMENT 'llm 判定的知识类型', overall_score FLOAT NULL COMMENT '综合分(相关性两子项均值,便于排序)', llm_evaluation JSON NULL COMMENT '完整评分 blob', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, UNIQUE KEY uk_q_case (q, case_id), KEY idx_platform (platform), KEY idx_query_text (query_text) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='固定query搜索+评分结果'; """ DDL_TOOLS = """ CREATE TABLE IF NOT EXISTS fqe_tools ( id BIGINT AUTO_INCREMENT PRIMARY KEY, q VARCHAR(16) NOT NULL, case_id VARCHAR(128) NOT NULL, platform VARCHAR(32) NULL, post_title VARCHAR(512) NULL, tool_name VARCHAR(255) NULL COMMENT '工具名称', substance_scope VARCHAR(255) NULL COMMENT '实质作用域', form_scope VARCHAR(255) NULL COMMENT '形式作用域', creation_layer VARCHAR(32) NULL COMMENT '创作层级:制作层/创作层', source_link VARCHAR(1024) NULL COMMENT '来源链接', input_desc TEXT NULL COMMENT '输入', output_desc TEXT NULL COMMENT '输出', usage_json JSON NULL COMMENT '用法数组', cases_json JSON NULL COMMENT '案例数组', defects_json JSON NULL COMMENT '缺点数组', updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间', model VARCHAR(64) NULL COMMENT '解构模型', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, KEY idx_q_case (q, case_id), KEY idx_tool_name (tool_name) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)'; """ def init_tables(): """建表(幂等)。""" if not _enabled(): print("⚠️ MySQL 未启用(缺 pymysql 或 MYSQL_HOST),跳过建表") return False conn = _conn() try: with conn.cursor() as cur: cur.execute(DDL_POSTS) cur.execute(DDL_TOOLS) print("✅ 建表完成:fqe_posts, fqe_tools") return True finally: conn.close() # ── 写入 ───────────────────────────────────────────────────────────────────── def _overall_from_eval(e): """从 mod schema 评分粗算综合分(相关性两子项均值)。算不出返回 None。""" try: rel = (e or {}).get("相关性") or {} vals = [] for k in ("和内容制作知识相关", "和 query 相关"): v = (rel.get(k) or {}).get("得分") if v is not None: vals.append(float(v)) return round(sum(vals) / len(vals), 2) if vals else None except Exception: return None def upsert_posts(q, query_text, results): """把一组搜索结果写入 fqe_posts(按 (q, case_id) upsert)。返回写入条数;失败返回 0。""" if not _enabled() or not results: return 0 rows = [] for r in results: post = r.get("post") or {} e = r.get("llm_evaluation") or {} rows.append(( q, query_text, r.get("case_id"), r.get("platform"), r.get("channel_content_id"), (post.get("title") or post.get("desc") or "")[:500], r.get("source_url"), post.get("body_text") or post.get("desc") or "", json.dumps(post.get("images") or [], ensure_ascii=False), post.get("like_count"), str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64], json.dumps(r.get("found_by_queries") or [], ensure_ascii=False), json.dumps(e.get("知识类型") or [], ensure_ascii=False), _overall_from_eval(e), json.dumps(e, ensure_ascii=False), )) sql = """ INSERT INTO fqe_posts (q, query_text, case_id, platform, channel_content_id, title, url, body, images, like_count, publish_time, found_by, knowledge_type, overall_score, llm_evaluation) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) ON DUPLICATE KEY UPDATE query_text=VALUES(query_text), platform=VALUES(platform), channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url), body=VALUES(body), images=VALUES(images), like_count=VALUES(like_count), publish_time=VALUES(publish_time), found_by=VALUES(found_by), knowledge_type=VALUES(knowledge_type), overall_score=VALUES(overall_score), llm_evaluation=VALUES(llm_evaluation); """ try: conn = _conn() try: with conn.cursor() as cur: cur.executemany(sql, rows) return len(rows) finally: conn.close() except Exception as ex: print(f"⚠️ fqe_posts 写库失败(不影响本地 json):{ex}") return 0 def upsert_tools(q, case_id, model, tools, platform=None, post_title=None): """把一帖的工具解构结果写入 fqe_tools(先删该 (q,case_id) 旧行再插)。失败返回 0。""" if not _enabled(): return 0 try: conn = _conn() try: with conn.cursor() as cur: cur.execute("DELETE FROM fqe_tools WHERE q=%s AND case_id=%s", (q, case_id)) if tools: rows = [( q, case_id, platform, (post_title or "")[:500], t.get("工具名称"), t.get("实质作用域"), t.get("形式作用域"), t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"), json.dumps(t.get("用法"), ensure_ascii=False) if t.get("用法") is not None else None, json.dumps(t.get("案例"), ensure_ascii=False) if t.get("案例") is not None else None, json.dumps(t.get("缺点"), ensure_ascii=False) if t.get("缺点") is not None else None, t.get("最新更新时间"), model, ) for t in tools] cur.executemany(""" INSERT INTO fqe_tools (q, case_id, platform, post_title, tool_name, substance_scope, form_scope, creation_layer, source_link, input_desc, output_desc, usage_json, cases_json, defects_json, updated_time, model) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s) """, rows) return len(tools) finally: conn.close() except Exception as ex: print(f"⚠️ fqe_tools 写库失败(不影响本地 json):{ex}") return 0 # ── 读取(本地 runs_full 被清空时,server 回退读库重建视图)──────────────────────── def _loads(v, default=None): """pymysql 的 JSON 列可能返回字符串,统一解析。""" if v is None: return default if isinstance(v, (list, dict)): return v try: return json.loads(v) except Exception: return default def fetch_posts_grouped(): """从 fqe_posts 重建 {q: {query_text, results:[r...]}}(r 的结构对齐 form_A.json 的 result, 便于喂给 server 的 adapt())。库不可用/空/异常返回 {}。""" if not _enabled(): return {} try: conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM fqe_posts ORDER BY q, overall_score DESC") rows = cur.fetchall() finally: conn.close() except Exception as ex: print(f"⚠️ 读 fqe_posts 失败:{ex}") return {} out = {} for row in rows: q = row["q"] r = { "case_id": row["case_id"], "platform": row["platform"], "channel_content_id": row["channel_content_id"], "source_url": row["url"], "found_by_queries": _loads(row["found_by"], []), "llm_evaluation": _loads(row["llm_evaluation"], {}), "post": { "title": row["title"], "body_text": row["body"], "images": _loads(row["images"], []), "like_count": row["like_count"], "publish_timestamp": row["publish_time"], }, } out.setdefault(q, {"query_text": row["query_text"], "results": []})["results"].append(r) return out def fetch_tools(q, case_id): """从 fqe_tools 重建 {case_id, model, tool_count, tools:[...]}(对齐本地 tools/{case_id}.json)。 无记录返回 None。""" if not _enabled(): return None try: conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT * FROM fqe_tools WHERE q=%s AND case_id=%s ORDER BY id", (q, case_id)) rows = cur.fetchall() finally: conn.close() except Exception as ex: print(f"⚠️ 读 fqe_tools 失败:{ex}") return None if not rows: return None tools = [{ "工具名称": r["tool_name"], "实质作用域": r["substance_scope"], "形式作用域": r["form_scope"], "创作层级": r["creation_layer"], "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"], "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]), "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"], } for r in rows] return {"case_id": case_id, "platform": rows[0]["platform"], "title": rows[0]["post_title"], "model": rows[0]["model"], "tool_count": len(tools), "tools": tools} def has_tools(q, case_id): """库里是否有该帖的工具解构记录。""" if not _enabled(): return False try: conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT 1 FROM fqe_tools WHERE q=%s AND case_id=%s LIMIT 1", (q, case_id)) return cur.fetchone() is not None finally: conn.close() except Exception: return False def rebuild_local(runs_dir=None): """从数据库重建本地 runs_full/{q}/form_A.json 和 tools/{case_id}.json。 用于本地文件丢失后恢复(界面回退只让界面有数据;提取脚本仍读本地文件,故提供本命令)。""" if not _enabled(): print("⚠️ MySQL 未启用,无法重建"); return runs_dir = Path(runs_dir) if runs_dir else (Path(__file__).resolve().parent / "runs_full") grouped = fetch_posts_grouped() np = nt = 0 for q, g in grouped.items(): d = {"form": "A", "query": g["query_text"], "original_q": g["query_text"] or "", "platforms": [], "total": len(g["results"]), "failed": 0, "results": g["results"]} p = runs_dir / q / "form_A.json" p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(d, ensure_ascii=False, indent=2), encoding="utf-8") np += 1 # tools try: conn = _conn() try: with conn.cursor() as cur: cur.execute("SELECT DISTINCT q, case_id FROM fqe_tools") pairs = cur.fetchall() finally: conn.close() for row in pairs: data = fetch_tools(row["q"], row["case_id"]) if data: tp = runs_dir / row["q"] / "tools" / f"{row['case_id']}.json" tp.parent.mkdir(parents=True, exist_ok=True) tp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") nt += 1 except Exception as ex: print(f"⚠️ 重建 tools 失败:{ex}") print(f"✅ 从库重建本地:{np} 个 query 的 form_A.json,{nt} 帖工具结果 → {runs_dir}") if __name__ == "__main__": cmd = sys.argv[1] if len(sys.argv) > 1 else "" if cmd == "init": init_tables() elif cmd == "rebuild": rebuild_local() else: print("用法:\n python db.py init # 建表\n python db.py rebuild # 从库重建本地 runs_full(本地文件丢失后恢复)")