| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464 |
- """
- 广告决策引擎 — auto_put_ad_mini
- 智能引擎:
- - LLM 推理 + 候选信号(ROI/裂变/CTR/消耗)驱动
- - 三级分类:零消耗待关停(规则)+ 待优化评估(LLM)+ 正常运行(规则)
- - 年龄保护三层架构(冷启动 / 早期成长 / 成熟期)
- """
- import logging
- import sys
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional, Tuple
- import numpy as np
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- _MINI_DIR = Path(__file__).resolve().parent.parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- from config import (
- AUDIENCE_TIER_PATTERNS,
- BID_ADJUSTMENT_ENABLED,
- BID_DOWN_ROI_FACTOR,
- BID_UP_ROI_FACTOR,
- BID_UP_MAX_SPEND,
- BID_CHANGE_MIN_PCT,
- BID_CHANGE_MAX_PCT,
- BID_UP_MIN_PCT,
- BID_UP_MAX_PCT,
- BID_DOWN_MIN_PCT,
- BID_DOWN_MAX_PCT,
- BID_DOWN_MIN_SPEND,
- BID_FLOOR_YUAN,
- BID_CEILING_YUAN,
- COLD_START_DAYS, # ≤3天:冷启动期(极度保护)
- EARLY_GROWTH_DAYS, # 4-7天:早期成长期(可提价)
- AD_AGE_MATURE, # >7天:成熟期(全面调控)
- HIGH_BURN_AGE_THRESHOLD,
- HIGH_BURN_COST_THRESHOLD,
- ROI_LOW_FACTOR,
- ROI_LOW_MIN_YESTERDAY_COST,
- )
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # 策略参数动态加载(阈值不写死在代码中)
- # ═══════════════════════════════════════════
- STRATEGY_PARAMS_FILE = _MINI_DIR / "strategy_params.json"
- def _load_strategy_params():
- """从json文件加载策略参数,如不存在则使用config.py默认值"""
- import json
- if STRATEGY_PARAMS_FILE.exists():
- try:
- with open(STRATEGY_PARAMS_FILE) as f:
- data = json.load(f)
- return data.get("params", {})
- except Exception as e:
- logger.warning(f"加载strategy_params.json失败,使用config.py默认值: {e}")
- # 使用config.py默认值
- return {
- "ROI_LOW_FACTOR": ROI_LOW_FACTOR,
- "BID_DOWN_ROI_FACTOR": BID_DOWN_ROI_FACTOR,
- "BID_UP_ROI_FACTOR": BID_UP_ROI_FACTOR,
- }
- # ═══════════════════════════════════════════
- # 决策动作类型(扩展支持)
- # ═══════════════════════════════════════════
- VALID_ACTIONS = [
- "pause", # 关停
- "bid_down", # 降价
- "bid_up", # 提价
- "hold", # 保持
- "creative_adjust", # 调整素材方向(需人工执行)
- "observe", # 观察等待(数据不稳定或接近阈值)
- "scale_up", # 扩量:建议新增广告/创意(需人工执行)
- ]
- # ═══════════════════════════════════════════
- # 辅助函数
- # ═══════════════════════════════════════════
- def _extract_audience_tier(ad_name: str) -> str:
- """从广告名称提取人群包 R 层级(保留自 V2)。"""
- if not ad_name:
- return "default"
- for tier, patterns in AUDIENCE_TIER_PATTERNS:
- for pat in patterns:
- if pat.lower() in str(ad_name).lower():
- return tier
- return "default"
- def _calculate_ad_age_days(create_time) -> Optional[int]:
- """计算广告从创建到现在的天数。"""
- if pd.isna(create_time):
- return None
- try:
- if isinstance(create_time, str):
- ct = datetime.strptime(create_time[:19], "%Y-%m-%d %H:%M:%S")
- else:
- ct = pd.Timestamp(create_time).to_pydatetime()
- return (datetime.now() - ct).days
- except Exception:
- return None
- # ═══════════════════════════════════════════
- # 衰退检测辅助函数
- # ═══════════════════════════════════════════
- def _detect_decay_signals(
- ad_ids: List[int],
- raw_dir: Path,
- ad_status_dir: Path,
- end_date: str
- ) -> pd.DataFrame:
- """
- 检测广告衰退信号(提价、换创意)。
- 输入:
- ad_ids: 需要检测的广告 ID 列表
- raw_dir: 创意级原始 CSV 目录
- ad_status_dir: 广告状态 CSV 目录
- end_date: 结束日期(YYYYMMDD)
- 输出:
- DataFrame,列:ad_id, bid_increased_7d, creative_changed_7d, stable_spend_days_30d
- """
- end_dt = datetime.strptime(end_date, "%Y%m%d")
- # 加载近 14 天创意数据(用于检测创意变化)
- creative_dfs = []
- for i in range(14):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = raw_dir / f"creative_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- creative_dfs.append(df)
- if not creative_dfs:
- logger.warning("无创意数据,无法检测衰退信号")
- return pd.DataFrame(columns=["ad_id", "bid_increased_7d", "creative_changed_7d", "stable_spend_days_30d"])
- creative_df = pd.concat(creative_dfs, ignore_index=True)
- creative_df = creative_df[creative_df["ad_id"].isin(ad_ids)]
- # 加载近 14 天广告状态(用于检测提价)
- status_dfs = []
- for i in range(14):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = ad_status_dir / f"ad_status_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- status_dfs.append(df)
- if not status_dfs:
- logger.warning("无广告状态数据,无法检测提价")
- status_df = pd.DataFrame()
- else:
- status_df = pd.concat(status_dfs, ignore_index=True)
- status_df = status_df[status_df["ad_id"].isin(ad_ids)]
- # 检测创意变化(近 7 天 vs 前 7-14 天)
- recent_7d_start = (end_dt - timedelta(days=6)).strftime("%Y%m%d")
- prior_7d_start = (end_dt - timedelta(days=13)).strftime("%Y%m%d")
- prior_7d_end = (end_dt - timedelta(days=7)).strftime("%Y%m%d")
- recent_creatives = (
- creative_df[creative_df["date"] >= recent_7d_start]
- .groupby("ad_id")["creative_id"]
- .apply(set)
- )
- prior_creatives = (
- creative_df[
- (creative_df["date"] >= prior_7d_start) & (creative_df["date"] <= prior_7d_end)
- ]
- .groupby("ad_id")["creative_id"]
- .apply(set)
- )
- creative_changed = {}
- for ad_id in ad_ids:
- recent_set = recent_creatives.get(ad_id, set())
- prior_set = prior_creatives.get(ad_id, set())
- creative_changed[ad_id] = (recent_set != prior_set) and len(recent_set) > 0 and len(prior_set) > 0
- # 检测提价(近 7 天最大出价 > 前 7-14 天最大出价)
- bid_increased = {}
- if not status_df.empty:
- recent_bids = (
- status_df[status_df["date"] >= recent_7d_start]
- .groupby("ad_id")["bid_amount"]
- .max()
- )
- prior_bids = (
- status_df[
- (status_df["date"] >= prior_7d_start) & (status_df["date"] <= prior_7d_end)
- ]
- .groupby("ad_id")["bid_amount"]
- .max()
- )
- for ad_id in ad_ids:
- recent_bid = recent_bids.get(ad_id, 0)
- prior_bid = prior_bids.get(ad_id, 0)
- bid_increased[ad_id] = recent_bid > prior_bid
- else:
- bid_increased = {ad_id: False for ad_id in ad_ids}
- # 计算 30 天稳定消耗天数(加载 30 天创意数据)
- creative_30d_dfs = []
- for i in range(30):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = raw_dir / f"creative_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- creative_30d_dfs.append(df)
- if creative_30d_dfs:
- creative_30d_df = pd.concat(creative_30d_dfs, ignore_index=True)
- creative_30d_df = creative_30d_df[creative_30d_df["ad_id"].isin(ad_ids)]
- # 按 ad_id + date 聚合消耗
- daily_cost = (
- creative_30d_df.groupby(["ad_id", "date"])["cost"]
- .sum()
- .reset_index()
- )
- stable_days = {}
- for ad_id in ad_ids:
- ad_cost = daily_cost[daily_cost["ad_id"] == ad_id]
- stable_days[ad_id] = (ad_cost["cost"] >= 100).sum()
- else:
- stable_days = {ad_id: 0 for ad_id in ad_ids}
- # 组装结果(不含 stable_spend_days_30d,该值已在 metrics CSV 中)
- result = pd.DataFrame({
- "ad_id": ad_ids,
- "bid_increased_7d": [bid_increased.get(ad_id, False) for ad_id in ad_ids],
- "creative_changed_7d": [creative_changed.get(ad_id, False) for ad_id in ad_ids],
- })
- return result
- # ═══════════════════════════════════════════
- # 智能引擎工具 1:整理待评估广告数据
- # ═══════════════════════════════════════════
- @tool(description="智能引擎:整理需要关注的广告数据,供LLM推理决策")
- async def get_ads_for_review(
- ctx: ToolContext,
- metrics_csv: str = "",
- end_date: str = "yesterday",
- roi_review_factor: float = 0.8,
- min_spend_for_zero_spend: float = 10.0,
- ) -> ToolResult:
- """
- 不做决策,将广告分为三类,返回结构化摘要供 LLM 推理。
- 【零消耗待关停】:7日均消耗 < 10元(几乎零活动),规则直接关停
- 【待评估(候选)】:消耗有意义但指标异常(ROI偏低或衰退信号),需LLM评估
- 【正常运行】:无异常信号,仅返回摘要统计
- Args:
- metrics_csv: ROI 指标 CSV 路径(calculate_roi_metrics 输出)
- end_date: 结束日期
- roi_review_factor: 动态ROI < 全体均值 × 此值 → 进入 待评估(候选)(默认 0.8)
- min_spend_for_zero_spend: 7日均消耗低于此值(元)→ 零消耗待关停(默认 10.0)
- """
- try:
- # 加载策略参数(动态阈值,不写死在代码中)
- params = _load_strategy_params()
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- df = pd.read_csv(metrics_csv)
- if df.empty:
- return ToolResult(title="get_ads_for_review", output="指标数据为空")
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 前置过滤:SUSPEND / DELETED 广告不参与 LLM 评估(省 token + 避免无效候选)
- # 注:apply_decisions 里还有一道兜底过滤;这里是前置剪枝
- if "configured_status" in df.columns:
- before = len(df)
- excluded_status = {"AD_STATUS_SUSPEND", "AD_STATUS_DELETED"}
- df = df[~df["configured_status"].isin(excluded_status)].copy()
- dropped = before - len(df)
- if dropped > 0:
- logger.info(f"get_ads_for_review 入口过滤 {dropped} 条 SUSPEND/DELETED 广告")
- # ===== 新增:读取人群包级别统计数据(同类对比基准)=====
- logger.info("读取人群包级别统计数据...")
- by_tier_stats = {}
- by_tier_goal = {}
- try:
- # 读取 portfolio_summary JSON 文件
- portfolio_dir = _MINI_DIR / "outputs" / "portfolio_summary"
- portfolio_file = portfolio_dir / f"portfolio_summary_{end_date}.json"
- if portfolio_file.exists():
- import json
- with open(portfolio_file, "r", encoding="utf-8") as f:
- portfolio_data = json.load(f)
- by_tier_stats = portfolio_data.get("by_audience_tier", {})
- by_tier_goal = portfolio_data.get("by_tier_goal", {})
- logger.info(f"✅ 从 {portfolio_file.name} 加载了 {len(by_tier_stats)} 个人群包 + {len(by_tier_goal)} 个tier+goal组的统计数据")
- else:
- logger.warning(f"未找到 portfolio_summary 文件: {portfolio_file}(请确认 calculate_roi_metrics 已正常运行)")
- by_tier_goal = {}
- except Exception as e:
- logger.warning(f"读取人群包统计数据失败,使用空字典兜底: {e}")
- by_tier_stats = {}
- by_tier_goal = {}
- # 计算广告年龄
- df["ad_age_days"] = df["create_time"].apply(_calculate_ad_age_days)
- # 检测衰退信号
- raw_dir = _MINI_DIR / "outputs" / "raw"
- ad_status_dir = _MINI_DIR / "outputs" / "ad_status"
- decay_signals = _detect_decay_signals(
- ad_ids=df["ad_id"].tolist(),
- raw_dir=raw_dir,
- ad_status_dir=ad_status_dir,
- end_date=end_date,
- )
- df = df.merge(decay_signals, on="ad_id", how="left")
- df["bid_increased_7d"] = df["bid_increased_7d"].fillna(False)
- df["creative_changed_7d"] = df["creative_changed_7d"].fillna(False)
- df["stable_spend_days_30d"] = df["stable_spend_days_30d"].fillna(0)
- # 全体 ROI 分布
- roi_series = df["动态ROI_7日均值"].dropna()
- roi_mean = float(roi_series.median()) if len(roi_series) > 0 else 0.0
- roi_p25 = float(roi_series.quantile(0.25)) if len(roi_series) > 0 else 0.0
- roi_p50 = float(roi_series.quantile(0.50)) if len(roi_series) > 0 else 0.0
- roi_p75 = float(roi_series.quantile(0.75)) if len(roi_series) > 0 else 0.0
- roi_p90 = float(roi_series.quantile(0.90)) if len(roi_series) > 0 else 0.0
- # 加载调整历史(用于"持续低ROI升级关停"判断)
- from guardrails import AdjustmentHistory
- adjustment_history = AdjustmentHistory()
- # 分类(业务语言)
- zero_spend_ads = [] # 零消耗待关停
- need_review_ads = [] # 待优化评估
- normal_ads_count = 0 # 正常运行
- for _, row in df.iterrows():
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- f_roi = row.get("动态ROI_7日均值")
- ad_age = row.get("ad_age_days")
- bid_inc = bool(row.get("bid_increased_7d", False))
- creative_chg = bool(row.get("creative_changed_7d", False))
- stable_days = float(row.get("stable_spend_days_30d", 0) or 0)
- if pd.isna(stable_days):
- stable_days = 0.0
- bid_amount = float(row.get("bid_amount", 0) or 0)
- # 零消耗待关停:7日均消耗 < 10元,几乎无活动(强规则,仍保留)
- # ⚠️ 年龄保护分层:
- # - ≤3天(冷启动期):保护,不关停且不评估(不动)
- # - 4-7天(早期成长期)+ 低消耗:放行进入候选评估,可能命中"提价分支A"(投手经验1.1第一条:唤醒沉默)
- # - >7天(成熟期)+ 低消耗:正常应用零消耗规则关停
- if cost_7d_avg < min_spend_for_zero_spend:
- if ad_age is not None and ad_age <= COLD_START_DAYS:
- # ≤3天(冷启动期):保护,不关停也不评估
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 年龄{ad_age}天≤{COLD_START_DAYS}天(冷启动期),"
- f"虽消耗低({cost_7d_avg:.2f}元),但年龄保护不关停"
- )
- continue
- elif ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
- # 4-7天(早期成长期)+ 低消耗:放行进入候选评估
- # 不 continue,让其落到下方 bid_up_candidate_a 判断(投手经验1.1第一条)
- logger.debug(
- f"广告 {row['ad_id']} 年龄{ad_age}天属早期成长期+消耗低({cost_7d_avg:.2f}元),"
- f"放行评估提价分支A(唤醒沉默)"
- )
- else:
- # >7天的低消耗广告:正常应用零消耗规则
- zero_spend_ads.append({
- "ad_id": int(row["ad_id"]),
- "ad_name": str(row.get("ad_name", "")),
- "cost_7d_avg": round(cost_7d_avg, 2),
- })
- continue
- # 昨日消耗(用于关停消耗门槛:投手经验2.4 "当天消耗>300")
- yesterday_cost = float(row.get("yesterday_cost", 0) or 0)
- # 待优化评估:ROI 偏低 或 衰退信号 或 出价调整候选(需要智能判断)
- # ★ 关停条件对齐投手经验2.4:需要昨日消耗≥300 且 广告年龄>3天
- roi_low = (
- (not pd.isna(f_roi))
- and (f_roi < roi_mean * roi_review_factor)
- and yesterday_cost >= ROI_LOW_MIN_YESTERDAY_COST # 昨日消耗≥300
- and (ad_age is not None and ad_age > COLD_START_DAYS) # 广告年龄>3天
- )
- decay_signal = (
- stable_days >= 7
- and cost_7d_avg < 100
- and (bid_inc or creative_chg)
- )
- # ===== 裂变率 + CTR 数据(用于候选信号判断)=====
- ad_fission = row.get("T0裂变系数_7日均值")
- if ad_fission is None or pd.isna(ad_fission):
- ad_fission = None
- else:
- ad_fission = float(ad_fission)
- # 人群包名称(优先用 audience_tier=package_name,兜底用 ad_name 提取 R 层级)
- tier = str(row.get("audience_tier", "")) or _extract_audience_tier(str(row.get("ad_name", "")))
- tier_stats = by_tier_stats.get(tier, {})
- tier_fission_mean = tier_stats.get("fission_mean")
- # CTR 数据
- ad_view = float(row.get("view_count", 0) or 0)
- ad_click = float(row.get("valid_click_count", 0) or 0)
- ad_ctr = ad_click / ad_view if ad_view > 0 else None
- tier_ctr_mean = tier_stats.get("ctr_mean")
- # ===== 出价调整候选(投手经验1.1 双分支 - 不同观察角度,OR 关系)=====
- # 分支A(消耗角度 / 唤醒沉默):
- # 3-7天 + 日均消耗 < 10元 + CTR 正常 → 提价 5-10%
- # 含义:"广告还没跑起来,先用提价信号试探系统是否愿意分发"
- bid_up_candidate_a = (
- ad_age is not None
- and COLD_START_DAYS < ad_age <= EARLY_GROWTH_DAYS # 4-7天
- and cost_7d_avg < min_spend_for_zero_spend # 日均消耗 < 10元(与 L394 放行口径一致)
- and bid_amount > 0
- and (tier_ctr_mean is None or ad_ctr is None # CTR 不低于同类均值80%("正常"定义)
- or ad_ctr >= tier_ctr_mean * 0.80)
- ) if BID_ADJUSTMENT_ENABLED else False
- # 分支B(ROI+裂变角度 / 优质放量):
- # 3-7天 + 后端数据好 + 均值消耗 <1000 + ROI>渠道均值5% + 裂变>同类10% + CTR 正常 → 提价 5-10%
- # 含义:"数据已证明这条广告优质,提价拉更多量"
- bid_up_candidate_b = (
- (not pd.isna(f_roi))
- and f_roi > roi_mean * params["BID_UP_ROI_FACTOR"] # ROI 高于渠道均值5%
- and cost_7d_avg < BID_UP_MAX_SPEND # 消耗<1000(固定阈值)
- and bid_amount > 0
- and (ad_age is not None and ad_age <= EARLY_GROWTH_DAYS) # 仅4-7天可提价(≤3天已被冷启动排除)
- and (tier_fission_mean is None or ad_fission is None # 裂变高于同类均值10%(无数据时跳过)
- or ad_fission > tier_fission_mean * 1.10)
- and (tier_ctr_mean is None or ad_ctr is None # CTR 不低于同类均值80%
- or ad_ctr >= tier_ctr_mean * 0.80)
- ) if BID_ADJUSTMENT_ENABLED else False
- # 命中任一分支即视为提价候选(OR 关系,两条经验路径独立有效)
- bid_up_candidate = bid_up_candidate_a or bid_up_candidate_b
- # 降价:ROI低于渠道均值10% + 裂变低于同类10% + 消耗≥500元/天
- bid_down_candidate = (
- (not pd.isna(f_roi))
- and f_roi < roi_mean * params["BID_DOWN_ROI_FACTOR"] # ROI低于渠道均值10%
- and f_roi >= roi_mean * params["ROI_LOW_FACTOR"] # 但未达关停线
- and cost_7d_avg >= BID_DOWN_MIN_SPEND # 消耗≥500元/天
- and bid_amount > 0
- and (tier_fission_mean is None or ad_fission is None # 裂变低于同类均值10%(无数据时跳过)
- or ad_fission < tier_fission_mean * 0.90)
- ) if BID_ADJUSTMENT_ENABLED else False
- # ===== 持续低ROI升级关停(投手经验2.4:"降价后持续低于均值就关停")=====
- persistent_low_roi = False
- if (
- not roi_low # 当前未达关停线(ROI在0.75~0.90之间)
- and (not pd.isna(f_roi))
- and f_roi < roi_mean * params["BID_DOWN_ROI_FACTOR"] # ROI仍低于渠道均值10%
- and yesterday_cost >= ROI_LOW_MIN_YESTERDAY_COST # 昨日消耗≥300
- and (ad_age is not None and ad_age > COLD_START_DAYS) # 年龄>3天
- ):
- last_bd_ts = adjustment_history.get_last_bid_down_ts(str(row["ad_id"]))
- if last_bd_ts is not None:
- days_since_bd = (datetime.now() - last_bd_ts).days
- if days_since_bd >= 7:
- # 降价后≥7天ROI仍低 → 升级为关停候选
- persistent_low_roi = True
- roi_low = True # 升级!
- logger.info(
- f"广告 {row['ad_id']} 降价后{days_since_bd}天ROI仍低"
- f"({f_roi:.4f}<{roi_mean * params['BID_DOWN_ROI_FACTOR']:.4f}),升级为关停候选"
- )
- # 扩量候选:成熟期 + 消耗稳定 + 高消耗 + ROI正常(基于决策树)
- scale_up_candidate = (
- ad_age is not None
- and ad_age > 7 # 成熟期(>7天)
- and stable_days >= 7 # 消耗稳定(≥7天)
- and cost_7d_avg > 1000 # 高消耗(>1000元/天)
- and (not pd.isna(f_roi))
- and f_roi >= roi_mean * 0.9 # ROI正常(≥均值的90%)
- )
- # ===== 消耗稳定性前置门控(决策树:成熟期+不稳定→observe)=====
- if ad_age is not None and ad_age > EARLY_GROWTH_DAYS and stable_days < 7:
- # 成熟期广告但消耗不稳定:清除负向信号,不进入降价/关停评估
- if roi_low or decay_signal or bid_down_candidate:
- logger.debug(
- f"广告 {row['ad_id']} 成熟期({ad_age}天)但消耗不稳定(稳定天数{stable_days}<7),"
- f"清除负向信号: roi_low={roi_low}, decay={decay_signal}, bid_down={bid_down_candidate}"
- )
- roi_low = False
- decay_signal = False
- bid_down_candidate = False
- # ===== 年龄保护(第一优先级)=====
- # 无论是否满足候选条件,年龄保护都是第一层判断
- age_protected_skip = False # 标记是否被年龄保护排除
- if ad_age is not None:
- # 冷启动期(≤3天):极度保护,直接排除所有评估
- if ad_age <= COLD_START_DAYS:
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 处于冷启动期({ad_age}天≤{COLD_START_DAYS}天),"
- f"年龄保护规则自动排除(无论是否满足候选条件)"
- )
- age_protected_skip = True
- # 早期成长期(4-7天):仅允许提价和扩量评估
- # ⚠️ 核心修复:强制清除所有负向候选标志,无论是否有提价标志
- elif ad_age <= EARLY_GROWTH_DAYS:
- # 检查原始候选状态(用于日志)
- has_negative_flags = roi_low or decay_signal or bid_down_candidate
- has_positive_flags = bid_up_candidate or scale_up_candidate
- # 强制清除负向候选标志(即使同时有提价标志)
- if has_negative_flags:
- logger.debug(
- f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
- f"年龄保护强制清除负向候选标志:"
- f"roi_low={roi_low}→False, decay={decay_signal}→False, "
- f"bid_down={bid_down_candidate}→False"
- )
- roi_low = False
- decay_signal = False
- bid_down_candidate = False
- # 如果清除后没有任何候选标志 → 排除
- if not has_positive_flags:
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
- f"无提价/扩量候选标志,已排除"
- )
- age_protected_skip = True
- # else: 有提价或扩量候选,允许进入评估(负向标志已清除)
- # 年龄保护排除的广告,直接跳过
- if age_protected_skip:
- continue
- # ===== 业务逻辑判断(第二层)=====
- # 只有通过年龄保护的广告才会到这里
- # 早期成长期的广告只会带着 bid_up_candidate 或 scale_up_candidate 到这里
- if roi_low or decay_signal or bid_up_candidate or bid_down_candidate or scale_up_candidate:
- # ===== 构建广告字典(基础字段)=====
- ad_dict = {
- "ad_id": int(row["ad_id"]),
- "ad_name": str(row.get("ad_name", "")),
- "动态ROI_7日均值": round(float(f_roi), 4) if not pd.isna(f_roi) else None,
- "cost_7d_avg": round(cost_7d_avg, 2),
- "cost_7d_total": round(float(row.get("cost_7d_total", 0) or 0), 2),
- "ad_age_days": int(ad_age) if ad_age is not None else None,
- "bid_increased_7d": bid_inc,
- "creative_changed_7d": creative_chg,
- "stable_spend_days_30d": int(stable_days),
- "bid_amount": round(bid_amount, 2),
- "bid_candidate": "bid_up" if bid_up_candidate else ("bid_down" if bid_down_candidate else None),
- "scale_up_candidate": scale_up_candidate, # 新增:扩量候选标记
- # ===== 广告自身指标(供LLM对比同类基准) =====
- "ad_fission": round(ad_fission, 4) if ad_fission is not None else None,
- "ad_ctr": round(ad_ctr, 4) if ad_ctr is not None else None,
- "yesterday_cost": round(yesterday_cost, 2),
- }
- # ===== 新增:添加 audience_tier 和 roi_valid_days =====
- ad_dict["audience_tier"] = str(row.get("audience_tier", "default"))
- ad_dict["roi_valid_days"] = int(row.get("roi_valid_days", 0) or 0)
- # ===== 新增:添加同类对比数据 =====
- tier = ad_dict.get("audience_tier", "default")
- tier_stats = by_tier_stats.get(tier, {})
- ad_dict["tier_roi_p25"] = tier_stats.get("roi_p25")
- ad_dict["tier_roi_p50"] = tier_stats.get("roi_p50")
- ad_dict["tier_roi_p75"] = tier_stats.get("roi_p75")
- ad_dict["tier_roi_mean"] = tier_stats.get("roi_mean")
- # ===== 新增:裂变率同类对比数据(如果有)=====
- ad_dict["tier_fission_mean"] = tier_stats.get("fission_mean")
- ad_dict["tier_fission_p50"] = tier_stats.get("fission_p50")
- # ===== 新增:CTR + 同类均值出价(基于 tier+goal 分组)=====
- ad_dict["tier_ctr_mean"] = tier_stats.get("ctr_mean")
- ad_goal = str(row.get("广告优化目标", ""))
- tier_goal_key = f"{tier}_{ad_goal}"
- tier_goal_stats = by_tier_goal.get(tier_goal_key, tier_stats)
- tier_bid_mean = tier_goal_stats.get("bid_mean")
- ad_dict["tier_bid_mean"] = tier_bid_mean # 同类(tier+goal)均值出价
- ad_dict["bid_up_target_min"] = round(tier_bid_mean * 1.05, 4) if tier_bid_mean else None
- ad_dict["bid_up_target_max"] = round(tier_bid_mean * 1.10, 4) if tier_bid_mean else None
- # 计算动态阈值(供LLM参考)
- tier_roi_p50 = tier_stats.get("roi_p50", roi_mean) # 兜底用全局均值
- # 关停线:中位数的 70-75%(低于25-30%)
- ad_dict["pause_line_min"] = round(tier_roi_p50 * 0.70, 4) if tier_roi_p50 else None
- ad_dict["pause_line_max"] = round(tier_roi_p50 * 0.75, 4) if tier_roi_p50 else None
- # 降价线:中位数的 85-90%(低于10-15%)
- ad_dict["bid_down_line_min"] = round(tier_roi_p50 * 0.85, 4) if tier_roi_p50 else None
- ad_dict["bid_down_line_max"] = round(tier_roi_p50 * 0.90, 4) if tier_roi_p50 else None
- # 提价线:中位数的 105-110%(高于5-10%)— 决策树标准
- ad_dict["bid_up_line_min"] = round(tier_roi_p50 * 1.05, 4) if tier_roi_p50 else None
- ad_dict["bid_up_line_max"] = round(tier_roi_p50 * 1.10, 4) if tier_roi_p50 else None
- # ===== 新增:年龄分段标签(基于决策树图片)=====
- if ad_age is not None:
- if ad_age <= COLD_START_DAYS: # ≤3天:冷启动期
- ad_dict["age_segment"] = "cold_start"
- ad_dict["age_protection_level"] = "极度保护(冷启动期)"
- ad_dict["allow_bid_down"] = False # 不允许降价
- ad_dict["allow_bid_up"] = False # 不允许提价
- elif ad_age <= EARLY_GROWTH_DAYS: # 4-7天:早期成长期
- ad_dict["age_segment"] = "early_growth"
- ad_dict["age_protection_level"] = "仅允许提价(早期成长期)"
- ad_dict["allow_bid_down"] = False # 不允许降价
- ad_dict["allow_bid_up"] = True # 允许提价(满足ROI+消耗条件时)
- ad_dict["max_bid_down_pct"] = 0 # 不允许降价
- else: # >7天:成熟期
- ad_dict["age_segment"] = "mature"
- ad_dict["age_protection_level"] = "正常调控(成熟期)"
- ad_dict["allow_bid_down"] = True
- ad_dict["allow_bid_up"] = True
- ad_dict["max_bid_down_pct"] = 0.05 # 最大降价5%(决策树上限)
- # ⚠️ 高燃烧预警:广告年龄>3天 且 昨日消耗>300元
- yesterday_cost = float(row.get("前1日消耗", 0) or 0)
- if ad_age > HIGH_BURN_AGE_THRESHOLD and yesterday_cost > HIGH_BURN_COST_THRESHOLD:
- ad_dict["high_burn_alert"] = True
- ad_dict["yesterday_cost"] = round(yesterday_cost, 2)
- else:
- ad_dict["high_burn_alert"] = False
- # ===== 调幅参数分离(基于候选类型)=====
- if bid_up_candidate:
- ad_dict["bid_change_min_pct"] = BID_UP_MIN_PCT # 0.05
- ad_dict["bid_change_max_pct"] = BID_UP_MAX_PCT # 0.10
- elif bid_down_candidate:
- ad_dict["bid_change_min_pct"] = BID_DOWN_MIN_PCT # 0.03
- ad_dict["bid_change_max_pct"] = BID_DOWN_MAX_PCT # 0.05
- else:
- # 兜底:roi_low/decay/scale_up 等非出价候选,LLM 仍可能建议调价
- ad_dict["bid_change_min_pct"] = BID_CHANGE_MIN_PCT # 0.03
- ad_dict["bid_change_max_pct"] = BID_CHANGE_MAX_PCT # 0.10
- # ★ 持续低ROI升级标记(告知LLM这是升级后的关停候选)
- if persistent_low_roi:
- ad_dict["persistent_low_roi"] = True
- ad_dict["recommendation_hint"] = "该广告降价后≥7天ROI仍低于渠道均值,建议关停"
- need_review_ads.append(ad_dict)
- continue
- # 正常运行:ROI 正常且无异常信号
- normal_ads_count += 1
- import json
- # ═══════════════════════════════════════════
- # 按 audience_tier 分组 need_review_ads(用于子 Agent 并行评估)
- # ═══════════════════════════════════════════
- review_by_tier: Dict[str, List[Dict]] = {}
- for ad in need_review_ads:
- tier = str(ad.get("audience_tier", "default") or "default")
- review_by_tier.setdefault(tier, []).append(ad)
- # tier 分组摘要(便于 LLM 快速判断是否需要分发子 Agent)
- tier_batches = sorted(
- [
- {
- "audience_tier": t,
- "count": len(ads),
- "ad_ids": [a.get("ad_id") for a in ads],
- }
- for t, ads in review_by_tier.items()
- ],
- key=lambda x: -x["count"],
- )
- result = {
- "summary": {
- "total": len(df),
- "zero_spend_ads": len(zero_spend_ads),
- "need_review_ads": len(need_review_ads),
- "normal_ads": normal_ads_count,
- "tier_groups": len(review_by_tier), # 并发批次数
- "max_batch_size": max((b["count"] for b in tier_batches), default=0),
- },
- "distribution": {
- "roi_mean": round(roi_mean, 4),
- "p25": round(roi_p25, 4),
- "p50": round(roi_p50, 4),
- "p75": round(roi_p75, 4),
- "p90": round(roi_p90, 4),
- },
- "bid_adjustment": {
- "enabled": BID_ADJUSTMENT_ENABLED,
- "bid_down_line": round(roi_mean * params["BID_DOWN_ROI_FACTOR"], 4),
- "bid_up_line": round(roi_mean * params["BID_UP_ROI_FACTOR"], 4),
- "bid_up_max_spend": BID_UP_MAX_SPEND,
- "roi_low_min_yesterday_cost": ROI_LOW_MIN_YESTERDAY_COST,
- },
- "thresholds_used": {
- "ROI_LOW_FACTOR": params["ROI_LOW_FACTOR"],
- "BID_DOWN_ROI_FACTOR": params["BID_DOWN_ROI_FACTOR"],
- "BID_UP_ROI_FACTOR": params["BID_UP_ROI_FACTOR"],
- "BID_UP_MAX_SPEND": BID_UP_MAX_SPEND,
- "ROI_LOW_MIN_YESTERDAY_COST": ROI_LOW_MIN_YESTERDAY_COST,
- "roi_mean": round(roi_mean, 4),
- "pause_line": round(roi_mean * params["ROI_LOW_FACTOR"], 4),
- "bid_down_line": round(roi_mean * params["BID_DOWN_ROI_FACTOR"], 4),
- "bid_up_line": round(roi_mean * params["BID_UP_ROI_FACTOR"], 4),
- },
- # 零消耗广告由规则全自动处理,LLM 无需逐条决策
- # 仅传入规模 + 10 条样本(供 LLM 追溯形态,避免 1000+ 条名单挤占 context)
- "zero_spend_ads_count": len(zero_spend_ads),
- "zero_spend_ads_samples": zero_spend_ads[:10],
- "need_review_ads": need_review_ads,
- # ★ 新增:按 tier 分组(用于 agent(task=[...]) 并发评估)
- "review_by_tier": review_by_tier,
- "tier_batches": tier_batches,
- }
- output_json = json.dumps(result, ensure_ascii=False, indent=2)
- return ToolResult(
- title=(
- f"广告分类(零消耗:{len(zero_spend_ads)} 待评估:{len(need_review_ads)} "
- f"分 {len(review_by_tier)} 个 tier 批次 正常:{normal_ads_count})"
- ),
- output=output_json,
- metadata={
- "total": len(df),
- "zero_spend_ads": len(zero_spend_ads),
- "need_review_ads": len(need_review_ads),
- "normal_ads": normal_ads_count,
- "tier_groups": len(review_by_tier),
- "roi_mean": roi_mean,
- "end_date": end_date,
- },
- )
- except Exception as e:
- logger.error("get_ads_for_review 失败: %s", e, exc_info=True)
- return ToolResult(title="get_ads_for_review 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 2:保存 LLM 决策结果
- # ═══════════════════════════════════════════
- @tool(description="智能引擎:接收LLM的决策列表,合并零消耗/正常运行类自动决策,保存为结构化结果")
- async def apply_decisions(
- ctx: ToolContext,
- decisions: str,
- end_date: str = "yesterday",
- metrics_csv: str = "",
- ) -> ToolResult:
- """
- 接收 LLM 的决策,合并【零消耗待关停】(自动关停)和【正常运行】(自动保持)广告,保存到 llm_decisions_{date}.csv。
- 决策分类:
- - 零消耗待关停:7日均消耗 < 10元,几乎无活动 → 规则判断自动关停
- - 待评估(候选):ROI 偏低、衰退信号、出价调整候选 → 智能判断
- - 正常运行:ROI 正常且无异常信号 → 规则判断自动保持
- Args:
- decisions: JSON 字符串,LLM 输出的【待评估(候选)】广告决策列表
- 格式:[{"ad_id": 123, "action": "pause"/"hold"/"bid_up"/"bid_down",
- "dimension": "...", "reason": "...", "confidence": "high"/"medium"/"low"}]
- end_date: 结束日期
- metrics_csv: ROI 指标 CSV 路径(用于获取【零消耗待关停】和【正常运行】广告)
- """
- import json
- try:
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 解析 LLM 决策
- try:
- llm_list = json.loads(decisions)
- except json.JSONDecodeError as e:
- return ToolResult(title="apply_decisions 失败", output=f"decisions 不是合法 JSON: {e}")
- if not isinstance(llm_list, list):
- return ToolResult(title="apply_decisions 失败", output="decisions 必须是 JSON 数组")
- # 加载零消耗待关停广告(规则判断)
- zero_spend_rows = []
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- try:
- df_metrics = pd.read_csv(metrics_csv)
- for _, row in df_metrics.iterrows():
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- ad_age = row.get("ad_age_days") # 获取广告年龄
- # ⚠️ 年龄保护:≤7天的广告不适用零消耗规则
- if cost_7d_avg < 10.0:
- # 检查广告年龄
- if ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
- # 4-7天或≤3天:保护,跳过
- logger.debug(
- f"零消耗规则跳过广告 {row['ad_id']}:年龄{ad_age}天≤{EARLY_GROWTH_DAYS}天"
- )
- continue
- # >7天的低消耗广告:正常应用零消耗规则
- # 优化reason表达:避免"0.00元"显示,改用"几乎无消耗"
- if cost_7d_avg == 0:
- reason_text = "7日几乎无消耗,长期无活动"
- else:
- reason_text = f"7日均消耗={cost_7d_avg:.2f}元,长期低消耗"
- zero_spend_rows.append({
- "ad_id": int(row["ad_id"]),
- "action": "pause",
- "dimension": "长期零消耗",
- "reason": reason_text,
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- except Exception as e:
- logger.warning("加载零消耗待关停广告失败(跳过): %s", e)
- # 合并 LLM 决策(标注来源 + 添加cost_7d_avg用于排序 + 冷启动期决策过滤)
- for item in llm_list:
- item["source"] = "智能判断"
- # ★ 关键修复:统一 ad_id 为 int,避免 int vs string 导致去重失败
- try:
- item["ad_id"] = int(item["ad_id"])
- except (ValueError, TypeError):
- pass
- ad_id = item.get("ad_id")
- action = item.get("action", "hold")
- # 从metrics中获取广告信息
- try:
- cost_row = df_metrics[df_metrics["ad_id"] == ad_id]
- if not cost_row.empty:
- row_data = cost_row.iloc[0]
- item["cost_7d_avg"] = float(row_data.get("cost_7d_avg", 0) or 0)
- # ===== 年龄保护兜底检查(阶段3)=====
- # 阶段1已做前置过滤,这里仅作兜底检查(理论上不应触发)
- ad_age_days = row_data.get("ad_age_days")
- if ad_age_days is not None:
- if ad_age_days <= COLD_START_DAYS: # ≤3天:冷启动期(极度保护)
- # 所有操作都改为observe
- if action in ["bid_down", "pause", "bid_up"]:
- original_action = action
- original_reason = item.get("reason", "")
- item["action"] = "observe"
- item["reason"] = f"{original_reason}(LLM建议{original_action},但广告处于冷启动期{ad_age_days}天,年龄保护规则自动改为观察)"
- item["confidence"] = "low"
- item["recommended_change_pct"] = None
- logger.error(
- f"⚠️ 兜底检查触发!广告 {ad_id} 处于冷启动期({ad_age_days}天≤{COLD_START_DAYS}天),"
- f"LLM建议 {original_action},已自动转换为 observe。"
- f"这不应该发生(阶段1应已过滤),请检查逻辑!"
- )
- elif ad_age_days <= EARLY_GROWTH_DAYS: # 4-7天:早期成长期(仅允许提价)
- # 不允许降价/关停
- if action in ["bid_down", "pause"]:
- original_action = action
- original_reason = item.get("reason", "")
- item["action"] = "observe"
- item["reason"] = f"{original_reason}(LLM建议{original_action},但广告处于早期成长期{ad_age_days}天,年龄保护规则仅允许提价,改为观察)"
- item["confidence"] = "low"
- item["recommended_change_pct"] = None
- logger.error(
- f"⚠️ 兜底检查触发!广告 {ad_id} 处于早期成长期({ad_age_days}天,4-{EARLY_GROWTH_DAYS}天),"
- f"LLM建议 {original_action},已自动转换为 observe。"
- f"这不应该发生(阶段1应已过滤),请检查逻辑!"
- )
- else:
- item["cost_7d_avg"] = 0.0
- except Exception as e:
- item["cost_7d_avg"] = 0.0
- logger.warning(f"处理广告 {ad_id} 信息时出错: {e}")
- # 加载正常运行广告(规则判断)
- normal_running_rows = []
- try:
- # 收集零消耗和待评估的 ad_id
- zero_spend_ad_ids = {row["ad_id"] for row in zero_spend_rows}
- need_review_ad_ids = {item["ad_id"] for item in llm_list}
- # 正常运行 = 所有广告 - 零消耗 - 待评估
- for _, row in df_metrics.iterrows():
- ad_id = int(row["ad_id"])
- if ad_id not in zero_spend_ad_ids and ad_id not in need_review_ad_ids:
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- f_roi = row.get("动态ROI_7日均值")
- ad_age_days = row.get("ad_age_days")
- # 冷启动保护:广告年龄 ≤ 3天(基于决策树)
- if ad_age_days is not None and ad_age_days <= COLD_START_DAYS:
- roi_str = f"{f_roi:.2f}" if not pd.isna(f_roi) else "数据不足"
- normal_running_rows.append({
- "ad_id": ad_id,
- "action": "hold",
- "dimension": "冷启动保护",
- "reason": f"广告年龄{ad_age_days}天 ≤ {COLD_START_DAYS}天(冷启动期),ROI={roi_str},消耗{cost_7d_avg:.2f}元/天,极度保护",
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- else:
- # 正常运行
- roi_str = f"{f_roi:.2f}" if not pd.isna(f_roi) else "数据不足"
- normal_running_rows.append({
- "ad_id": ad_id,
- "action": "hold",
- "dimension": "正常运行",
- "reason": f"ROI={roi_str},消耗正常({cost_7d_avg:.2f}元/天),保持当前出价",
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- except Exception as e:
- logger.warning("加载正常运行广告失败(跳过): %s", e)
- all_decisions = zero_spend_rows + llm_list + normal_running_rows
- if not all_decisions:
- return ToolResult(title="apply_decisions", output="无决策数据")
- df_out = pd.DataFrame(all_decisions)
- # ★ 统一 ad_id 为 int64,确保后续 merge/去重不因类型不匹配而失败
- df_out["ad_id"] = pd.to_numeric(df_out["ad_id"], errors="coerce").astype("Int64")
- # ===== 去重:同一 ad_id 只保留优先级最高的决策 =====
- # 优先级:智能判断 > 规则判断(LLM 的判断优先于规则默认值)
- source_priority = {"智能判断": 0, "llm_modified": 1, "规则判断": 2}
- df_out["_source_rank"] = df_out["source"].map(source_priority).fillna(9)
- df_out = df_out.sort_values("_source_rank").drop_duplicates(subset=["ad_id"], keep="first")
- df_out = df_out.drop(columns=["_source_rank"])
- logger.info(f"去重后决策数: {len(df_out)}(智能判断优先)")
- # ===== 关键修复:合并 metrics CSV 中的字段 =====
- # 从 metrics CSV 补充 ad_name, ad_age_days, cost_7d_avg, 动态ROI 等字段
- try:
- df_metrics_full = pd.read_csv(metrics_csv)
- df_metrics_full["ad_id"] = pd.to_numeric(df_metrics_full["ad_id"], errors="coerce").astype("Int64")
- # 选择需要合并的列(OUTPUT_COLUMNS中定义的所有列)
- merge_cols = [
- "ad_id", "account_id", "ad_name", "audience_tier", "create_time", "ad_age_days",
- "bid_amount", "yesterday_cost", "yesterday_revenue", "yesterday_roi",
- "cost_7d_total", "cost_7d_avg", "revenue_7d_total",
- "动态ROI", "动态ROI_7日均值", "cost_30d_total", "cost_30d_avg",
- "stable_spend_days_30d", "creative_count", "roi_valid_days"
- ]
- # 只保留存在的列
- merge_cols = [c for c in merge_cols if c in df_metrics_full.columns]
- df_metrics_merge = df_metrics_full[merge_cols]
- # 左连接:保留df_out的所有行,补充字段
- df_out = df_out.merge(df_metrics_merge, on="ad_id", how="left", suffixes=("", "_metrics"))
- logger.info(f"已从 metrics CSV 合并 {len(merge_cols)} 个字段")
- except Exception as e:
- logger.warning(f"合并 metrics 字段失败(决策CSV将缺少扩展字段): {e}")
- # 过滤:已暂停(SUSPEND)或腾讯侧已删除(is_deleted=True,由 sync_ad_status.py 写入)
- # 优先读 now()-1d 的 ad_status(和 sync_ad_status.py、im_approval 保持一致,
- # 因为 is_deleted 是"当前 API 状态快照",不绑定 end_date;且 sync 默认同步 T-1)
- # 若 T-1 CSV 不存在,降级到 end_date 当天的 CSV 作为兜底。
- sync_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- ad_status_path = _MINI_DIR / "outputs" / "ad_status" / f"ad_status_{sync_date}.csv"
- if not ad_status_path.exists():
- fallback = _MINI_DIR / "outputs" / "ad_status" / f"ad_status_{end_date}.csv"
- if fallback.exists():
- logger.info(f"ad_status T-1 CSV 不存在,降级使用 end_date={end_date} 的快照")
- ad_status_path = fallback
- if ad_status_path.exists():
- try:
- df_status = pd.read_csv(ad_status_path)
- # 1) SUSPEND
- suspended_mask = df_status["ad_status"] == "AD_STATUS_SUSPEND"
- # 2) 腾讯侧已删除(sync_ad_status.py 每日同步回写,向后兼容:列缺失即视为全 False)
- if "is_deleted" in df_status.columns:
- deleted_mask = df_status["is_deleted"].fillna(False).astype(bool)
- else:
- deleted_mask = pd.Series(False, index=df_status.index)
- excluded_ads = set(
- df_status[suspended_mask | deleted_mask]["ad_id"].tolist()
- )
- before_count = len(df_out)
- df_out = df_out[~df_out["ad_id"].isin(excluded_ads)]
- filtered_count = before_count - len(df_out)
- if filtered_count > 0:
- n_suspend = int(suspended_mask.sum())
- n_deleted = int(deleted_mask.sum())
- logger.info(
- f"过滤掉 {filtered_count} 个广告"
- f"(暂停 {n_suspend} + 腾讯侧已删除 {n_deleted})"
- )
- except Exception as e:
- logger.warning(f"加载广告状态数据失败,跳过过滤: {e}")
- # 确保必要列存在
- for col in ["ad_id", "action", "dimension", "reason", "confidence", "source"]:
- if col not in df_out.columns:
- df_out[col] = ""
- # 数值列用 None 而非空字符串,避免 float("") 异常
- for col in ["recommended_change_pct", "current_bid", "recommended_bid", "cost_7d_avg"]:
- if col not in df_out.columns:
- df_out[col] = None
- # 按7日均消耗降序排列(消耗高的广告排在前面,更需要关注)
- if "cost_7d_avg" in df_out.columns:
- df_out["cost_7d_avg"] = pd.to_numeric(df_out["cost_7d_avg"], errors="coerce").fillna(0)
- df_out = df_out.sort_values("cost_7d_avg", ascending=False).reset_index(drop=True)
- # ⚠️ 不再删除 cost_7d_avg,保留所有字段到最终报告
- # 保存
- reports_dir = _MINI_DIR / "outputs" / "reports"
- reports_dir.mkdir(parents=True, exist_ok=True)
- out_path = reports_dir / f"llm_decisions_{end_date}.csv"
- df_out.to_csv(out_path, index=False, encoding="utf-8-sig")
- pause_count = (df_out["action"] == "pause").sum()
- hold_count = (df_out["action"] == "hold").sum()
- bid_up_count = (df_out["action"] == "bid_up").sum()
- bid_down_count = (df_out["action"] == "bid_down").sum()
- output_parts = [
- f"智能引擎决策已保存: {out_path}",
- f" 关停: {pause_count} 个(含零消耗待关停: {len(zero_spend_rows)} 个)",
- f" 保持: {hold_count} 个(含正常运行: {len(normal_running_rows)} 个)",
- ]
- if bid_up_count > 0:
- output_parts.append(f" 提价: {bid_up_count} 个")
- if bid_down_count > 0:
- output_parts.append(f" 降价: {bid_down_count} 个")
- return ToolResult(
- title=f"智能引擎决策已保存({len(df_out)}条)",
- output="\n".join(output_parts),
- metadata={
- "csv_path": str(out_path),
- "total": len(df_out),
- "pause": int(pause_count),
- "hold": int(hold_count),
- "bid_up": int(bid_up_count),
- "bid_down": int(bid_down_count),
- "zero_spend_ads": len(zero_spend_rows),
- "normal_running_ads": len(normal_running_rows),
- "end_date": end_date,
- },
- )
- except Exception as e:
- logger.error("apply_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="apply_decisions 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 3:查询单个广告详情(Mode 2 支撑)
- # ═══════════════════════════════════════════
- @tool(description="查询单个广告的当前指标和历史数据")
- async def query_ad_detail(
- ctx: ToolContext,
- ad_id: str,
- metrics_csv: str = "",
- ) -> ToolResult:
- """
- 查询单个广告的当前指标 + 全局分布上下文(Mode 2 定向操作用)。
- Args:
- ctx: 工具上下文
- ad_id: 广告 ID(字符串或数字均可)
- metrics_csv: ROI 指标 CSV 路径(默认 outputs/metrics_temp.csv)
- Returns:
- ToolResult,包含该广告的详细指标和全局上下文
- """
- import json
- import os
- try:
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- metrics_path = Path(metrics_csv)
- if not metrics_path.exists():
- return ToolResult(
- title="query_ad_detail 失败",
- output=f"指标文件不存在: {metrics_csv},请先执行 calculate_roi_metrics",
- )
- # 检查数据新鲜度
- file_mtime = os.path.getmtime(metrics_path)
- age_hours = (datetime.now().timestamp() - file_mtime) / 3600
- freshness_warning = ""
- if age_hours > 24:
- freshness_warning = f"⚠️ 数据已过期({age_hours:.1f}小时前更新),建议先执行 fetch_creative_data + calculate_roi_metrics 刷新数据。\n\n"
- df = pd.read_csv(metrics_csv)
- # 查找目标广告
- ad_id_int = int(ad_id)
- ad_row = df[df["ad_id"] == ad_id_int]
- if ad_row.empty:
- return ToolResult(
- title="query_ad_detail",
- output=f"{freshness_warning}未找到广告 {ad_id},共有 {len(df)} 个广告",
- )
- row = ad_row.iloc[0]
- # 计算广告年龄
- ad_age_days = _calculate_ad_age_days(row.get("create_time"))
- # 全局 ROI 分布(使用中位数作为基准,避免被少数高ROI广告拉高)
- roi_series = df["动态ROI_7日均值"].dropna()
- roi_mean = float(roi_series.median()) if len(roi_series) > 0 else 0.0
- roi_low_line = roi_mean * ROI_LOW_FACTOR
- bid_down_line = roi_mean * BID_DOWN_ROI_FACTOR
- bid_up_line = roi_mean * BID_UP_ROI_FACTOR
- # 构建广告详情
- f_roi = row.get("动态ROI_7日均值")
- ad_detail = {
- "ad_id": ad_id_int,
- "ad_name": str(row.get("ad_name", "")),
- "bid_amount": round(float(row.get("bid_amount", 0) or 0), 2),
- "动态ROI_7日均值": round(float(f_roi), 4) if not pd.isna(f_roi) else None,
- "cost_7d_avg": round(float(row.get("cost_7d_avg", 0) or 0), 2),
- "cost_7d_total": round(float(row.get("cost_7d_total", 0) or 0), 2),
- "ad_age_days": ad_age_days,
- "configured_status": str(row.get("configured_status", "")),
- }
- # 检测干预信号
- try:
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- raw_dir = _MINI_DIR / "outputs" / "raw"
- ad_status_dir = _MINI_DIR / "outputs" / "ad_status"
- decay_signals = _detect_decay_signals(
- ad_ids=[ad_id_int],
- raw_dir=raw_dir,
- ad_status_dir=ad_status_dir,
- end_date=end_date,
- )
- if not decay_signals.empty:
- ds_row = decay_signals.iloc[0]
- ad_detail["bid_increased_7d"] = bool(ds_row.get("bid_increased_7d", False))
- ad_detail["creative_changed_7d"] = bool(ds_row.get("creative_changed_7d", False))
- except Exception as e:
- logger.warning("检测干预信号失败: %s", e)
- # 全局上下文
- global_context = {
- "全体动态ROI基准(中位数)": round(roi_mean, 4),
- "ROI关停线": round(roi_low_line, 4),
- "ROI降价线": round(bid_down_line, 4),
- "ROI提价线": round(bid_up_line, 4),
- "提价消耗上限": BID_UP_MAX_SPEND,
- "关停消耗门槛(昨日)": ROI_LOW_MIN_YESTERDAY_COST,
- }
- result = {
- "ad_detail": ad_detail,
- "global_context": global_context,
- }
- output = freshness_warning + json.dumps(result, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"广告 {ad_id} 详情",
- output=output,
- metadata=result,
- )
- except Exception as e:
- logger.error("query_ad_detail 失败: %s", e, exc_info=True)
- return ToolResult(title="query_ad_detail 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 4:修改已有决策(Mode 3 支撑)
- # ═══════════════════════════════════════════
- @tool(description="修改已有决策:修改指定广告的操作或调幅,也可新增决策")
- async def modify_decisions(
- ctx: ToolContext,
- modifications: str,
- decisions_csv: str = "",
- end_date: str = "yesterday",
- ) -> ToolResult:
- """
- 修改已有 llm_decisions_{date}.csv 中的决策(Mode 3 反馈修改用)。
- 支持两种修改方式:
- 1. 按 ad_id 精确修改/新增(upsert):
- [{"ad_id": "90289631207", "new_action": "bid_down", "new_change_pct": -0.05}]
- 2. 按过滤器批量修改:
- [{"filter": "all_bid_down", "new_change_pct": -0.03}]
- 支持: all_pause / all_bid_down / all_bid_up / all_llm
- Args:
- ctx: 工具上下文
- modifications: JSON 字符串,修改列表
- decisions_csv: 决策 CSV 路径(默认自动查找最新)
- end_date: 结束日期(用于查找默认 CSV)
- Returns:
- ToolResult,包含修改日志和新的 action 分布
- """
- import json
- import glob as glob_mod
- try:
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 解析修改列表
- try:
- mod_list = json.loads(modifications)
- except json.JSONDecodeError as e:
- return ToolResult(title="modify_decisions 失败", output=f"modifications 不是合法 JSON: {e}")
- if not isinstance(mod_list, list):
- return ToolResult(title="modify_decisions 失败", output="modifications 必须是 JSON 数组")
- # 定位决策 CSV
- if not decisions_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- # 先找当天的,再找最新的
- target_path = reports_dir / f"llm_decisions_{end_date}.csv"
- if target_path.exists():
- decisions_csv = str(target_path)
- else:
- # 查找最新的 llm_decisions_*.csv
- pattern = str(reports_dir / "llm_decisions_*.csv")
- files = sorted(glob_mod.glob(pattern), reverse=True)
- if files:
- decisions_csv = files[0]
- else:
- return ToolResult(
- title="modify_decisions 失败",
- output="未找到任何已有决策文件(llm_decisions_*.csv),请先执行全量分析",
- )
- decisions_path = Path(decisions_csv)
- if not decisions_path.exists():
- return ToolResult(title="modify_decisions 失败", output=f"决策文件不存在: {decisions_csv}")
- df = pd.read_csv(decisions_csv)
- if df.empty:
- return ToolResult(title="modify_decisions 失败", output="决策文件为空")
- # 加载 metrics 获取 bid_amount
- metrics_csv_path = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- bid_map = {}
- try:
- df_metrics = pd.read_csv(metrics_csv_path)
- bid_map = dict(zip(df_metrics["ad_id"].astype(int), df_metrics["bid_amount"].fillna(0)))
- except Exception as e:
- logger.warning("加载 metrics 获取 bid_amount 失败: %s", e)
- change_log = []
- new_rows = []
- for mod in mod_list:
- if "filter" in mod:
- # 批量修改
- filter_type = mod["filter"]
- filter_map = {
- "all_pause": "pause",
- "all_bid_down": "bid_down",
- "all_bid_up": "bid_up",
- "all_llm": None, # 所有 LLM 决策
- }
- if filter_type not in filter_map:
- change_log.append(f"⚠️ 未知 filter: {filter_type},跳过")
- continue
- target_action = filter_map[filter_type]
- if target_action:
- mask = df["action"] == target_action
- else:
- mask = df["source"] == "llm"
- matched = mask.sum()
- if matched == 0:
- change_log.append(f"filter={filter_type}: 无匹配行")
- continue
- # 应用修改
- if "new_action" in mod:
- df.loc[mask, "action"] = mod["new_action"]
- if "new_change_pct" in mod:
- df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
- # 重算 recommended_bid
- for idx in df[mask].index:
- ad_id_val = int(df.at[idx, "ad_id"])
- bid = bid_map.get(ad_id_val, 0)
- if bid > 0:
- new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- df.at[idx, "recommended_bid"] = new_bid
- df.at[idx, "current_bid"] = round(bid, 2)
- if "new_dimension" in mod:
- df.loc[mask, "dimension"] = mod["new_dimension"]
- if "new_reason" in mod:
- df.loc[mask, "reason"] = mod["new_reason"]
- df.loc[mask, "source"] = "llm_modified"
- change_log.append(f"filter={filter_type}: 修改 {matched} 行")
- elif "ad_id" in mod:
- # 精确修改/新增(upsert)
- target_id = int(mod["ad_id"])
- mask = df["ad_id"] == target_id
- if mask.any():
- # 修改已有行
- if "new_action" in mod:
- old_action = df.loc[mask, "action"].iloc[0]
- df.loc[mask, "action"] = mod["new_action"]
- change_log.append(f"ad_id={target_id}: action {old_action} → {mod['new_action']}")
- if "new_change_pct" in mod:
- df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
- bid = bid_map.get(target_id, 0)
- if bid > 0:
- new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- df.loc[mask, "recommended_bid"] = new_bid
- df.loc[mask, "current_bid"] = round(bid, 2)
- change_log.append(f"ad_id={target_id}: change_pct → {mod['new_change_pct']}")
- if "new_dimension" in mod:
- df.loc[mask, "dimension"] = mod["new_dimension"]
- if "new_reason" in mod:
- df.loc[mask, "reason"] = mod["new_reason"]
- df.loc[mask, "source"] = "llm_modified"
- else:
- # 新增行
- new_action = mod.get("new_action", "hold")
- change_pct = mod.get("new_change_pct")
- bid = bid_map.get(target_id, 0)
- new_bid = None
- if change_pct is not None and bid > 0:
- new_bid = round(bid * (1 + change_pct), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- new_row = {
- "ad_id": target_id,
- "action": new_action,
- "dimension": mod.get("new_dimension", "用户指定"),
- "reason": mod.get("new_reason", "用户定向操作"),
- "confidence": mod.get("confidence", "high"),
- "source": "llm_modified",
- "recommended_change_pct": change_pct,
- "current_bid": round(bid, 2) if bid > 0 else None,
- "recommended_bid": new_bid,
- }
- new_rows.append(new_row)
- change_log.append(f"ad_id={target_id}: 新增 action={new_action}")
- else:
- change_log.append(f"⚠️ 修改项缺少 ad_id 或 filter,跳过: {mod}")
- # 合并新增行
- if new_rows:
- df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
- # 保存(覆盖原文件)
- df.to_csv(decisions_csv, index=False, encoding="utf-8-sig")
- # 统计新的 action 分布
- action_dist = df["action"].value_counts().to_dict()
- output_parts = [
- f"决策已修改并保存: {decisions_csv}",
- "",
- "修改日志:",
- ]
- for log in change_log:
- output_parts.append(f" {log}")
- output_parts.extend([
- "",
- "当前 action 分布:",
- ])
- for action, count in action_dist.items():
- output_parts.append(f" {action}: {count} 个")
- output_parts.append(f" 总计: {len(df)} 个")
- return ToolResult(
- title=f"决策修改完成({len(change_log)}项变更)",
- output="\n".join(output_parts),
- metadata={
- "csv_path": str(decisions_csv),
- "changes": len(change_log),
- "action_distribution": action_dist,
- "total": len(df),
- },
- )
- except Exception as e:
- logger.error("modify_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="modify_decisions 失败", output=str(e))
|