| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- # -*- coding: utf-8 -*-
- """fixed_query_eval · MySQL 持久化(双写:本地 json + 数据库)
- ================================================================================
- 读 .env 的 MYSQL_* 连接 MySQL(仓库 MySQL 约定是 pymysql)。两张表:
- fqe_posts —— 每行一个 (query, 帖子):搜索 + llm 评分结果
- fqe_tools —— 每行一个解构出的工具:工具解构结果
- 设计原则:
- - **失败不阻断**:所有写入用 try/except 包,DB 挂了不影响本地 json 写入(文件是主存储)。
- - **幂等**:posts 用 (q, case_id) 唯一键 upsert;tools 先按 (q, case_id) 删旧再插新(重新解构会覆盖)。
- - 建表:init_tables() 用 CREATE TABLE IF NOT EXISTS,跑 `python db.py init` 即建表。
- """
- import os
- import json
- import sys
- from pathlib import Path
- PROJECT_ROOT = Path(__file__).resolve().parents[5]
- sys.path.insert(0, str(PROJECT_ROOT))
- from dotenv import load_dotenv
- load_dotenv()
- try:
- import pymysql
- from pymysql.cursors import DictCursor
- except ImportError:
- pymysql = None
- def _enabled() -> bool:
- return pymysql is not None and bool(os.getenv("MYSQL_HOST"))
- def _conn():
- return pymysql.connect(
- host=os.getenv("MYSQL_HOST"),
- port=int(os.getenv("MYSQL_PORT", 3306)),
- user=os.getenv("MYSQL_USER"),
- password=os.getenv("MYSQL_PASSWORD"),
- database=os.getenv("MYSQL_DATABASE"),
- charset="utf8mb4",
- cursorclass=DictCursor,
- autocommit=True,
- connect_timeout=10,
- )
- # ── DDL ──────────────────────────────────────────────────────────────────────
- DDL_POSTS = """
- CREATE TABLE IF NOT EXISTS fqe_posts (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- q VARCHAR(16) NOT NULL COMMENT 'query 目录名 q0000',
- query_text VARCHAR(255) NULL COMMENT '基准 query(如 GPT image2 评测)',
- case_id VARCHAR(128) NOT NULL COMMENT 'platform_channelContentId',
- platform VARCHAR(32) NULL,
- channel_content_id VARCHAR(128) NULL,
- title VARCHAR(512) NULL,
- url VARCHAR(1024) NULL,
- body MEDIUMTEXT NULL,
- images JSON NULL COMMENT '图片 URL 数组',
- like_count INT NULL,
- publish_time VARCHAR(64) NULL,
- found_by JSON NULL COMMENT '命中的措辞数组',
- knowledge_type JSON NULL COMMENT 'llm 判定的知识类型',
- overall_score FLOAT NULL COMMENT '综合分(相关性两子项均值,便于排序)',
- llm_evaluation JSON NULL COMMENT '完整评分 blob',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
- UNIQUE KEY uk_q_case (q, case_id),
- KEY idx_platform (platform),
- KEY idx_query_text (query_text)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='固定query搜索+评分结果';
- """
- DDL_TOOLS = """
- CREATE TABLE IF NOT EXISTS fqe_tools (
- id BIGINT AUTO_INCREMENT PRIMARY KEY,
- q VARCHAR(16) NOT NULL,
- case_id VARCHAR(128) NOT NULL,
- platform VARCHAR(32) NULL,
- post_title VARCHAR(512) NULL,
- tool_name VARCHAR(255) NULL COMMENT '工具名称',
- substance_scope VARCHAR(255) NULL COMMENT '实质作用域',
- form_scope VARCHAR(255) NULL COMMENT '形式作用域',
- creation_layer VARCHAR(32) NULL COMMENT '创作层级:制作层/创作层',
- source_link VARCHAR(1024) NULL COMMENT '来源链接',
- input_desc TEXT NULL COMMENT '输入',
- output_desc TEXT NULL COMMENT '输出',
- usage_json JSON NULL COMMENT '用法数组',
- cases_json JSON NULL COMMENT '案例数组',
- defects_json JSON NULL COMMENT '缺点数组',
- updated_time VARCHAR(64) NULL COMMENT '工具最新更新时间',
- model VARCHAR(64) NULL COMMENT '解构模型',
- created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
- KEY idx_q_case (q, case_id),
- KEY idx_tool_name (tool_name)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='工具解构结果(每行一个工具)';
- """
- def init_tables():
- """建表(幂等)。"""
- if not _enabled():
- print("⚠️ MySQL 未启用(缺 pymysql 或 MYSQL_HOST),跳过建表")
- return False
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute(DDL_POSTS)
- cur.execute(DDL_TOOLS)
- print("✅ 建表完成:fqe_posts, fqe_tools")
- return True
- finally:
- conn.close()
- # ── 写入 ─────────────────────────────────────────────────────────────────────
- def _overall_from_eval(e):
- """从 mod schema 评分粗算综合分(相关性两子项均值)。算不出返回 None。"""
- try:
- rel = (e or {}).get("相关性") or {}
- vals = []
- for k in ("和内容制作知识相关", "和 query 相关"):
- v = (rel.get(k) or {}).get("得分")
- if v is not None:
- vals.append(float(v))
- return round(sum(vals) / len(vals), 2) if vals else None
- except Exception:
- return None
- def upsert_posts(q, query_text, results):
- """把一组搜索结果写入 fqe_posts(按 (q, case_id) upsert)。返回写入条数;失败返回 0。"""
- if not _enabled() or not results:
- return 0
- rows = []
- for r in results:
- post = r.get("post") or {}
- e = r.get("llm_evaluation") or {}
- rows.append((
- q, query_text, r.get("case_id"), r.get("platform"), r.get("channel_content_id"),
- (post.get("title") or post.get("desc") or "")[:500],
- r.get("source_url"),
- post.get("body_text") or post.get("desc") or "",
- json.dumps(post.get("images") or [], ensure_ascii=False),
- post.get("like_count"),
- str(post.get("publish_time") or post.get("publish_timestamp") or "")[:64],
- json.dumps(r.get("found_by_queries") or [], ensure_ascii=False),
- json.dumps(e.get("知识类型") or [], ensure_ascii=False),
- _overall_from_eval(e),
- json.dumps(e, ensure_ascii=False),
- ))
- sql = """
- INSERT INTO fqe_posts
- (q, query_text, case_id, platform, channel_content_id, title, url, body,
- images, like_count, publish_time, found_by, knowledge_type, overall_score, llm_evaluation)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
- ON DUPLICATE KEY UPDATE
- query_text=VALUES(query_text), platform=VALUES(platform),
- channel_content_id=VALUES(channel_content_id), title=VALUES(title), url=VALUES(url),
- body=VALUES(body), images=VALUES(images), like_count=VALUES(like_count),
- publish_time=VALUES(publish_time), found_by=VALUES(found_by),
- knowledge_type=VALUES(knowledge_type), overall_score=VALUES(overall_score),
- llm_evaluation=VALUES(llm_evaluation);
- """
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.executemany(sql, rows)
- return len(rows)
- finally:
- conn.close()
- except Exception as ex:
- print(f"⚠️ fqe_posts 写库失败(不影响本地 json):{ex}")
- return 0
- def upsert_tools(q, case_id, model, tools, platform=None, post_title=None):
- """把一帖的工具解构结果写入 fqe_tools(先删该 (q,case_id) 旧行再插)。失败返回 0。"""
- if not _enabled():
- return 0
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute("DELETE FROM fqe_tools WHERE q=%s AND case_id=%s", (q, case_id))
- if tools:
- rows = [(
- q, case_id, platform, (post_title or "")[:500],
- t.get("工具名称"), t.get("实质作用域"), t.get("形式作用域"),
- t.get("创作层级"), t.get("来源链接"), t.get("输入"), t.get("输出"),
- json.dumps(t.get("用法"), ensure_ascii=False) if t.get("用法") is not None else None,
- json.dumps(t.get("案例"), ensure_ascii=False) if t.get("案例") is not None else None,
- json.dumps(t.get("缺点"), ensure_ascii=False) if t.get("缺点") is not None else None,
- t.get("最新更新时间"), model,
- ) for t in tools]
- cur.executemany("""
- INSERT INTO fqe_tools
- (q, case_id, platform, post_title, tool_name, substance_scope, form_scope,
- creation_layer, source_link, input_desc, output_desc,
- usage_json, cases_json, defects_json, updated_time, model)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
- """, rows)
- return len(tools)
- finally:
- conn.close()
- except Exception as ex:
- print(f"⚠️ fqe_tools 写库失败(不影响本地 json):{ex}")
- return 0
- # ── 读取(本地 runs_full 被清空时,server 回退读库重建视图)────────────────────────
- def _loads(v, default=None):
- """pymysql 的 JSON 列可能返回字符串,统一解析。"""
- if v is None:
- return default
- if isinstance(v, (list, dict)):
- return v
- try:
- return json.loads(v)
- except Exception:
- return default
- def fetch_posts_grouped():
- """从 fqe_posts 重建 {q: {query_text, results:[r...]}}(r 的结构对齐 form_A.json 的 result,
- 便于喂给 server 的 adapt())。库不可用/空/异常返回 {}。"""
- if not _enabled():
- return {}
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute("SELECT * FROM fqe_posts ORDER BY q, overall_score DESC")
- rows = cur.fetchall()
- finally:
- conn.close()
- except Exception as ex:
- print(f"⚠️ 读 fqe_posts 失败:{ex}")
- return {}
- out = {}
- for row in rows:
- q = row["q"]
- r = {
- "case_id": row["case_id"], "platform": row["platform"],
- "channel_content_id": row["channel_content_id"], "source_url": row["url"],
- "found_by_queries": _loads(row["found_by"], []),
- "llm_evaluation": _loads(row["llm_evaluation"], {}),
- "post": {
- "title": row["title"], "body_text": row["body"],
- "images": _loads(row["images"], []), "like_count": row["like_count"],
- "publish_timestamp": row["publish_time"],
- },
- }
- out.setdefault(q, {"query_text": row["query_text"], "results": []})["results"].append(r)
- return out
- def fetch_tools(q, case_id):
- """从 fqe_tools 重建 {case_id, model, tool_count, tools:[...]}(对齐本地 tools/{case_id}.json)。
- 无记录返回 None。"""
- if not _enabled():
- return None
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute("SELECT * FROM fqe_tools WHERE q=%s AND case_id=%s ORDER BY id", (q, case_id))
- rows = cur.fetchall()
- finally:
- conn.close()
- except Exception as ex:
- print(f"⚠️ 读 fqe_tools 失败:{ex}")
- return None
- if not rows:
- return None
- tools = [{
- "工具名称": r["tool_name"], "实质作用域": r["substance_scope"],
- "形式作用域": r["form_scope"], "创作层级": r["creation_layer"],
- "来源链接": r["source_link"], "输入": r["input_desc"], "输出": r["output_desc"],
- "用法": _loads(r["usage_json"]), "案例": _loads(r["cases_json"]),
- "缺点": _loads(r["defects_json"]), "最新更新时间": r["updated_time"],
- } for r in rows]
- return {"case_id": case_id, "platform": rows[0]["platform"],
- "title": rows[0]["post_title"], "model": rows[0]["model"],
- "tool_count": len(tools), "tools": tools}
- def has_tools(q, case_id):
- """库里是否有该帖的工具解构记录。"""
- if not _enabled():
- return False
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute("SELECT 1 FROM fqe_tools WHERE q=%s AND case_id=%s LIMIT 1", (q, case_id))
- return cur.fetchone() is not None
- finally:
- conn.close()
- except Exception:
- return False
- def rebuild_local(runs_dir=None):
- """从数据库重建本地 runs_full/{q}/form_A.json 和 tools/{case_id}.json。
- 用于本地文件丢失后恢复(界面回退只让界面有数据;提取脚本仍读本地文件,故提供本命令)。"""
- if not _enabled():
- print("⚠️ MySQL 未启用,无法重建"); return
- runs_dir = Path(runs_dir) if runs_dir else (Path(__file__).resolve().parent / "runs_full")
- grouped = fetch_posts_grouped()
- np = nt = 0
- for q, g in grouped.items():
- d = {"form": "A", "query": g["query_text"], "original_q": g["query_text"] or "",
- "platforms": [], "total": len(g["results"]), "failed": 0, "results": g["results"]}
- p = runs_dir / q / "form_A.json"
- p.parent.mkdir(parents=True, exist_ok=True)
- p.write_text(json.dumps(d, ensure_ascii=False, indent=2), encoding="utf-8")
- np += 1
- # tools
- try:
- conn = _conn()
- try:
- with conn.cursor() as cur:
- cur.execute("SELECT DISTINCT q, case_id FROM fqe_tools")
- pairs = cur.fetchall()
- finally:
- conn.close()
- for row in pairs:
- data = fetch_tools(row["q"], row["case_id"])
- if data:
- tp = runs_dir / row["q"] / "tools" / f"{row['case_id']}.json"
- tp.parent.mkdir(parents=True, exist_ok=True)
- tp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
- nt += 1
- except Exception as ex:
- print(f"⚠️ 重建 tools 失败:{ex}")
- print(f"✅ 从库重建本地:{np} 个 query 的 form_A.json,{nt} 帖工具结果 → {runs_dir}")
- if __name__ == "__main__":
- cmd = sys.argv[1] if len(sys.argv) > 1 else ""
- if cmd == "init":
- init_tables()
- elif cmd == "rebuild":
- rebuild_local()
- else:
- print("用法:\n python db.py init # 建表\n python db.py rebuild # 从库重建本地 runs_full(本地文件丢失后恢复)")
|