Ver Fonte

perf(mode_workflow): fetch_queries 改 SQL 标量算采纳(不拉整表 blob)+ server 缓存 query 列表(任务完成失效)

刘文武 há 6 dias atrás
pai
commit
ada64ace0f
2 ficheiros alterados com 25 adições e 4 exclusões
  1. 4 3
      examples/mode_workflow/db.py
  2. 21 1
      examples/mode_workflow/server.py

+ 4 - 3
examples/mode_workflow/db.py

@@ -512,11 +512,12 @@ def fetch_queries(mode="process"):
                                    COUNT(*) AS post_count
                             FROM {table} GROUP BY query_id ORDER BY query_id""")
             queries = cur.fetchall()
-            cur.execute(f"""SELECT query_id, overall_score, llm_evaluation, publish_time
-                            FROM {table}""")
+            # 采纳数:SQL 直取 rel/repro 标量算,**不拉整表 llm_evaluation**(旧版全表 blob,切 tab 巨慢)
+            cur.execute(f"""SELECT query_id, overall_score, publish_time,
+                                   {_REL_SQL} AS rel, {_REPRO_SQL} AS repro FROM {table}""")
             hits = {}
             for r in cur.fetchall():
-                if is_adopted(r["overall_score"], _loads(r["llm_evaluation"]), r["publish_time"]):
+                if is_adopted_rel(r["overall_score"], r["rel"], r["publish_time"], r["repro"]):
                     hits[r["query_id"]] = hits.get(r["query_id"], 0) + 1
             cur.execute("SELECT query_id, COUNT(DISTINCT case_id) AS n FROM mode_process GROUP BY query_id")
             np = {r["query_id"]: r["n"] for r in cur.fetchall()}

+ 21 - 1
examples/mode_workflow/server.py

@@ -216,6 +216,8 @@ _DASH_TTL = 60.0   # 秒
 def _invalidate_dashboard():
     with _DASH_LOCK:
         _DASH_CACHE["ts"] = 0.0
+    with _QUERIES_LOCK:        # query 列表同样只在任务完成时变,一并作废
+        _QUERIES_CACHE.clear()
 
 
 def _dashboard_cached():
@@ -229,6 +231,24 @@ def _dashboard_cached():
     return data
 
 
+# ── query 列表缓存 ────────────────────────────────────────────────────────────
+# /api/queries 要全表 JSON_EXTRACT 算每组采纳数(远程 RDS ~2s),数据同样只在任务完成时变。
+# 缓存 per mode,任务结束随 dashboard 一并作废(见 _invalidate_dashboard);TTL 兜底外部改库。
+_QUERIES_CACHE = {}        # mode -> {"data": [...], "ts": float}
+_QUERIES_LOCK = threading.Lock()
+
+
+def _queries_cached(mode):
+    with _QUERIES_LOCK:
+        c = _QUERIES_CACHE.get(mode)
+        if c and time.monotonic() - c["ts"] < _DASH_TTL:
+            return c["data"]
+    data = db.fetch_queries(mode)   # 计算放锁外,不阻塞其它请求
+    with _QUERIES_LOCK:
+        _QUERIES_CACHE[mode] = {"data": data, "ts": time.monotonic()}
+    return data
+
+
 def _dashboard():
     posts, procs, tools = db.fetch_dashboard_rows()
 
@@ -494,7 +514,7 @@ class Handler(BaseHTTPRequestHandler):
             elif u.path == "/api/dashboard":
                 self._json_etag(_dashboard_cached())
             elif u.path == "/api/queries":
-                self._json(db.fetch_queries(qs.get("mode", "process")))
+                self._json_etag(_queries_cached(qs.get("mode", "process")))
             elif u.path == "/api/posts":
                 self._json(db.fetch_posts(qs.get("query_id", ""), qs.get("mode", "process")))
             elif u.path == "/api/post":