Просмотр исходного кода

feat:自动扫描仓库并配置webhook

tanjingyu 2 недель назад
Родитель
Сommit
b84ed35bb2
8 измененных файлов с 526 добавлено и 70 удалено
  1. 6 0
      .env.example
  2. 161 33
      README.md
  3. 2 0
      app/config.py
  4. 83 9
      app/services/gogs_client.py
  5. 148 0
      app/services/repo_scanner.py
  6. 0 0
      app/tasks/__init__.py
  7. 102 0
      app/tasks/scan_repos.py
  8. 24 28
      使用指南.md

+ 6 - 0
.env.example

@@ -10,6 +10,12 @@ GOGS_URL=https://git.yishihui.com
 GOGS_TOKEN=4ae18a8e348dd931e33bf6f752536e9a6fd4d9c3
 GOGS_SECRET=
 
+# ========== Webhook 自动配置 ==========
+# 定时任务扫描仓库后,会将此 URL 配置为仓库的 webhook 回调地址
+GOGS_WEBHOOK_URL=http://your-data-nexus-host:8000/webhook
+# Webhook 回调时携带的签名密钥(可留空)
+GOGS_WEBHOOK_SECRET=
+
 # ========== OSS 配置 ==========
 OSS_ACCESS_KEY_ID=your_access_key_id
 OSS_ACCESS_KEY_SECRET=your_access_key_secret

+ 161 - 33
README.md

@@ -1,16 +1,74 @@
-# DataNexus - 轻量级数据中台
+# DataNexus  轻量级数据中台
 
-基于 Git Webhook 的自动化数据归集系统,实现代码仓库产出数据的自动提取、版本化存储和统一管理。
+> 基于 Git Webhook 的自动化数据归集系统,实现代码仓库产出数据的自动提取、版本化存储和统一管理。
 
-## 功能特性
+## 功能特性
 
-- **自动归集** - Git Push 触发自动数据采集,无需手动操作
-- **版本化存储** - 每次 Commit 独立存储,支持历史回溯
-- **增量更新** - 基于 Git SHA 智能去重,只存储变化的文件
-- **多 Stage 支持** - 单仓库可配置多个数据环节(选题、清洗、分析等)
-- **REST API** - 提供项目、版本、文件的查询和下载接口
+| 特性 | 说明 |
+|------|------|
+| **自动归集** | Git Push 触发自动数据采集,无需手动操作 |
+| **版本化存储** | 每次 Commit 独立存储至 OSS,支持历史回溯 |
+| **增量更新** | 基于 Git SHA 智能去重,只存储变化的文件 |
+| **多 Stage** | 单仓库可配置多个数据环节(选题、清洗、分析…) |
+| **Webhook 自扫描** | 定时扫描可管理仓库,自动配置 Webhook,零手动接入 |
+| **REST API** | 提供项目、版本、文件的查询和下载接口 |
 
