tanjingyu 3 settimane fa
commit
f0669247a0

+ 14 - 0
.env.example

@@ -0,0 +1,14 @@
+# ========== 数据库配置 ==========
+DB_HOST=rm-t4n8oyqunr5b4461s6o.mysql.singapore.rds.aliyuncs.com
+DB_PORT=3306
+DB_USER=developer_saas
+DB_PASSWORD=developer_saas#Aiddit
+DB_NAME=data_nexus
+
+# ========== Gogs 配置 ==========
+GOGS_URL=https://git.yishihui.com
+GOGS_TOKEN=4ae18a8e348dd931e33bf6f752536e9a6fd4d9c3
+GOGS_SECRET=
+
+# ========== 存储配置 ==========
+STORAGE_ROOT=C:\D_DEVICE\Projec\data_nexus\storage

+ 40 - 0
.gitignore

@@ -0,0 +1,40 @@
+# Storage
+storage/
+
+# IDE
+.idea/
+.vscode/
+*.iml
+
+# Claude
+.claude/
+
+# Python
+__pycache__/
+*.py[cod]
+*.pyo
+.Python
+*.egg-info/
+dist/
+build/
+.eggs/
+*.egg
+.venv/
+venv/
+env/
+
+# Environment
+.env
+
+# Logs
+*.log
+logs/
+
+# OS
+.DS_Store
+Thumbs.db
+
+# Test
+.pytest_cache/
+.coverage
+htmlcov/

+ 81 - 0
README.md

@@ -0,0 +1,81 @@
+# DataNexus - 轻量级数据中台
+
+基于 Git Webhook 的自动化数据归集系统,实现代码仓库产出数据的自动提取、版本化存储和统一管理。
+
+## 功能特性
+
+- **自动归集** - Git Push 触发自动数据采集,无需手动操作
+- **版本化存储** - 每次 Commit 独立存储,支持历史回溯
+- **增量更新** - 基于 Git SHA 智能去重,只存储变化的文件
+- **多 Stage 支持** - 单仓库可配置多个数据环节(选题、清洗、分析等)
+- **REST API** - 提供项目、版本、文件的查询和下载接口
+
+## 快速开始
+
+### 1. 安装依赖
+
+```bash
+pip install -r requirements.txt
+```
+
+### 2. 配置环境变量
+
+编辑 `.env` 文件:
+
+```env
+# 数据库
+DB_HOST=localhost
+DB_PORT=3306
+DB_USER=root
+DB_PASSWORD=your_password
+DB_NAME=data_nexus
+
+# Gogs
+GOGS_URL=https://your-gogs-server.com
+GOGS_TOKEN=your_access_token
+GOGS_SECRET=optional_webhook_secret
+
+# 存储
+STORAGE_ROOT=./storage
+```
+
+### 3. 启动服务
+
+```bash
+uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
+```
+
+### 4. 配置 Webhook
+
+在 Gogs 仓库设置中添加 Webhook:
+- URL: `http://your-server:8000/webhook`
+- Content Type: `application/json`
+
+### 5. 添加 manifest.yaml
+
+在仓库根目录创建 `manifest.yaml`:
+
+```yaml
+project_name: "my_project"
+
+stages:
+  - name: "data_collection"
+    outputs:
+      - path: "./results/"
+        pattern: "*.csv"
+      - path: "./report.pdf"
+```
+
+## API 接口
+
+| 接口 | 方法 | 说明 |
+|-----|------|-----|
+| `/webhook` | POST | 接收 Gogs Webhook |
+| `/projects` | GET | 列出所有项目 |
+| `/projects/{id}/versions` | GET | 列出项目版本 |
+| `/versions/{id}/files` | GET | 获取版本文件树 |
+| `/files/{id}/content` | GET | 下载文件内容 |
+
+## 文档
+
+详细设计请参阅 [实现方案设计文档](轻量级数据中台%20(Data-Hub)%20实现方案设计文档.md)

+ 26 - 0
app/config.py

@@ -0,0 +1,26 @@
+import os
+from dotenv import load_dotenv
+
+load_dotenv()
+
+class Settings:
+    PROJECT_NAME: str = "Data Nexus"
+    VERSION: str = "0.1.0"
+    
+    # Database
+    DB_USER: str = os.getenv("DB_USER", "root")
+    DB_PASSWORD: str = os.getenv("DB_PASSWORD", "")
+    DB_HOST: str = os.getenv("DB_HOST", "localhost")
+    DB_PORT: str = os.getenv("DB_PORT", "3306")
+    DB_NAME: str = os.getenv("DB_NAME", "data_nexus")
+    DATABASE_URL: str = f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"
+    
+    # Gogs
+    GOGS_URL: str = os.getenv("GOGS_URL", "http://localhost:3000")
+    GOGS_TOKEN: str = os.getenv("GOGS_TOKEN", "")
+    GOGS_SECRET: str = os.getenv("GOGS_SECRET", "") # Webhook secret
+    
+    # Storage
+    STORAGE_ROOT: str = os.getenv("STORAGE_ROOT", "/data/storage")
+
+settings = Settings()

+ 16 - 0
app/database.py

