# denamd_server 一个包含 `Web 接口 + 定时任务` 的 Python 服务基础骨架。 ## 技术栈 - FastAPI:提供 Web API - APScheduler:提供定时任务调度 ## 快速开始 1. 安装依赖 ```bash pip install -r requirements.txt ``` 2. (可选)初始化环境变量 ```bash cp .env.example .env ``` 3. 启动服务 ```bash uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload ``` 4. 启动前端(本地开发) ```bash cd frontend npm install npm run dev ``` 默认前端地址:`http://localhost:5173` ## 已封装接口 - `GET /demand/api/v1/health`:服务健康检查 - `GET /demand/api/v1/scheduler/status`:定时任务状态 - `GET /demand/api/v1/demand-pool`:查询 `multi_demand_pool_di`,支持 `strategy`、`start_dt`、`end_dt` 筛选与分页(`page`、`page_size`) - `GET /demand/api/v1/demand-pool/strategies`:统计策略名列表(可按日期范围过滤) ## 已封装定时任务 - `heartbeat_job`:默认每 1 小时执行一次,输出心跳日志(与其它定时任务频率无关,仅便于确认调度器存活) - `demand_pool_today_incremental_sync_job`:每小时执行一次,当天分区增量同步(插入前按 `demand_id` 去重) 你可以在 `app/scheduler/jobs.py` 中继续添加业务任务,并在 `app/scheduler/manager.py` 里注册。 ## MySQL 与 ODPS 封装 - MySQL 封装文件:`app/db/mysql.py` - `get_db_session()`:获取 SQLAlchemy 会话(可用于依赖注入) - ODPS 封装文件:`app/odps/client.py` - `get_odps_client()`:获取 ODPS 客户端单例 相关配置在 `.env` 中维护。 ## 需求池同步任务 - 同步逻辑文件:`app/sync/demand_pool_sync.py` - 数据源 1:`dwd_multi_demand_pool_di`(`extend` 写入 MySQL `ext_info`,其余字段直映射) - 数据源 2:`dwd_demand_pool_di`(默认关闭,`DEMAND_POOL_SECONDARY_SYNC_ENABLED=true` 时启用;映射规则:`strategy=近期需求`,`demand_id=MD5(strategy+demand+dt)`,`demand_name=demand`,`weight=score`,`video_count/video_list` 置空,`ext_info={}`) - 全量任务:`run_full_sync()`,默认同步分区 `20260507,20260508,20260509` - 增量任务:`run_today_incremental_sync()`,每小时同步当天分区 - 去重策略:ODPS 分区内先按 `demand_id` 去重,写入 MySQL 时使用唯一键 UPSERT ## 实验需求池写入(strategy_staging → ODPS) - 逻辑文件:`app/sync/experiment_demand_pool_write.py` - 目标表:`loghubods.dwd_multi_demand_pool_di_tmp`(分区 `dt=yyyymmdd`) - 写入方式:PyODPS Tunnel **追加写入**(`open_writer`),语义等价于 `INSERT INTO ... PARTITION(dt=...)` 追加行,非 SQL 执行 - 定时任务:`experiment_demand_pool_hourly_write_job`,**北京时间** 03:30 首次、之后每小时 `:30`、23:30 末次;写入当天 `dt` 分区 - 手动触发: - API:`POST /demand/api/v1/experiment/demand-pool/write?partition_dt=yyyymmdd` - 脚本:`python scripts/run_experiment_demand_pool_write.py --partition-dt 20260611` ### 策略配置实验字段(`strategy_config` 表独立列) | 字段 | 说明 | |------|------| | `daily_write_limit` | 每个策略每天最多写入条数;`0` 表示不限制,直到 staging 全部写完 | | `priority` | 跨策略去重优先级,**数值越小越优先**;**同优先级**策略之间 `demand_name` 不去重 | `params` JSON 仅保留策略运行阈值等业务参数。 ### 写入规则 1. 仅处理 `active=true` 的策略 2. 从 `strategy_staging` 按 `weight DESC` 选取未写入行 3. `demand_id` 已在目标表当天分区存在则跳过 4. 不同 priority 之间按 `demand_name` 去重:**先写入者优先**,后到的其他 priority 一律跳过(即使 priority 数值更小) 5. 同 priority 之间不去重,可写入同名不同 `demand_id` 的需求 ## 前端说明 - 前端目录:`frontend` - 技术栈:React + Vite - 页面能力:`strategy` / `dt` 筛选查询 + 表格展示 - 默认请求前缀:`/demand/api/v1` ## Docker 部署(前后端服务) 在项目根目录执行: ```bash docker compose up -d --build ``` 启动后: - 前端:`http://localhost:8080` - 后端:`http://localhost:8000`