-## 快速开始
+## 🔄 工作流程
+
+```
+┌──────────────┐     push      ┌──────────────┐   webhook    ┌──────────────┐
+│  Git 仓库     │ ──────────▶  │  Gogs Server  │ ──────────▶ │  DataNexus   │
+│ manifest.yaml │              └──────────────┘              │  (FastAPI)   │
+└──────────────┘                                             └──────┬───────┘
+                                                                    │
+                                                       ┌────────────┴────────────┐
+                                                       ▼                         ▼
+                                                ┌─────────────┐          ┌──────────────┐
+                                                │   MySQL      │          │  阿里云 OSS   │
+                                                │  元数据 + 索引 │          │  文件物理存储  │
+                                                └─────────────┘          └──────────────┘
+```
+
+**定时扫描流程(零配置接入):**
+
+```
+scan_repos 定时任务
+      │
+      ├─ 1. 列出用户所有可见仓库
+      ├─ 2. 筛选 permissions.admin = true
+      ├─ 3. 检查 manifest.yaml 是否存在
+      └─ 4. 自动配置 Webhook(幂等,已存在则跳过)
+```
+
+## 📁 项目结构
+
+```
+data_nexus/
+├── app/
+│   ├── config.py                  # 环境变量 & 配置中心
+│   ├── database.py                # SQLAlchemy 引擎 & Session
+│   ├── main.py                    # FastAPI 应用入口 & 路由
+│   ├── models.py                  # ORM 模型(Project / DataVersion / DataFile)
+│   ├── schemas.py                 # Pydantic 响应模型
+│   ├── services/
+│   │   ├── gogs_client.py         # Gogs API 客户端(仓库/文件/Webhook)
+│   │   ├── oss_client.py          # 阿里云 OSS 客户端
+│   │   ├── repo_scanner.py        # 仓库扫描 & Webhook 自动配置服务
+│   │   ├── storage_service.py     # 文件变更检测 & 存储逻辑
+│   │   └── webhook_service.py     # Webhook 事件处理(解析 manifest → 归集数据)
+│   └── tasks/
+│       └── scan_repos.py          # 定时任务入口(仓库扫描 CLI)
+├── tests/
+│   └── test_simulation.py
+├── storage/                       # 本地存储目录(已 gitignore)
+├── .env                           # 环境变量(已 gitignore)
+├── .env.example                   # 环境变量模板
+├── requirements.txt               # Python 依赖
+├── 使用指南.md                     # 面向仓库使用者的接入指南
+└── 轻量级数据中台 (Data-Hub) 实现方案设计文档.md
+```
+
+## 🚀 快速开始
 
 ### 1. 安装依赖
 
@@ -20,40 +78,73 @@ pip install -r requirements.txt
 
 ### 2. 配置环境变量
 
-编辑 `.env` 文件:
+复制模板并编辑:
+
+```bash
+cp .env.example .env
+```
 
-```env
-# 数据库
+`.env` 完整配置项:
+
+```ini
+# ========== 数据库配置 ==========
 DB_HOST=localhost
 DB_PORT=3306
 DB_USER=root
 DB_PASSWORD=your_password
 DB_NAME=data_nexus
 
-# Gogs
+# ========== Gogs 配置 ==========
 GOGS_URL=https://your-gogs-server.com
-GOGS_TOKEN=your_access_token
-GOGS_SECRET=optional_webhook_secret
+GOGS_TOKEN=your_gogs_api_token
+GOGS_SECRET=                              # Webhook 签名验证密钥(可选)
+
+# ========== Webhook 自动配置 ==========
+GOGS_WEBHOOK_URL=http://your-host:8000/webhook   # 定时扫描后自动写入仓库的回调地址
+GOGS_WEBHOOK_SECRET=                              # 回调签名密钥(可选,与 GOGS_SECRET 对应)
 
-# 存储
+# ========== 存储配置 ==========
 STORAGE_ROOT=./storage
+
+# ========== OSS 配置 ==========
+OSS_ACCESS_KEY_ID=your_access_key_id
+OSS_ACCESS_KEY_SECRET=your_access_key_secret
+OSS_ENDPOINT=oss-cn-hangzhou.aliyuncs.com
+OSS_BUCKET_NAME=your_bucket_name
+OSS_PREFIX=data_nexus                     # OSS 存储前缀
+OSS_CDN_URL=https://your-cdn-domain.com   # CDN 加速域名
 ```
 
-### 3. 启动服务
+### 3. 启动 API 服务
 
 ```bash
 uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
 ```
 
-### 4. 配置 Webhook
+或直接运行:
 
-在 Gogs 仓库设置中添加 Webhook:
-- URL: `http://your-server:8000/webhook`
-- Content Type: `application/json`
+```bash
+python -m app.main
+```
+
+### 4. 启动仓库扫描定时任务
 
-### 5. 添加 manifest.yaml
+扫描所有可管理仓库,为包含 `manifest.yaml` 的仓库自动配置 Webhook:
 
