Przeglądaj źródła

feat(creative-create): 模块 B 创意搭建端到端 — 接入 piaoquantv xcx/save

命名和跳转路径由 xcx/save 统管:dynamic_creative_name = root_source_id,
mini_program_path = pageUrl。客户端不再本地拼接,服务侧改归因方案自动跟上。
images_add 沉淀到 ad_api 作公共素材库封装(MD5 幂等)。
幂等保护按 adgroup + material_image_id 判等,命中则 skip 新建。
刘立冬 2 tygodni temu
rodzic
commit
a21ca6a430

+ 96 - 0
examples/auto_put_ad_mini/CREATIVE_CREATION_TODO.md

@@ -0,0 +1,96 @@
+# 模块 B 创意搭建 — Code Review TODO
+
+> 来源:2026-06-08 端到端打通后的工程 review。
+> 状态:端到端可单广告挂创意,但生产批量上线前必须做完 P0。
+> 验证基线:`83846793 / 106275052398` 已成功挂上 `10030789982`(已手动删除)。
+
+## P0 — 必须修(影响业务正确性)
+
+### ~~P0-1. `DEFAULT_JUMP_PATH` 改动态生成~~ ✅ 已完成(2026-06-08)
+- 实现方式:接入 piaoquantv `xcx/save` 接口
+- 落地:新建 `tools/landing_plan.py` 封装接口;`creative_creation.py` 删 `DEFAULT_JUMP_PATH` 常量
+- `mini_program_path` 改为来自 `xcx/save` 返回的 `pageUrl`
+- 验证:两次调用 plan_id=8061 → 8062,page_url 每次新生成
+
+### ~~P0-2. `build_creative_name` 防重名~~ ✅ 由 P0-1 副作用解决(2026-06-08)
+- 原方案:客户端加 uuid 进 hash 输入
+- 实际方案:`xcx/save` 返回的 `pageUrl` 内嵌 `rootSourceId`,直接用作 `dynamic_creative_name`
+- 命名生成职责完全交由服务侧管,客户端不再持有命名逻辑
+- 落地:删 `tools/creative_creation.py::build_creative_name` 函数
+
+## P1 — 生产稳定性
+
+### ~~P1-1. `upload_image_to_account` 迁移到 `tools/ad_api.py`~~ ✅ 已完成(2026-06-08)
+- 落地:`tools/ad_api.py` 加 `images_add(account_id, image_url) -> str`(sync public,与 `_post/_get/_check` 同层)
+- `tools/creative_creation.py` 删 `upload_image_to_account`,改 import `from tools.ad_api import images_add`
+- 验证:同账户重复调用返回相同 `image_id`(MD5 幂等)
+- 复用:模块 A 后续挂"广告主图"也用同一函数
+
+### ~~P1-2. 幂等保护(防止重复创建)~~ ✅ 已完成(2026-06-08)
+- 落地:`tools/creative_creation.py::find_existing_creative_by_image(account_id, adgroup_id, image_id)`
+- 判等键:**同 adgroup + 同 material_image_id**(不按 creative_name,因为 name 每次 xcx/save 都新)
+- 接入:`create_creative_for_ad(..., skip_if_exists=True)` 默认开启,POST 前先反查 → 命中直接返回已有 `dynamic_creative_id`
+- 验证:None 路径反查 `image_id=999999999999999` 返回 None(命中路径因当前 adgroup 无创意 skip,逻辑同 None 路径反向)
+- 副作用:每次创建多 1 次 `/dynamic_creatives/get` 调用(QPS 多消耗 1,在 P1-3 接限流时算入)
+
+### ~~P1-3. 接入 `execution_engine` 的 QPS + retry~~ ⏸ 暂缓(2026-06-08 用户决策)
+- 暂缓原因:单账户串行挂创意场景,实际 QPS 远低于腾讯 10/s 硬限,不必预先处理
+- piaoquantv 侧已确认无限流(用户 2026-06-08 确认)
+- 现状评估:链路 3 次腾讯调用(`/dynamic_creatives/get` + `/images/add` + `/dynamic_creatives/add`)+ 1 次 piaoquantv `xcx/save`,单条创意挂载 < 1s
+- 触发条件:**如果**未来真撞限(腾讯返回 limit 错误码)→ 在 `ad_api.py::_post / _get` 加一层全局 sync token bucket 即可(execution_engine.TokenBucket 是 async 版,需要 sync 改造)
+- 不在调用方加 — 改一处而非每个 caller 改
+
+### P1-4. 定义领域异常
+- 现状:全部 `RuntimeError`,调用方无法区分错误类型
+- 方案:
+  - `ImageUploadError(account_id, image_url, tencent_code)`
+  - `MissingBrandError(account_id)`
+  - `CreativeRejectError(adgroup_id, tencent_code, field)`
+
+## P2 — 架构清晰度
+
+### P2-1. 抽 `tools/account_assets_repo.py`
+- 集中:`get_account_brand` + 后续 `get_account_audience_pack` + `get_account_jump_template` 等
+- 现状:DB 访问散落各 tool
+
+### P2-2. 腾讯枚举集中
+- `tools/ad_api.py` 加 `class TencentAdStatus / TencentDeliveryMode / TencentCreativeType`
+- 替换:`"AD_STATUS_NORMAL" / "DELIVERY_MODE_COMPONENT" / "DYNAMIC_CREATIVE_TYPE_PROGRAM"` 等 magic string
+
+### P2-3. 完善 logging 链路
+- 召回 / 上传 / build / POST / 反查 全程 `step=xxx duration_ms=yyy`
+
+### P2-4. 模块 B 主循环 + 协作 hook
+- `find_ads_needing_creatives(account_id) -> list[ad_id]`(filter `system_status=CREATIVE_EMPTY` 或 `creative_count < MIN`)
+- 配合 plan 阶段 f 的 `execute_creation_once.py` 主入口
+
+## P3 — 可演进性
+
+### P3-1. `build_creative_request_body` 引入 `CreativeContext` dataclass
+- 把 brand / jump_path / button_text 等 SOP 字段打包,减少 8 参数
+
+### P3-2. 83846804 账户跑同样流程验证
+- brand 已就绪(`40915884255`),DB 已填,直接可用
+
+### P3-3. 加模块 B README
+- 位置:`tools/README_creative_creation.md`
+- 内容:数据流图 / 接口列表 / 已知腾讯错误码表 / 故障排查清单
+
+### P3-4. 图片 Content-Type 自动检测
+- 现状:硬编码 `image/jpeg`
+- 方案:`mimetypes.guess_type(url)` 或读 magic bytes
+
+## 已知腾讯错误码 — 排查表
+
+| code | 含义 | 修复方向 |
+|---|---|---|
+| 18001 | 缺失必填参数 `image_id` | image 组件不能用 `image_url`,必须先上传拿 `image_id` |
+| 1800269 | 品牌形象必填 | 必须传 `brand` 组件 + `brand_name + brand_image_id` |
+| 1530003 | 图片不存在或已删除 | `image_id` 跨账户不可复用,必须按账户独立上传 |
+
+## 已验证过的非业务事实
+
+- 同账户图片 MD5 幂等:重复上传同图返回相同 `image_id`(无需本地缓存)
+- 跨账户图片不可复用:同图在不同账户下 `image_id` 不同
+- 普通素材审核:`DYNAMIC_CREATIVE_STATUS_PENDING` → 2-4 小时
+- 广告 `system_status` 由 `CREATIVE_EMPTY` 切走有几秒到几分钟延迟(腾讯后端聚合状态)

