| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359 |
- """
- KnowHub Server
- Agent 工具使用经验的共享平台。
- FastAPI + SQLite,单文件部署。
- """
- import os
- import sqlite3
- from contextlib import asynccontextmanager
- from datetime import datetime, timezone
- from typing import Optional
- from pathlib import Path
- from fastapi import FastAPI, HTTPException, Query
- from pydantic import BaseModel, Field
- BRAND_NAME = os.getenv("BRAND_NAME", "KnowHub")
- BRAND_API_ENV = os.getenv("BRAND_API_ENV", "KNOWHUB_API")
- BRAND_DB = os.getenv("BRAND_DB", "knowhub.db")
- DB_PATH = Path(__file__).parent / BRAND_DB
- # --- 数据库 ---
- def get_db() -> sqlite3.Connection:
- conn = sqlite3.connect(str(DB_PATH))
- conn.row_factory = sqlite3.Row
- conn.execute("PRAGMA journal_mode=WAL")
- return conn
- def init_db():
- conn = get_db()
- conn.execute("""
- CREATE TABLE IF NOT EXISTS experiences (
- id INTEGER PRIMARY KEY AUTOINCREMENT,
- name TEXT NOT NULL,
- url TEXT DEFAULT '',
- category TEXT DEFAULT '',
- task TEXT NOT NULL,
- score INTEGER CHECK(score BETWEEN 1 AND 5),
- outcome TEXT DEFAULT '',
- tips TEXT DEFAULT '',
- content_id TEXT DEFAULT '',
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.execute("CREATE INDEX IF NOT EXISTS idx_name ON experiences(name)")
- conn.execute("""
- CREATE TABLE IF NOT EXISTS contents (
- id TEXT PRIMARY KEY,
- title TEXT DEFAULT '',
- body TEXT NOT NULL,
- sort_order INTEGER DEFAULT 0,
- submitted_by TEXT DEFAULT '',
- created_at TEXT NOT NULL
- )
- """)
- conn.commit()
- conn.close()
- # --- Models ---
- class ExperienceIn(BaseModel):
- name: str
- url: str = ""
- category: str = ""
- task: str
- score: int = Field(ge=1, le=5)
- outcome: str = ""
- tips: str = ""
- content_id: str = ""
- submitted_by: str = ""
- class ExperienceOut(BaseModel):
- task: str
- score: int
- outcome: str
- tips: str
- content_id: str
- submitted_by: str
- created_at: str
- class ResourceResult(BaseModel):
- name: str
- url: str
- relevant_experiences: list[ExperienceOut]
- avg_score: float
- experience_count: int
- class SearchResponse(BaseModel):
- results: list[ResourceResult]
- class ResourceDetailResponse(BaseModel):
- name: str
- url: str
- category: str
- avg_score: float
- experience_count: int
- experiences: list[ExperienceOut]
- class ContentIn(BaseModel):
- id: str
- title: str = ""
- body: str
- sort_order: int = 0
- submitted_by: str = ""
- class ContentNode(BaseModel):
- id: str
- title: str
- class ContentOut(BaseModel):
- id: str
- title: str
- body: str
- toc: Optional[ContentNode] = None
- children: list[ContentNode]
- prev: Optional[ContentNode] = None
- next: Optional[ContentNode] = None
- # --- App ---
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- init_db()
- yield
- app = FastAPI(title=BRAND_NAME, lifespan=lifespan)
- def _search_rows(conn: sqlite3.Connection, q: str, category: Optional[str]) -> list[sqlite3.Row]:
- """LIKE 搜索,拆词后 AND 连接,匹配 task + tips + outcome + name"""
- terms = q.split()
- if not terms:
- return []
- conditions = []
- params: list[str] = []
- for term in terms:
- like = f"%{term}%"
- conditions.append(
- "(task LIKE ? OR tips LIKE ? OR outcome LIKE ? OR name LIKE ?)"
- )
- params.extend([like, like, like, like])
- if category:
- conditions.append("category = ?")
- params.append(category)
- sql = (
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences WHERE "
- + " AND ".join(conditions)
- + " ORDER BY created_at DESC"
- )
- return conn.execute(sql, params).fetchall()
- def _group_by_resource(rows: list[sqlite3.Row], limit: int) -> list[ResourceResult]:
- """按 name 分组并聚合"""
- groups: dict[str, list[sqlite3.Row]] = {}
- for row in rows:
- name = row["name"]
- if name not in groups:
- groups[name] = []
- groups[name].append(row)
- results = []
- for resource_name, resource_rows in groups.items():
- scores = [r["score"] for r in resource_rows]
- avg = sum(scores) / len(scores)
- results.append(ResourceResult(
- name=resource_name,
- url=resource_rows[0]["url"],
- relevant_experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in resource_rows
- ],
- avg_score=round(avg, 1),
- experience_count=len(resource_rows),
- ))
- results.sort(key=lambda r: r.avg_score * r.experience_count, reverse=True)
- return results[:limit]
- @app.get("/api/search", response_model=SearchResponse)
- def search_experiences(
- q: str = Query(..., min_length=1),
- category: Optional[str] = None,
- limit: int = Query(default=10, ge=1, le=50),
- ):
- conn = get_db()
- try:
- rows = _search_rows(conn, q, category)
- return SearchResponse(results=_group_by_resource(rows, limit))
- finally:
- conn.close()
- @app.post("/api/experience", status_code=201)
- def submit_experience(exp: ExperienceIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT INTO experiences"
- "(name, url, category, task, score, outcome, tips, content_id, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
- (exp.name, exp.url, exp.category, exp.task,
- exp.score, exp.outcome, exp.tips, exp.content_id, exp.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/resource/{name}", response_model=ResourceDetailResponse)
- def get_resource_experiences(name: str):
- conn = get_db()
- try:
- rows = conn.execute(
- "SELECT name, url, category, task, score, outcome, tips, content_id, "
- "submitted_by, created_at FROM experiences "
- "WHERE name = ? ORDER BY created_at DESC",
- (name,),
- ).fetchall()
- if not rows:
- raise HTTPException(status_code=404, detail=f"No experiences found for resource: {name}")
- scores = [r["score"] for r in rows]
- avg = sum(scores) / len(scores)
- return ResourceDetailResponse(
- name=name,
- url=rows[0]["url"],
- category=rows[0]["category"],
- avg_score=round(avg, 1),
- experience_count=len(rows),
- experiences=[
- ExperienceOut(
- task=r["task"],
- score=r["score"],
- outcome=r["outcome"],
- tips=r["tips"],
- content_id=r["content_id"],
- submitted_by=r["submitted_by"],
- created_at=r["created_at"],
- )
- for r in rows
- ],
- )
- finally:
- conn.close()
- @app.post("/api/content", status_code=201)
- def submit_content(content: ContentIn):
- conn = get_db()
- try:
- now = datetime.now(timezone.utc).isoformat()
- conn.execute(
- "INSERT OR REPLACE INTO contents"
- "(id, title, body, sort_order, submitted_by, created_at)"
- " VALUES (?, ?, ?, ?, ?, ?)",
- (content.id, content.title, content.body, content.sort_order, content.submitted_by, now),
- )
- conn.commit()
- return {"status": "ok"}
- finally:
- conn.close()
- @app.get("/api/content/{content_id:path}", response_model=ContentOut)
- def get_content(content_id: str):
- conn = get_db()
- try:
- row = conn.execute(
- "SELECT id, title, body, sort_order FROM contents WHERE id = ?",
- (content_id,),
- ).fetchone()
- if not row:
- raise HTTPException(status_code=404, detail=f"Content not found: {content_id}")
- # 计算导航上下文
- root_id = content_id.split("/")[0] if "/" in content_id else content_id
- # TOC (根节点)
- toc = None
- if "/" in content_id:
- toc_row = conn.execute(
- "SELECT id, title FROM contents WHERE id = ?",
- (root_id,),
- ).fetchone()
- if toc_row:
- toc = ContentNode(id=toc_row["id"], title=toc_row["title"])
- # Children (子节点)
- children = []
- children_rows = conn.execute(
- "SELECT id, title FROM contents WHERE id LIKE ? AND id != ? ORDER BY sort_order",
- (f"{content_id}/%", content_id),
- ).fetchall()
- children = [ContentNode(id=r["id"], title=r["title"]) for r in children_rows]
- # Prev/Next (同级节点)
- prev_node = None
- next_node = None
- if "/" in content_id:
- siblings = conn.execute(
- "SELECT id, title, sort_order FROM contents WHERE id LIKE ? AND id NOT LIKE ? ORDER BY sort_order",
- (f"{root_id}/%", f"{root_id}/%/%"),
- ).fetchall()
- for i, sib in enumerate(siblings):
- if sib["id"] == content_id:
- if i > 0:
- prev_node = ContentNode(id=siblings[i-1]["id"], title=siblings[i-1]["title"])
- if i < len(siblings) - 1:
- next_node = ContentNode(id=siblings[i+1]["id"], title=siblings[i+1]["title"])
- break
- return ContentOut(
- id=row["id"],
- title=row["title"],
- body=row["body"],
- toc=toc,
- children=children,
- prev=prev_node,
- next=next_node,
- )
- finally:
- conn.close()
- if __name__ == "__main__":
- import uvicorn
- uvicorn.run(app, host="0.0.0.0", port=8000)
|