@@ -0,0 +1,16 @@
+from sqlalchemy import create_engine
+from sqlalchemy.ext.declarative import declarative_base
+from sqlalchemy.orm import sessionmaker
+from app.config import settings
+
+engine = create_engine(settings.DATABASE_URL)
+SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
+
+Base = declarative_base()
+
+def get_db():
+    db = SessionLocal()
+    try:
+        yield db
+    finally:
+        db.close()

+ 218 - 0
app/main.py

@@ -0,0 +1,218 @@
+from fastapi import FastAPI, BackgroundTasks, Request, Depends, HTTPException, Header
+from sqlalchemy.orm import Session
+from typing import List, Optional
+from app.config import settings
+from app.database import engine, Base, get_db
+from app.services.webhook_service import WebhookService
+from app.models import Project, DataVersion, DataFile
+from app import schemas
+import logging
+import os
+import hmac
+import hashlib
+
+# Create tables
+Base.metadata.create_all(bind=engine)
+
+logging.basicConfig(level=logging.INFO)
+logger = logging.getLogger(__name__)
+
+app = FastAPI(title="Data Nexus", version="0.1.0")
+
+
+def build_file_tree(files: List[DataFile]) -> list:
+    """Convert flat file list to tree structure."""
+    tree = {}
+
+    for f in files:
+        parts = f.relative_path.split("/")
+        current = tree
+
+        for i, part in enumerate(parts):
+            if i == len(parts) - 1:
+                # It's a file
+                if "_files" not in current:
+                    current["_files"] = []
+                current["_files"].append({
+                    "name": part,
+                    "type": "file",
+                    "id": f.id,
+                    "size": f.file_size,
+                    "file_type": f.file_type,
+                    "sha": f.file_sha
+                })
+            else:
+                # It's a folder
+                if part not in current:
+                    current[part] = {}
+                current = current[part]
+
+    def convert_to_list(node: dict) -> list:
+        result = []
+        for key, value in node.items():
+            if key == "_files":
+                result.extend(value)
+            else:
+                result.append({
+                    "name": key,
+                    "type": "folder",
+                    "children": convert_to_list(value)
+                })
+        # Sort: folders first, then files
+        result.sort(key=lambda x: (0 if x["type"] == "folder" else 1, x["name"]))
+        return result
+
+    return convert_to_list(tree)
+
+@app.get("/")
+def read_root():
+    return {"message": "Welcome to Data Nexus API"}
+
+
+def verify_webhook_signature(payload_body: bytes, signature: str) -> bool:
+    """Verify Gogs webhook signature."""
+    if not settings.GOGS_SECRET:
+        return True  # No secret configured, skip verification
+    if not signature:
+        return False
+    expected = hmac.new(
+        settings.GOGS_SECRET.encode(),
+        payload_body,
+        hashlib.sha256
+    ).hexdigest()
+    return hmac.compare_digest(f"sha256={expected}", signature)
+
+
+@app.post("/webhook")
+async def webhook_handler(
+    request: Request,
+    background_tasks: BackgroundTasks,
+    db: Session = Depends(get_db),
+    x_gogs_signature: Optional[str] = Header(None)
+):
+    body = await request.body()
+
+    # Verify signature if secret is configured
+    if settings.GOGS_SECRET and not verify_webhook_signature(body, x_gogs_signature):
+        raise HTTPException(status_code=401, detail="Invalid signature")
+
+    try:
+        import json
+        payload = json.loads(body)
+    except Exception:
+        raise HTTPException(status_code=400, detail="Invalid JSON")
+
+    # Process in background
+    service = WebhookService(db)
+    background_tasks.add_task(service.process_webhook, payload)
+
+    return {"status": "ok", "message": "Webhook received"}
+
+
+# ==================== Project APIs ====================
+
+@app.get("/projects", response_model=List[schemas.ProjectOut])
+def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
+    """List all projects."""
+    projects = db.query(Project).offset(skip).limit(limit).all()
+    return projects
+
+
+@app.get("/projects/{project_id}", response_model=schemas.ProjectOut)
+def get_project(project_id: int, db: Session = Depends(get_db)):
+    """Get a single project by ID."""
+    project = db.query(Project).filter(Project.id == project_id).first()
+    if not project:
+        raise HTTPException(status_code=404, detail="Project not found")
+    return project
+
+
+@app.get("/projects/name/{project_name}", response_model=schemas.ProjectOut)
+def get_project_by_name(project_name: str, db: Session = Depends(get_db)):
+    """Get a project by name."""
+    project = db.query(Project).filter(Project.project_name == project_name).first()
+    if not project:
+        raise HTTPException(status_code=404, detail="Project not found")
+    return project
+
+
+# ==================== Version APIs ====================
+
+@app.get("/projects/{project_id}/versions", response_model=List[schemas.DataVersionOut])
+def list_versions(
+    project_id: int,
+    stage: Optional[str] = None,
+    skip: int = 0,
+    limit: int = 100,
+    db: Session = Depends(get_db)
+):
+    """List versions for a project, optionally filtered by stage."""
+    query = db.query(DataVersion).filter(DataVersion.project_id == project_id)
+    if stage:
+        query = query.filter(DataVersion.stage == stage)
+    versions = query.order_by(DataVersion.created_at.desc()).offset(skip).limit(limit).all()
+    return versions
+
+
+@app.get("/versions/{version_id}", response_model=schemas.DataVersionOut)
+def get_version(version_id: int, db: Session = Depends(get_db)):
+    """Get a single version by ID."""
+    version = db.query(DataVersion).filter(DataVersion.id == version_id).first()
+    if not version:
+        raise HTTPException(status_code=404, detail="Version not found")
+    return version
+
+
+@app.get("/versions/{version_id}/files")
+def get_version_files(version_id: int, flat: bool = False, db: Session = Depends(get_db)):
+    """
+    Get files for a version.
+    - flat=False (default): Returns tree structure
+    - flat=True: Returns flat list
+    """
+    version = db.query(DataVersion).filter(DataVersion.id == version_id).first()
+    if not version:
+        raise HTTPException(status_code=404, detail="Version not found")
+
+    files = db.query(DataFile).filter(DataFile.version_id == version_id).all()
+
+    if flat:
+        return [schemas.DataFileOut.model_validate(f) for f in files]
+
+    return build_file_tree(files)
+
+
+# ==================== File APIs ====================
+
+from fastapi.responses import FileResponse
+
+@app.get("/files/{file_id}", response_model=schemas.DataFileOut)
+def get_file_info(file_id: int, db: Session = Depends(get_db)):
+    """Get file metadata."""
+    file_record = db.query(DataFile).filter(DataFile.id == file_id).first()
+    if not file_record:
+        raise HTTPException(status_code=404, detail="File not found")
+    return file_record
+
+
+@app.get("/files/{file_id}/content")
+def get_file_content(file_id: int, db: Session = Depends(get_db)):
+    """Download file content."""
+    file_record = db.query(DataFile).filter(DataFile.id == file_id).first()
+    if not file_record:
+        raise HTTPException(status_code=404, detail="File not found")
+
+    if not file_record.storage_path or not os.path.exists(file_record.storage_path):
+        raise HTTPException(status_code=404, detail="Physical file not found")
+
+    import mimetypes
+    media_type, _ = mimetypes.guess_type(file_record.relative_path)
+    return FileResponse(
+        file_record.storage_path,
+        media_type=media_type,
+        filename=os.path.basename(file_record.relative_path)
+    )
+
+if __name__ == "__main__":
+    import uvicorn
+    uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True)

