""" KnowHub Server Agent 工具使用经验的共享平台。 FastAPI + SQLite,单文件部署。 """ import os import sqlite3 from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Optional from pathlib import Path from fastapi import FastAPI, HTTPException, Query from pydantic import BaseModel, Field BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub") BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API") BRAND_DB = os.getenv("BRAND_DB", "knowhub.db") DB_PATH = Path(__file__).parent / BRAND_DB # --- 数据库 --- def get_db() -> sqlite3.Connection: conn = sqlite3.connect(str(DB_PATH)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA journal_mode=WAL") return conn def init_db(): conn = get_db() conn.execute(""" CREATE TABLE IF NOT EXISTS experiences ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL, url TEXT DEFAULT '', category TEXT DEFAULT '', task TEXT NOT NULL, score INTEGER CHECK(score BETWEEN 1 AND 5), outcome TEXT DEFAULT '', tips TEXT DEFAULT '', content_id TEXT DEFAULT '', submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)") conn.execute(""" CREATE TABLE IF NOT EXISTS contents ( id TEXT PRIMARY KEY, title TEXT DEFAULT '', body TEXT NOT NULL, sort_order INTEGER DEFAULT 0, submitted_by TEXT DEFAULT '', created_at TEXT NOT NULL ) """) conn.commit() conn.close() # --- Models --- class ExperienceIn(BaseModel): name: str url: str = "" category: str = "" task: str score: int = Field(ge=1, le=5) outcome: str = "" tips: str = "" content_id: str = "" submitted_by: str = "" class ExperienceOut(BaseModel): task: str score: int outcome: str tips: str content_id: str submitted_by: str created_at: str class ResourceResult(BaseModel): name: str url: str relevant_experiences: list[ExperienceOut] avg_score: float experience_count: int class SearchResponse(BaseModel): results: list[ResourceResult] class ResourceDetailResponse(BaseModel): name: str url: str category: str avg_score: float experience_count: int experiences: list[ExperienceOut] class ContentIn(BaseModel): id: str title: str = "" body: str sort_order: int = 0 submitted_by: str = "" class ContentNode(BaseModel): id: str title: str class ContentOut(BaseModel): id: str title: str body: str toc: Optional[ContentNode] = None children: list[ContentNode] prev: Optional[ContentNode] = None next: Optional[ContentNode] = None # --- App --- @asynccontextmanager async def lifespan(app: FastAPI): init_db() yield app = FastAPI(title=BRAND_NAME, lifespan=lifespan) def _search_rows(conn: sqlite3.Connection, q: str, category: Optional[str]) -> list[sqlite3.Row]: """LIKE 搜索,拆词后 AND 连接,匹配 task + tips + outcome + name""" terms = q.split() if not terms: return [] conditions = [] params: list[str] = [] for term in terms: like = f"%{term}%" conditions.append( "(task LIKE ? OR tips LIKE ? OR outcome LIKE ? OR name LIKE ?)" ) params.extend([like, like, like, like]) if category: conditions.append("category = ?") params.append(category) sql = ( "SELECT name, url, category, task, score, outcome, tips, content_id, " "submitted_by, created_at FROM experiences WHERE " + " AND ".join(conditions) + " ORDER BY created_at DESC" ) return conn.execute(sql, params).fetchall() def _group_by_resource(rows: list[sqlite3.Row], limit: int) -> list[ResourceResult]: """按 name 分组并聚合""" groups: dict[str, list[sqlite3.Row]] = {} for row in rows: name = row["name"] if name not in groups: groups[name] = [] groups[name].append(row) results = [] for resource_name, resource_rows in groups.items(): scores = [r["score"] for r in resource_rows] avg = sum(scores) / len(scores) results.append(ResourceResult( name=resource_name, url=resource_rows[0]["url"], relevant_experiences=[ ExperienceOut( task=r["task"], score=r["score"], outcome=r["outcome"], tips=r["tips"], content_id=r["content_id"], submitted_by=r["submitted_by"], created_at=r["created_at"], ) for r in resource_rows ], avg_score=round(avg, 1), experience_count=len(resource_rows), )) results.sort(key=lambda r: r.avg_score * r.experience_count, reverse=True) return results[:limit] @app.get("/api/search", response_model=SearchResponse) def search_experiences( q: str = Query(..., min_length=1), category: Optional[str] = None, limit: int = Query(default=10, ge=1, le=50), ): conn = get_db() try: rows = _search_rows(conn, q, category) return SearchResponse(results=_group_by_resource(rows, limit)) finally: conn.close() @app.post("/api/experience", status_code=201) def submit_experience(exp: ExperienceIn): conn = get_db() try: now = datetime.now(timezone.utc).isoformat() conn.execute( "INSERT INTO experiences" "(name, url, category, task, score, outcome, tips, content_id, submitted_by, created_at)" " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", (exp.name, exp.url, exp.category, exp.task, exp.score, exp.outcome, exp.tips, exp.content_id, exp.submitted_by, now), ) conn.commit() return {"status": "ok"} finally: conn.close() @app.get("/api/resource/{name}", response_model=ResourceDetailResponse) def get_resource_experiences(name: str): conn = get_db() try: rows = conn.execute( "SELECT name, url, category, task, score, outcome, tips, content_id, " "submitted_by, created_at FROM experiences " "WHERE name = ? ORDER BY created_at DESC", (name,), ).fetchall() if not rows: raise HTTPException(status_code=404, detail=f"No experiences found for resource: {name}") scores = [r["score"] for r in rows] avg = sum(scores) / len(scores) return ResourceDetailResponse( name=name, url=rows[0]["url"], category=rows[0]["category"], avg_score=round(avg, 1), experience_count=len(rows), experiences=[ ExperienceOut( task=r["task"], score=r["score"], outcome=r["outcome"], tips=r["tips"], content_id=r["content_id"], submitted_by=r["submitted_by"], created_at=r["created_at"], ) for r in rows ], ) finally: conn.close() @app.post("/api/content", status_code=201) def submit_content(content: ContentIn): conn = get_db() try: now = datetime.now(timezone.utc).isoformat() conn.execute( "INSERT OR REPLACE INTO contents" "(id, title, body, sort_order, submitted_by, created_at)" " VALUES (?, ?, ?, ?, ?, ?)", (content.id, content.title, content.body, content.sort_order, content.submitted_by, now), ) conn.commit() return {"status": "ok"} finally: conn.close() @app.get("/api/content/{content_id:path}", response_model=ContentOut) def get_content(content_id: str): conn = get_db() try: row = conn.execute( "SELECT id, title, body, sort_order FROM contents WHERE id = ?", (content_id,), ).fetchone() if not row: raise HTTPException(status_code=404, detail=f"Content not found: {content_id}") # 计算导航上下文 root_id = content_id.split("/")[0] if "/" in content_id else content_id # TOC (根节点) toc = None if "/" in content_id: toc_row = conn.execute( "SELECT id, title FROM contents WHERE id = ?", (root_id,), ).fetchone() if toc_row: toc = ContentNode(id=toc_row["id"], title=toc_row["title"]) # Children (子节点) children = [] children_rows = conn.execute( "SELECT id, title FROM contents WHERE id LIKE ? AND id != ? ORDER BY sort_order", (f"{content_id}/%", content_id), ).fetchall() children = [ContentNode(id=r["id"], title=r["title"]) for r in children_rows] # Prev/Next (同级节点) prev_node = None next_node = None if "/" in content_id: siblings = conn.execute( "SELECT id, title, sort_order FROM contents WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order", (f"{root_id}/%", f"{root_id}/%/%"), ).fetchall() for i, sib in enumerate(siblings): if sib["id"] == content_id: if i > 0: prev_node = ContentNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"]) if i < len(siblings) - 1: next_node = ContentNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"]) break return ContentOut( id=row["id"], title=row["title"], body=row["body"], toc=toc, children=children, prev=prev_node, next=next_node, ) finally: conn.close() if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)