-在仓库根目录创建 `manifest.yaml`:
+```bash
+# 单次执行
+python -m app.tasks.scan_repos
+
+# 持续循环(默认每 1 小时扫描一次)
+python -m app.tasks.scan_repos --loop
+
+# 自定义扫描间隔(如每 30 分钟)
+python -m app.tasks.scan_repos --loop --interval 1800
+```
+
+### 5. 仓库接入(添加 manifest.yaml)
+
+在 Git 仓库根目录创建 `manifest.yaml`:
 
 ```yaml
 project_name: "my_project"
@@ -66,16 +157,53 @@ stages:
       - path: "./report.pdf"
 ```
 
-## API 接口
+> 📖 详细的 manifest 配置说明请参阅 [使用指南](使用指南.md)
+
+## 📡 API 接口
+
+### Webhook
+
+| 方法 | 路径 | 说明 |
+|------|------|------|
+| POST | `/webhook` | 接收 Gogs Push Webhook,支持 HMAC-SHA256 签名验证 |
+
+### 项目
+
+| 方法 | 路径 | 说明 |
+|------|------|------|
+| GET | `/projects` | 列出所有项目(支持分页 `?skip=&limit=`) |
+| GET | `/projects/{project_id}` | 按 ID 获取项目详情 |
+| GET | `/projects/name/{project_name}` | 按名称获取项目详情 |
+
+### 版本
+
+| 方法 | 路径 | 说明 |
+|------|------|------|
+| GET | `/projects/{project_id}/versions` | 列出版本(支持 `?stage=` 过滤) |
+| GET | `/versions/{version_id}` | 获取单个版本详情 |
+| GET | `/versions/{version_id}/files` | 获取版本文件树(`?flat=true` 返回扁平列表) |
+
+### 文件
+
+| 方法 | 路径 | 说明 |
+|------|------|------|
+| GET | `/files/{file_id}` | 获取文件元数据 |
+| GET | `/files/{file_id}/url` | 获取文件 CDN 下载链接 |
+| GET | `/files/{file_id}/content` | 302 重定向至 CDN 下载 |
+
+## 🛠 技术栈
 
-| 接口 | 方法 | 说明 |
-|-----|------|-----|
-| `/webhook` | POST | 接收 Gogs Webhook |
-| `/projects` | GET | 列出所有项目 |
-| `/projects/{id}/versions` | GET | 列出项目版本 |
-| `/versions/{id}/files` | GET | 获取版本文件树 |
-| `/files/{id}/content` | GET | 下载文件内容 |
+| 组件 | 技术选型 |
+|------|----------|
+| Web 框架 | FastAPI |
+| 数据库 | MySQL + SQLAlchemy |
+| HTTP 客户端 | httpx(异步) |
+| 对象存储 | 阿里云 OSS(oss2) |
+| CDN 加速 | 阿里云 CDN |
+| ID 生成 | ULID(python-ulid) |
+| 配置管理 | python-dotenv |
 
-## 文档
+## 📚 相关文档
 
-详细设计请参阅 [实现方案设计文档](轻量级数据中台%20(Data-Hub)%20实现方案设计文档.md)
+- [使用指南](使用指南.md) — 面向仓库接入者的简明操作手册
+- [实现方案设计文档](轻量级数据中台%20(Data-Hub)%20实现方案设计文档.md) — 架构设计、数据库建模、增量更新逻辑等详细设计

+ 2 - 0
app/config.py

@@ -19,6 +19,8 @@ class Settings:
     GOGS_URL: str = os.getenv("GOGS_URL", "http://localhost:3000")
     GOGS_TOKEN: str = os.getenv("GOGS_TOKEN", "")
     GOGS_SECRET: str = os.getenv("GOGS_SECRET", "") # Webhook secret
+    GOGS_WEBHOOK_URL: str = os.getenv("GOGS_WEBHOOK_URL", "")  # Webhook callback URL for auto-config
+    GOGS_WEBHOOK_SECRET: str = os.getenv("GOGS_WEBHOOK_SECRET", "")  # Webhook signature secret
     
     # Storage
     STORAGE_ROOT: str = os.getenv("STORAGE_ROOT", "/data/storage")