+ 42 - 0
app/models.py

@@ -0,0 +1,42 @@
+from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, BigInteger
+from sqlalchemy.orm import relationship
+from sqlalchemy.sql import func
+from app.database import Base
+
+class Project(Base):
+    __tablename__ = "projects"
+
+    id = Column(Integer, primary_key=True, index=True)
+    project_name = Column(String(100), unique=True, nullable=False, index=True)
+    description = Column(Text, nullable=True)
+    created_at = Column(DateTime(timezone=True), server_default=func.now())
+
+    versions = relationship("DataVersion", back_populates="project")
+
+class DataVersion(Base):
+    __tablename__ = "data_versions"
+
+    id = Column(Integer, primary_key=True, index=True)
+    project_id = Column(Integer, ForeignKey("projects.id"))
+    stage = Column(String(50), nullable=False)
+    commit_id = Column(String(64), nullable=False)
+    author = Column(String(50))
+    manifest_snapshot = Column(Text)
+    created_at = Column(DateTime(timezone=True), server_default=func.now())
+
+    project = relationship("Project", back_populates="versions")
+    files = relationship("DataFile", back_populates="version")
+
+class DataFile(Base):
+    __tablename__ = "data_files"
+
+    id = Column(Integer, primary_key=True, index=True)
+    version_id = Column(Integer, ForeignKey("data_versions.id"))
+    relative_path = Column(String(255))
+    storage_path = Column(String(500))
+    file_size = Column(BigInteger)
+    file_type = Column(String(20))
+    file_sha = Column(String(64), index=True)  # Git Blob SHA for deduplication
+    created_at = Column(DateTime(timezone=True), server_default=func.now())
+
+    version = relationship("DataVersion", back_populates="files")

+ 68 - 0
app/schemas.py

@@ -0,0 +1,68 @@
+from pydantic import BaseModel
+from typing import List, Optional
+from datetime import datetime
+
+
+class ProjectBase(BaseModel):
+    project_name: str
+    description: Optional[str] = None
+
+
+class ProjectCreate(ProjectBase):
+    pass
+
+
+class ProjectOut(ProjectBase):
+    id: int
+    created_at: datetime
+
+    class Config:
+        from_attributes = True
+
+
+# Keep old name for backward compatibility
+Project = ProjectOut
+
+
+class DataFileBase(BaseModel):
+    relative_path: str
+    file_size: int
+    file_type: str
+    file_sha: str
+
+
+class DataFileOut(DataFileBase):
+    id: int
+    storage_path: str
+    created_at: datetime
+
+    class Config:
+        from_attributes = True
+
+
+# Keep old name for backward compatibility
+DataFile = DataFileOut
+
+
+class DataVersionBase(BaseModel):
+    stage: str
+    commit_id: str
+    author: Optional[str] = None
+    manifest_snapshot: Optional[str] = None
+
+
+class DataVersionOut(DataVersionBase):
+    id: int
+    project_id: int
+    created_at: datetime
+
+    class Config:
+        from_attributes = True
+
+
+class DataVersionWithFiles(DataVersionOut):
+    files: List[DataFileOut] = []
+
+
+# Keep old name for backward compatibility
+DataVersion = DataVersionOut

