Przeglądaj źródła

feat:数据库表结构优化、使用指南.md

tanjingyu 3 tygodni temu
rodzic
commit
84a4172bf6

+ 4 - 4
app/main.py

@@ -129,7 +129,7 @@ def list_projects(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)
 
 
 @app.get("/projects/{project_id}", response_model=schemas.ProjectOut)
-def get_project(project_id: int, db: Session = Depends(get_db)):
+def get_project(project_id: str, db: Session = Depends(get_db)):
     """Get a single project by ID."""
     project = db.query(Project).filter(Project.id == project_id).first()
     if not project:
@@ -150,7 +150,7 @@ def get_project_by_name(project_name: str, db: Session = Depends(get_db)):
 
 @app.get("/projects/{project_id}/versions", response_model=List[schemas.DataVersionOut])
 def list_versions(
-    project_id: int,
+    project_id: str,
     stage: Optional[str] = None,
     skip: int = 0,
     limit: int = 100,
@@ -165,7 +165,7 @@ def list_versions(
 
 
 @app.get("/versions/{version_id}", response_model=schemas.DataVersionOut)
-def get_version(version_id: int, db: Session = Depends(get_db)):
+def get_version(version_id: str, db: Session = Depends(get_db)):
     """Get a single version by ID."""
     version = db.query(DataVersion).filter(DataVersion.id == version_id).first()
     if not version:
@@ -174,7 +174,7 @@ def get_version(version_id: int, db: Session = Depends(get_db)):
 
 
 @app.get("/versions/{version_id}/files")
-def get_version_files(version_id: int, flat: bool = False, db: Session = Depends(get_db)):
+def get_version_files(version_id: str, flat: bool = False, db: Session = Depends(get_db)):
     """
     Get files for a version.
     - flat=False (default): Returns tree structure

+ 14 - 5
app/models.py

@@ -1,23 +1,31 @@
 from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, BigInteger
 from sqlalchemy.orm import relationship
 from sqlalchemy.sql import func
+from ulid import ULID
 from app.database import Base
 
+
+def generate_ulid() -> str:
+    """Generate a new ULID string."""
+    return str(ULID())
+
+
 class Project(Base):
     __tablename__ = "projects"
 
-    id = Column(Integer, primary_key=True, index=True)
+    id = Column(String(26), primary_key=True, default=generate_ulid)
     project_name = Column(String(100), unique=True, nullable=False, index=True)
     description = Column(Text, nullable=True)
     created_at = Column(DateTime(timezone=True), server_default=func.now())
 
     versions = relationship("DataVersion", back_populates="project")
 
+
 class DataVersion(Base):
     __tablename__ = "data_versions"
 
-    id = Column(Integer, primary_key=True, index=True)
-    project_id = Column(Integer, ForeignKey("projects.id"))
+    id = Column(String(26), primary_key=True, default=generate_ulid)
+    project_id = Column(String(26), ForeignKey("projects.id"))
     stage = Column(String(50), nullable=False)
     commit_id = Column(String(64), nullable=False)
     author = Column(String(50))
@@ -27,11 +35,12 @@ class DataVersion(Base):
     project = relationship("Project", back_populates="versions")
     files = relationship("DataFile", back_populates="version")
 
+
 class DataFile(Base):
     __tablename__ = "data_files"
 
-    id = Column(Integer, primary_key=True, index=True)
-    version_id = Column(Integer, ForeignKey("data_versions.id"))
+    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
+    version_id = Column(String(26), ForeignKey("data_versions.id"))
     relative_path = Column(String(255))
     storage_path = Column(String(500))
     file_size = Column(BigInteger)

+ 3 - 3
app/schemas.py

@@ -13,7 +13,7 @@ class ProjectCreate(ProjectBase):
 
 
 class ProjectOut(ProjectBase):
-    id: int
+    id: str
     created_at: datetime
 
     class Config:
@@ -52,8 +52,8 @@ class DataVersionBase(BaseModel):
 
 
 class DataVersionOut(DataVersionBase):
-    id: int
-    project_id: int
+    id: str
+    project_id: str
     created_at: datetime
 
     class Config:

+ 1 - 1
app/services/storage_service.py

@@ -22,7 +22,7 @@ class StorageService:
             self.db.refresh(project)
         return project
 
-    def create_version(self, project_id: int, stage: str, commit_id: str, author: str, manifest: str) -> DataVersion:
+    def create_version(self, project_id: str, stage: str, commit_id: str, author: str, manifest: str) -> DataVersion:
         version = DataVersion(
             project_id=project_id,
             stage=stage,

+ 2 - 1
requirements.txt

@@ -5,4 +5,5 @@ sqlalchemy
 pymysql
 python-dotenv
 pyyaml
-oss2
+oss2
+python-ulid

+ 158 - 0
使用指南.md

@@ -0,0 +1,158 @@
+# DataNexus 使用指南
+
+## 这是什么?
+
+DataNexus 是一个数据自动归集系统。只要你在项目中配置好 `manifest.yaml`,每次 `git push` 后,系统会自动把你指定的文件上传到云端,并保留历史版本。
+
+## 快速开始
+
+### 第一步:在项目根目录创建 manifest.yaml
+
+```yaml
+project_name: "你的项目名"
+
+stages:
+  - name: "环节名称"
+    outputs:
+      - path: "./要上传的文件或目录/"
+```
+
+### 第二步:正常 git push
+
+```bash
+git add .
+git commit -m "your message"
+git push
+```
+
+完成!系统会自动处理剩下的事情。
+
+---
+
+## manifest.yaml 配置详解
+
+### 基础示例
+
+```yaml
+project_name: "topic_research"    # 项目唯一标识(必填)
+
+stages:
+  - name: "selection"             # 环节名称(必填)
+    outputs:
+      - path: "./results/report.csv"           # 上传单个文件
+      - path: "./output_images/"               # 上传整个目录
+```
+
+### 完整示例(多环节)
+
+```yaml
+project_name: "topic_research"
+
+stages:
+  # 环节1:选题
+  - name: "selection"
+    outputs:
+      - path: "./results/daily_report.csv"
+      - path: "./output_images/"
+        pattern: "*.png"                       # 只上传 png 文件
+
+  # 环节2:数据清洗
+  - name: "cleaning"
+    outputs:
+      - path: "./cleaned_data/"
+        pattern: "*.csv"
+
+  # 环节3:分析报告
+  - name: "analysis"
+    outputs:
+      - path: "./reports/"
+```
+
+### 配置说明
+
+| 字段 | 必填 | 说明 |
+|------|------|------|
+| `project_name` | ✅ | 项目唯一标识,建议用英文 |
+| `stages` | ✅ | 环节列表 |
+| `stages[].name` | ✅ | 环节名称,如 selection、cleaning、analysis |
+| `stages[].outputs` | ✅ | 要上传的文件/目录列表 |
+| `outputs[].path` | ✅ | 文件或目录路径(相对于项目根目录) |
+| `outputs[].pattern` | ❌ | 文件匹配模式,默认 `*`(匹配所有) |
+
+### path 写法
+
+```yaml
+# 单个文件
+- path: "./data/result.csv"
+
+# 整个目录(注意结尾的 /)
+- path: "./output/"
+
+# 带匹配模式的目录
+- path: "./images/"
+  pattern: "*.png"          # 只匹配 png 文件
+```
+
+---
+
+## 常见问题
+
+### Q: 每次 push 都会上传所有文件吗?
+
+不会。系统会对比文件的 SHA 值,只有内容发生变化的文件才会被上传。
+
+### Q: 历史版本会被覆盖吗?
+
+不会。每次 commit 的文件都会独立存储,可以随时查看历史版本。
+
+### Q: 文件大小有限制吗?
+
+建议单个文件不超过 500MB。
+
+### Q: 支持哪些文件类型?
+
+支持所有文件类型:csv、xlsx、png、pdf、json 等。
+
+### Q: 目录下的子目录会被上传吗?
+
+会。配置目录路径后,会递归上传该目录下的所有文件(包括子目录)。
+
+---
+
+## 查看已上传的文件
+
+### API 接口
+
+```bash
+# 查看所有项目
+GET /projects
+
+# 查看项目的所有版本
+GET /projects/{project_id}/versions
+
+# 查看某个版本的文件列表
+GET /versions/{version_id}/files
+
+# 获取文件下载链接
+GET /files/{file_id}/url
+```
+
+### 文件访问地址
+
+上传的文件可以通过 CDN 直接访问:
+
+```
+https://res-bj.cybertogether.net/data_nexus/{project_name}/{stage}/{commit_id}/{file_path}
+```
+
+---
+
+## 注意事项
+
+1. `manifest.yaml` 必须放在项目根目录
+2. `path` 路径是相对于项目根目录的
+3. 目录路径必须以 `/` 结尾
+4. 只有 `git push` 才会触发上传,本地 commit 不会
+5. 同一个 commit 重复 push 不会重复上传(幂等性)
+
+---

+ 138 - 61
轻量级数据中台 (Data-Hub) 实现方案设计文档.md

@@ -7,7 +7,7 @@
 *   **版本管理真空:** 覆盖式更新导致历史数据无法追溯,依赖关系容易崩溃。
 
 ## 2. 解决目标 (Objectives)
-构建一个“非侵入式”的轻量化中台,实现:
+构建一个"非侵入式"的轻量化中台,实现:
 1.  **自动归集:** 只要代码 Push 到 Git,系统自动提取该环节产出的数据。
 2.  **版本化:** 每次提交产生的成果都被唯一标记,互不覆盖。
 3.  **标准化:** 建立统一的数据目录结构。
@@ -15,7 +15,7 @@
 
 ## 3. 核心约定 (Social Contract / Agreements)
 为了实现自动化,团队成员需要达成以下三点共识:
-1.  **根目录配置文件:** 每个仓库根目录必须包含 `manifest.yaml`,声明哪些数据需要“上云”
+1.  **根目录配置文件:** 每个仓库根目录必须包含 `manifest.yaml`,声明哪些数据需要"上云"
 2.  **结果文件落盘:** 代码运行后,结果必须产出到仓库目录内的指定位置(不支持读取仓库外的绝对路径)。
 3.  **必须执行 Git Push:** 只有 Push 动作会触发中台的数据采集。
 
@@ -26,17 +26,21 @@
 
 ### 4.2 存储方案
 *   **元数据存储:** 使用 **MySQL** 记录项目、环节、版本、文件索引。
-*   **物理存储(二选一):**
-    *   **方案 A(推荐初始使用):** **服务器本地文件系统**。直接写入服务器磁盘(如 `/data/storage`),简单高效,适合文本和小文件。
-    *   **方案 B(进阶):** **对象存储 (OSS/MinIO)**。如果未来文件量大或需要可视化预览更方便,可无缝迁移至 MinIO。
-*   **核心原则:** 数据库只存“路径”和“元数据”,不存文件内容。
+*   **物理存储:** 使用 **阿里云 OSS + CDN** 存储文件内容。
+    *   文件上传到 OSS,通过 CDN 加速访问
+    *   CDN 域名:`https://res-bj.cybertogether.net`
+    *   访问方式:`{CDN_URL}/{OSS_KEY}`
+*   **核心原则:** 数据库只存"OSS Key"和"元数据",不存文件内容。
 
-### 4.3 数据获取机制 (核心变更)
+### 4.3 数据获取机制 (按需获取)
 *   **弃用 `git clone`:** 全量克隆效率低且浪费空间。
-*   **采用 Gogs REST API:**
+*   **弃用全量文件树:** 不再获取整个仓库的文件树,避免大仓库性能问题。
+*   **采用按需获取策略:**
     1.  通过 API 获取 `manifest.yaml` (Raw Content)。
-    2.  根据 Manifest 解析出文件列表。
-    3.  通过 API 获取文件 Git SHA,**仅下载发生变更的文件**。
+    2.  解析 Manifest 获取 `outputs` 配置。
+    3.  **单文件配置**:直接调用 Contents API 获取该文件信息(包含 SHA)。
+    4.  **目录配置**:仅获取该目录下的文件树,递归遍历子目录。
+    5.  根据 SHA 判断是否需要下载。
 
 ### 4.4 增量更新逻辑 (Smart Deduplication)
 为了节省存储空间并提高效率,采用 **Git Blob SHA** 进行指纹比对。
@@ -54,31 +58,29 @@
 *   查询某文件的历史版本时,通过 `relative_path` 向前查询 `data_files` 表即可。
 
 ### 4.5 存储结构可视化 (Visualization)
-最终在服务器磁盘(或 OSS Bucket)上的目录结构将是完全扁平且语义化的,通过 **Commit ID** 实现版本物理隔离。
+最终在 OSS Bucket 上的目录结构将是完全扁平且语义化的,通过 **Commit ID** 实现版本物理隔离。
 
-**目录树示例:**
+**OSS Key 结构:**
 ```text
-/opt/datahub/storage/
-├── topic_research/              <-- 项目名 (Project Name)
-│   ├── selection/               <-- 环节名 (Stage)
-│   │   ├── a1b2c3d4/            <-- [版本1] Commit ID (2023-10-01)
-│   │   │   ├── daily_report.csv
-│   │   │   └── output_images/
-│   │   │       ├── 001.png
-│   │   │       └── 002.png
-│   │   │
-│   │   └── e5f6g7h8/            <-- [版本2] Commit ID (2023-10-05)
-│   │       ├── daily_report.csv
-│   │       └── output_images/
-│   │           ├── 001.png
-│   │           └── 003.png
-│   │
-│   └── cleaning/                <-- 另一个环节
-│       └── ...
-└── ...
-```
-*   **物理隔离:** 即使两个 versions 的 `daily_report.csv` 同名,它们也分别位于不同的 commit 文件夹下的,互不冲突。
-*   **版本回溯:** 数据库中存储 `Commit ID -> /path/to/file` 的映射,想要回滚只需查库找到对应的文件夹即可。
+{prefix}/{project_name}/{stage}/{commit_id}/{relative_path}
+```
+
+**示例:**
+```text
+data_nexus/topic_research/selection/a1b2c3d4/daily_report.csv
+data_nexus/topic_research/selection/a1b2c3d4/output_images/001.png
+data_nexus/topic_research/selection/e5f6g7h8/daily_report.csv
+```
+
+**访问 URL:**
+```
+https://res-bj.cybertogether.net/data_nexus/topic_research/selection/a1b2c3d4/daily_report.csv
+```
+
+### 4.6 并发处理
+*   **异步处理:** Webhook 请求立即返回,文件处理在后台异步执行。
+*   **独立 Session:** 每个后台任务创建独立的数据库 Session,避免请求结束后 Session 被关闭的问题。
+*   **多仓库并发:** 支持多个仓库同时推送 Webhook,各自独立处理。
 
 ## 5. 详细设计 (Detailed Design)
 
@@ -118,34 +120,41 @@ outputs:
 ```
 
 ### 5.2 数据库建模 (MySQL)
+
+**ID 策略:**
+*   `projects` 和 `data_versions` 表使用 **ULID**(26 位字符串),便于数据迁移和分布式场景。
+*   `data_files` 表使用自增 ID,因为文件记录量大且通常跟随 version 迁移。
+
 ```sql
 CREATE TABLE `projects` (
-  `id` INT PRIMARY KEY AUTO_INCREMENT,
+  `id` VARCHAR(26) PRIMARY KEY,           -- ULID
   `project_name` VARCHAR(100) NOT NULL UNIQUE,
   `description` TEXT,
   `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP
 );
 
 CREATE TABLE `data_versions` (
-  `id` INT PRIMARY KEY AUTO_INCREMENT,
-  `project_id` INT,
+  `id` VARCHAR(26) PRIMARY KEY,           -- ULID
+  `project_id` VARCHAR(26),               -- 外键关联 projects.id
   `stage` VARCHAR(50) NOT NULL,
-  `commit_id` VARCHAR(64) NOT NULL, -- Git 的 Commit Hash
+  `commit_id` VARCHAR(64) NOT NULL,       -- Git 的 Commit Hash
   `author` VARCHAR(50),
-  `manifest_snapshot` TEXT,        -- 存储当时的 manifest.yaml 内容
+  `manifest_snapshot` TEXT,               -- 存储当时的 manifest.yaml 内容
   `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
-  INDEX(project_id, stage)
+  INDEX(project_id, stage),
+  FOREIGN KEY (project_id) REFERENCES projects(id)
 );
 
 CREATE TABLE `data_files` (
-  `id` INT PRIMARY KEY AUTO_INCREMENT,
-  `version_id` INT,
-  `relative_path` VARCHAR(255),    -- 原始相对路径
-  `storage_path` VARCHAR(500),     -- 在服务器上的绝对存储路径
+  `id` INT PRIMARY KEY AUTO_INCREMENT,    -- 自增 ID
+  `version_id` VARCHAR(26),               -- 外键关联 data_versions.id
+  `relative_path` VARCHAR(255),           -- 原始相对路径
+  `storage_path` VARCHAR(500),            -- OSS Key
   `file_size` BIGINT,
-  `file_type` VARCHAR(20),         -- 扩展名
-  `file_sha` VARCHAR(64),          -- [新增] 文件的 Git Blob SHA,用于去重
+  `file_type` VARCHAR(20),                -- 扩展名
+  `file_sha` VARCHAR(64),                 -- 文件的 Git Blob SHA,用于去重
   `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
+  INDEX(file_sha),
   FOREIGN KEY (version_id) REFERENCES data_versions(id)
 );
 ```
@@ -153,27 +162,95 @@ CREATE TABLE `data_files` (
 ### 5.3 中台后端逻辑流 (WorkFlow)
 中台应用接收到 Gogs Webhook 请求后,执行以下步骤:
 
-1.  **接收事件:** 获取仓库信息 (`owner`, `repo`) 和 `commit_id`。
-2.  **获取清单 (API):**
+1.  **接收事件:** 获取仓库信息 (`owner`, `repo`) 和 `commit_id`,立即返回响应。
+2.  **后台处理:** 在独立的后台任务中执行以下操作:
+3.  **获取清单 (API):**
     *   调用 Gogs API: `GET /{owner}/{repo}/raw/{commit_id}/manifest.yaml`
     *   若响应 404,则该次提交不包含数据,直接结束。
-3.  **解析清单:** 读取 YAML,解析出 `project_name` 和 `stages` 配置。
-4.  **获取文件树:** 调用 Gogs Tree API 获取该 commit 下所有文件及其 Blob SHA。
+4.  **解析清单:** 读取 YAML,解析出 `project_name` 和 `stages` 配置。
 5.  **遍历 Stages:** 对每个 stage 执行以下操作:
-    *   创建 `data_versions` 记录。
-    *   遍历该 stage 的 `outputs` 配置,匹配文件树中的文件。
-6.  **变更检测与处理:** 对每个匹配的文件:
+    *   **幂等性检查:** 查询是否已存在相同 project + stage + commit_id 的记录,若存在则跳过。
+    *   创建 `data_versions` 记录(自动生成 ULID)。
+    *   遍历该 stage 的 `outputs` 配置。
+6.  **按需获取文件信息:**
+    *   **单文件**:调用 Contents API 获取文件信息(包含 SHA)。
+    *   **目录**:调用 Contents API 递归获取目录下所有文件。
+7.  **变更检测与处理:** 对每个匹配的文件:
     *   **查询历史:** 在 `data_files` 表中查找同一项目 + 同一 stage + 同一文件路径的**最新一条记录**。
     *   **对比 SHA:**
         *   **如果 SHA 相同:** 文件未变更,**跳过不记录**。
-        *   **如果 SHA 不同(或无历史):** 文件有变更,执行下载并在 `data_files` 表中新增记录。
-7.  **文件下载与落盘:**
-    *   仅当文件发生变更时,调用 Raw API 下载内容。
-    *   将下载的数据流写入本地磁盘。
-    *   **路径隔离:** 严格按照 `/{project}/{stage}/{commit_id}/{filename}` 隔离。
+        *   **如果 SHA 不同(或无历史):** 文件有变更,执行下载并上传到 OSS。
+8.  **文件上传:**
+    *   调用 Raw API 下载文件内容。
+    *   上传到 OSS,Key 格式:`{prefix}/{project}/{stage}/{commit_id}/{relative_path}`
+    *   在 `data_files` 表中新增记录,`storage_path` 存储 OSS Key。
+
+## 6. API 接口设计
+
+### 6.1 Webhook 接口
+```
+POST /webhook
+```
+接收 Gogs Push 事件,支持 HMAC-SHA256 签名验证。
+
+### 6.2 项目接口
+```
+GET /projects                    # 列出所有项目
+GET /projects/{project_id}       # 获取单个项目(ID 为 ULID)
+GET /projects/name/{name}        # 按名称获取项目
+```
+
+### 6.3 版本接口
+```
+GET /projects/{project_id}/versions?stage=xxx  # 列出项目版本,可按 stage 过滤
+GET /versions/{version_id}                      # 获取单个版本(ID 为 ULID)
+GET /versions/{version_id}/files?flat=true      # 获取版本文件(树形/扁平)
+```
+
+### 6.4 文件接口
+```
+GET /files/{file_id}            # 获取文件元数据(ID 为自增整数)
+GET /files/{file_id}/url        # 获取文件 CDN URL
+GET /files/{file_id}/content    # 重定向到 CDN URL 下载
+```
+
+## 7. 配置项
+
+### 7.1 环境变量
+```bash
+# 数据库配置
+DB_HOST=localhost
+DB_PORT=3306
+DB_USER=root
+DB_PASSWORD=xxx
+DB_NAME=data_nexus
+
+# Gogs 配置
+GOGS_URL=https://git.example.com
+GOGS_TOKEN=xxx
+GOGS_SECRET=                    # Webhook 签名密钥(可选)
+
+# OSS 配置
+OSS_ACCESS_KEY_ID=xxx
+OSS_ACCESS_KEY_SECRET=xxx
+OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com
+OSS_BUCKET_NAME=xxx
+OSS_PREFIX=data_nexus
+OSS_CDN_URL=https://res-bj.cybertogether.net
+```
+
+## 8. 约定细节补充 (Constraints)
+*   **幂等性:** 同一 Commit ID + Stage 若重复触发,系统会检查数据库,若已存在则跳过。
+*   **安全性:** 使用 Gogs Token 进行 API 认证,支持 Webhook 签名验证。
+*   **大文件:** 建议单文件大小控制在 500MB 以内。OSS 支持大文件,但下载时间会较长。
 
+## 9. 技术栈
 
-## 6. 约定细节补充 (Constraints)
-*   **文件冲突:** 同一 Commit ID 若重复触发,系统应先检查数据库,若已存在则跳过,防止重复占用空间。
-*   **安全性:** 中台服务器需要配置好访问 Gogs 的 SSH Key,以便有权限拉取私有仓库代码。
-*   **大文件:** 考虑到仅使用 MySQL,单文件大小建议控制在 500MB 以内。如果未来有超大文件(如几个GB),建议再考虑挂载 NAS。
+| 组件 | 技术选型 |
+|------|----------|
+| Web 框架 | FastAPI |
+| 数据库 | MySQL + SQLAlchemy |
+| HTTP 客户端 | httpx (异步) |
+| 对象存储 | 阿里云 OSS |
+| ID 生成 | ULID (python-ulid) |
+| 配置管理 | python-dotenv |