+ 83 - 9
app/services/gogs_client.py

@@ -1,9 +1,13 @@
 import httpx
 from app.config import settings
 import logging
+from typing import Optional
 
 logger = logging.getLogger(__name__)
 
+# Default timeout for API requests (seconds)
+_DEFAULT_TIMEOUT = 30.0
+
 
 class GogsClient:
     def __init__(self):
@@ -11,11 +15,81 @@ class GogsClient:
         self.token = settings.GOGS_TOKEN
         self.headers = {"Authorization": f"token {self.token}"}
 
-    async def get_manifest(self, owner: str, repo: str, commit_id: str) -> str:
-        """Fetch manifest.yaml raw content from a specific commit."""
-        # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
-        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/manifest.yaml"
-        async with httpx.AsyncClient() as client:
+    # ------------------------------------------------------------------
+    # Repository discovery
+    # ------------------------------------------------------------------
+
+    async def list_user_repos(self) -> list[dict]:
+        """Fetch *all* repositories visible to the authenticated user.
+
+        Gogs paginates with `?page=N` (default 20 per page).
+        We iterate until an empty page is returned.
+        """
+        repos: list[dict] = []
+        page = 1
+
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
+            while True:
+                url = f"{self.base_url}/api/v1/user/repos?page={page}&limit=50"
+                resp = await client.get(url, headers=self.headers)
+                resp.raise_for_status()
+                batch = resp.json()
+                if not batch:
+                    break
+                repos.extend(batch)
+                page += 1
+
+        logger.info(f"Fetched {len(repos)} repositories in total")
+        return repos
+
+    # ------------------------------------------------------------------
+    # Webhook management
+    # ------------------------------------------------------------------
+
+    async def list_repo_webhooks(self, owner: str, repo: str) -> list[dict]:
+        """List all webhooks configured on a repository."""
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
+            resp = await client.get(url, headers=self.headers)
+            resp.raise_for_status()
+            return resp.json()
+
+    async def create_repo_webhook(
+        self,
+        owner: str,
+        repo: str,
+        webhook_url: str,
+        secret: str = "",
+        events: Optional[list[str]] = None,
+    ) -> dict:
+        """Create a push webhook on a repository.
+
+        Returns the created webhook payload from Gogs.
+        """
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/hooks"
+        payload = {
+            "type": "gogs",
+            "config": {
+                "url": webhook_url,
+                "content_type": "json",
+                "secret": secret,
+            },
+            "events": events or ["push"],
+            "active": True,
+        }
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
+            resp = await client.post(url, headers=self.headers, json=payload)
+            resp.raise_for_status()
+            return resp.json()
+
+    # ------------------------------------------------------------------
+    # Manifest / file operations (existing)
+    # ------------------------------------------------------------------
+
+    async def get_manifest(self, owner: str, repo: str, ref: str) -> str | None:
+        """Fetch manifest.yaml raw content from a given ref (commit / branch)."""
+        url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{ref}/manifest.yaml"
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
             resp = await client.get(url, headers=self.headers)
             if resp.status_code == 404:
                 return None
@@ -25,7 +99,7 @@ class GogsClient:
     async def get_tree(self, owner: str, repo: str, commit_id: str, path: str = "") -> list:
         """Get the file tree of a repository."""
         url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
-        async with httpx.AsyncClient() as client:
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
             resp = await client.get(url, headers=self.headers)
             resp.raise_for_status()
             return resp.json()