+ 0 - 0
app/services/__init__.py


+ 52 - 0
app/services/gogs_client.py

@@ -0,0 +1,52 @@
+import httpx
+from app.config import settings
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class GogsClient:
+    def __init__(self):
+        self.base_url = settings.GOGS_URL.rstrip('/')
+        self.token = settings.GOGS_TOKEN
+        self.headers = {"Authorization": f"token {self.token}"}
+
+    async def get_manifest(self, owner: str, repo: str, commit_id: str) -> str:
+        """Fetch manifest.yaml raw content from a specific commit."""
+        # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/manifest.yaml"
+        async with httpx.AsyncClient() as client:
+            resp = await client.get(url, headers=self.headers)
+            if resp.status_code == 404:
+                return None
+            resp.raise_for_status()
+            return resp.text
+
+    async def get_tree(self, owner: str, repo: str, commit_id: str, path: str = "") -> list:
+        """Get the file tree of a repository."""
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
+        async with httpx.AsyncClient() as client:
+            resp = await client.get(url, headers=self.headers)
+            resp.raise_for_status()
+            return resp.json()
+
+    async def get_recursive_tree(self, owner: str, repo: str, commit_id: str) -> dict:
+        """Get recursive tree."""
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/git/trees/{commit_id}?recursive=1"
+        try:
+            async with httpx.AsyncClient() as client:
+                resp = await client.get(url, headers=self.headers)
+                resp.raise_for_status()
+                return resp.json()
+        except httpx.HTTPStatusError as e:
+            logger.error(f"Failed to get recursive tree: {e}")
+            return {"tree": []}
+
+    async def get_file_content(self, owner: str, repo: str, commit_id: str, file_path: str) -> bytes:
+        """Download raw file content."""
+        # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/{file_path}"
+        async with httpx.AsyncClient() as client:
+            resp = await client.get(url, headers=self.headers)
+            resp.raise_for_status()
+            return resp.content

+ 90 - 0
app/services/storage_service.py

@@ -0,0 +1,90 @@
+import os
+import shutil
+import hashlib
+from sqlalchemy.orm import Session
+from app.models import Project, DataVersion, DataFile
+from app.config import settings
+from app.services.gogs_client import GogsClient
+import logging
+
+logger = logging.getLogger(__name__)
+
+class StorageService:
+    def __init__(self, db: Session, gogs_client: GogsClient):
+        self.db = db
+        self.gogs = gogs_client
+        self.storage_root = settings.STORAGE_ROOT
+
+    def get_or_create_project(self, project_name: str, description: str = None) -> Project:
+        project = self.db.query(Project).filter(Project.project_name == project_name).first()
+        if not project:
+            project = Project(project_name=project_name, description=description)
+            self.db.add(project)
+            self.db.commit()
+            self.db.refresh(project)
+        return project
+
+    def create_version(self, project_id: int, stage: str, commit_id: str, author: str, manifest: str) -> DataVersion:
+        version = DataVersion(
+            project_id=project_id,
+            stage=stage,
+            commit_id=commit_id,
+            author=author,
+            manifest_snapshot=manifest
+        )
+        self.db.add(version)
+        self.db.commit()
+        self.db.refresh(version)
+        return version
+
+    async def process_file_with_sha(self, version: DataVersion, relative_path: str, file_sha: str, owner: str, repo: str):
+        """只处理变化的文件,未变化的文件不记录。"""
+        # 查询同一项目 + 同一 stage + 同一文件路径的最新一条记录
+        # 通过 version 关联查询
+        last_file = (
+            self.db.query(DataFile)
+            .join(DataVersion)
+            .filter(
+                DataVersion.project_id == version.project_id,
+                DataVersion.stage == version.stage,
+                DataFile.relative_path == relative_path
+            )
+            .order_by(DataVersion.created_at.desc())
+            .first()
+        )
+
+        if last_file and last_file.file_sha == file_sha:
+            # 文件未变化,跳过不记录
+            logger.info(f"File {relative_path} (SHA: {file_sha}) unchanged. Skipping.")
+            return
+
+        # 文件是新的或有变化,下载并记录
+        logger.info(f"File {relative_path} (SHA: {file_sha}) changed. Downloading.")
+        content = await self.gogs.get_file_content(owner, repo, version.commit_id, relative_path)
+        file_size = len(content)
+
+        project_name = version.project.project_name
+        safe_path = os.path.join(
+            self.storage_root,
+            project_name,
+            version.stage,
+            version.commit_id,
+            relative_path.replace("/", os.sep)
+        )
+
+        os.makedirs(os.path.dirname(safe_path), exist_ok=True)
+
+        with open(safe_path, "wb") as f:
+            f.write(content)
+
+        # 创建记录
+        new_file = DataFile(
+            version_id=version.id,
+            relative_path=relative_path,
+            storage_path=safe_path,
+            file_size=file_size,
+            file_type=os.path.splitext(relative_path)[1],
+            file_sha=file_sha
+        )
+        self.db.add(new_file)
+        self.db.commit()