+ 50 - 0
examples/auto_put_ad_mini/tools/ad_api.py

@@ -15,6 +15,7 @@
   TENCENT_AD_BASE_URL       API base,默认 https://api.e.qq.com/v3.0
 """
 
+import hashlib
 import json
 import logging
 import os
@@ -190,6 +191,55 @@ def _check(resp: Dict[str, Any], op: str) -> Dict[str, Any]:
     return resp.get("data") or {}
 
 
+# ===== 素材库(Image)— 公共能力,供创意/广告/品牌 共用 =====
+
+def images_add(account_id: int, image_url: str, timeout: int = 60) -> str:
+    """下载图片 → MD5 → POST /v3.0/images/add (multipart) → 返回 image_id (str)。
+
+    腾讯 MD5 幂等(已实测):同账户重复上传同图返回相同 image_id,**无需本地缓存**。
+    跨账户不复用:同图在不同账户下 image_id 不同。
+
+    Args:
+        account_id: 腾讯广告主账号 ID
+        image_url: 图片公网可下载 URL
+
+    Returns:
+        image_id (str) — 可直接填入 creative_components.image[].value.image_id
+                       或 creative_components.brand[].value.brand_image_id
+
+    Raises:
+        RuntimeError: 下载失败 / 腾讯 reject(含 code + message_cn)
+    """
+    resp = httpx.get(image_url, timeout=TIMEOUT)
+    resp.raise_for_status()
+    img_bytes = resp.content
+    img_md5 = hashlib.md5(img_bytes).hexdigest()
+
+    params = dict(_common_params(account_id))
+    user_token = _get_user_token_for_account(account_id)
+    if user_token:
+        params["user_token"] = user_token
+    url = f"{BASE_URL}/images/add?{urlencode(params)}"
+
+    files = {"file": ("image.jpg", img_bytes, "image/jpeg")}
+    data = {
+        "account_id": str(account_id),
+        "signature": img_md5,
+        "upload_type": "UPLOAD_TYPE_FILE",
+    }
+    logger.info(
+        "[images_add] account=%d url=%s md5=%s size=%d",
+        account_id, image_url[:60], img_md5, len(img_bytes),
+    )
+    api_resp = httpx.post(url, files=files, data=data, timeout=timeout).json()
+    if api_resp.get("code") != 0:
+        raise RuntimeError(
+            f"images_add 失败 account={account_id} url={image_url} "
+            f"code={api_resp.get('code')} msg={api_resp.get('message_cn') or api_resp.get('message')}"
+        )
+    return str(api_resp["data"]["image_id"])
+
+
 # ===== 广告(Ad)— 3.0 顶层单位 =====
 
 @tool(description="创建广告(腾讯广告3.0顶层单位,含营销目标/定向/出价/预算,对应API: /adgroups/add)")

+ 274 - 0
examples/auto_put_ad_mini/tools/creative_creation.py

@@ -0,0 +1,274 @@
+"""创意搭建主入口(模块 B)。
+
+数据流(2026-06-08 用户确认 + 端到端打通):
+  承接视频(piaoquantv) → 多路召回素材(vector) → top 1 → 上传素材图(MD5 幂等)
+  → 调 xcx/save 注册落地计划(拿 page_url + creative_name)→ 读账户 brand
+  → 构造 body → POST /dynamic_creatives/add → 绑到广告
+
+决策落地(已验证):
+- image 组件:**必须用 image_id**(image_url 会 reject code=18001)
+  → 实现:下载 cover URL → MD5 → POST /v3.0/images/add multipart → 拿 image_id
+  → MD5 幂等:同账户重复上传返回同 ID(无需本地缓存)
+- brand 组件:**必填**(漏传会 reject code=1800269)
+  → brand_image_id 跨账户不可复用(reject code=1530003)
+  → 从 DB account_whitelist 读账户级 brand_name/brand_image_id
+- dynamic_creative_type: DYNAMIC_CREATIVE_TYPE_PROGRAM(参考样本 9744753978)
+- description: 素材 title 一条(MVP)
+- jump_info / 命名:**piaoquantv xcx/save 接口统一管**
+  → page_url(mini_program_path)和 root_source_id(creative_name)都来自服务侧
+  → 客户端不本地拼接、不本地命名,保证归因锚点跨系统一致
+- 配置 NORMAL 状态,挂上后腾讯进入 PENDING(普通素材审核 2-4 小时)
+"""
+
+import logging
+from dataclasses import asdict
+from typing import Optional
+
+from config import MARKETING_CARRIER_GH_ID
+from tools.ad_api import _check, _get, _post, images_add
+from tools.landing_plan import LandingPlanResult, create_landing_plan
+from tools.material_recall import Material, recall_materials_for_video
+from tools.video_recall import (
+    LandingVideo,
+    fetch_landing_videos_for_account,
+    get_account_crowd_package,
+)
+
+logger = logging.getLogger(__name__)
+
+
+def _pick_image_url(material: Material) -> str:
+    """从素材里取 image URL — material.cover 优先,fallback 到 raw.imageList[0]"""
+    if material.cover:
+        return material.cover
+    images = (material.raw or {}).get("imageList") or []
+    return images[0] if images else ""
+
+
+def find_existing_creative_by_image(
+    account_id: int, adgroup_id: int, material_image_id: str,
+) -> Optional[int]:
+    """查 adgroup 下是否已有创意挂了这个素材 image_id(幂等检查)。
+
+    判等键(2026-06-08 决策):**同 adgroup + 同 material_image_id**。
+    - 不按 creative_name 判等:name = root_source_id,每次 xcx/save 都新,等于没做
+    - 按 image_id 判等:同 image 在同 adgroup 下挂多条会被腾讯模型降权曝光
+
+    Returns:
+        已有创意的 dynamic_creative_id;无则 None。
+    """
+    resp = _get("/dynamic_creatives/get", {
+        "account_id": account_id, "page": 1, "page_size": 100,
+        "filtering": [{
+            "field": "adgroup_id", "operator": "IN", "values": [str(adgroup_id)],
+        }],
+        "fields": ["dynamic_creative_id", "creative_components"],
+    })
+    items = (resp.get("data") or {}).get("list") or []
+    for c in items:
+        images = (c.get("creative_components") or {}).get("image") or []
+        for img in images:
+            existing_id = ((img.get("value") or {}).get("image_id")) or ""
+            if str(existing_id) == str(material_image_id):
+                cid = int(c.get("dynamic_creative_id"))
+                logger.info(
+                    "[idempotency] adgroup=%d image_id=%s 已挂创意 creative_id=%d,skip 新建",
+                    adgroup_id, material_image_id, cid,
+                )
+                return cid
+    return None
+
+
+def get_account_brand(account_id: int) -> dict:
+    """从 DB account_whitelist 读账户级 brand 资产。
+
+    返回 {"brand_name": str, "brand_image_id": str}。
+    跨账户 brand_image_id 不可复用,所以一定按 account_id 取。
+    """
+    from db.connection import get_connection
+
+    conn = get_connection()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(
+                "SELECT brand_name, brand_image_id FROM account_whitelist WHERE account_id=%s",
+                (account_id,),
+            )
+            row = cur.fetchone()
+    finally:
+        conn.close()
+
+    if not row or not row.get("brand_name") or not row.get("brand_image_id"):
+        raise RuntimeError(
+            f"account {account_id} 未在 account_whitelist 配置 brand_name/brand_image_id,"
+            f"无法挂创意(腾讯 brand 组件必填)"
+        )
+    return {
+        "brand_name": row["brand_name"],
+        "brand_image_id": str(row["brand_image_id"]),
+    }
+
+
+def build_creative_request_body(
+    account_id: int,
+    adgroup_id: int,
+    landing: LandingVideo,
+    material: Material,
+    material_image_id: str,
+    brand_name: str,
+    brand_image_id: str,
+    creative_name: str,
+    jump_path: str,
+) -> dict:
+    """生成 /v3.0/dynamic_creatives/add 的请求 body(纯函数,无 I/O)。
+
+    Args:
+        material_image_id: 已上传到当前账户的素材图 image_id
+        brand_name / brand_image_id: 账户级 brand 资产(已上传)
+        creative_name: 由 xcx/save 返回的 root_source_id(归因锚点)
+        jump_path: 由 xcx/save 返回的 page_url(小程序跳转路径)
+    """
+    description_content = material.title or landing.title or "查看详情"
+    jump_spec = {
+        "page_type": "PAGE_TYPE_WECHAT_MINI_PROGRAM",
+        "page_spec": {
+            "wechat_mini_program_spec": {
+                "mini_program_id": MARKETING_CARRIER_GH_ID,
+                "mini_program_path": jump_path,
+            }
+        },
+    }
+
+    return {
+        "account_id": account_id,
+        "adgroup_id": adgroup_id,
+        "dynamic_creative_name": creative_name,
+        "delivery_mode": "DELIVERY_MODE_COMPONENT",
+        "dynamic_creative_type": "DYNAMIC_CREATIVE_TYPE_PROGRAM",
+        "configured_status": "AD_STATUS_NORMAL",
+        "creative_components": {
+            "description": [{"value": {"content": description_content}}],
+            "image": [{"value": {"image_id": material_image_id}}],
+            "brand": [{"value": {
+                "brand_name": brand_name,
+                "brand_image_id": brand_image_id,
+            }}],
+            "action_button": [{"value": {"button_text": "查看详情"}}],
+            "jump_info": [{"value": jump_spec}],
+            "main_jump_info": [{"value": jump_spec}],
+        },
+    }
+
+
+def _pick_landing_and_materials(account_id: int) -> tuple[LandingVideo, list[Material]]:
+    """召回链:承接视频 → 多路素材召回 → 取第一个有素材命中的 landing 及其 top 素材。
+
+    出口:确保 landing.point_type+standard_element 都有值(否则多路召回会全 skip)。
+    """
+    videos = fetch_landing_videos_for_account(account_id, page_size=50)
+    valid = [v for v in videos if v.point_type and v.standard_element]
+    if not valid:
+        raise RuntimeError(
+            f"account={account_id} 无 pointType+standardElement 都有值的承接视频"
+        )
+
+    for v in valid:
+        materials = recall_materials_for_video(v, final_top_n=5)
+        if materials:
+            return v, materials
+
+    raise RuntimeError(
+        f"account={account_id} 前 {len(valid)} 条承接视频都召回 0 素材"
+    )
+
+
+def preview_for_account(account_id: int, adgroup_id: int) -> dict:
+    """承接视频 → 召回素材 → top 1 → 上传素材图 → 注册落地计划 → 读 brand → 构造 body。
+
+    **会真实调:**
+    - 腾讯 /images/add(MD5 幂等)
+    - piaoquantv xcx/save(每次新建一条计划)
+
+    body 是真实可投放的(只差 POST /dynamic_creatives/add)。
+    """
+    landing, materials = _pick_landing_and_materials(account_id)
+    top_material = materials[0]
+
+    image_url = _pick_image_url(top_material)
+    if not image_url:
+        raise ValueError(
+            f"素材 {top_material.material_id} 既无 cover 也无 imageList,无法构造 image 组件"
+        )
+    material_image_id = images_add(account_id, image_url)
+
+    crowd_package = get_account_crowd_package(account_id)
+    plan = create_landing_plan(crowd_package, landing)
+
+    brand = get_account_brand(account_id)
+
+    body = build_creative_request_body(
+        account_id=account_id, adgroup_id=adgroup_id,
+        landing=landing, material=top_material,
+        material_image_id=material_image_id,
+        brand_name=brand["brand_name"],
+        brand_image_id=brand["brand_image_id"],
+        creative_name=plan.root_source_id,
+        jump_path=plan.page_url,
+    )
+    return {
+        "landing": asdict(landing),
+        "material": asdict(top_material),
+        "material_image_id": material_image_id,
+        "brand": brand,
+        "landing_plan": asdict(plan),
+        "body": body,
+    }
+
+
+def create_creative_for_ad(
+    account_id: int, adgroup_id: int,
+    landing: LandingVideo, material: Material,
+    skip_if_exists: bool = True,
+) -> int:
+    """编排:上传素材图 → 幂等检查 → 注册落地计划 → 读 brand → build body → POST。
+
+    Args:
+        skip_if_exists: True(默认) — 同 adgroup + 同 material_image_id 已有创意时直接返回已有 ID,
+                        跳过 xcx/save 注册和腾讯 POST,节省审核额度 + 避免模型降权。
+                        False — 强制新建(用于 A/B 测试不同归因锚点的场景)。
+
+    Returns:
+        dynamic_creative_id(新建或已有)
+    """
+    image_url = _pick_image_url(material)
+    if not image_url:
+        raise ValueError(
+            f"素材 {material.material_id} 既无 cover 也无 imageList,无法构造 image 组件"
+        )
+    material_image_id = images_add(account_id, image_url)
+
+    if skip_if_exists:
+        existing_cid = find_existing_creative_by_image(account_id, adgroup_id, material_image_id)
+        if existing_cid:
+            return existing_cid
+
+    crowd_package = get_account_crowd_package(account_id)
+    plan = create_landing_plan(crowd_package, landing)
+
+    brand = get_account_brand(account_id)
+
+    body = build_creative_request_body(
+        account_id, adgroup_id, landing, material,
+        material_image_id=material_image_id,
+        brand_name=brand["brand_name"],
+        brand_image_id=brand["brand_image_id"],
+        creative_name=plan.root_source_id,
+        jump_path=plan.page_url,
+    )
+    logger.info(
+        "[creative_creation] POST /dynamic_creatives/add adgroup=%d name=%s image_id=%s plan_id=%d",
+        adgroup_id, body["dynamic_creative_name"], material_image_id, plan.plan_id,
+    )
+    resp = _post("/dynamic_creatives/add", body)
+    data = _check(resp, "creative_create")
+    return data.get("dynamic_creative_id")

+ 166 - 0
examples/auto_put_ad_mini/tools/landing_plan.py

@@ -0,0 +1,166 @@
+"""落地页计划注册接口(piaoquantv `xcx/save`)适配层。
+
+业务含义(2026-06-08 用户确认):
+  每条腾讯创意挂上去前,先到 piaoquantv 服务侧"注册一个落地计划",
+  服务侧返回:
+    - pageUrl:小程序跳转路径,**直接作为 creative_components.jump_info.mini_program_path**
+    - pageUrl 里嵌入了服务侧生成的 rootSourceId,**直接作为 dynamic_creative_name**(归因锚点)
+
+接口契约(严格按用户给的 curl,不增不减字段):
+  POST https://tp-open.piaoquantv.com/contentPlatform/plan/xcx/save
+  header: token: <fixed>
+  body: {
+    "audiencePackage": str,              # 账户级 crowd_package
+    "videoList": [                       # 单条逐次调,不批量
+      {"videoId": int, "title": str, "cover": str, "video": str, "experimentId": str}
+    ]
+  }
+
+重复调行为(用户确认):**新建** — 同 video 重复调返回不同 plan_id + 不同 rootSourceId + 不同 shareId。
+  客户端无需缓存 plan_id;每条腾讯创意对应一次 xcx/save 调用。
+"""
+
+import logging
+import os
+from dataclasses import dataclass, field
+from typing import Optional
+from urllib.parse import parse_qs, unquote
+
+import httpx
+
+from tools.video_recall import LandingVideo, PIAOQUANTV_TOKEN
+
+logger = logging.getLogger(__name__)
+
+XCX_SAVE_URL = os.getenv(
+    "PIAOQUANTV_XCX_SAVE_URL",
+    "https://tp-open.piaoquantv.com/contentPlatform/plan/xcx/save",
+)
+
+
+@dataclass
+class LandingPlanResult:
+    """xcx/save 返回的落地计划。
+
+    业务核心字段:
+      page_url       — 直接传给腾讯创意的 mini_program_path
+      root_source_id — 直接用作 dynamic_creative_name(归因锚点)
+      share_id       — pageUrl 里的 rootShareId/shareId(实测两者相同)
+    """
+
+    plan_id: int                     # piaoquantv 平台计划 ID(自增,8059)
+    page_url: str                    # `pages/category?jumpPage=...`
+    root_source_id: str              # 从 pageUrl 解析 — 也用作 creative_name
+    share_id: str                    # 从 pageUrl 解析
+    video_id: int                    # 入参回显,便于追溯
+    audience_package: str
+    raw: dict = field(default_factory=dict, repr=False)
+
+
+def _parse_inner_query(page_url: str) -> dict:
+    """解析 pageUrl 内层 query。
+
+    pageUrl 是双层 URL:
+      外层:`pages/category?jumpPage=<URL-encoded inner>`
+      内层:`pages/user-videos?fromGzh=1&rootShareId=...&id=...&shareId=...&rootSourceId=...`
+
+    返回:`{key: value}`(取 parse_qs 的第一个值)
+    """
+    q_idx = page_url.find("?")
+    if q_idx < 0:
+        raise ValueError(f"pageUrl 无 query 段: {page_url}")
+    outer = page_url[q_idx + 1:]
+    if not outer.startswith("jumpPage="):
+        raise ValueError(f"pageUrl 外层 query 不是 jumpPage=: {outer[:80]}")
+    inner_encoded = outer[len("jumpPage="):]
+    inner_url = unquote(inner_encoded)
+
+    inner_q_idx = inner_url.find("?")
+    if inner_q_idx < 0:
+        raise ValueError(f"jumpPage 解码后无 query: {inner_url}")
+    inner_query = inner_url[inner_q_idx + 1:]
+
+    parsed = parse_qs(inner_query, keep_blank_values=True)
+    return {k: v[0] for k, v in parsed.items() if v}
+
+
+def create_landing_plan(
+    crowd_package: str,
+    video: LandingVideo,
+    timeout: int = 30,
+) -> LandingPlanResult:
+    """调 xcx/save 注册一个落地计划,返回可直接挂创意的 pageUrl + creative_name。
+
+    Args:
+        crowd_package: 账户级 crowd_package(来自 account_whitelist)
+        video: LandingVideo(必须含 video_id/title/cover_url/video_url/experiment_id)
+    """
+    if not video.experiment_id:
+        raise ValueError(
+            f"video_id={video.video_id} 缺 experiment_id,xcx/save 入参不完整"
+        )
+
+    body = {
+        "audiencePackage": crowd_package,
+        "videoList": [{
+            "videoId": video.video_id,
+            "title": video.title,
+            "cover": video.cover_url,
+            "video": video.video_url,
+            "experimentId": video.experiment_id,
+        }],
+    }
+    headers = {
+        "Content-Type": "application/json",
+        "token": PIAOQUANTV_TOKEN,
+        "Accept": "application/json, text/plain, */*",
+    }
+
+    logger.info(
+        "[landing_plan] POST xcx/save crowd=%r videoId=%d experimentId=%s",
+        crowd_package, video.video_id, video.experiment_id,
+    )
+    resp = httpx.post(XCX_SAVE_URL, json=body, headers=headers, timeout=timeout)
+    resp.raise_for_status()
+    data = resp.json()
+
+    if data.get("code") != 0 or not data.get("success"):
+        raise RuntimeError(
+            f"xcx/save 失败 videoId={video.video_id} "
+            f"code={data.get('code')} msg={data.get('msg')}"
+        )
+
+    items = data.get("data") or []
+    if not items:
+        raise RuntimeError(
+            f"xcx/save 返回 data 为空 videoId={video.video_id}"
+        )
+    item = items[0]
+
+    page_url = item.get("pageUrl") or ""
+    if not page_url:
+        raise RuntimeError(
+            f"xcx/save 返回 pageUrl 为空 videoId={video.video_id} item={item}"
+        )
+
+    inner = _parse_inner_query(page_url)
+    root_source_id = inner.get("rootSourceId") or ""
+    share_id = inner.get("shareId") or ""
+    if not root_source_id:
+        raise RuntimeError(
+            f"pageUrl 内层未解析出 rootSourceId: {page_url}"
+        )
+
+    logger.info(
+        "[landing_plan] OK plan_id=%s rootSourceId=%s",
+        item.get("id"), root_source_id,
+    )
+    return LandingPlanResult(
+        plan_id=int(item.get("id") or 0),
+        page_url=page_url,
+        root_source_id=root_source_id,
+        share_id=share_id,
+        video_id=video.video_id,
+        audience_package=crowd_package,
+        raw=item,
+    )

+ 261 - 0
examples/auto_put_ad_mini/tools/material_recall.py

@@ -0,0 +1,261 @@
+"""创意素材召回接口适配层(vector 服务)。
+
+业务模型(用户 2026-06-08 确认):
+  对一个承接视频(LandingVideo)执行 3 种策略召回素材,合并去重,按 score 取 top N。
+
+接口:POST https://api-internal.piaoquantv.com/videoVector/recallTest/matchByText
+鉴权:不需要(内部 API)
+
+本质(用户 2026-06-08 确认):**多路召回 → 排序 → 去重**
+
+实测确认素材库的 4 个有效 configCode(VIDEO_TITLE 和 ALL 对 MATERIAL 模态返回 0):
+  - VIDEO_TOPIC      (选题向量)
+  - VIDEO_KEYPOINT   (关键点向量)
+  - VIDEO_INSPIRATION(灵感点向量)
+  - VIDEO_PURPOSE    (目的点向量)
+
+策略:对一个承接视频,选合适的 queryText,**串行调 4 个 configCode**,合并按 materialId 去重(取 max score),按 score 降序返回 top N。
+
+queryText 选择优先级(选第一个非空且非占位符 "-"):
+  1. standard_element(标准化元素,实测效果最好)
+  2. demand_content_topic(选题,如果非 "-")
+  3. title(承接视频标题,兜底)
+"""
+
+import logging
+import os
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+import httpx
+
+from tools.video_recall import LandingVideo
+
+logger = logging.getLogger(__name__)
+
+VECTOR_BASE = os.getenv(
+    "VECTOR_BASE_URL",
+    "https://api-internal.piaoquantv.com/videoVector",
+)
+VECTOR_MATCH_BY_TEXT = f"{VECTOR_BASE}/recallTest/matchByText"
+VECTOR_ALL_CONFIG_CODES = f"{VECTOR_BASE}/videoSearch/getAllConfigCodes"
+
+
+def fetch_all_config_codes() -> dict[str, str]:
+    """动态获取 vector 服务支持的全部 configCode → 中文名。
+
+    返回 {configCode: 中文名}。
+    召回前先调一次,避免硬编码列表过时。
+    """
+    resp = httpx.get(VECTOR_ALL_CONFIG_CODES, timeout=15)
+    resp.raise_for_status()
+    data = resp.json()
+    if data.get("code") not in (0, 200):
+        raise RuntimeError(
+            f"getAllConfigCodes 失败:code={data.get('code')} msg={data.get('msg')}"
+        )
+    return data.get("data") or {}
+
+# 字段维度策略(2026-06-08 用户最终确认)
+# - 维度 1 选题:       queryText=demand_content_topic, configCode=VIDEO_TOPIC
+# - 维度 2 标准化元素: queryText=standard_element, configCode 按 point_type **唯一对应**:
+#     - "灵感点" → INSPIRATION_SUBSTANCE
+#     - "关键点" → KEYPOINT_SUBSTANCE
+#     - "目的点" → PURPOSE_SUBSTANCE
+#     - 其他/空    → 跳过(没办法确定走哪一路,no-guessing)
+# - 维度 3 标题:       不做(VIDEO_TITLE 对 MATERIAL 实测返回 0)
+POINT_TYPE_TO_SUBSTANCE = {
+    "灵感点": "INSPIRATION_SUBSTANCE",
+    "关键点": "KEYPOINT_SUBSTANCE",
+    "目的点": "PURPOSE_SUBSTANCE",
+}
+
+# fallback(动态接口失败时用)
+MATERIAL_EFFECTIVE_CONFIG_CODES = ["VIDEO_TOPIC"] + list(POINT_TYPE_TO_SUBSTANCE.values())
+
+# 占位符值视为无效(piaoquantv 数据里 demandContentTopic 大量为 "-")
+PLACEHOLDER_VALUES = {"", "-", None}
+
+# 单 configCode 召回 top N(合并前)
+DEFAULT_PER_CC_TOP_N = 100
+# 最终合并去重后返回 top N
+DEFAULT_FINAL_TOP_N = 20
+
+
+@dataclass
+class Material:
+    """召回的创意素材。
+
+    注意:`material_id` 是 vector 服务的素材 ID
+    (内部=数字串,外部=32位 MD5)。**与腾讯 video_id/cover_id 的映射机制待用户后续给。**
+    """
+
+    material_id: str           # 素材原始 ID
+    score: float               # 向量相似度(max if 多 configCode 命中)
+    rank_score: Optional[float] = None    # 精排综合分(未精排时 null)
+    title: str = ""
+    cover: str = ""
+    video_url: str = ""
+    # 命中过的所有 configCode(用于调试/可解释性)
+    hit_config_codes: List[str] = field(default_factory=list)
+    # 原始 item dict(以备后用)
+    raw: dict = field(default_factory=dict, repr=False)
+
+
+def _call_match_by_text(
+    query_text: str,
+    config_code: str,
+    material_top_n: int = DEFAULT_PER_CC_TOP_N,
+    source_labels: Optional[List[str]] = None,
+    timeout: int = 30,
+) -> List[dict]:
+    """单次调 matchByText,只要 MATERIAL 模态,返回原始 items 列表。"""
+    body = {
+        "queryText": query_text,
+        "configCode": config_code,
+        "modalities": ["MATERIAL"],
+        "videoTopN": 0,
+        "articleTopN": 0,
+        "materialTopN": material_top_n,
+        "topN": material_top_n,
+        "displayK": material_top_n,
+    }
+    if source_labels:
+        body["sourceLabels"] = source_labels
+
+    logger.info(
+        "[material_recall] matchByText q=%r configCode=%s topN=%d",
+        query_text[:40], config_code, material_top_n,
+    )
+    resp = httpx.post(
+        VECTOR_MATCH_BY_TEXT,
+        json=body,
+        headers={"content-type": "application/json", "accept": "application/json"},
+        timeout=timeout,
+    )
+    resp.raise_for_status()
+    data = resp.json()
+
+    code = data.get("code")
+    # CommonResponse 可能用 0 或 200 表示成功;以 data 字段为准
+    if code not in (0, 200, "0", "200"):
+        raise RuntimeError(
+            f"vector matchByText 失败:code={code} msg={data.get('msg') or data.get('message')}"
+        )
+
+    payload = data.get("data") or {}
+    items = payload.get("items") or []
+    # 只留 MATERIAL 模态(防御性 — 万一返回混入)
+    return [it for it in items if it.get("modality") == "MATERIAL"]
+
+
+def _pick_query_text(landing: LandingVideo) -> Optional[str]:
+    """按优先级选 queryText:standard_element > demand_content_topic > title。
+    占位符 "-" / 空串视为无效。"""
+    for cand in (landing.standard_element, landing.demand_content_topic, landing.title):
+        if cand and cand not in PLACEHOLDER_VALUES:
+            return cand
+    return None
+
+
+def _build_strategies(landing: LandingVideo) -> list[tuple[str, str, str]]:
+    """根据承接视频字段构造召回策略(用户 2026-06-08 最终确认)。
+
+    返回 [(queryText, configCode, strategy_name), ...]
+    每条策略 = 1 次接口调用
+    """
+    strategies = []
+    # 维度 1: 选题
+    if landing.demand_content_topic and landing.demand_content_topic not in PLACEHOLDER_VALUES:
+        strategies.append((landing.demand_content_topic, "VIDEO_TOPIC", "选题"))
+    # 维度 2: 标准化元素 — point_type 决定走哪个 SUBSTANCE
+    if landing.standard_element and landing.standard_element not in PLACEHOLDER_VALUES:
+        cc = POINT_TYPE_TO_SUBSTANCE.get(landing.point_type)
+        if cc:
+            strategies.append((landing.standard_element, cc, f"标准化元素-{landing.point_type}"))
+        else:
+            logger.info(
+                "[material_recall] standard_element 有值但 point_type=%r 不在映射中,"
+                "标准化元素维度跳过",
+                landing.point_type,
+            )
+    return strategies
+
+
+def recall_materials_for_video(
+    landing: LandingVideo,
+    final_top_n: int = DEFAULT_FINAL_TOP_N,
+    per_cc_top_n: int = DEFAULT_PER_CC_TOP_N,
+    source_labels: Optional[List[str]] = None,
+) -> List[Material]:
+    """按字段维度分组召回(2026-06-08 用户确认)。
+
+    - 选题维度       : queryText=demand_content_topic, configCode=VIDEO_TOPIC
+    - 标准化元素维度 : queryText=standard_element, configCode 按 point_type 唯一对应
+    - 标题维度       : 不做(VIDEO_TITLE 对 MATERIAL 模态实测返回 0)
+
+    合并同 materialId 取 max(score),按 score 降序返回 top N。
+    """
+    strategies = _build_strategies(landing)
+    if not strategies:
+        logger.warning(
+            "[material_recall] landing video_id=%d 无有效策略(选题/标准化元素字段都缺),返回空",
+            landing.video_id,
+        )
+        return []
+
+    logger.info(
+        "[material_recall] landing video_id=%d 走 %d 个策略",
+        landing.video_id, len(strategies),
+    )
+
+    merged: dict[str, Material] = {}
+    for query_text, cc, name in strategies:
+        logger.info("[material_recall]   策略=%s q=%r configCode=%s",
+                    name, query_text[:30], cc)
+        try:
+            items = _call_match_by_text(
+                query_text=query_text, config_code=cc,
+                material_top_n=per_cc_top_n,
+                source_labels=source_labels,
+            )
+        except Exception as e:
+            logger.error("[material_recall] 策略 %s 失败,跳过:%s", name, e)
+            continue
+
+        logger.info("[material_recall]     → 返回 %d 条", len(items))
+        for it in items:
+            mid = it.get("materialId") or (str(it["id"]) if it.get("id") is not None else None)
+            if not mid:
+                continue
+            score = float(it.get("score") or 0.0)
+            rank = it.get("rankScore")
+            existing = merged.get(mid)
+            if existing is None:
+                merged[mid] = Material(
+                    material_id=str(mid),
+                    score=score,
+                    rank_score=float(rank) if rank is not None else None,
+                    title=it.get("title") or "",
+                    cover=it.get("cover") or "",
+                    video_url=it.get("videoUrl") or "",
+                    hit_config_codes=[cc],
+                    raw=it,
+                )
+            else:
+                # 取 max score;hit_config_codes 追加
+                if score > existing.score:
+                    existing.score = score
+                if cc not in existing.hit_config_codes:
+                    existing.hit_config_codes.append(cc)
+                # rank_score 取首次非空值(策略间应一致;若不一致以首次为准,后续可改)
+                if existing.rank_score is None and rank is not None:
+                    existing.rank_score = float(rank)
+
+    # 按 score 降序
+    out = sorted(merged.values(), key=lambda m: m.score, reverse=True)
+    logger.info(
+        "[material_recall] landing video_id=%d 三策略合并去重 → %d 条,返回 top %d",
+        landing.video_id, len(out), final_top_n,
+    )
+    return out[:final_top_n]

+ 185 - 0
examples/auto_put_ad_mini/tools/video_recall.py

@@ -0,0 +1,185 @@
+"""承接视频获取接口适配层(piaoquantv API)。
+
+业务模型(用户 2026-06-08 确认):
+  - "承接视频" = 用户点开创意/进入小程序后看到的视频内容
+  - 接口已按 ROV 降序 + 访问量过滤,我们拿前 N 即可,**无需自己排序**
+  - 这一步**不是**创意素材召回 — 创意素材在后续 material_recall 步用本视频字段
+    (title / standardElement / categoryName / demandContentTitle 等)做 query 召回
+
+接口:POST https://tp-open.piaoquantv.com/contentPlatform/plan/videoContentList
+认证:token header(沿用用户给的值,可通过 env PIAOQUANTV_TOKEN 覆盖)
+
+字段参考(2026-06-08 反推自 sample 调用):
+  返回 data.objs[*] 39 个字段,关键:
+    videoId        : 业务侧 8 位 video ID(注:非腾讯素材库 video_id,11 位)
+    title / cover / video : 标题 + 封面 URL + 视频 URL
+    score / rov / sim / visitUv : 推荐分(已排序)
+    standardElement / categoryName / demandContentTitle / dimension : 语义维度
+"""
+
+import logging
+import os
+from dataclasses import dataclass, field
+from typing import List, Optional
+
+import httpx
+
+logger = logging.getLogger(__name__)
+
+PIAOQUANTV_VIDEO_API = "https://tp-open.piaoquantv.com/contentPlatform/plan/videoContentList"
+PIAOQUANTV_TOKEN = os.getenv(
+    "PIAOQUANTV_TOKEN",
+    "f6e07dba7fe3476cb31fd3733d607c5c",  # 用户 2026-06-08 提供
+)
+
+
+@dataclass
+class LandingVideo:
+    """承接视频(landing video)— 用户进小程序后看到的视频。
+
+    注意:`video_id` 是 piaoquantv 业务侧 ID(8 位),**不**是腾讯素材库 video_id(11 位)。
+    挂创意时腾讯 creative_components.video 需要的是腾讯 video_id,二者映射逻辑待补。
+    """
+
+    video_id: int
+    title: str
+    cover_url: str
+    video_url: str
+    # 推荐分数(接口已用 rov 降序)
+    score: float
+    rov: float
+    sim: float
+    visit_uv: int
+    # 召回素材时用的语义维度(对应 material_recall 三策略)
+    standard_element: str           # 例:"煽动性"
+    category_name: str              # 例:"情感强度控制"
+    demand_content_title: str       # 关联原视频标题
+    demand_content_topic: str       # 关联原视频选题(对应 VIDEO_TOPIC 召回)
+    demand_content_id: str          # 关联原视频 ID
+    demand_type: str                # 需求类型
+    point_type: str                 # 点类型("关键点"/"灵感点"/"目的点"),决定 standardElement 召回的 configCode
+    dimension: str                  # 例:"传播的头部"
+    experiment_id: str              # 实验 ID(透传给 xcx/save,接口会嵌入 pageUrl)
+    # 原始 raw(以备未来用其他字段)
+    raw: dict = field(default_factory=dict, repr=False)
+
+
+def fetch_landing_videos(
+    crowd_package: str,
+    page_size: int = 10,
+    page_num: int = 1,
+    video_business_type: int = 5,    # type=5 = 小程序投流(用户确认)
+    source: str = "prior",            # 投放人群需求类型(用户确认)
+    title_filter: str = "",
+    sort: int = 0,
+    timeout: int = 30,
+) -> List[LandingVideo]:
+    """从 piaoquantv 拉承接视频列表(按 ROV 降序,直接取 top N)。
+
+    Args:
+        crowd_package: 人群包名,例:"泛人群" / "回流330以上人群"
+        page_size: 取前 K 条
+        page_num: 分页(默认 1)
+        video_business_type: 5=小程序投流(默认,SOP 不变)
+        source: 投放人群需求类型(默认 "prior")
+        title_filter: 标题过滤(空则不过滤)
+    """
+    body = {
+        "title": title_filter,
+        "sort": sort,
+        "type": video_business_type,
+        "pageNum": page_num,
+        "pageSize": page_size,
+        "source": source,
+        "crowdPackage": crowd_package,
+    }
+    headers = {
+        "content-type": "application/json",
+        "token": PIAOQUANTV_TOKEN,
+        "accept": "application/json",
+    }
+
+    logger.info(
+        "[video_recall] fetch crowd=%r page=%d size=%d", crowd_package, page_num, page_size,
+    )
+    resp = httpx.post(PIAOQUANTV_VIDEO_API, json=body, headers=headers, timeout=timeout)
+    resp.raise_for_status()
+    data = resp.json()
+
+    if data.get("code") != 0 or not data.get("success"):
+        raise RuntimeError(
+            f"piaoquantv 接口失败:code={data.get('code')} msg={data.get('msg')}"
+        )
+
+    payload = data.get("data") or {}
+    objs = payload.get("objs") or []
+
+    videos: List[LandingVideo] = []
+    for obj in objs:
+        if not isinstance(obj, dict) or "videoId" not in obj:
+            logger.warning("[video_recall] 跳过缺 videoId 的项:%s", obj)
+            continue
+        videos.append(
+            LandingVideo(
+                video_id=int(obj["videoId"]),
+                title=obj.get("title") or "",
+                cover_url=obj.get("cover") or "",
+                video_url=obj.get("video") or "",
+                score=float(obj.get("score") or 0.0),
+                rov=float(obj.get("rov") or 0.0),
+                sim=float(obj.get("sim") or 0.0),
+                visit_uv=int(obj.get("visitUv") or 0),
+                standard_element=obj.get("standardElement") or "",
+                category_name=obj.get("categoryName") or "",
+                demand_content_title=obj.get("demandContentTitle") or "",
+                demand_content_topic=obj.get("demandContentTopic") or "",
+                demand_content_id=obj.get("demandContentId") or "",
+                demand_type=obj.get("demandType") or "",
+                point_type=obj.get("pointType") or "",
+                dimension=obj.get("dimension") or "",
+                experiment_id=obj.get("experimentId") or "",
+                raw=obj,
+            )
+        )
+
+    logger.info(
+        "[video_recall] 返回 %d 条承接视频(总池 %d / 当前页 %d)",
+        len(videos), payload.get("totalSize") or 0, payload.get("currentPage") or 1,
+    )
+    return videos
+
+
+def get_account_crowd_package(account_id: int) -> str:
+    """从 account_whitelist 读账户级 crowd_package。
+
+    crowd_package 决定:
+      - videoContentList 拉哪批承接视频(本模块)
+      - xcx/save 注册落地计划的 audiencePackage(landing_plan 模块)
+    """
+    from db.connection import get_connection
+
+    conn = get_connection()
+    try:
+        with conn.cursor() as cur:
+            cur.execute(
+                "SELECT crowd_package FROM account_whitelist WHERE account_id=%s",
+                (account_id,),
+            )
+            row = cur.fetchone()
+    finally:
+        conn.close()
+
+    if not row or not row.get("crowd_package"):
+        raise ValueError(
+            f"account_id {account_id} 的 crowd_package 未在 account_whitelist 配置"
+        )
+    return row["crowd_package"]
+
+
+def fetch_landing_videos_for_account(
+    account_id: int,
+    page_size: int = 10,
+) -> List[LandingVideo]:
+    """根据账户的 crowd_package 字段拉视频。"""
+    crowd_package = get_account_crowd_package(account_id)
+    return fetch_landing_videos(crowd_package=crowd_package, page_size=page_size)