需求数据服务

xueyiming b53929d719 增加需求去重和写入 4 дней назад
app b53929d719 增加需求去重和写入 4 дней назад
frontend b53929d719 增加需求去重和写入 4 дней назад
tests b53929d719 增加需求去重和写入 4 дней назад
.dockerignore 2f164b0c61 项目初始化 1 месяц назад
.env.example b53929d719 增加需求去重和写入 4 дней назад
.gitignore a4a0559795 增加需求汇总页面 6 дней назад
Dockerfile 5838be26fe 修改部署文件 1 месяц назад
README.md b53929d719 增加需求去重和写入 4 дней назад
deploy.sh 606a0b6b41 增加部署sh脚本 1 месяц назад
docker-compose.yml 5838be26fe 修改部署文件 1 месяц назад
requirements.txt 23e48b8ba5 增加导出功能 2 недель назад

README.md

denamd_server

一个包含 Web 接口 + 定时任务 的 Python 服务基础骨架。

技术栈

  • FastAPI:提供 Web API
  • APScheduler:提供定时任务调度

快速开始

  1. 安装依赖
pip install -r requirements.txt
  1. (可选)初始化环境变量
cp .env.example .env
  1. 启动服务
uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
  1. 启动前端(本地开发)
cd frontend
npm install
npm run dev

默认前端地址:http://localhost:5173

已封装接口

  • GET /demand/api/v1/health:服务健康检查
  • GET /demand/api/v1/scheduler/status:定时任务状态
  • GET /demand/api/v1/demand-pool:查询 multi_demand_pool_di,支持 strategystart_dtend_dt 筛选与分页(pagepage_size
  • GET /demand/api/v1/demand-pool/strategies:统计策略名列表(可按日期范围过滤)

已封装定时任务

  • heartbeat_job:默认每 1 小时执行一次,输出心跳日志(与其它定时任务频率无关,仅便于确认调度器存活)
  • demand_pool_today_incremental_sync_job:每小时执行一次,当天分区增量同步(插入前按 demand_id 去重)

你可以在 app/scheduler/jobs.py 中继续添加业务任务,并在 app/scheduler/manager.py 里注册。

MySQL 与 ODPS 封装

  • MySQL 封装文件:app/db/mysql.py
    • get_db_session():获取 SQLAlchemy 会话(可用于依赖注入)
  • ODPS 封装文件:app/odps/client.py
    • get_odps_client():获取 ODPS 客户端单例

相关配置在 .env 中维护。

需求池同步任务

  • 同步逻辑文件:app/sync/demand_pool_sync.py
  • 数据源 1:dwd_multi_demand_pool_diextend 写入 MySQL ext_info,其余字段直映射)
  • 数据源 2:dwd_demand_pool_di(默认关闭,DEMAND_POOL_SECONDARY_SYNC_ENABLED=true 时启用;映射规则:strategy=近期需求demand_id=MD5(strategy+demand+dt)demand_name=demandweight=scorevideo_count/video_list 置空,ext_info={}
  • 全量任务:run_full_sync(),默认同步分区 20260507,20260508,20260509
  • 增量任务:run_today_incremental_sync(),每小时同步当天分区
  • 去重策略:ODPS 分区内先按 demand_id 去重,写入 MySQL 时使用唯一键 UPSERT

实验需求池写入(strategy_staging → ODPS)

  • 逻辑文件:app/sync/experiment_demand_pool_write.py
  • 目标表:loghubods.dwd_multi_demand_pool_di_tmp(分区 dt=yyyymmdd
  • 写入方式:PyODPS Tunnel 追加写入open_writer),语义等价于 INSERT INTO ... PARTITION(dt=...) 追加行,非 SQL 执行
  • 定时任务:experiment_demand_pool_hourly_write_job北京时间 03:30 首次、之后每小时 :30、23:30 末次;写入当天 dt 分区
  • 手动触发:
    • API:POST /demand/api/v1/experiment/demand-pool/write?partition_dt=yyyymmdd
    • 脚本:python scripts/run_experiment_demand_pool_write.py --partition-dt 20260611

策略配置实验字段(strategy_config 表独立列)

字段 说明
daily_write_limit 每个策略每天最多写入条数;0 表示不限制,直到 staging 全部写完
priority 跨策略去重优先级,数值越小越优先同优先级策略之间 demand_name 不去重

params JSON 仅保留策略运行阈值等业务参数。

写入规则

  1. 仅处理 active=true 的策略
  2. strategy_stagingweight DESC 选取未写入行
  3. demand_id 已在目标表当天分区存在则跳过
  4. 不同 priority 之间按 demand_name 去重:先写入者优先,后到的其他 priority 一律跳过(即使 priority 数值更小)
  5. 同 priority 之间不去重,可写入同名不同 demand_id 的需求

前端说明

  • 前端目录:frontend
  • 技术栈:React + Vite
  • 页面能力:strategy / dt 筛选查询 + 表格展示
  • 默认请求前缀:/demand/api/v1

Docker 部署(前后端服务)

在项目根目录执行:

docker compose up -d --build

启动后:

  • 前端:http://localhost:8080
  • 后端:http://localhost:8000