+ 157 - 0
app/services/webhook_service.py

@@ -0,0 +1,157 @@
+import yaml
+import logging
+import fnmatch
+from sqlalchemy.orm import Session
+from app.models import Project, DataVersion
+from app.services.gogs_client import GogsClient
+from app.services.storage_service import StorageService
+
+logger = logging.getLogger(__name__)
+
+
+def normalize_path(path: str) -> str:
+    """Normalize path by removing ./ prefix."""
+    path = path.strip()
+    if path.startswith("./"):
+        path = path[2:]
+    return path
+
+
+def is_directory_pattern(path: str) -> bool:
+    """Check if the path pattern represents a directory."""
+    return path.endswith("/")
+
+
+class WebhookService:
+    def __init__(self, db: Session):
+        self.db = db
+        self.gogs = GogsClient()
+        self.storage = StorageService(db, self.gogs)
+
+    async def process_webhook(self, payload: dict):
+        # 1. Parse payload
+        ref = payload.get("ref")
+        if not ref:
+            logger.warning("No ref in payload")
+            return
+
+        after_sha = payload.get("after")
+        repo = payload.get("repository", {})
+        repo_name = repo.get("name")
+        owner = repo.get("owner", {}).get("username")
+        pusher = payload.get("pusher", {})
+        author_name = pusher.get("username", "unknown")
+
+        if not after_sha or not repo_name or not owner:
+            logger.error("Invalid payload: missing essential fields")
+            return
+
+        logger.info(f"Processing push for {owner}/{repo_name} commit {after_sha}")
+
+        # 2. Get manifest
+        manifest_content = await self.gogs.get_manifest(owner, repo_name, after_sha)
+        if not manifest_content:
+            logger.info("No manifest.yaml found. Skipping.")
+            return
+
+        try:
+            manifest = yaml.safe_load(manifest_content)
+        except yaml.YAMLError as e:
+            logger.error(f"Failed to parse manifest: {e}")
+            return
+
+        # 3. Validation
+        project_name = manifest.get("project_name")
+        if not project_name:
+            logger.error("Manifest missing project_name")
+            return
+
+        # 4. Get or create project
+        project = self.storage.get_or_create_project(project_name)
+
+        # 5. Get file tree (once for all stages)
+        try:
+            tree_data = await self.gogs.get_recursive_tree(owner, repo_name, after_sha)
+            tree_files = {
+                item['path']: item
+                for item in tree_data.get('tree', [])
+                if item['type'] == 'blob'
+            }
+        except Exception as e:
+            logger.error(f"Failed to get file tree: {e}", exc_info=True)
+            return
+
+        # 6. Process stages (support both old and new format)
+        stages = manifest.get("stages", [])
+
+        # Backward compatibility: old single-stage format
+        if not stages and manifest.get("stage"):
+            stages = [{
+                "name": manifest.get("stage"),
+                "outputs": manifest.get("outputs", [])
+            }]
+
+        if not stages:
+            logger.error("Manifest missing stages configuration")
+            return
+
+        for stage_config in stages:
+            stage_name = stage_config.get("name")
+            outputs = stage_config.get("outputs", [])
+
+            if not stage_name:
+                logger.warning("Stage missing name, skipping")
+                continue
+
+            # Check if this stage+commit already processed (idempotency)
+            existing_version = self.db.query(DataVersion).filter(
+                DataVersion.project_id == project.id,
+                DataVersion.stage == stage_name,
+                DataVersion.commit_id == after_sha
+            ).first()
+
+            if existing_version:
+                logger.info(f"Stage '{stage_name}' already processed. Skipping.")
+                continue
+
+            # Create version for this stage
+            version = self.storage.create_version(
+                project.id, stage_name, after_sha, author_name, manifest_content
+            )
+            logger.info(f"Processing stage '{stage_name}' with {len(outputs)} output rules")
+
+            # Process outputs for this stage
+            await self._process_outputs(version, outputs, tree_files, owner, repo_name)
+
+    async def _process_outputs(
+        self, version, outputs: list, tree_files: dict, owner: str, repo_name: str
+    ):
+        """Process output rules and download matching files."""
+        for output in outputs:
+            raw_path_pattern = output.get("path", "")
+            match_pattern = output.get("pattern", "*")
+
+            path_pattern = normalize_path(raw_path_pattern)
+            is_dir = is_directory_pattern(raw_path_pattern)
+
+            for file_path, file_info in tree_files.items():
+                is_match = False
+
+                if is_dir:
+                    dir_prefix = path_pattern.rstrip("/") + "/"
+                    if file_path.startswith(dir_prefix):
+                        rel_name = file_path[len(dir_prefix):]
+                        if rel_name and fnmatch.fnmatch(rel_name, match_pattern):
+                            is_match = True
+                else:
+                    if file_path == path_pattern:
+                        is_match = True
+
+                if is_match:
+                    try:
+                        file_sha = file_info.get("sha")
+                        await self.storage.process_file_with_sha(
+                            version, file_path, file_sha, owner, repo_name
+                        )
+                    except Exception as e:
+                        logger.error(f"Failed to process file {file_path}: {e}")

+ 7 - 0
requirements.txt