@@ -37,7 +111,7 @@ class GogsClient:
         """
         url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{file_path}?ref={commit_id}"
         try:
-            async with httpx.AsyncClient() as client:
+            async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
                 resp = await client.get(url, headers=self.headers)
                 if resp.status_code == 404:
                     return None
@@ -71,7 +145,7 @@ class GogsClient:
             """Recursively fetch directory contents using contents API."""
             url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/contents/{path}?ref={commit_id}"
             try:
-                async with httpx.AsyncClient() as client:
+                async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
                     resp = await client.get(url, headers=self.headers)
                     if resp.status_code == 404:
                         logger.warning(f"Directory not found: {path}")
@@ -103,7 +177,7 @@ class GogsClient:
         """Download raw file content."""
         # Gogs raw file URL format: /{owner}/{repo}/raw/{ref}/{path}
         url = f"{self.base_url}/api/v1/repos/{owner}/{repo}/raw/{commit_id}/{file_path}"
-        async with httpx.AsyncClient() as client:
+        async with httpx.AsyncClient(timeout=_DEFAULT_TIMEOUT) as client:
             resp = await client.get(url, headers=self.headers)
             resp.raise_for_status()
             return resp.content

+ 148 - 0
app/services/repo_scanner.py

@@ -0,0 +1,148 @@
+"""
+Repo Scanner Service
+====================
+Scans all Gogs repositories accessible to the authenticated user,
+identifies repos that:
+  1. We have admin permissions on
+  2. Contain a `manifest.yaml` on the default branch
+
+For qualifying repos, it ensures a Data Nexus webhook is configured
+(idempotent — skips repos that already have the webhook).
+"""
+
+import logging
+from dataclasses import dataclass
+
+from app.config import settings
+from app.services.gogs_client import GogsClient
+
+logger = logging.getLogger(__name__)
+
+
+@dataclass(frozen=True)
+class ScanResult:
+    """Lightweight value object summarising one scan run."""
+    total_repos: int
+    admin_repos: int
+    manifest_repos: int
+    webhooks_created: int
+    webhooks_skipped: int
+    errors: int
+
+
+class RepoScanner:
+    """Orchestrates the repo‑scan → webhook‑setup pipeline."""
+
+    def __init__(self, gogs: GogsClient | None = None):
+        self.gogs = gogs or GogsClient()
+        self.webhook_url: str = settings.GOGS_WEBHOOK_URL
+        self.webhook_secret: str = settings.GOGS_WEBHOOK_SECRET
+
+    # ------------------------------------------------------------------
+    # Public API
+    # ------------------------------------------------------------------
+
+    async def scan_and_configure(self) -> ScanResult:
+        """Run a full scan cycle.
+
+        Steps
+        -----
+        1. Fetch all repos visible to the token owner.
+        2. Filter repos where ``permissions.admin == True``.
+        3. For each admin repo, check if ``manifest.yaml`` exists.
+        4. If manifest exists, ensure our webhook is present.
+
+        Returns a :class:`ScanResult` summarising what happened.
+        """
+        if not self.webhook_url:
+            raise ValueError(
+                "GOGS_WEBHOOK_URL is not configured. "
+                "Please set it in .env before running the scanner."
+            )
+
+        all_repos = await self.gogs.list_user_repos()
+        admin_repos = self._filter_admin_repos(all_repos)
+
+        logger.info(
+            f"Found {len(all_repos)} repos total, "
+            f"{len(admin_repos)} with admin permissions"
+        )
+
+        manifest_count = 0
+        created = 0
+        skipped = 0
+        errors = 0
+
+        for repo in admin_repos:
+            owner = repo["owner"]["username"]
+            name = repo["name"]
+            default_branch = repo.get("default_branch", "master")
+
+            try:
+                has_manifest = await self._has_manifest(owner, name, default_branch)
+                if not has_manifest:
+                    logger.debug(f"[{owner}/{name}] No manifest.yaml — skipping")
+                    continue
+
+                manifest_count += 1
+                logger.info(f"[{owner}/{name}] manifest.yaml found ✔")
+
+                already_configured = await self._webhook_already_exists(owner, name)
+                if already_configured:
+                    logger.info(f"[{owner}/{name}] Webhook already configured — skipping")
+                    skipped += 1
+                    continue
+
+                await self._create_webhook(owner, name)
+                created += 1
+                logger.info(f"[{owner}/{name}] Webhook created ✔")
+
+            except Exception as exc:
+                errors += 1
+                logger.error(f"[{owner}/{name}] Error: {exc}", exc_info=True)
+
+        result = ScanResult(
+            total_repos=len(all_repos),
+            admin_repos=len(admin_repos),
+            manifest_repos=manifest_count,
+            webhooks_created=created,
+            webhooks_skipped=skipped,
+            errors=errors,
+        )
+        logger.info(f"Scan complete: {result}")
+        return result
+
+    # ------------------------------------------------------------------
+    # Internal helpers
+    # ------------------------------------------------------------------
+
+    @staticmethod
+    def _filter_admin_repos(repos: list[dict]) -> list[dict]:
+        """Return repos where the authenticated user has admin permissions."""
+        return [
+            r for r in repos
+            if r.get("permissions", {}).get("admin") is True
+        ]
+
+    async def _has_manifest(self, owner: str, repo: str, ref: str) -> bool:
+        """Check whether `manifest.yaml` exists in the repo."""
+        content = await self.gogs.get_manifest(owner, repo, ref)
+        return content is not None
+
+    async def _webhook_already_exists(self, owner: str, repo: str) -> bool:
+        """Return True if our webhook URL is already registered on the repo."""
+        hooks = await self.gogs.list_repo_webhooks(owner, repo)
+        return any(
+            hook.get("config", {}).get("url") == self.webhook_url
+            for hook in hooks
+        )
+
+    async def _create_webhook(self, owner: str, repo: str) -> dict:
+        """Create our Data Nexus push webhook on the repo."""
+        return await self.gogs.create_repo_webhook(
+            owner=owner,
+            repo=repo,
+            webhook_url=self.webhook_url,
+            secret=self.webhook_secret,
+            events=["push"],
+        )

+ 0 - 0
app/tasks/__init__.py


+ 102 - 0
app/tasks/scan_repos.py

@@ -0,0 +1,102 @@
+"""
+Scheduled Task — Repository Scanner
+====================================
+Periodically scans all admin-accessible Gogs repositories,
+detects repos with a ``manifest.yaml``, and auto-configures
+the Data Nexus webhook so data pushes are captured automatically.
+
+Usage
+-----
+Run directly::
+
+    python -m app.tasks.scan_repos          # single run
+    python -m app.tasks.scan_repos --loop   # loop with interval
+
+Or import and call programmatically::
+
+    from app.tasks.scan_repos import run_once
+    await run_once()
+"""
+
+import asyncio
+import argparse
+import logging
+import sys
+
+from app.services.repo_scanner import RepoScanner
+
+# ── Logging ──────────────────────────────────────────────────────────
+logging.basicConfig(
+    level=logging.INFO,
+    format="%(asctime)s  %(levelname)-8s  %(name)s  %(message)s",
+    datefmt="%Y-%m-%d %H:%M:%S",
+)
+logger = logging.getLogger("scan_repos")
+
+# ── Default scan interval (seconds) ─────────────────────────────────
+DEFAULT_INTERVAL_SECONDS = 60 * 60  # 1 hour
+
+
+# ── Core routines ───────────────────────────────────────────────────
+
+
+async def run_once() -> None:
+    """Execute a single scan‑and‑configure cycle."""
+    scanner = RepoScanner()
+    result = await scanner.scan_and_configure()
+
+    logger.info("=" * 60)
+    logger.info("  Scan Summary")
+    logger.info("-" * 60)
+    logger.info(f"  Total repos discovered : {result.total_repos}")
+    logger.info(f"  Admin repos            : {result.admin_repos}")
+    logger.info(f"  With manifest.yaml     : {result.manifest_repos}")
+    logger.info(f"  Webhooks created       : {result.webhooks_created}")
+    logger.info(f"  Webhooks skipped (dup) : {result.webhooks_skipped}")
+    logger.info(f"  Errors                 : {result.errors}")
+    logger.info("=" * 60)
+
+
+async def run_loop(interval: int = DEFAULT_INTERVAL_SECONDS) -> None:
+    """Run the scan repeatedly with a fixed delay between cycles."""
+    logger.info(f"Starting scan loop (interval={interval}s)")
+
+    while True:
+        try:
+            await run_once()
+        except Exception as exc:
+            logger.error(f"Scan cycle failed: {exc}", exc_info=True)
+
+        logger.info(f"Next scan in {interval} seconds …")
+        await asyncio.sleep(interval)
+
+
+# ── CLI entry‑point ─────────────────────────────────────────────────
+
+
+def main() -> None:
+    parser = argparse.ArgumentParser(
+        description="Scan Gogs repos and auto-configure Data Nexus webhooks.",
+    )
+    parser.add_argument(
+        "--loop",
+        action="store_true",
+        help="Run continuously with a fixed interval (default: 1 hour).",
+    )
+    parser.add_argument(
+        "--interval",
+        type=int,
+        default=DEFAULT_INTERVAL_SECONDS,
+        help=f"Interval in seconds between scans (default: {DEFAULT_INTERVAL_SECONDS}).",
+    )
+
+    args = parser.parse_args()
+
+    if args.loop:
+        asyncio.run(run_loop(interval=args.interval))
+    else:
+        asyncio.run(run_once())
+
+
+if __name__ == "__main__":
+    main()

+ 24 - 28
使用指南.md

@@ -4,6 +4,30 @@
 
 DataNexus 是一个数据自动归集系统。只要你在项目中配置好 `manifest.yaml`,每次 `git push` 后,系统会自动把你指定的文件上传到云端,并保留历史版本。
 
+## 前置准备:仓库权限
+
+DataNexus 需要对你的仓库拥有**管理权限**,才能自动读取配置并设置 Webhook。请根据你的仓库归属情况,确认是否需要额外操作:
+
+### 情况一:仓库属于 AIGC 或 Server 组织 ✅ 无需操作
+
+如果你的仓库创建在 **AIGC** 或 **Server** 组织下(即仓库地址形如 `https://git.yishihui.com/AIGC/your-repo` 或 `https://git.yishihui.com/Server/your-repo`),系统已有权限,**无需任何授权操作**,直接进入下一步即可。
+
+### 情况二:仓库属于个人账号 ⚠️ 需要添加协作者
+
+如果仓库是在你**个人账号**下创建的(即仓库地址形如 `https://git.yishihui.com/你的用户名/your-repo`),则需要手动添加授权:
+
+1. 进入你的仓库页面,点击 **仓库设置(Settings)**
+2. 在左侧菜单选择 **协作者(Collaborators)**
+3. 搜索并添加用户 **`tanjingyu`**
+4. 权限选择 **管理(Admin)**
+5. 点击确认
+
+> 💡 **为什么需要管理权限?** DataNexus 需要 Admin 权限来为仓库自动配置 Webhook,这是触发数据自动归集的前提。添加后系统会在下次扫描时自动完成 Webhook 配置,你无需其他手动操作。
+
+> 💡 **推荐做法:** 如果没有特殊原因,建议将数据相关的仓库创建在 **AIGC** 或 **Server** 组织下,这样可以跳过授权步骤,开箱即用。
+
+---
+
 ## 快速开始
 
 ### 第一步:在项目根目录创建 manifest.yaml
@@ -119,34 +143,6 @@ stages:
 
 ---
 
-## 查看已上传的文件
-
-### API 接口
-
-```bash
-# 查看所有项目
-GET /projects
-
-# 查看项目的所有版本
-GET /projects/{project_id}/versions
-
-# 查看某个版本的文件列表
-GET /versions/{version_id}/files
-
-# 获取文件下载链接
-GET /files/{file_id}/url
-```
-
-### 文件访问地址
-
-上传的文件可以通过 CDN 直接访问:
-
-```
-https://res-bj.cybertogether.net/data_nexus/{project_name}/{stage}/{commit_id}/{file_path}
-```
-
----
-
 ## 注意事项
 
 1. `manifest.yaml` 必须放在项目根目录