README.md 7.4 KB

DataNexus — 轻量级数据中台

基于 Git Webhook 的自动化数据归集系统,实现代码仓库产出数据的自动提取、版本化存储和统一管理。

✨ 功能特性

特性 说明
自动归集 Git Push 触发自动数据采集,无需手动操作
版本化存储 每次 Commit 独立存储至 OSS,支持历史回溯
增量更新 基于 Git SHA 智能去重,只存储变化的文件
多 Stage 单仓库可配置多个数据环节(选题、清洗、分析…)
Webhook 自扫描 定时扫描可管理仓库,自动配置 Webhook,零手动接入
REST API 提供项目、版本、文件的查询和下载接口

🔄 工作流程

┌──────────────┐     push      ┌──────────────┐   webhook    ┌──────────────┐
│  Git 仓库     │ ──────────▶  │  Gogs Server  │ ──────────▶ │  DataNexus   │
│ manifest.yaml │              └──────────────┘              │  (FastAPI)   │
└──────────────┘                                             └──────┬───────┘
                                                                    │
                                                       ┌────────────┴────────────┐
                                                       ▼                         ▼
                                                ┌─────────────┐          ┌──────────────┐
                                                │   MySQL      │          │  阿里云 OSS   │
                                                │  元数据 + 索引 │          │  文件物理存储  │
                                                └─────────────┘          └──────────────┘

定时扫描流程(零配置接入):

scan_repos 定时任务
      │
      ├─ 1. 列出用户所有可见仓库
      ├─ 2. 筛选 permissions.admin = true
      ├─ 3. 检查 manifest.yaml 是否存在
      └─ 4. 自动配置 Webhook(幂等,已存在则跳过)

📁 项目结构

data_nexus/
├── app/
│   ├── config.py                  # 环境变量 & 配置中心
│   ├── database.py                # SQLAlchemy 引擎 & Session
│   ├── main.py                    # FastAPI 应用入口 & 路由
│   ├── models.py                  # ORM 模型(Project / DataVersion / DataFile)
│   ├── schemas.py                 # Pydantic 响应模型
│   ├── services/
│   │   ├── gogs_client.py         # Gogs API 客户端(仓库/文件/Webhook)
│   │   ├── oss_client.py          # 阿里云 OSS 客户端
│   │   ├── repo_scanner.py        # 仓库扫描 & Webhook 自动配置服务
│   │   ├── storage_service.py     # 文件变更检测 & 存储逻辑
│   │   └── webhook_service.py     # Webhook 事件处理(解析 manifest → 归集数据)
│   └── tasks/
│       └── scan_repos.py          # 定时任务入口(仓库扫描 CLI)
├── tests/
│   └── test_simulation.py
├── storage/                       # 本地存储目录(已 gitignore)
├── .env                           # 环境变量(已 gitignore)
├── .env.example                   # 环境变量模板
├── requirements.txt               # Python 依赖
├── 使用指南.md                     # 面向仓库使用者的接入指南
└── 轻量级数据中台 (Data-Hub) 实现方案设计文档.md

🚀 快速开始

1. 安装依赖

pip install -r requirements.txt

2. 配置环境变量

复制模板并编辑:

cp .env.example .env

.env 完整配置项:

# ========== 数据库配置 ==========
DB_HOST=localhost
DB_PORT=3306
DB_USER=root
DB_PASSWORD=your_password
DB_NAME=data_nexus

# ========== Gogs 配置 ==========
GOGS_URL=https://your-gogs-server.com
GOGS_TOKEN=your_gogs_api_token
GOGS_SECRET=                              # Webhook 签名验证密钥(可选)

# ========== Webhook 自动配置 ==========
GOGS_WEBHOOK_URL=http://your-host:8000/webhook   # 定时扫描后自动写入仓库的回调地址
GOGS_WEBHOOK_SECRET=                              # 回调签名密钥(可选,与 GOGS_SECRET 对应)

# ========== 存储配置 ==========
STORAGE_ROOT=./storage

# ========== OSS 配置 ==========
OSS_ACCESS_KEY_ID=your_access_key_id
OSS_ACCESS_KEY_SECRET=your_access_key_secret
OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET_NAME=your_bucket_name
OSS_PREFIX=data_nexus                     # OSS 存储前缀
OSS_CDN_URL=https://your-cdn-domain.com   # CDN 加速域名

3. 启动 API 服务

uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload

或直接运行:

python -m app.main

4. 启动仓库扫描定时任务

扫描所有可管理仓库,为包含 manifest.yaml 的仓库自动配置 Webhook:

# 单次执行
python -m app.tasks.scan_repos

# 持续循环(默认每 1 小时扫描一次)
python -m app.tasks.scan_repos --loop

# 自定义扫描间隔(如每 30 分钟)
python -m app.tasks.scan_repos --loop --interval 1800

5. 仓库接入(添加 manifest.yaml)

在 Git 仓库根目录创建 manifest.yaml

project_name: "my_project"

stages:
  - name: "data_collection"
    outputs:
      - path: "./results/"
        pattern: "*.csv"
      - path: "./report.pdf"

📖 详细的 manifest 配置说明请参阅 使用指南

📡 API 接口

Webhook

方法 路径 说明
POST /webhook 接收 Gogs Push Webhook,支持 HMAC-SHA256 签名验证

项目

方法 路径 说明
GET /projects 列出所有项目(支持分页 ?skip=&limit=
GET /projects/{project_id} 按 ID 获取项目详情
GET /projects/name/{project_name} 按名称获取项目详情

版本

方法 路径 说明
GET /projects/{project_id}/versions 列出版本(支持 ?stage= 过滤)
GET /versions/{version_id} 获取单个版本详情
GET /versions/{version_id}/files 获取版本文件树(?flat=true 返回扁平列表)

文件

方法 路径 说明
GET /files/{file_id} 获取文件元数据
GET /files/{file_id}/url 获取文件 CDN 下载链接
GET /files/{file_id}/content 302 重定向至 CDN 下载

🛠 技术栈

组件 技术选型
Web 框架 FastAPI
数据库 MySQL + SQLAlchemy
HTTP 客户端 httpx(异步)
对象存储 阿里云 OSS(oss2)
CDN 加速 阿里云 CDN
ID 生成 ULID(python-ulid)
配置管理 python-dotenv

📚 相关文档