@@ -0,0 +1,7 @@
+fastapi
+uvicorn
+httpx
+sqlalchemy
+pymysql
+python-dotenv
+pyyaml

+ 149 - 0
tests/test_simulation.py

@@ -0,0 +1,149 @@
+import asyncio
+import os
+import sys
+import shutil
+import json
+from unittest.mock import MagicMock, AsyncMock, patch
+
+# Add project root to path
+sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
+
+from app.database import Base, engine, SessionLocal
+from app.services.webhook_service import WebhookService
+from app.models import Project, DataVersion, DataFile
+
+# Setup
+def setup_db():
+    Base.metadata.drop_all(bind=engine)
+    Base.metadata.create_all(bind=engine)
+
+def teardown_storage():
+    if os.path.exists("test_storage"):
+        shutil.rmtree("test_storage")
+
+# Mocks
+manifest_yaml = """
+project_name: "test_project"
+stage: "test_stage"
+outputs:
+  - path: "results/data.csv"
+  - path: "images/"
+    pattern: "*.png"
+"""
+
+mock_tree_response = {
+    "tree": [
+        {"path": "results/data.csv", "type": "blob", "sha": "sha123", "mode": "100644"},
+        {"path": "images/plot.png", "type": "blob", "sha": "sha456", "mode": "100644"},
+        {"path": "images/ignore.txt", "type": "blob", "sha": "sha789", "mode": "100644"},
+        {"path": "README.md", "type": "blob", "sha": "sha000", "mode": "100644"}
+    ]
+}
+
+async def run_test():
+    print("Setting up test environment...")
+    setup_db()
+    teardown_storage()
+    
+    # Override settings for storage root
+    with patch("app.config.settings.STORAGE_ROOT", "test_storage"):
+        
+        db = SessionLocal()
+        service = WebhookService(db)
+        
+        # Mock GogsClient
+        service.gogs.get_manifest = AsyncMock(return_value=manifest_yaml)
+        service.gogs.get_recursive_tree = AsyncMock(return_value=mock_tree_response)
+        
+        async def mock_get_file(owner, repo, commit, path):
+            if path == "results/data.csv":
+                return b"csv_data"
+            if path == "images/plot.png":
+                return b"png_data"
+            return b"other_data"
+            
+        service.gogs.get_file_content = AsyncMock(side_effect=mock_get_file)
+        
+        # Mock Payload
+        payload = {
+            "ref": "refs/heads/master",
+            "after": "commit_sha_abc",
+            "repository": {
+                "name": "my-repo",
+                "owner": {"username": "my-user"}
+            },
+            "pusher": {"username": "test-author"}
+        }
+        
+        print("Processing webhook...")
+        await service.process_webhook(payload)
+        
+        # Verification
+        print("Verifying results...")
+        
+        # Check Project
+        project = db.query(Project).filter_by(project_name="test_project").first()
+        assert project is not None
+        print("[PASS] Project created")
+        
+        # Check Version
+        version = db.query(DataVersion).filter_by(commit_id="commit_sha_abc").first()
+        assert version is not None
+        assert version.stage == "test_stage"
+        print("[PASS] Version created")
+        
+        # Check Files
+        files = db.query(DataFile).filter_by(version_id=version.id).all()
+        file_paths = [f.relative_path for f in files]
+        print(f"Stored files: {file_paths}")
+        
+        assert "results/data.csv" in file_paths
+        assert "images/plot.png" in file_paths
+        # ignore.txt should check against pattern if pattern logic is strict?
+        # My pattern logic was: if dir, check pattern.
+        # manifest: images/ pattern: *.png. So ignore.txt should NOT be there.
+        # But wait, logic in webhook_service: file_path.startswith(path_pattern) ... fnmatch(rel_name, match_pattern)
+        # ignore.txt -> rel_name: ignore.txt. *.png matches? No. Good.
+        if "images/ignore.txt" in file_paths:
+             print("[FAIL] 'images/ignore.txt' should not be included")
+        else:
+             print("[PASS] Pattern filtering worked")
+
+        # Check Physical Files
+        for f in files:
+            if not os.path.exists(f.storage_path):
+                print(f"[FAIL] {f.storage_path} does not exist")
+            else:
+                 print(f"[PASS] File {f.relative_path} stored at {f.storage_path}")
+
+        # Test Deduplication
+        print("\nTesting Deduplication...")
+        # New payload, same file content (same SHA)
+        payload2 = payload.copy()
+        payload2["after"] = "commit_sha_def" # New commit
+        
+        # We need to simulate that this new commit has the SAME tree for these files
+        # service.gogs.get_recursive_tree is already mocked to return the same tree SHAs
+        
+        # Reset get_file_content mock to track calls
+        service.gogs.get_file_content.reset_mock()
+        
+        await service.process_webhook(payload2)
+        
+        version2 = db.query(DataVersion).filter_by(commit_id="commit_sha_def").first()
+        assert version2 is not None
+        print("[PASS] Version 2 created")
+        
+        # Check if get_file_content was called. It should NOT be called because SHAs are same and files exist.
+        if service.gogs.get_file_content.called:
+            print("[FAIL] Deduplication failed: files were downloaded again")
+            print(service.gogs.get_file_content.mock_calls)
+        else:
+            print("[PASS] Deduplication worked: download skipped")
+
+        db.close()
+        teardown_storage()
+        print("\nAll tests passed!")
+
+if __name__ == "__main__":
+    asyncio.run(run_test())

