| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716 |
- """
- 预算计算引擎 — 出价调整与账户评估
- 改造后的三层架构:
- - 数据层:get_ad_performance / get_account_summary — 获取原始数据
- - 计算层:compute_budget_thresholds / classify_ads / compute_bid_adjustment — 确定性计算
- - 执行层:bid_adjustment_execute — 调用 API 执行调整
- 所有策略参数(分位数、决策矩阵、幅度)从策略配置层传入,不在此处硬编码。
- """
- import json
- import logging
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Any, Dict, List, Optional
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- from examples.auto_put_ad.tools.ad_api import ad_update
- from examples.auto_put_ad.tools.data_query import _get_odps_client
- from examples.auto_put_ad.tools.strategy_config import (
- get_strategy_config,
- get_decision_matrix,
- determine_strategy,
- )
- logger = logging.getLogger(__name__)
- # ===== 内部辅助函数 =====
- def _parse_bizdate(bizdate: str) -> tuple:
- """解析业务日期,返回 (YYYYMMDD, YYYY-MM-DD)"""
- if bizdate in ("yesterday", ""):
- biz = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- else:
- biz = bizdate.replace("-", "")
- biz_dash = f"{biz[:4]}-{biz[4:6]}-{biz[6:]}"
- return biz, biz_dash
- def _build_efficiency_sql(biz: str, biz_dash: str) -> str:
- """构建昨日效率数据 SQL(广告维度聚合)"""
- return f"""
- SELECT
- a.account_id,
- a.ad_id,
- c.ad_name,
- c.create_time,
- SUM(b.cost/100) AS cost,
- SUM(b.valid_click_count) AS valid_click_count,
- SUM(b.conversions_count) AS conversions_count,
- SUM(t.首层小程序打开数) AS open_count,
- SUM(t.裂变0层回流数) AS fission0_count,
- SUM(t.总回流人数) AS total_return_count
- FROM (
- SELECT
- IF(c.creative_name IS NOT NULL, c.creative_name, t.rootsourceid) AS creative_name,
- t.*
- FROM loghubods.touliu_data t
- LEFT JOIN (
- SELECT DISTINCT creative_name,
- SPLIT(GET_JSON_OBJECT(page_spec,'$.wechat_mini_program_spec.mini_program_path'),'rootSourceId%3D')[1] AS rootsourceid
- FROM loghubods.ad_put_tencent_creative_components a
- LEFT JOIN loghubods.ad_put_tencent_creative_day b ON a.creative_id = b.creative_id
- WHERE page_type = 'PAGE_TYPE_WECHAT_MINI_PROGRAM'
- ) c ON c.rootsourceid = t.rootsourceid
- WHERE t.dt = '{biz}'
- ) t
- LEFT JOIN loghubods.ad_put_tencent_creative_day a ON t.creative_name = a.creative_name
- LEFT JOIN loghubods.ad_put_tencent_ad c ON a.ad_id = c.ad_id
- LEFT JOIN (
- SELECT creative_id, valid_click_count, cost, conversions_count
- FROM (
- SELECT creative_id, valid_click_count, cost, conversions_count,
- ROW_NUMBER() OVER (PARTITION BY creative_id ORDER BY update_time DESC) AS rank
- FROM loghubods.ad_put_tencent_creative_data_day
- WHERE dt = '{biz_dash}'
- ) t WHERE rank = 1
- ) b ON a.creative_id = b.creative_id
- WHERE t.dt = '{biz}'
- AND a.ad_id IS NOT NULL
- AND b.cost IS NOT NULL
- AND b.cost > 0
- GROUP BY a.account_id, a.ad_id, c.ad_name, c.create_time
- """
- # ═════════════════════════════════════════════════════════════
- # 数据层 — 获取原始数据
- # ═════════════════════════════════════════════════════════════
- @tool(description="获取指定账户昨日广告效果数据,包含效率分、消耗、转化数、冷启动信息等")
- async def get_ad_performance(
- account_id: int = 0,
- bizdate: str = "yesterday",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 拉取昨日广告效果数据(广告维度聚合)。
- Args:
- account_id: 账户ID(传 0 不过滤账户,拉全量)
- bizdate: 业务日期,"yesterday" 或 YYYYMMDD 格式
- Returns:
- 结构化广告列表,每条含: ad_id, account_id, cost, efficiency,
- open_count, conversions_count, create_time, bid_amount, ad_status
- """
- try:
- client = _get_odps_client()
- if client is None:
- return ToolResult(title="get_ad_performance 失败", output="ODPS 客户端未初始化")
- biz, biz_dash = _parse_bizdate(bizdate)
- # 拉取效率数据
- sql_efficiency = _build_efficiency_sql(biz, biz_dash)
- df_eff = client.execute_sql(sql_efficiency)
- if df_eff.empty:
- return ToolResult(title="get_ad_performance", output=f"昨日({biz})无效率数据")
- # 按账户过滤
- if account_id > 0:
- df_eff = df_eff[df_eff["account_id"].astype(float).astype("Int64") == account_id]
- if df_eff.empty:
- return ToolResult(title="get_ad_performance", output=f"账户 {account_id} 昨日({biz})无数据")
- # 拉取当前出价/状态
- ad_ids = [int(x) for x in df_eff["ad_id"].dropna().unique() if str(x) != "nan"]
- ad_ids_str = ",".join(map(str, ad_ids))
- sql_status = f"""
- SELECT ad_id, ad_name, account_id, bid_amount, day_amount, ad_status, optimization_goal
- FROM loghubods.ad_put_tencent_ad
- WHERE ad_id IN ({ad_ids_str})
- """
- df_status = client.execute_sql(sql_status)
- # 合并
- df_eff["ad_id"] = df_eff["ad_id"].astype(float).astype("Int64")
- df_status["ad_id"] = df_status["ad_id"].astype(float).astype("Int64")
- df = pd.merge(
- df_eff,
- df_status[["ad_id", "bid_amount", "day_amount", "ad_status", "optimization_goal"]],
- on="ad_id", how="left",
- )
- # 计算效率分
- df["efficiency"] = df.apply(
- lambda r: r["fission0_count"] / r["cost"]
- if r["cost"] and r["cost"] > 0 and r["fission0_count"] is not None and pd.notna(r["fission0_count"])
- else None,
- axis=1,
- )
- total_cost = float(df["cost"].sum())
- avg_eff = float(df["efficiency"].mean()) if not df["efficiency"].isna().all() else 0
- lines = [
- f"广告效果数据({biz},{len(df)}条广告)",
- f"总消耗: {total_cost:,.0f}元,平均效率分: {avg_eff:.4f}",
- "",
- f"{'广告ID':<12} {'账户ID':<12} {'消耗(元)':>10} {'效率分':>8} {'转化':>5} {'出价(分)':>8} {'状态':<10}",
- "-" * 70,
- ]
- for _, row in df.head(20).iterrows():
- eff_str = f"{row['efficiency']:.4f}" if pd.notna(row["efficiency"]) else "-"
- bid_str = str(int(float(row["bid_amount"]))) if pd.notna(row.get("bid_amount")) else "-"
- conv = int(row["conversions_count"]) if pd.notna(row.get("conversions_count")) else 0
- lines.append(
- f"{int(row['ad_id']):<12} {int(row['account_id']):<12} "
- f"{row['cost']:>10,.0f} {eff_str:>8} {conv:>5} {bid_str:>8} "
- f"{str(row.get('ad_status', '')):>10}"
- )
- if len(df) > 20:
- lines.append(f"... 还有 {len(df) - 20} 条")
- return ToolResult(
- title=f"广告效果数据({len(df)}条,总消耗{total_cost:,.0f}元)",
- output="\n".join(lines),
- metadata={
- "ad_data": df.to_dict("records"),
- "total_cost": total_cost,
- "avg_efficiency": avg_eff,
- "ad_count": len(df),
- "bizdate": biz,
- },
- )
- except Exception as e:
- logger.error("get_ad_performance 失败: %s", e, exc_info=True)
- return ToolResult(title="get_ad_performance 失败", output=str(e))
- @tool(description="获取账户维度昨日汇总数据,包含消耗、效率、广告数、稳定性标签")
- async def get_account_summary(
- bizdate: str = "yesterday",
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 账户级汇总:查询各账户昨日汇总数据,按消耗量判断稳定性。
- Args:
- bizdate: 数据日期,"yesterday" 或 YYYYMMDD 格式
- """
- try:
- client = _get_odps_client()
- if client is None:
- return ToolResult(title="get_account_summary 失败", output="ODPS 客户端未初始化")
- biz, biz_dash = _parse_bizdate(bizdate)
- inner_sql = _build_efficiency_sql(biz, biz_dash)
- sql = f"""
- SELECT
- account_id,
- COUNT(DISTINCT ad_id) AS ad_count,
- SUM(cost) AS total_cost,
- SUM(open_count) AS total_open,
- SUM(fission0_count) AS total_fission0
- FROM ({inner_sql}) t
- GROUP BY account_id
- """
- df = client.execute_sql(sql)
- if df.empty:
- return ToolResult(title="get_account_summary", output=f"昨日({biz})无账户数据")
- df["avg_efficiency"] = df.apply(
- lambda r: r["total_fission0"] / r["total_cost"] if r["total_cost"] and r["total_cost"] > 0 else 0,
- axis=1,
- )
- median_cost = df["total_cost"].median()
- p30_cost = df["total_cost"].quantile(0.30)
- def label_stability(cost):
- if cost >= median_cost:
- return "稳定"
- elif cost >= p30_cost:
- return "一般"
- else:
- return "低量"
- df["stability"] = df["total_cost"].apply(label_stability)
- df = df.sort_values("total_cost", ascending=False).reset_index(drop=True)
- lines = [
- f"账户汇总({biz},共 {len(df)} 个账户)",
- f"消耗中位数: {median_cost:,.0f}元",
- "",
- f"{'账户ID':<15} {'广告数':>6} {'昨日消耗(元)':>12} {'效率分均值':>10} {'稳定性':>6}",
- "-" * 55,
- ]
- for _, row in df.iterrows():
- lines.append(
- f"{int(row['account_id']):<15} {int(row['ad_count']):>6} "
- f"{row['total_cost']:>12,.0f} {row['avg_efficiency']:>10.4f} "
- f"{row['stability']:>6}"
- )
- stable_high_eff = df[(df["stability"] == "稳定") & (df["avg_efficiency"] > df["avg_efficiency"].median())]
- if not stable_high_eff.empty:
- lines += ["", "扩量建议账户(稳定 + 效率分高于中位数):"]
- for _, row in stable_high_eff.iterrows():
- lines.append(f" 账户 {int(row['account_id'])}(消耗 {row['total_cost']:,.0f}元,效率分 {row['avg_efficiency']:.4f})")
- return ToolResult(
- title=f"账户汇总({len(df)}个账户)",
- output="\n".join(lines),
- metadata={
- "accounts": df.to_dict("records"),
- "median_cost": median_cost,
- "bizdate": biz,
- },
- )
- except Exception as e:
- logger.error("get_account_summary 失败: %s", e, exc_info=True)
- return ToolResult(title="get_account_summary 失败", output=str(e))
- # ═════════════════════════════════════════════════════════════
- # 计算层 — 确定性计算,策略参数从外部传入
- # ═════════════════════════════════════════════════════════════
- @tool(description="计算分位数阈值(ROI高/低 + 消耗中位数),分位数参数可自定义")
- async def compute_budget_thresholds(
- ad_data_json: str,
- roi_high_percentile: float = 0.70,
- roi_low_percentile: float = 0.30,
- cost_mid_percentile: float = 0.50,
- min_open_count: int = 100,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 基于广告效果数据计算分位数阈值。
- Args:
- ad_data_json: 广告效果数据 JSON(来自 get_ad_performance 的 metadata.ad_data)
- roi_high_percentile: ROI 高阈值分位数(默认 P70,可调)
- roi_low_percentile: ROI 低阈值分位数(默认 P30,可调)
- cost_mid_percentile: 消耗中位数分位数(默认 P50,可调)
- min_open_count: 有效广告最低打开数(默认 100)
- """
- try:
- ad_data = json.loads(ad_data_json) if isinstance(ad_data_json, str) else ad_data_json
- df = pd.DataFrame(ad_data)
- # 筛选有效广告
- df_valid = df[df["open_count"] >= min_open_count].copy()
- df_nosample = df[df["open_count"] < min_open_count]
- if df_valid.empty:
- return ToolResult(
- title="compute_budget_thresholds",
- output=f"无有效广告(open_count >= {min_open_count})",
- )
- # 确保 efficiency 列有效
- df_valid["efficiency"] = pd.to_numeric(df_valid["efficiency"], errors="coerce")
- df_valid = df_valid.dropna(subset=["efficiency"])
- thresholds = {
- "roi_high": float(df_valid["efficiency"].quantile(roi_high_percentile)),
- "roi_low": float(df_valid["efficiency"].quantile(roi_low_percentile)),
- "cost_mid": float(df_valid["cost"].quantile(cost_mid_percentile)),
- }
- output = (
- f"阈值计算结果(有效广告 {len(df_valid)} 条,样本不足 {len(df_nosample)} 条):\n"
- f" ROI 高阈值 (P{int(roi_high_percentile*100)}): {thresholds['roi_high']:.4f}\n"
- f" ROI 低阈值 (P{int(roi_low_percentile*100)}): {thresholds['roi_low']:.4f}\n"
- f" 消耗中位数 (P{int(cost_mid_percentile*100)}): {thresholds['cost_mid']:.0f}元\n"
- f"\n分位数参数: roi_high=P{int(roi_high_percentile*100)}, "
- f"roi_low=P{int(roi_low_percentile*100)}, "
- f"cost_mid=P{int(cost_mid_percentile*100)}"
- )
- return ToolResult(
- title=f"阈值计算({len(df_valid)}条有效广告)",
- output=output,
- metadata={
- "thresholds": thresholds,
- "valid_count": len(df_valid),
- "nosample_count": len(df_nosample),
- "percentiles_used": {
- "roi_high": roi_high_percentile,
- "roi_low": roi_low_percentile,
- "cost_mid": cost_mid_percentile,
- },
- },
- )
- except Exception as e:
- logger.error("compute_budget_thresholds 失败: %s", e, exc_info=True)
- return ToolResult(title="compute_budget_thresholds 失败", output=str(e))
- @tool(description="将广告按 ROI×跑量 二维象限分类,阈值从外部传入")
- async def classify_ads(
- ad_data_json: str,
- thresholds_json: str,
- min_open_count: int = 100,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 将每条广告分类到 ROI × 跑量 二维象限。
- Args:
- ad_data_json: 广告效果数据 JSON
- thresholds_json: 阈值 JSON,如 {"roi_high": 0.05, "roi_low": 0.02, "cost_mid": 500}
- min_open_count: 有效广告最低打开数
- """
- try:
- ad_data = json.loads(ad_data_json) if isinstance(ad_data_json, str) else ad_data_json
- thresholds = json.loads(thresholds_json) if isinstance(thresholds_json, str) else thresholds_json
- df = pd.DataFrame(ad_data)
- df_valid = df[df["open_count"] >= min_open_count].copy()
- df_nosample = df[df["open_count"] < min_open_count]
- if df_valid.empty:
- return ToolResult(title="classify_ads", output="无有效广告")
- classified = []
- counts = {"high_high": 0, "high_low": 0, "mid_high": 0, "mid_low": 0, "low_high": 0, "low_low": 0}
- for _, row in df_valid.iterrows():
- eff = float(row["efficiency"]) if pd.notna(row.get("efficiency")) else 0.0
- cost = float(row["cost"])
- if eff >= thresholds["roi_high"]:
- roi_level = "high"
- elif eff >= thresholds["roi_low"]:
- roi_level = "mid"
- else:
- roi_level = "low"
- volume_level = "high" if cost >= thresholds["cost_mid"] else "low"
- quadrant = f"{roi_level}_{volume_level}"
- counts[quadrant] = counts.get(quadrant, 0) + 1
- item = row.to_dict()
- item["roi_level"] = roi_level
- item["volume_level"] = volume_level
- item["quadrant"] = quadrant
- classified.append(item)
- lines = [
- f"广告分类结果({len(classified)}条有效,{len(df_nosample)}条样本不足)",
- f"阈值: ROI高={thresholds['roi_high']:.4f}, ROI低={thresholds['roi_low']:.4f}, 消耗中位={thresholds['cost_mid']:.0f}元",
- "",
- "分布:",
- ]
- for q, c in counts.items():
- roi, vol = q.split("_")
- lines.append(f" {roi}ROI + {vol}跑量: {c}条")
- return ToolResult(
- title=f"广告分类({len(classified)}条)",
- output="\n".join(lines),
- metadata={
- "classified_ads": classified,
- "nosample_ads": df_nosample.to_dict("records"),
- "distribution": counts,
- "thresholds_used": thresholds,
- },
- )
- except Exception as e:
- logger.error("classify_ads 失败: %s", e, exc_info=True)
- return ToolResult(title="classify_ads 失败", output=str(e))
- @tool(description="计算出价调整方案,策略参数(决策矩阵、幅度、保护规则)全部从外部传入")
- async def compute_bid_adjustment(
- classified_ads_json: str,
- strategy: str,
- decision_matrix_json: str = "",
- max_increase_pct: float = 0.15,
- max_decrease_pct: float = -0.15,
- protect_cold_start: bool = True,
- cold_start_hours: int = 48,
- cold_start_min_conversions: int = 6,
- protect_compensation: bool = True,
- compensation_min_conversions: int = 6,
- min_bid: int = 10,
- max_bid: int = 10000,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 根据分类结果和策略参数,计算每条广告的出价调整方案。
- Args:
- classified_ads_json: 分类后的广告数据 JSON(来自 classify_ads)
- strategy: 策略名称(aggressive_scale_down / moderate_scale_down / maintain / moderate_scale_up / aggressive_scale_up)
- decision_matrix_json: 决策矩阵 JSON。为空则从配置层加载。
- 格式: {"high_high": ["keep", 0.0], "mid_high": ["decrease", -0.05], ...}
- max_increase_pct: 最大提价幅度(默认 0.15 = +15%)
- max_decrease_pct: 最大降价幅度(默认 -0.15 = -15%)
- protect_cold_start: 是否启用冷启动保护
- cold_start_hours: 冷启动期小时数
- cold_start_min_conversions: 冷启动最少转化数
- protect_compensation: 是否启用赔付保护
- compensation_min_conversions: 赔付门槛转化数
- min_bid: 最低出价(分)
- max_bid: 最高出价(分)
- """
- try:
- classified_ads = json.loads(classified_ads_json) if isinstance(classified_ads_json, str) else classified_ads_json
- # 加载决策矩阵
- if decision_matrix_json:
- matrix = json.loads(decision_matrix_json) if isinstance(decision_matrix_json, str) else decision_matrix_json
- # 转为 tuple
- matrix = {k: tuple(v) for k, v in matrix.items()}
- else:
- config = get_strategy_config()
- matrix = get_decision_matrix(config, strategy)
- results = []
- cold_start_count = 0
- for ad in classified_ads:
- conversions = int(ad.get("conversions_count", 0) or 0)
- quadrant = ad.get("quadrant", f"{ad.get('roi_level', 'mid')}_{ad.get('volume_level', 'low')}")
- # --- 冷启动保护 ---
- is_cold_start = False
- cold_start_reason = ""
- if protect_cold_start:
- create_time = ad.get("create_time")
- if create_time and pd.notna(create_time):
- try:
- if isinstance(create_time, str):
- ct = datetime.strptime(create_time[:19], "%Y-%m-%d %H:%M:%S")
- else:
- ct = pd.Timestamp(create_time).to_pydatetime()
- hours_since = (datetime.now() - ct).total_seconds() / 3600
- if hours_since < cold_start_hours:
- is_cold_start = True
- cold_start_reason = f"冷启动期({hours_since:.0f}h<{cold_start_hours}h)"
- except (ValueError, TypeError):
- pass
- if conversions < cold_start_min_conversions and not is_cold_start:
- is_cold_start = True
- cold_start_reason = f"转化不足({conversions}<{cold_start_min_conversions})"
- if is_cold_start:
- cold_start_count += 1
- action, adj_ratio = "observe", 0.0
- else:
- action, adj_ratio = matrix.get(quadrant, ("keep", 0.0))
- # 限制幅度范围
- if adj_ratio > 0:
- adj_ratio = min(adj_ratio, max_increase_pct)
- elif adj_ratio < 0:
- adj_ratio = max(adj_ratio, max_decrease_pct)
- # --- 赔付保护 ---
- if protect_compensation and action == "close":
- if 3 <= conversions < compensation_min_conversions:
- action = "observe"
- adj_ratio = 0.0
- cold_start_reason = f"接近赔付门槛({conversions}次转化,等待积累到{compensation_min_conversions})"
- # 计算新出价
- bid = ad.get("bid_amount")
- bid_val = float(bid) if bid and pd.notna(bid) else None
- new_bid = None
- if bid_val and action in ("increase", "decrease"):
- new_bid = max(min_bid, min(max_bid, int(bid_val * (1 + adj_ratio))))
- elif bid_val:
- new_bid = int(bid_val)
- results.append({
- "date": datetime.now().strftime("%Y-%m-%d"),
- "ad_id": int(ad.get("ad_id", 0)),
- "ad_name": str(ad.get("ad_name", "")),
- "account_id": int(ad.get("account_id", 0)),
- "roi_level": ad.get("roi_level", ""),
- "volume_level": ad.get("volume_level", ""),
- "quadrant": quadrant,
- "efficiency": round(float(ad.get("efficiency", 0) or 0), 4),
- "cost": round(float(ad.get("cost", 0)), 2),
- "open_count": int(ad.get("open_count", 0)),
- "conversions_count": conversions,
- "is_cold_start": is_cold_start,
- "cold_start_reason": cold_start_reason,
- "current_bid": int(bid_val) if bid_val else None,
- "new_bid": new_bid,
- "adjustment_ratio": f"{adj_ratio:+.0%}" if adj_ratio != 0 else "-",
- "action": action,
- "ad_status": str(ad.get("ad_status", "")),
- })
- # 汇总统计
- action_counts = {}
- for r in results:
- action_counts[r["action"]] = action_counts.get(r["action"], 0) + 1
- action_labels = [
- ("keep", "保持不动"), ("increase", "提价放量"), ("decrease", "降价控量"),
- ("close", "建议关停"), ("observe", "观察不动"),
- ]
- lines = [
- f"出价调整方案({len(results)}条广告)",
- f"策略: {strategy}",
- "",
- ]
- for act, label in action_labels:
- sub = [r for r in results if r["action"] == act]
- if not sub:
- continue
- lines.append(f"【{label}({act})- {len(sub)}个】")
- for item in sub[:5]:
- bid_info = f"出价:{item['current_bid']}→{item['new_bid']}分 {item['adjustment_ratio']}" if item["current_bid"] else "无出价"
- lines.append(
- f" {item['ad_id']} | ROI:{item['roi_level']}/量:{item['volume_level']} | "
- f"效率:{item['efficiency']} | 消耗:{item['cost']:.0f}元 | {bid_info}"
- )
- if len(sub) > 5:
- lines.append(f" ... 还有 {len(sub)-5} 个")
- lines.append("")
- if cold_start_count > 0:
- cold_ads = [r for r in results if r["is_cold_start"]]
- lines.append(f"【冷启动保护 - {cold_start_count}个】")
- for item in cold_ads[:5]:
- lines.append(f" {item['ad_id']} | {item['cold_start_reason']} | 转化:{item['conversions_count']}")
- if cold_start_count > 5:
- lines.append(f" ... 还有 {cold_start_count - 5} 个")
- lines.append("")
- summary_parts = [f"{label}:{action_counts.get(act, 0)}" for act, label in action_labels]
- lines.append(f"合计: {' / '.join(summary_parts)} / 冷启动保护:{cold_start_count}")
- # 添加 JSON 数据块供执行 Agent 使用
- lines.append("\n" + "=" * 60)
- lines.append("执行数据(JSON格式,供执行Agent使用):")
- lines.append("```json")
- lines.append(json.dumps({"adjustment_plan": results}, ensure_ascii=False, indent=2))
- lines.append("```")
- return ToolResult(
- title=f"出价调整方案({len(results)}条,{strategy})",
- output="\n".join(lines),
- metadata={
- "adjustment_plan": results,
- "strategy": strategy,
- "action_counts": action_counts,
- "cold_start_count": cold_start_count,
- "params": {
- "max_increase_pct": max_increase_pct,
- "max_decrease_pct": max_decrease_pct,
- "protect_cold_start": protect_cold_start,
- "protect_compensation": protect_compensation,
- },
- },
- )
- except Exception as e:
- logger.error("compute_bid_adjustment 失败: %s", e, exc_info=True)
- return ToolResult(title="compute_bid_adjustment 失败", output=str(e))
- # ═════════════════════════════════════════════════════════════
- # 执行层 — 调用 API 执行调整
- # ═════════════════════════════════════════════════════════════
- @tool(description="执行出价调整方案,批量调用 API 修改出价")
- async def bid_adjustment_execute(
- adjustment_plan_json: str,
- account_id: int,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 批量执行出价调整。
- Args:
- adjustment_plan_json: 调整方案 JSON 字符串或列表,每项包含 ad_id, new_bid, action
- account_id: 账户ID
- """
- # 解析 JSON(支持字符串或已解析的列表)
- if isinstance(adjustment_plan_json, str):
- try:
- data = json.loads(adjustment_plan_json)
- # 如果是 {"adjustment_plan": [...]} 格式,提取列表
- if isinstance(data, dict) and "adjustment_plan" in data:
- adjustment_plan = data["adjustment_plan"]
- else:
- adjustment_plan = data
- except json.JSONDecodeError as e:
- return ToolResult(
- title="执行失败",
- output=f"JSON 解析失败: {e}",
- error=str(e),
- )
- else:
- adjustment_plan = adjustment_plan_json
- success_count = 0
- failed_count = 0
- errors = []
- for item in adjustment_plan:
- if item["action"] not in ("increase", "decrease"):
- continue
- try:
- await ad_update(
- account_id=account_id,
- adgroup_id=item["ad_id"],
- bid_amount=item["new_bid"]
- )
- success_count += 1
- logger.info("调整出价: ad_id=%s, new_bid=%s", item["ad_id"], item["new_bid"])
- except Exception as e:
- failed_count += 1
- error_msg = f"ad_id={item['ad_id']}: {str(e)}"
- errors.append(error_msg)
- logger.error("执行失败: %s", error_msg)
- output_lines = [
- "执行完成:",
- f"- 成功调整: {success_count} 个",
- f"- 失败: {failed_count} 个",
- ]
- if errors:
- output_lines.append("\n失败详情:")
- for err in errors[:10]:
- output_lines.append(f" {err}")
- if len(errors) > 10:
- output_lines.append(f" ... 还有 {len(errors)-10} 个错误")
- return ToolResult(
- title="出价调整执行结果",
- output="\n".join(output_lines),
- )
- # ═════════════════════════════════════════════════════════════
- # 兼容层 — 保留旧接口,内部调用新工具链
- # ═════════════════════════════════════════════════════════════
- # 保留旧名称引用,避免 config.py 中的工具白名单报错
- account_evaluate = get_account_summary
- budget_calculate_from_data = None # 已废弃,功能拆分到 get_ad_performance + compute_* 工具链
|