+ 179 - 0
轻量级数据中台 (Data-Hub) 实现方案设计文档.md

@@ -0,0 +1,179 @@
+# DataNexus - 轻量级数据中台设计文档 (Lightweight Data Hub)
+
+## 1. 背景与现状 (Background & Status Quo)
+目前团队在选题、调研等多个环节独立开展工作。每个环节有独立的 Git 仓库(部署在私有 Gogs 上),但环节之间存在数据依赖。
+*   **数据孤岛:** 过程数据留在个人本地,依赖方需通过私聊手动获取。
+*   **规范缺失:** 产出文件的路径、命名、数量均不统一。
+*   **版本管理真空:** 覆盖式更新导致历史数据无法追溯,依赖关系容易崩溃。
+
+## 2. 解决目标 (Objectives)
+构建一个“非侵入式”的轻量化中台,实现:
+1.  **自动归集:** 只要代码 Push 到 Git,系统自动提取该环节产出的数据。
+2.  **版本化:** 每次提交产生的成果都被唯一标记,互不覆盖。
+3.  **标准化:** 建立统一的数据目录结构。
+4.  **去工具化:** 研发人员无需安装 CLI,只需遵守简单的文件契约。
+
+## 3. 核心约定 (Social Contract / Agreements)
+为了实现自动化,团队成员需要达成以下三点共识:
+1.  **根目录配置文件:** 每个仓库根目录必须包含 `manifest.yaml`,声明哪些数据需要“上云”。
+2.  **结果文件落盘:** 代码运行后,结果必须产出到仓库目录内的指定位置(不支持读取仓库外的绝对路径)。
+3.  **必须执行 Git Push:** 只有 Push 动作会触发中台的数据采集。
+
+## 4. 技术实现思路 (Technical Architecture)
+
+### 4.1 触发机制
+*   **Gogs Webhook:** 在 Gogs 仓库配置 `push` 事件的 Webhook,指向中台工程的 API 接口。
+
+### 4.2 存储方案
+*   **元数据存储:** 使用 **MySQL** 记录项目、环节、版本、文件索引。
+*   **物理存储(二选一):**
+    *   **方案 A(推荐初始使用):** **服务器本地文件系统**。直接写入服务器磁盘(如 `/data/storage`),简单高效,适合文本和小文件。
+    *   **方案 B(进阶):** **对象存储 (OSS/MinIO)**。如果未来文件量大或需要可视化预览更方便,可无缝迁移至 MinIO。
+*   **核心原则:** 数据库只存“路径”和“元数据”,不存文件内容。
+
+### 4.3 数据获取机制 (核心变更)
+*   **弃用 `git clone`:** 全量克隆效率低且浪费空间。
+*   **采用 Gogs REST API:**
+    1.  通过 API 获取 `manifest.yaml` (Raw Content)。
+    2.  根据 Manifest 解析出文件列表。
+    3.  通过 API 获取文件 Git SHA,**仅下载发生变更的文件**。
+
+### 4.4 增量更新逻辑 (Smart Deduplication)
+为了节省存储空间并提高效率,采用 **Git Blob SHA** 进行指纹比对。
+
+*   **原理:** Gogs API 返回的每个文件都有唯一的 `sha` 字段(Git Blob SHA)。
+*   **对比范围:** 同一项目 + 同一 Stage + 同一文件路径,查询最新一条记录的 SHA。
+*   **逻辑:**
+    *   如果 `current_sha == last_sha`:文件未变化,**跳过不记录**。
+    *   如果 `current_sha != last_sha`(或无历史记录):文件有变化,**下载并记录**。
+*   **ABA 问题处理:** 由于只对比"最新一条记录",即使文件内容 A→B→A 变化,第三次仍会被正确识别为"有变化"并记录。
+
+**数据记录原则:**
+*   `data_versions` 表:每次 push(commit)创建一条记录,代表一个版本快照。
+*   `data_files` 表:只记录本次 commit 中**实际发生变化的文件**,未变化的文件不记录。
+*   查询某文件的历史版本时,通过 `relative_path` 向前查询 `data_files` 表即可。
+
+### 4.5 存储结构可视化 (Visualization)
+最终在服务器磁盘(或 OSS Bucket)上的目录结构将是完全扁平且语义化的,通过 **Commit ID** 实现版本物理隔离。
+
+**目录树示例:**
+```text
+/opt/datahub/storage/
+├── topic_research/              <-- 项目名 (Project Name)
+│   ├── selection/               <-- 环节名 (Stage)
+│   │   ├── a1b2c3d4/            <-- [版本1] Commit ID (2023-10-01)
+│   │   │   ├── daily_report.csv
+│   │   │   └── output_images/
+│   │   │       ├── 001.png
+│   │   │       └── 002.png
+│   │   │
+│   │   └── e5f6g7h8/            <-- [版本2] Commit ID (2023-10-05)
+│   │       ├── daily_report.csv
+│   │       └── output_images/
+│   │           ├── 001.png
+│   │           └── 003.png
+│   │
+│   └── cleaning/                <-- 另一个环节
+│       └── ...
+└── ...
+```
+*   **物理隔离:** 即使两个 versions 的 `daily_report.csv` 同名,它们也分别位于不同的 commit 文件夹下的,互不冲突。
+*   **版本回溯:** 数据库中存储 `Commit ID -> /path/to/file` 的映射,想要回滚只需查库找到对应的文件夹即可。
+
+## 5. 详细设计 (Detailed Design)
+
+### 5.1 manifest.yaml 规范
+支持指定单个文件或整个文件夹,支持多 stage 配置。
+
+**多 Stage 格式(推荐):**
+```yaml
+project_name: "topic_research"  # 项目唯一标识
+
+stages:
+  - name: "selection"           # 环节1:选题
+    outputs:
+      - path: "./results/daily_report.csv"  # 指定单个文件
+      - path: "./output_images/"            # 指定整个文件夹
+        pattern: "*.png"                    # 可选:匹配模式,默认 *
+
+  - name: "cleaning"            # 环节2:清洗
+    outputs:
+      - path: "./cleaned_data/"
+        pattern: "*.csv"
+
+  - name: "analysis"            # 环节3:分析
+    outputs:
+      - path: "./reports/"
+```
+
+**单 Stage 格式(向后兼容):**
+```yaml
+project_name: "topic_research"  # 项目唯一标识
+stage: "selection"              # 环节标识(如:选题、清洗、分析)
+
+outputs:
+  - path: "./results/daily_report.csv"  # 指定单个文件
+  - path: "./output_images/"            # 指定整个文件夹
+    pattern: "*.png"                    # 可选:匹配模式,默认 *
+```
+
+### 5.2 数据库建模 (MySQL)
+```sql
+CREATE TABLE `projects` (
+  `id` INT PRIMARY KEY AUTO_INCREMENT,
+  `project_name` VARCHAR(100) NOT NULL UNIQUE,
+  `description` TEXT,
+  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
+);
+
+CREATE TABLE `data_versions` (
+  `id` INT PRIMARY KEY AUTO_INCREMENT,
+  `project_id` INT,
+  `stage` VARCHAR(50) NOT NULL,
+  `commit_id` VARCHAR(64) NOT NULL, -- Git 的 Commit Hash
+  `author` VARCHAR(50),
+  `manifest_snapshot` TEXT,        -- 存储当时的 manifest.yaml 内容
+  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  INDEX(project_id, stage)
+);
+
+CREATE TABLE `data_files` (
+  `id` INT PRIMARY KEY AUTO_INCREMENT,
+  `version_id` INT,
+  `relative_path` VARCHAR(255),    -- 原始相对路径
+  `storage_path` VARCHAR(500),     -- 在服务器上的绝对存储路径
+  `file_size` BIGINT,
+  `file_type` VARCHAR(20),         -- 扩展名
+  `file_sha` VARCHAR(64),          -- [新增] 文件的 Git Blob SHA,用于去重
+  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  FOREIGN KEY (version_id) REFERENCES data_versions(id)
+);
+```
+
+### 5.3 中台后端逻辑流 (WorkFlow)
+中台应用接收到 Gogs Webhook 请求后,执行以下步骤:
+
+1.  **接收事件:** 获取仓库信息 (`owner`, `repo`) 和 `commit_id`。
+2.  **获取清单 (API):**
+    *   调用 Gogs API: `GET /{owner}/{repo}/raw/{commit_id}/manifest.yaml`
+    *   若响应 404,则该次提交不包含数据,直接结束。
+3.  **解析清单:** 读取 YAML,解析出 `project_name` 和 `stages` 配置。
+4.  **获取文件树:** 调用 Gogs Tree API 获取该 commit 下所有文件及其 Blob SHA。
+5.  **遍历 Stages:** 对每个 stage 执行以下操作:
+    *   创建 `data_versions` 记录。
+    *   遍历该 stage 的 `outputs` 配置,匹配文件树中的文件。
+6.  **变更检测与处理:** 对每个匹配的文件:
+    *   **查询历史:** 在 `data_files` 表中查找同一项目 + 同一 stage + 同一文件路径的**最新一条记录**。
+    *   **对比 SHA:**
+        *   **如果 SHA 相同:** 文件未变更,**跳过不记录**。
+        *   **如果 SHA 不同(或无历史):** 文件有变更,执行下载并在 `data_files` 表中新增记录。
+7.  **文件下载与落盘:**
+    *   仅当文件发生变更时,调用 Raw API 下载内容。
+    *   将下载的数据流写入本地磁盘。
+    *   **路径隔离:** 严格按照 `/{project}/{stage}/{commit_id}/{filename}` 隔离。
+
+
+## 6. 约定细节补充 (Constraints)
+*   **文件冲突:** 同一 Commit ID 若重复触发,系统应先检查数据库,若已存在则跳过,防止重复占用空间。
+*   **安全性:** 中台服务器需要配置好访问 Gogs 的 SSH Key,以便有权限拉取私有仓库代码。
+*   **大文件:** 考虑到仅使用 MySQL,单文件大小建议控制在 500MB 以内。如果未来有超大文件(如几个GB),建议再考虑挂载 NAS。