| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486 |
- """
- 广告决策引擎 — auto_put_ad_mini
- 智能引擎:
- - LLM 推理 + 候选信号(ROI/裂变/CTR/消耗)驱动
- - 三级分类:零消耗待关停(规则)+ 待优化评估(LLM)+ 正常运行(规则)
- - 年龄保护三层架构(冷启动 / 早期成长 / 成熟期)
- """
- import logging
- import sys
- from datetime import datetime, timedelta
- from pathlib import Path
- from typing import Dict, List, Optional, Tuple
- import numpy as np
- import pandas as pd
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- _MINI_DIR = Path(__file__).resolve().parent.parent
- if str(_MINI_DIR) not in sys.path:
- sys.path.insert(0, str(_MINI_DIR))
- from config import (
- AUDIENCE_TIER_PATTERNS,
- BID_ADJUSTMENT_ENABLED,
- BID_DOWN_ROI_FACTOR,
- BID_UP_ROI_FACTOR,
- BID_UP_MAX_SPEND,
- BID_CHANGE_MIN_PCT,
- BID_CHANGE_MAX_PCT,
- BID_UP_MIN_PCT,
- BID_UP_MAX_PCT,
- BID_DOWN_MIN_PCT,
- BID_DOWN_MAX_PCT,
- BID_DOWN_MIN_SPEND,
- BID_FLOOR_YUAN,
- BID_CEILING_YUAN,
- COLD_START_DAYS, # ≤3天:冷启动期(极度保护)
- EARLY_GROWTH_DAYS, # 4-7天:早期成长期(可提价)
- AD_AGE_MATURE, # >7天:成熟期(全面调控)
- HIGH_BURN_AGE_THRESHOLD,
- HIGH_BURN_COST_THRESHOLD,
- ROI_LOW_FACTOR,
- ROI_LOW_MIN_YESTERDAY_COST,
- )
- logger = logging.getLogger(__name__)
- # ═══════════════════════════════════════════
- # 策略参数动态加载(阈值不写死在代码中)
- # ═══════════════════════════════════════════
- STRATEGY_PARAMS_FILE = _MINI_DIR / "strategy_params.json"
- def _load_strategy_params():
- """从json文件加载策略参数,如不存在则使用config.py默认值"""
- import json
- if STRATEGY_PARAMS_FILE.exists():
- try:
- with open(STRATEGY_PARAMS_FILE) as f:
- data = json.load(f)
- return data.get("params", {})
- except Exception as e:
- logger.warning(f"加载strategy_params.json失败,使用config.py默认值: {e}")
- # 使用config.py默认值
- return {
- "ROI_LOW_FACTOR": ROI_LOW_FACTOR,
- "BID_DOWN_ROI_FACTOR": BID_DOWN_ROI_FACTOR,
- "BID_UP_ROI_FACTOR": BID_UP_ROI_FACTOR,
- }
- # ═══════════════════════════════════════════
- # 决策动作类型(扩展支持)
- # ═══════════════════════════════════════════
- VALID_ACTIONS = [
- "pause", # 关停
- "bid_down", # 降价
- "bid_up", # 提价
- "hold", # 保持
- "creative_adjust", # 调整素材方向(需人工执行)
- "observe", # 观察等待(数据不稳定或接近阈值)
- "scale_up", # 扩量:建议新增广告/创意(需人工执行)
- ]
- # ═══════════════════════════════════════════
- # 辅助函数
- # ═══════════════════════════════════════════
- def _extract_audience_tier(ad_name: str) -> str:
- """从广告名称提取人群包 R 层级(保留自 V2)。"""
- if not ad_name:
- return "default"
- for tier, patterns in AUDIENCE_TIER_PATTERNS:
- for pat in patterns:
- if pat.lower() in str(ad_name).lower():
- return tier
- return "default"
- def _calculate_ad_age_days(create_time) -> Optional[int]:
- """计算广告从创建到现在的天数。"""
- if pd.isna(create_time):
- return None
- try:
- if isinstance(create_time, str):
- ct = datetime.strptime(create_time[:19], "%Y-%m-%d %H:%M:%S")
- else:
- ct = pd.Timestamp(create_time).to_pydatetime()
- return (datetime.now() - ct).days
- except Exception:
- return None
- # ═══════════════════════════════════════════
- # 衰退检测辅助函数
- # ═══════════════════════════════════════════
- def _detect_decay_signals(
- ad_ids: List[int],
- raw_dir: Path,
- ad_status_dir: Path,
- end_date: str
- ) -> pd.DataFrame:
- """
- 检测广告衰退信号(提价、换创意)。
- 输入:
- ad_ids: 需要检测的广告 ID 列表
- raw_dir: 创意级原始 CSV 目录
- ad_status_dir: 广告状态 CSV 目录
- end_date: 结束日期(YYYYMMDD)
- 输出:
- DataFrame,列:ad_id, bid_increased_7d, creative_changed_7d, stable_spend_days_30d
- """
- end_dt = datetime.strptime(end_date, "%Y%m%d")
- # 加载近 14 天创意数据(用于检测创意变化)
- creative_dfs = []
- for i in range(14):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = raw_dir / f"creative_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- creative_dfs.append(df)
- if not creative_dfs:
- logger.warning("无创意数据,无法检测衰退信号")
- return pd.DataFrame(columns=["ad_id", "bid_increased_7d", "creative_changed_7d", "stable_spend_days_30d"])
- creative_df = pd.concat(creative_dfs, ignore_index=True)
- creative_df = creative_df[creative_df["ad_id"].isin(ad_ids)]
- # 加载近 14 天广告状态(用于检测提价)
- status_dfs = []
- for i in range(14):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = ad_status_dir / f"ad_status_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- status_dfs.append(df)
- if not status_dfs:
- logger.warning("无广告状态数据,无法检测提价")
- status_df = pd.DataFrame()
- else:
- status_df = pd.concat(status_dfs, ignore_index=True)
- status_df = status_df[status_df["ad_id"].isin(ad_ids)]
- # 检测创意变化(近 7 天 vs 前 7-14 天)
- recent_7d_start = (end_dt - timedelta(days=6)).strftime("%Y%m%d")
- prior_7d_start = (end_dt - timedelta(days=13)).strftime("%Y%m%d")
- prior_7d_end = (end_dt - timedelta(days=7)).strftime("%Y%m%d")
- recent_creatives = (
- creative_df[creative_df["date"] >= recent_7d_start]
- .groupby("ad_id")["creative_id"]
- .apply(set)
- )
- prior_creatives = (
- creative_df[
- (creative_df["date"] >= prior_7d_start) & (creative_df["date"] <= prior_7d_end)
- ]
- .groupby("ad_id")["creative_id"]
- .apply(set)
- )
- creative_changed = {}
- for ad_id in ad_ids:
- recent_set = recent_creatives.get(ad_id, set())
- prior_set = prior_creatives.get(ad_id, set())
- creative_changed[ad_id] = (recent_set != prior_set) and len(recent_set) > 0 and len(prior_set) > 0
- # 检测提价(近 7 天最大出价 > 前 7-14 天最大出价)
- bid_increased = {}
- if not status_df.empty:
- recent_bids = (
- status_df[status_df["date"] >= recent_7d_start]
- .groupby("ad_id")["bid_amount"]
- .max()
- )
- prior_bids = (
- status_df[
- (status_df["date"] >= prior_7d_start) & (status_df["date"] <= prior_7d_end)
- ]
- .groupby("ad_id")["bid_amount"]
- .max()
- )
- for ad_id in ad_ids:
- recent_bid = recent_bids.get(ad_id, 0)
- prior_bid = prior_bids.get(ad_id, 0)
- bid_increased[ad_id] = recent_bid > prior_bid
- else:
- bid_increased = {ad_id: False for ad_id in ad_ids}
- # 计算 30 天稳定消耗天数(加载 30 天创意数据)
- creative_30d_dfs = []
- for i in range(30):
- date = (end_dt - timedelta(days=i)).strftime("%Y%m%d")
- csv_path = raw_dir / f"creative_{date}.csv"
- if csv_path.exists():
- df = pd.read_csv(csv_path)
- df["date"] = date
- creative_30d_dfs.append(df)
- if creative_30d_dfs:
- creative_30d_df = pd.concat(creative_30d_dfs, ignore_index=True)
- creative_30d_df = creative_30d_df[creative_30d_df["ad_id"].isin(ad_ids)]
- # 按 ad_id + date 聚合消耗
- daily_cost = (
- creative_30d_df.groupby(["ad_id", "date"])["cost"]
- .sum()
- .reset_index()
- )
- stable_days = {}
- for ad_id in ad_ids:
- ad_cost = daily_cost[daily_cost["ad_id"] == ad_id]
- stable_days[ad_id] = (ad_cost["cost"] >= 100).sum()
- else:
- stable_days = {ad_id: 0 for ad_id in ad_ids}
- # 组装结果(不含 stable_spend_days_30d,该值已在 metrics CSV 中)
- result = pd.DataFrame({
- "ad_id": ad_ids,
- "bid_increased_7d": [bid_increased.get(ad_id, False) for ad_id in ad_ids],
- "creative_changed_7d": [creative_changed.get(ad_id, False) for ad_id in ad_ids],
- })
- return result
- # ═══════════════════════════════════════════
- # 智能引擎工具 1:整理待评估广告数据
- # ═══════════════════════════════════════════
- @tool(description="智能引擎:整理需要关注的广告数据,供LLM推理决策")
- async def get_ads_for_review(
- ctx: ToolContext = None,
- metrics_csv: str = "",
- end_date: str = "yesterday",
- roi_review_factor: float = 0.8,
- min_spend_for_zero_spend: float = 10.0,
- ) -> ToolResult:
- """
- 不做决策,将广告分为三类,返回结构化摘要供 LLM 推理。
- 【零消耗待关停】:7日均消耗 < 10元(几乎零活动),规则直接关停
- 【待评估(候选)】:消耗有意义但指标异常(ROI偏低或衰退信号),需LLM评估
- 【正常运行】:无异常信号,仅返回摘要统计
- Args:
- metrics_csv: ROI 指标 CSV 路径(calculate_roi_metrics 输出)
- end_date: 结束日期
- roi_review_factor: 动态ROI < 全体均值 × 此值 → 进入 待评估(候选)(默认 0.8)
- min_spend_for_zero_spend: 7日均消耗低于此值(元)→ 零消耗待关停(默认 10.0)
- """
- try:
- # 加载策略参数(动态阈值,不写死在代码中)
- params = _load_strategy_params()
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- df = pd.read_csv(metrics_csv)
- if df.empty:
- return ToolResult(title="get_ads_for_review", output="指标数据为空")
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 前置过滤:SUSPEND / DELETED 广告不参与 LLM 评估(省 token + 避免无效候选)
- # 注:apply_decisions 里还有一道兜底过滤;这里是前置剪枝
- if "configured_status" in df.columns:
- before = len(df)
- excluded_status = {"AD_STATUS_SUSPEND", "AD_STATUS_DELETED"}
- df = df[~df["configured_status"].isin(excluded_status)].copy()
- dropped = before - len(df)
- if dropped > 0:
- logger.info(f"get_ads_for_review 入口过滤 {dropped} 条 SUSPEND/DELETED 广告")
- # ===== 白名单说明 =====
- # 白名单仅在执行阶段(execute_decisions)生效,用于限制实际API操作的账户范围。
- # 分析阶段不做白名单过滤,确保所有账户的广告都被评估并出现在审批表中。
- # ===== 新增:读取人群包级别统计数据(同类对比基准)=====
- logger.info("读取人群包级别统计数据...")
- by_tier_stats = {}
- by_tier_goal = {}
- try:
- # 读取 portfolio_summary JSON 文件
- portfolio_dir = _MINI_DIR / "outputs" / "portfolio_summary"
- portfolio_file = portfolio_dir / f"portfolio_summary_{end_date}.json"
- if portfolio_file.exists():
- import json
- with open(portfolio_file, "r", encoding="utf-8") as f:
- portfolio_data = json.load(f)
- by_tier_stats = portfolio_data.get("by_audience_tier", {})
- by_tier_goal = portfolio_data.get("by_tier_goal", {})
- logger.info(f"✅ 从 {portfolio_file.name} 加载了 {len(by_tier_stats)} 个人群包 + {len(by_tier_goal)} 个tier+goal组的统计数据")
- else:
- logger.warning(f"未找到 portfolio_summary 文件: {portfolio_file}(请确认 calculate_roi_metrics 已正常运行)")
- by_tier_goal = {}
- except Exception as e:
- logger.warning(f"读取人群包统计数据失败,使用空字典兜底: {e}")
- by_tier_stats = {}
- by_tier_goal = {}
- # 广告年龄:优先使用 metrics CSV 中已有的 ad_age_days(基于 end_dt 计算,与 ROI 数据口径一致)
- # ⚠️ 不再用 datetime.now() 重新计算,避免与 roi_calculator 的 end_dt 基准差 1 天
- if "ad_age_days" not in df.columns or df["ad_age_days"].isna().all():
- logger.warning("metrics CSV 缺少 ad_age_days 列,使用 datetime.now() 兜底计算")
- df["ad_age_days"] = df["create_time"].apply(_calculate_ad_age_days)
- # 检测衰退信号
- raw_dir = _MINI_DIR / "outputs" / "raw"
- ad_status_dir = _MINI_DIR / "outputs" / "ad_status"
- decay_signals = _detect_decay_signals(
- ad_ids=df["ad_id"].tolist(),
- raw_dir=raw_dir,
- ad_status_dir=ad_status_dir,
- end_date=end_date,
- )
- df = df.merge(decay_signals, on="ad_id", how="left")
- df["bid_increased_7d"] = df["bid_increased_7d"].fillna(False)
- df["creative_changed_7d"] = df["creative_changed_7d"].fillna(False)
- df["stable_spend_days_30d"] = df["stable_spend_days_30d"].fillna(0)
- # 全体 ROI 分布
- roi_series = df["动态ROI_7日均值"].dropna()
- # channel_roi_p50 = 渠道P50(全体广告"动态ROI 7日均值"的中位数),决策基准
- channel_roi_p50 = float(roi_series.median()) if len(roi_series) > 0 else 0.0
- roi_p25 = float(roi_series.quantile(0.25)) if len(roi_series) > 0 else 0.0
- roi_p75 = float(roi_series.quantile(0.75)) if len(roi_series) > 0 else 0.0
- roi_p90 = float(roi_series.quantile(0.90)) if len(roi_series) > 0 else 0.0
- # 加载调整历史(用于"持续低ROI升级关停"判断)
- try:
- from examples.auto_put_ad_mini.tools.guardrails import AdjustmentHistory
- adjustment_history = AdjustmentHistory()
- except ImportError:
- from types import SimpleNamespace
- adjustment_history = SimpleNamespace(was_recently_adjusted=lambda *a, **kw: False)
- logger.warning("guardrails.AdjustmentHistory 导入失败,跳过调整历史检查")
- # 分类(业务语言)
- zero_spend_ads = [] # 零消耗待关停
- need_review_ads = [] # 待优化评估
- normal_ads_count = 0 # 正常运行
- for _, row in df.iterrows():
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- dynamic_roi_7d = row.get("动态ROI_7日均值")
- ad_age = row.get("ad_age_days")
- bid_inc = bool(row.get("bid_increased_7d", False))
- creative_chg = bool(row.get("creative_changed_7d", False))
- stable_days = float(row.get("stable_spend_days_30d", 0) or 0)
- if pd.isna(stable_days):
- stable_days = 0.0
- bid_amount = float(row.get("bid_amount", 0) or 0)
- # 零消耗待关停:7日均消耗 < 10元,几乎无活动(强规则,仍保留)
- # ⚠️ 年龄保护分层:
- # - ≤3天(冷启动期):保护,不关停且不评估(不动)
- # - 4-7天(早期成长期)+ 低消耗:放行进入候选评估,可能命中"提价分支A"(投手经验1.1第一条:唤醒沉默)
- # - >7天(成熟期)+ 低消耗:正常应用零消耗规则关停
- if cost_7d_avg < min_spend_for_zero_spend:
- if ad_age is not None and ad_age <= COLD_START_DAYS:
- # ≤3天(冷启动期):保护,不关停也不评估
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 年龄{ad_age}天≤{COLD_START_DAYS}天(冷启动期),"
- f"虽消耗低({cost_7d_avg:.2f}元),但年龄保护不关停"
- )
- continue
- elif ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
- # 4-7天(早期成长期)+ 低消耗:放行进入候选评估
- # 不 continue,让其落到下方 bid_up_candidate_a 判断(投手经验1.1第一条)
- logger.debug(
- f"广告 {row['ad_id']} 年龄{ad_age}天属早期成长期+消耗低({cost_7d_avg:.2f}元),"
- f"放行评估提价分支A(唤醒沉默)"
- )
- else:
- # >7天的低消耗广告:正常应用零消耗规则
- zero_spend_ads.append({
- "ad_id": int(row["ad_id"]),
- "ad_name": str(row.get("ad_name", "")),
- "cost_7d_avg": round(cost_7d_avg, 2),
- })
- continue
- # 昨日消耗(用于关停消耗门槛:投手经验2.4 "当天消耗>300")
- yesterday_cost = float(row.get("yesterday_cost", 0) or 0)
- # 待优化评估:ROI 偏低 或 衰退信号 或 出价调整候选(需要智能判断)
- # ★ 关停条件对齐投手经验2.4:需要昨日消耗≥300 且 广告年龄>3天
- roi_low = (
- (not pd.isna(dynamic_roi_7d))
- and (dynamic_roi_7d < channel_roi_p50 * roi_review_factor)
- and yesterday_cost >= ROI_LOW_MIN_YESTERDAY_COST # 昨日消耗≥300
- and (ad_age is not None and ad_age > COLD_START_DAYS) # 广告年龄>3天
- )
- decay_signal = (
- stable_days >= 7
- and cost_7d_avg < 100
- and (bid_inc or creative_chg)
- )
- # ===== 裂变率 + CTR 数据(用于候选信号判断)=====
- ad_fission = row.get("T0裂变系数_7日均值")
- if ad_fission is None or pd.isna(ad_fission):
- ad_fission = None
- else:
- ad_fission = float(ad_fission)
- # 人群包名称(优先用 audience_tier=package_name,兜底用 ad_name 提取 R 层级)
- tier = str(row.get("audience_tier", "")) or _extract_audience_tier(str(row.get("ad_name", "")))
- tier_stats = by_tier_stats.get(tier, {})
- tier_fission_mean = tier_stats.get("fission_mean")
- # CTR 数据
- ad_view = float(row.get("view_count", 0) or 0)
- ad_click = float(row.get("valid_click_count", 0) or 0)
- ad_ctr = ad_click / ad_view if ad_view > 0 else None
- tier_ctr_mean = tier_stats.get("ctr_mean")
- # ===== 出价调整候选(投手经验1.1 双分支 - 不同观察角度,OR 关系)=====
- # 分支A(消耗角度 / 唤醒沉默):
- # 3-7天 + 日均消耗 < 10元 + CTR 正常 → 提价 5-10%
- # 含义:"广告还没跑起来,先用提价信号试探系统是否愿意分发"
- bid_up_candidate_a = (
- ad_age is not None
- and COLD_START_DAYS < ad_age <= EARLY_GROWTH_DAYS # 4-7天
- and cost_7d_avg < min_spend_for_zero_spend # 日均消耗 < 10元(与 L394 放行口径一致)
- and bid_amount > 0
- and (tier_ctr_mean is None or ad_ctr is None # CTR 不低于同类均值80%("正常"定义)
- or ad_ctr >= tier_ctr_mean * 0.80)
- ) if BID_ADJUSTMENT_ENABLED else False
- # 分支B(ROI+裂变角度 / 优质放量):
- # 3-7天 + 后端数据好 + 均值消耗 <1000 + ROI>渠道均值5% + 裂变>同类10% + CTR 正常 → 提价 5-10%
- # 含义:"数据已证明这条广告优质,提价拉更多量"
- bid_up_candidate_b = (
- (not pd.isna(dynamic_roi_7d))
- and dynamic_roi_7d > channel_roi_p50 * params["BID_UP_ROI_FACTOR"] # ROI 高于渠道均值5%
- and cost_7d_avg < BID_UP_MAX_SPEND # 消耗<1000(固定阈值)
- and bid_amount > 0
- and (ad_age is not None and ad_age <= EARLY_GROWTH_DAYS) # 仅4-7天可提价(≤3天已被冷启动排除)
- and (tier_fission_mean is None or ad_fission is None # 裂变高于同类均值10%(无数据时跳过)
- or ad_fission > tier_fission_mean * 1.10)
- and (tier_ctr_mean is None or ad_ctr is None # CTR 不低于同类均值80%
- or ad_ctr >= tier_ctr_mean * 0.80)
- ) if BID_ADJUSTMENT_ENABLED else False
- # 命中任一分支即视为提价候选(OR 关系,两条经验路径独立有效)
- bid_up_candidate = bid_up_candidate_a or bid_up_candidate_b
- # 降价候选(入池):ROI 在降价区间 + 消耗≥500元/天
- # ★ 裂变率条件已移至 LLM 层判断(fission_vs_tier 字段),规则层只负责入池
- bid_down_candidate = (
- (not pd.isna(dynamic_roi_7d))
- and dynamic_roi_7d < channel_roi_p50 * params["BID_DOWN_ROI_FACTOR"] # ROI 在降价区间
- and dynamic_roi_7d >= channel_roi_p50 * params["ROI_LOW_FACTOR"] # 但未达关停线
- and cost_7d_avg >= BID_DOWN_MIN_SPEND # 消耗有数据意义
- and bid_amount > 0
- ) if BID_ADJUSTMENT_ENABLED else False
- # ===== 持续低ROI升级关停(投手经验2.4:"降价后持续低于均值就关停")=====
- persistent_low_roi = False
- if (
- not roi_low # 当前未达关停线(ROI在0.75~0.90之间)
- and (not pd.isna(dynamic_roi_7d))
- and dynamic_roi_7d < channel_roi_p50 * params["BID_DOWN_ROI_FACTOR"] # ROI仍低于渠道均值10%
- and yesterday_cost >= ROI_LOW_MIN_YESTERDAY_COST # 昨日消耗≥300
- and (ad_age is not None and ad_age > COLD_START_DAYS) # 年龄>3天
- ):
- last_bd_ts = adjustment_history.get_last_bid_down_ts(str(row["ad_id"]))
- if last_bd_ts is not None:
- days_since_bd = (datetime.now() - last_bd_ts).days
- if days_since_bd >= 7:
- # 降价后≥7天ROI仍低 → 升级为关停候选
- persistent_low_roi = True
- roi_low = True # 升级!
- logger.info(
- f"广告 {row['ad_id']} 降价后{days_since_bd}天ROI仍低"
- f"({dynamic_roi_7d:.4f}<{channel_roi_p50 * params['BID_DOWN_ROI_FACTOR']:.4f}),升级为关停候选"
- )
- # 扩量候选:成熟期 + 消耗稳定 + 高消耗 + ROI正常(基于决策树)
- scale_up_candidate = (
- ad_age is not None
- and ad_age > 7 # 成熟期(>7天)
- and stable_days >= 7 # 消耗稳定(≥7天)
- and cost_7d_avg > 1000 # 高消耗(>1000元/天)
- and (not pd.isna(dynamic_roi_7d))
- and dynamic_roi_7d >= channel_roi_p50 * 0.9 # ROI正常(≥均值的90%)
- )
- # ===== 消耗稳定性前置门控(决策树:成熟期+不稳定→observe)=====
- if ad_age is not None and ad_age > EARLY_GROWTH_DAYS and stable_days < 7:
- # 成熟期广告但消耗不稳定:清除负向信号,不进入降价/关停评估
- if roi_low or decay_signal or bid_down_candidate:
- logger.debug(
- f"广告 {row['ad_id']} 成熟期({ad_age}天)但消耗不稳定(稳定天数{stable_days}<7),"
- f"清除负向信号: roi_low={roi_low}, decay={decay_signal}, bid_down={bid_down_candidate}"
- )
- roi_low = False
- decay_signal = False
- bid_down_candidate = False
- # ===== 年龄保护(第一优先级)=====
- # 无论是否满足候选条件,年龄保护都是第一层判断
- age_protected_skip = False # 标记是否被年龄保护排除
- if ad_age is not None:
- # 冷启动期(≤3天):极度保护,直接排除所有评估
- if ad_age <= COLD_START_DAYS:
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 处于冷启动期({ad_age}天≤{COLD_START_DAYS}天),"
- f"年龄保护规则自动排除(无论是否满足候选条件)"
- )
- age_protected_skip = True
- # 早期成长期(4-7天):仅允许提价和扩量评估
- # ⚠️ 核心修复:强制清除所有负向候选标志,无论是否有提价标志
- elif ad_age <= EARLY_GROWTH_DAYS:
- # 检查原始候选状态(用于日志)
- has_negative_flags = roi_low or decay_signal or bid_down_candidate
- has_positive_flags = bid_up_candidate or scale_up_candidate
- # 强制清除负向候选标志(即使同时有提价标志)
- if has_negative_flags:
- logger.debug(
- f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
- f"年龄保护强制清除负向候选标志:"
- f"roi_low={roi_low}→False, decay={decay_signal}→False, "
- f"bid_down={bid_down_candidate}→False"
- )
- roi_low = False
- decay_signal = False
- bid_down_candidate = False
- # 如果清除后没有任何候选标志 → 排除
- if not has_positive_flags:
- normal_ads_count += 1
- logger.debug(
- f"广告 {row['ad_id']} 处于早期成长期({ad_age}天),"
- f"无提价/扩量候选标志,已排除"
- )
- age_protected_skip = True
- # else: 有提价或扩量候选,允许进入评估(负向标志已清除)
- # 年龄保护排除的广告,直接跳过
- if age_protected_skip:
- continue
- # ===== 业务逻辑判断(第二层)=====
- # 只有通过年龄保护的广告才会到这里
- # 早期成长期的广告只会带着 bid_up_candidate 或 scale_up_candidate 到这里
- if roi_low or decay_signal or bid_up_candidate or bid_down_candidate or scale_up_candidate:
- # ===== 构建广告字典(基础字段)=====
- ad_dict = {
- "ad_id": int(row["ad_id"]),
- "ad_name": str(row.get("ad_name", "")),
- "动态ROI_7日均值": round(float(dynamic_roi_7d), 4) if not pd.isna(dynamic_roi_7d) else None,
- "cost_7d_avg": round(cost_7d_avg, 2),
- "cost_7d_total": round(float(row.get("cost_7d_total", 0) or 0), 2),
- "ad_age_days": int(ad_age) if ad_age is not None else None,
- "bid_increased_7d": bid_inc,
- "creative_changed_7d": creative_chg,
- "stable_spend_days_30d": int(stable_days),
- "bid_amount": round(bid_amount, 2),
- # ★ 客观信号(替代旧 bid_candidate 预设答案)
- "roi_zone": (
- "below_pause_line" if (not pd.isna(dynamic_roi_7d) and dynamic_roi_7d < channel_roi_p50 * params["ROI_LOW_FACTOR"])
- else "bid_down_zone" if bid_down_candidate
- else "above_bid_up_line" if (not pd.isna(dynamic_roi_7d) and dynamic_roi_7d > channel_roi_p50 * params["BID_UP_ROI_FACTOR"])
- else "normal"
- ),
- "bid_up_candidate": bid_up_candidate,
- "fission_vs_tier": (
- "high" if (ad_fission is not None and tier_fission_mean is not None and ad_fission >= tier_fission_mean * 1.10)
- else "low" if (ad_fission is not None and tier_fission_mean is not None and ad_fission < tier_fission_mean * 0.90)
- else "normal" if (ad_fission is not None and tier_fission_mean is not None)
- else "unknown"
- ),
- "scale_up_candidate": scale_up_candidate,
- # ===== 广告自身指标(供LLM对比同类基准) =====
- "ad_fission": round(ad_fission, 4) if ad_fission is not None else None,
- "ad_ctr": round(ad_ctr, 4) if ad_ctr is not None else None,
- "yesterday_cost": round(yesterday_cost, 2),
- }
- # ===== 新增:添加 audience_tier 和 roi_valid_days =====
- ad_dict["audience_tier"] = str(row.get("audience_tier", "default"))
- ad_dict["roi_valid_days"] = int(row.get("roi_valid_days", 0) or 0)
- # ===== 同类对比数据(仅裂变/CTR/出价,ROI 对比走渠道) =====
- tier = ad_dict.get("audience_tier", "default")
- tier_stats = by_tier_stats.get(tier, {})
- # ROI 对比走"渠道整体"(channel_roi_p50),故此处不注入 tier_roi_* 字段
- # 裂变率同类对比数据(裂变必须对比同人群)
- ad_dict["tier_fission_mean"] = tier_stats.get("fission_mean")
- ad_dict["tier_fission_p50"] = tier_stats.get("fission_p50")
- # ===== 新增:CTR + 同类均值出价(基于 tier+goal 分组)=====
- ad_dict["tier_ctr_mean"] = tier_stats.get("ctr_mean")
- ad_goal = str(row.get("广告优化目标", ""))
- tier_goal_key = f"{tier}_{ad_goal}"
- tier_goal_stats = by_tier_goal.get(tier_goal_key, tier_stats)
- tier_bid_mean = tier_goal_stats.get("bid_mean")
- ad_dict["tier_bid_mean"] = tier_bid_mean # 同类(tier+goal)均值出价
- ad_dict["bid_up_target_min"] = round(tier_bid_mean * 1.05, 4) if tier_bid_mean else None
- ad_dict["bid_up_target_max"] = round(tier_bid_mean * 1.10, 4) if tier_bid_mean else None
- # ROI 阈值线:基于"渠道P50"(channel_roi_p50,全体广告7日均值的中位数),严禁用同类
- # 精简为单值(减少 LLM 字段混淆,避免阈值幻觉)
- ad_dict["pause_line"] = round(channel_roi_p50 * params["ROI_LOW_FACTOR"], 4) if channel_roi_p50 else None # 关停线 = 渠道P50 × 0.75
- ad_dict["bid_down_line"] = round(channel_roi_p50 * params["BID_DOWN_ROI_FACTOR"], 4) if channel_roi_p50 else None # 降价线 = 渠道P50 × 0.90
- ad_dict["bid_up_line"] = round(channel_roi_p50 * params["BID_UP_ROI_FACTOR"], 4) if channel_roi_p50 else None # 提价线 = 渠道P50 × 1.05
- # ===== 新增:年龄分段标签(基于决策树图片)=====
- if ad_age is not None:
- if ad_age <= COLD_START_DAYS: # ≤3天:冷启动期
- ad_dict["age_segment"] = "cold_start"
- ad_dict["age_protection_level"] = "极度保护(冷启动期)"
- ad_dict["allow_bid_down"] = False # 不允许降价
- ad_dict["allow_bid_up"] = False # 不允许提价
- elif ad_age <= EARLY_GROWTH_DAYS: # 4-7天:早期成长期
- ad_dict["age_segment"] = "early_growth"
- ad_dict["age_protection_level"] = "仅允许提价(早期成长期)"
- ad_dict["allow_bid_down"] = False # 不允许降价
- ad_dict["allow_bid_up"] = True # 允许提价(满足ROI+消耗条件时)
- ad_dict["max_bid_down_pct"] = 0 # 不允许降价
- else: # >7天:成熟期
- ad_dict["age_segment"] = "mature"
- ad_dict["age_protection_level"] = "正常调控(成熟期)"
- ad_dict["allow_bid_down"] = True
- ad_dict["allow_bid_up"] = True
- ad_dict["max_bid_down_pct"] = 0.05 # 最大降价5%(决策树上限)
- # ⚠️ 高燃烧预警:广告年龄>3天 且 昨日消耗>300元
- yesterday_cost = float(row.get("前1日消耗", 0) or 0)
- if ad_age > HIGH_BURN_AGE_THRESHOLD and yesterday_cost > HIGH_BURN_COST_THRESHOLD:
- ad_dict["high_burn_alert"] = True
- ad_dict["yesterday_cost"] = round(yesterday_cost, 2)
- else:
- ad_dict["high_burn_alert"] = False
- # ===== 调幅参数分离(基于候选类型)=====
- if bid_up_candidate:
- ad_dict["bid_change_min_pct"] = BID_UP_MIN_PCT # 0.05
- ad_dict["bid_change_max_pct"] = BID_UP_MAX_PCT # 0.10
- elif bid_down_candidate:
- ad_dict["bid_change_min_pct"] = BID_DOWN_MIN_PCT # 0.03
- ad_dict["bid_change_max_pct"] = BID_DOWN_MAX_PCT # 0.05
- else:
- # 兜底:roi_low/decay/scale_up 等非出价候选,LLM 仍可能建议调价
- ad_dict["bid_change_min_pct"] = BID_CHANGE_MIN_PCT # 0.03
- ad_dict["bid_change_max_pct"] = BID_CHANGE_MAX_PCT # 0.10
- # ★ 持续低ROI升级标记(告知LLM这是升级后的关停候选)
- if persistent_low_roi:
- ad_dict["persistent_low_roi"] = True
- ad_dict["recommendation_hint"] = "该广告降价后≥7天ROI仍低于渠道均值,建议关停"
- need_review_ads.append(ad_dict)
- continue
- # 正常运行:ROI 正常且无异常信号
- normal_ads_count += 1
- import json
- # ═══════════════════════════════════════════
- # 按 audience_tier 分组 need_review_ads(用于子 Agent 并行评估)
- # ═══════════════════════════════════════════
- review_by_tier: Dict[str, List[Dict]] = {}
- for ad in need_review_ads:
- tier = str(ad.get("audience_tier", "default") or "default")
- review_by_tier.setdefault(tier, []).append(ad)
- # tier 分批:每个 tier 单独评估(降低单次 LLM 输入量,提升质量)
- tier_batches = sorted(
- [
- {
- "audience_tier": t,
- "count": len(ads),
- "ads": ads, # 完整广告数据
- }
- for t, ads in review_by_tier.items()
- ],
- key=lambda x: -x["count"],
- )
- result = {
- "summary": {
- "total": len(df),
- "zero_spend_ads": len(zero_spend_ads),
- "need_review_ads": len(need_review_ads),
- "normal_ads": normal_ads_count,
- "tier_groups": len(review_by_tier), # 并发批次数
- "max_batch_size": max((b["count"] for b in tier_batches), default=0),
- },
- "distribution": {
- "channel_roi_p50": round(channel_roi_p50, 4),
- "p25": round(roi_p25, 4),
- "p50": round(channel_roi_p50, 4),
- "p75": round(roi_p75, 4),
- "p90": round(roi_p90, 4),
- },
- "bid_adjustment": {
- "enabled": BID_ADJUSTMENT_ENABLED,
- "bid_down_line": round(channel_roi_p50 * params["BID_DOWN_ROI_FACTOR"], 4),
- "bid_up_line": round(channel_roi_p50 * params["BID_UP_ROI_FACTOR"], 4),
- "bid_up_max_spend": BID_UP_MAX_SPEND,
- "roi_low_min_yesterday_cost": ROI_LOW_MIN_YESTERDAY_COST,
- },
- "thresholds_used": {
- "ROI_LOW_FACTOR": params["ROI_LOW_FACTOR"],
- "BID_DOWN_ROI_FACTOR": params["BID_DOWN_ROI_FACTOR"],
- "BID_UP_ROI_FACTOR": params["BID_UP_ROI_FACTOR"],
- "BID_UP_MAX_SPEND": BID_UP_MAX_SPEND,
- "ROI_LOW_MIN_YESTERDAY_COST": ROI_LOW_MIN_YESTERDAY_COST,
- "channel_roi_p50": round(channel_roi_p50, 4),
- "pause_line": round(channel_roi_p50 * params["ROI_LOW_FACTOR"], 4),
- "bid_down_line": round(channel_roi_p50 * params["BID_DOWN_ROI_FACTOR"], 4),
- "bid_up_line": round(channel_roi_p50 * params["BID_UP_ROI_FACTOR"], 4),
- },
- # 零消耗广告由规则全自动处理,LLM 无需逐条决策
- # 仅传入规模 + 10 条样本(供 LLM 追溯形态,避免 1000+ 条名单挤占 context)
- "zero_spend_ads_count": len(zero_spend_ads),
- "zero_spend_ads_samples": zero_spend_ads[:10],
- # ★ 按 tier 分批评估(降低单次 LLM 输入量,提升决策质量)
- # tier_batches 包含完整广告数据,LLM 需循环处理每个 batch
- "tier_batches": tier_batches,
- "need_review_ads_total": len(need_review_ads), # 总数统计
- }
- output_json = json.dumps(result, ensure_ascii=False, indent=2)
- return ToolResult(
- title=(
- f"广告分类(零消耗:{len(zero_spend_ads)} 待评估:{len(need_review_ads)} "
- f"分 {len(review_by_tier)} 个 tier 批次 正常:{normal_ads_count})"
- ),
- output=output_json,
- metadata={
- "total": len(df),
- "zero_spend_ads": len(zero_spend_ads),
- "need_review_ads": len(need_review_ads),
- "normal_ads": normal_ads_count,
- "tier_groups": len(review_by_tier),
- "channel_roi_p50": channel_roi_p50,
- "end_date": end_date,
- },
- )
- except Exception as e:
- logger.error("get_ads_for_review 失败: %s", e, exc_info=True)
- return ToolResult(title="get_ads_for_review 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 2:保存 LLM 决策结果
- # ═══════════════════════════════════════════
- @tool(description="智能引擎:接收LLM的决策列表,合并零消耗/正常运行类自动决策,保存为结构化结果")
- async def apply_decisions(
- ctx: ToolContext = None,
- decisions: str = "",
- end_date: str = "yesterday",
- metrics_csv: str = "",
- ) -> ToolResult:
- """
- 接收 LLM 的决策,合并【零消耗待关停】(自动关停)和【正常运行】(自动保持)广告,保存到 llm_decisions_{date}.csv。
- 决策分类:
- - 零消耗待关停:7日均消耗 < 10元,几乎无活动 → 规则判断自动关停
- - 待评估(候选):ROI 偏低、衰退信号、出价调整候选 → 智能判断
- - 正常运行:ROI 正常且无异常信号 → 规则判断自动保持
- Args:
- decisions: JSON 字符串,LLM 输出的【待评估(候选)】广告决策列表
- 格式:[{"ad_id": 123, "action": "pause"/"hold"/"bid_up"/"bid_down",
- "dimension": "...", "reason": "...", "confidence": "high"/"medium"/"low"}]
- end_date: 结束日期
- metrics_csv: ROI 指标 CSV 路径(用于获取【零消耗待关停】和【正常运行】广告)
- """
- import json
- try:
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 解析 LLM 决策
- try:
- llm_list = json.loads(decisions)
- except json.JSONDecodeError as e:
- return ToolResult(title="apply_decisions 失败", output=f"decisions 不是合法 JSON: {e}")
- if not isinstance(llm_list, list):
- return ToolResult(title="apply_decisions 失败", output="decisions 必须是 JSON 数组")
- # 加载零消耗待关停广告(规则判断)
- zero_spend_rows = []
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- try:
- df_metrics = pd.read_csv(metrics_csv)
- for _, row in df_metrics.iterrows():
- # ⚠️ 状态过滤:跳过已关停的广告(避免重复决策)
- ad_status = row.get("configured_status", "")
- if ad_status in ["AD_STATUS_SUSPEND", "AD_STATUS_DELETED", "SUSPEND", "DELETED"]:
- continue
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- ad_age = row.get("ad_age_days") # 获取广告年龄
- # ⚠️ 年龄保护:≤7天的广告不适用零消耗规则
- if cost_7d_avg < 10.0:
- # 检查广告年龄
- if ad_age is not None and ad_age <= EARLY_GROWTH_DAYS:
- # 4-7天或≤3天:保护,跳过
- logger.debug(
- f"零消耗规则跳过广告 {row['ad_id']}:年龄{ad_age}天≤{EARLY_GROWTH_DAYS}天"
- )
- continue
- # >7天的低消耗广告:正常应用零消耗规则
- # 优化reason表达:避免"0.00元"显示,改用"几乎无消耗"
- if cost_7d_avg == 0:
- reason_text = "7日几乎无消耗,长期无活动"
- else:
- reason_text = f"7日均消耗={cost_7d_avg:.2f}元,长期低消耗"
- zero_spend_rows.append({
- "ad_id": int(row["ad_id"]),
- "action": "pause",
- "dimension": "长期零消耗",
- "reason": reason_text,
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- except Exception as e:
- logger.warning("加载零消耗待关停广告失败(跳过): %s", e)
- # 合并 LLM 决策(标注来源 + 添加cost_7d_avg用于排序 + 冷启动期决策过滤)
- for item in llm_list:
- item["source"] = "智能判断"
- # ★ 关键修复:统一 ad_id 为 int,避免 int vs string 导致去重失败
- try:
- item["ad_id"] = int(item["ad_id"])
- except (ValueError, TypeError):
- pass
- ad_id = item.get("ad_id")
- action = item.get("action", "hold")
- # 从metrics中获取广告信息
- try:
- cost_row = df_metrics[df_metrics["ad_id"] == ad_id]
- if not cost_row.empty:
- row_data = cost_row.iloc[0]
- item["cost_7d_avg"] = float(row_data.get("cost_7d_avg", 0) or 0)
- # ===== 年龄保护兜底检查(阶段3)=====
- # 阶段1已做前置过滤,这里仅作兜底检查(理论上不应触发)
- ad_age_days = row_data.get("ad_age_days")
- if ad_age_days is not None:
- if ad_age_days <= COLD_START_DAYS: # ≤3天:冷启动期(极度保护)
- # 所有操作都改为observe
- if action in ["bid_down", "pause", "bid_up"]:
- original_action = action
- original_reason = item.get("reason", "")
- item["action"] = "observe"
- item["reason"] = f"{original_reason}(LLM建议{original_action},但广告处于冷启动期{ad_age_days}天,年龄保护规则自动改为观察)"
- item["confidence"] = "low"
- item["recommended_change_pct"] = None
- logger.error(
- f"⚠️ 兜底检查触发!广告 {ad_id} 处于冷启动期({ad_age_days}天≤{COLD_START_DAYS}天),"
- f"LLM建议 {original_action},已自动转换为 observe。"
- f"这不应该发生(阶段1应已过滤),请检查逻辑!"
- )
- elif ad_age_days <= EARLY_GROWTH_DAYS: # 4-7天:早期成长期(仅允许提价)
- # 不允许降价/关停
- if action in ["bid_down", "pause"]:
- original_action = action
- original_reason = item.get("reason", "")
- item["action"] = "observe"
- item["reason"] = f"{original_reason}(LLM建议{original_action},但广告处于早期成长期{ad_age_days}天,年龄保护规则仅允许提价,改为观察)"
- item["confidence"] = "low"
- item["recommended_change_pct"] = None
- logger.error(
- f"⚠️ 兜底检查触发!广告 {ad_id} 处于早期成长期({ad_age_days}天,4-{EARLY_GROWTH_DAYS}天),"
- f"LLM建议 {original_action},已自动转换为 observe。"
- f"这不应该发生(阶段1应已过滤),请检查逻辑!"
- )
- else:
- item["cost_7d_avg"] = 0.0
- except Exception as e:
- item["cost_7d_avg"] = 0.0
- logger.warning(f"处理广告 {ad_id} 信息时出错: {e}")
- # 加载正常运行广告(规则判断)
- normal_running_rows = []
- try:
- # 收集零消耗和待评估的 ad_id
- zero_spend_ad_ids = {row["ad_id"] for row in zero_spend_rows}
- need_review_ad_ids = {item["ad_id"] for item in llm_list}
- # 正常运行 = 所有广告 - 零消耗 - 待评估 - 已关停/已删除
- for _, row in df_metrics.iterrows():
- ad_id = int(row["ad_id"])
- # ⚠️ 与零消耗扫描保持一致:跳过 SUSPEND/DELETED 广告
- # (含 cache enrichment 从 NORMAL 覆盖过来的历史状态)
- ad_status = str(row.get("configured_status", "")).upper()
- if ad_status in ("AD_STATUS_SUSPEND", "AD_STATUS_DELETED", "SUSPEND", "DELETED"):
- continue
- if ad_id not in zero_spend_ad_ids and ad_id not in need_review_ad_ids:
- cost_7d_avg = float(row.get("cost_7d_avg", 0) or 0)
- dynamic_roi_7d = row.get("动态ROI_7日均值")
- ad_age_days = row.get("ad_age_days")
- # 冷启动保护:广告年龄 ≤ 3天(基于决策树)
- if ad_age_days is not None and ad_age_days <= COLD_START_DAYS:
- roi_str = f"{dynamic_roi_7d:.2f}" if not pd.isna(dynamic_roi_7d) else "数据不足"
- normal_running_rows.append({
- "ad_id": ad_id,
- "action": "hold",
- "dimension": "冷启动保护",
- "reason": f"广告年龄{ad_age_days}天 ≤ {COLD_START_DAYS}天(冷启动期),ROI={roi_str},消耗{cost_7d_avg:.2f}元/天,极度保护",
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- else:
- # 正常运行
- roi_str = f"{dynamic_roi_7d:.2f}" if not pd.isna(dynamic_roi_7d) else "数据不足"
- normal_running_rows.append({
- "ad_id": ad_id,
- "action": "hold",
- "dimension": "正常运行",
- "reason": f"ROI={roi_str},消耗正常({cost_7d_avg:.2f}元/天),保持当前出价",
- "confidence": "high",
- "source": "规则判断",
- "cost_7d_avg": cost_7d_avg, # 用于排序
- })
- except Exception as e:
- logger.warning("加载正常运行广告失败(跳过): %s", e)
- all_decisions = zero_spend_rows + llm_list + normal_running_rows
- if not all_decisions:
- return ToolResult(title="apply_decisions", output="无决策数据")
- df_out = pd.DataFrame(all_decisions)
- # ★ 统一 ad_id 为 int64,确保后续 merge/去重不因类型不匹配而失败
- df_out["ad_id"] = pd.to_numeric(df_out["ad_id"], errors="coerce").astype("Int64")
- # ===== 去重:同一 ad_id 只保留优先级最高的决策 =====
- # 优先级:智能判断 > 规则判断(LLM 的判断优先于规则默认值)
- source_priority = {"智能判断": 0, "llm_modified": 1, "规则判断": 2}
- df_out["_source_rank"] = df_out["source"].map(source_priority).fillna(9)
- df_out = df_out.sort_values("_source_rank").drop_duplicates(subset=["ad_id"], keep="first")
- df_out = df_out.drop(columns=["_source_rank"])
- logger.info(f"去重后决策数: {len(df_out)}(智能判断优先)")
- # ===== 关键修复:合并 metrics CSV 中的字段 =====
- # 从 metrics CSV 补充 ad_name, ad_age_days, cost_7d_avg, 动态ROI 等字段
- try:
- df_metrics_full = pd.read_csv(metrics_csv)
- df_metrics_full["ad_id"] = pd.to_numeric(df_metrics_full["ad_id"], errors="coerce").astype("Int64")
- # 选择需要合并的列(OUTPUT_COLUMNS中定义的所有列)
- merge_cols = [
- "ad_id", "account_id", "ad_name", "audience_tier", "create_time", "ad_age_days",
- "bid_amount", "yesterday_cost", "yesterday_revenue", "yesterday_roi",
- "cost_7d_total", "cost_7d_avg", "revenue_7d_total",
- "动态ROI", "动态ROI_7日均值", "cost_30d_total", "cost_30d_avg",
- "stable_spend_days_30d", "creative_count", "roi_valid_days"
- ]
- # 只保留存在的列
- merge_cols = [c for c in merge_cols if c in df_metrics_full.columns]
- df_metrics_merge = df_metrics_full[merge_cols]
- # 左连接:保留df_out的所有行,补充字段
- df_out = df_out.merge(df_metrics_merge, on="ad_id", how="left", suffixes=("", "_metrics"))
- logger.info(f"已从 metrics CSV 合并 {len(merge_cols)} 个字段")
- except Exception as e:
- logger.warning(f"合并 metrics 字段失败(决策CSV将缺少扩展字段): {e}")
- # 过滤:已暂停(SUSPEND)或腾讯侧已删除(is_deleted=True,由 sync_ad_status.py 写入)
- # 优先读 now()-1d 的 ad_status(和 sync_ad_status.py、im_approval 保持一致,
- # 因为 is_deleted 是"当前 API 状态快照",不绑定 end_date;且 sync 默认同步 T-1)
- # 若 T-1 CSV 不存在,降级到 end_date 当天的 CSV 作为兜底。
- sync_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- ad_status_path = _MINI_DIR / "outputs" / "ad_status" / f"ad_status_{sync_date}.csv"
- if not ad_status_path.exists():
- fallback = _MINI_DIR / "outputs" / "ad_status" / f"ad_status_{end_date}.csv"
- if fallback.exists():
- logger.info(f"ad_status T-1 CSV 不存在,降级使用 end_date={end_date} 的快照")
- ad_status_path = fallback
- if ad_status_path.exists():
- try:
- df_status = pd.read_csv(ad_status_path)
- # 1) SUSPEND
- suspended_mask = df_status["ad_status"] == "AD_STATUS_SUSPEND"
- # 2) 腾讯侧已删除(sync_ad_status.py 每日同步回写,向后兼容:列缺失即视为全 False)
- if "is_deleted" in df_status.columns:
- deleted_mask = df_status["is_deleted"].fillna(False).astype(bool)
- else:
- deleted_mask = pd.Series(False, index=df_status.index)
- excluded_ads = set(
- df_status[suspended_mask | deleted_mask]["ad_id"].tolist()
- )
- before_count = len(df_out)
- df_out = df_out[~df_out["ad_id"].isin(excluded_ads)]
- filtered_count = before_count - len(df_out)
- if filtered_count > 0:
- n_suspend = int(suspended_mask.sum())
- n_deleted = int(deleted_mask.sum())
- logger.info(
- f"过滤掉 {filtered_count} 个广告"
- f"(暂停 {n_suspend} + 腾讯侧已删除 {n_deleted})"
- )
- except Exception as e:
- logger.warning(f"加载广告状态数据失败,跳过过滤: {e}")
- # 确保必要列存在
- for col in ["ad_id", "action", "dimension", "reason", "confidence", "source"]:
- if col not in df_out.columns:
- df_out[col] = ""
- # 数值列用 None 而非空字符串,避免 float("") 异常
- for col in ["recommended_change_pct", "current_bid", "recommended_bid", "cost_7d_avg"]:
- if col not in df_out.columns:
- df_out[col] = None
- # 按7日均消耗降序排列(消耗高的广告排在前面,更需要关注)
- if "cost_7d_avg" in df_out.columns:
- df_out["cost_7d_avg"] = pd.to_numeric(df_out["cost_7d_avg"], errors="coerce").fillna(0)
- df_out = df_out.sort_values("cost_7d_avg", ascending=False).reset_index(drop=True)
- # ⚠️ 不再删除 cost_7d_avg,保留所有字段到最终报告
- # 保存
- reports_dir = _MINI_DIR / "outputs" / "reports"
- reports_dir.mkdir(parents=True, exist_ok=True)
- out_path = reports_dir / f"llm_decisions_{end_date}.csv"
- df_out.to_csv(out_path, index=False, encoding="utf-8-sig")
- pause_count = (df_out["action"] == "pause").sum()
- hold_count = (df_out["action"] == "hold").sum()
- bid_up_count = (df_out["action"] == "bid_up").sum()
- bid_down_count = (df_out["action"] == "bid_down").sum()
- output_parts = [
- f"智能引擎决策已保存: {out_path}",
- f" 关停: {pause_count} 个(含零消耗待关停: {len(zero_spend_rows)} 个)",
- f" 保持: {hold_count} 个(含正常运行: {len(normal_running_rows)} 个)",
- ]
- if bid_up_count > 0:
- output_parts.append(f" 提价: {bid_up_count} 个")
- if bid_down_count > 0:
- output_parts.append(f" 降价: {bid_down_count} 个")
- return ToolResult(
- title=f"智能引擎决策已保存({len(df_out)}条)",
- output="\n".join(output_parts),
- metadata={
- "csv_path": str(out_path),
- "total": len(df_out),
- "pause": int(pause_count),
- "hold": int(hold_count),
- "bid_up": int(bid_up_count),
- "bid_down": int(bid_down_count),
- "zero_spend_ads": len(zero_spend_rows),
- "normal_running_ads": len(normal_running_rows),
- "end_date": end_date,
- },
- )
- except Exception as e:
- logger.error("apply_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="apply_decisions 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 3:查询单个广告详情(Mode 2 支撑)
- # ═══════════════════════════════════════════
- @tool(description="查询单个广告的当前指标和历史数据")
- async def query_ad_detail(
- ctx: ToolContext = None,
- ad_id: str = "",
- metrics_csv: str = "",
- ) -> ToolResult:
- """
- 查询单个广告的当前指标 + 全局分布上下文(Mode 2 定向操作用)。
- Args:
- ctx: 工具上下文
- ad_id: 广告 ID(字符串或数字均可)
- metrics_csv: ROI 指标 CSV 路径(默认 outputs/metrics_temp.csv)
- Returns:
- ToolResult,包含该广告的详细指标和全局上下文
- """
- import json
- import os
- try:
- if not metrics_csv:
- metrics_csv = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- metrics_path = Path(metrics_csv)
- if not metrics_path.exists():
- return ToolResult(
- title="query_ad_detail 失败",
- output=f"指标文件不存在: {metrics_csv},请先执行 calculate_roi_metrics",
- )
- # 检查数据新鲜度
- file_mtime = os.path.getmtime(metrics_path)
- age_hours = (datetime.now().timestamp() - file_mtime) / 3600
- freshness_warning = ""
- if age_hours > 24:
- freshness_warning = f"⚠️ 数据已过期({age_hours:.1f}小时前更新),建议先执行 fetch_creative_data + calculate_roi_metrics 刷新数据。\n\n"
- df = pd.read_csv(metrics_csv)
- # 查找目标广告
- ad_id_int = int(ad_id)
- ad_row = df[df["ad_id"] == ad_id_int]
- if ad_row.empty:
- return ToolResult(
- title="query_ad_detail",
- output=f"{freshness_warning}未找到广告 {ad_id},共有 {len(df)} 个广告",
- )
- row = ad_row.iloc[0]
- # 计算广告年龄
- ad_age_days = _calculate_ad_age_days(row.get("create_time"))
- # 全局 ROI 分布(使用中位数作为基准,避免被少数高ROI广告拉高)
- roi_series = df["动态ROI_7日均值"].dropna()
- channel_roi_p50 = float(roi_series.median()) if len(roi_series) > 0 else 0.0
- roi_low_line = channel_roi_p50 * ROI_LOW_FACTOR
- bid_down_line = channel_roi_p50 * BID_DOWN_ROI_FACTOR
- bid_up_line = channel_roi_p50 * BID_UP_ROI_FACTOR
- # 构建广告详情
- dynamic_roi_7d = row.get("动态ROI_7日均值")
- ad_detail = {
- "ad_id": ad_id_int,
- "ad_name": str(row.get("ad_name", "")),
- "bid_amount": round(float(row.get("bid_amount", 0) or 0), 2),
- "动态ROI_7日均值": round(float(dynamic_roi_7d), 4) if not pd.isna(dynamic_roi_7d) else None,
- "cost_7d_avg": round(float(row.get("cost_7d_avg", 0) or 0), 2),
- "cost_7d_total": round(float(row.get("cost_7d_total", 0) or 0), 2),
- "ad_age_days": ad_age_days,
- "configured_status": str(row.get("configured_status", "")),
- }
- # 检测干预信号
- try:
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- raw_dir = _MINI_DIR / "outputs" / "raw"
- ad_status_dir = _MINI_DIR / "outputs" / "ad_status"
- decay_signals = _detect_decay_signals(
- ad_ids=[ad_id_int],
- raw_dir=raw_dir,
- ad_status_dir=ad_status_dir,
- end_date=end_date,
- )
- if not decay_signals.empty:
- ds_row = decay_signals.iloc[0]
- ad_detail["bid_increased_7d"] = bool(ds_row.get("bid_increased_7d", False))
- ad_detail["creative_changed_7d"] = bool(ds_row.get("creative_changed_7d", False))
- except Exception as e:
- logger.warning("检测干预信号失败: %s", e)
- # 全局上下文
- global_context = {
- "全体动态ROI基准(中位数)": round(channel_roi_p50, 4),
- "ROI关停线": round(roi_low_line, 4),
- "ROI降价线": round(bid_down_line, 4),
- "ROI提价线": round(bid_up_line, 4),
- "提价消耗上限": BID_UP_MAX_SPEND,
- "关停消耗门槛(昨日)": ROI_LOW_MIN_YESTERDAY_COST,
- }
- result = {
- "ad_detail": ad_detail,
- "global_context": global_context,
- }
- output = freshness_warning + json.dumps(result, ensure_ascii=False, indent=2)
- return ToolResult(
- title=f"广告 {ad_id} 详情",
- output=output,
- metadata=result,
- )
- except Exception as e:
- logger.error("query_ad_detail 失败: %s", e, exc_info=True)
- return ToolResult(title="query_ad_detail 失败", output=str(e))
- # ═══════════════════════════════════════════
- # 智能引擎工具 4:修改已有决策(Mode 3 支撑)
- # ═══════════════════════════════════════════
- @tool(description="修改已有决策:修改指定广告的操作或调幅,也可新增决策")
- async def modify_decisions(
- ctx: ToolContext = None,
- modifications: str = "",
- decisions_csv: str = "",
- end_date: str = "yesterday",
- ) -> ToolResult:
- """
- 修改已有 llm_decisions_{date}.csv 中的决策(Mode 3 反馈修改用)。
- 支持两种修改方式:
- 1. 按 ad_id 精确修改/新增(upsert):
- [{"ad_id": "90289631207", "new_action": "bid_down", "new_change_pct": -0.05}]
- 2. 按过滤器批量修改:
- [{"filter": "all_bid_down", "new_change_pct": -0.03}]
- 支持: all_pause / all_bid_down / all_bid_up / all_llm
- Args:
- ctx: 工具上下文
- modifications: JSON 字符串,修改列表
- decisions_csv: 决策 CSV 路径(默认自动查找最新)
- end_date: 结束日期(用于查找默认 CSV)
- Returns:
- ToolResult,包含修改日志和新的 action 分布
- """
- import json
- import glob as glob_mod
- try:
- if end_date == "yesterday":
- end_date = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
- # 解析修改列表
- try:
- mod_list = json.loads(modifications)
- except json.JSONDecodeError as e:
- return ToolResult(title="modify_decisions 失败", output=f"modifications 不是合法 JSON: {e}")
- if not isinstance(mod_list, list):
- return ToolResult(title="modify_decisions 失败", output="modifications 必须是 JSON 数组")
- # 定位决策 CSV
- if not decisions_csv:
- reports_dir = _MINI_DIR / "outputs" / "reports"
- # 先找当天的,再找最新的
- target_path = reports_dir / f"llm_decisions_{end_date}.csv"
- if target_path.exists():
- decisions_csv = str(target_path)
- else:
- # 查找最新的 llm_decisions_*.csv
- pattern = str(reports_dir / "llm_decisions_*.csv")
- files = sorted(glob_mod.glob(pattern), reverse=True)
- if files:
- decisions_csv = files[0]
- else:
- return ToolResult(
- title="modify_decisions 失败",
- output="未找到任何已有决策文件(llm_decisions_*.csv),请先执行全量分析",
- )
- decisions_path = Path(decisions_csv)
- if not decisions_path.exists():
- return ToolResult(title="modify_decisions 失败", output=f"决策文件不存在: {decisions_csv}")
- df = pd.read_csv(decisions_csv)
- if df.empty:
- return ToolResult(title="modify_decisions 失败", output="决策文件为空")
- # 加载 metrics 获取 bid_amount
- metrics_csv_path = str(_MINI_DIR / "outputs" / "metrics_temp.csv")
- bid_map = {}
- try:
- df_metrics = pd.read_csv(metrics_csv_path)
- bid_map = dict(zip(df_metrics["ad_id"].astype(int), df_metrics["bid_amount"].fillna(0)))
- except Exception as e:
- logger.warning("加载 metrics 获取 bid_amount 失败: %s", e)
- change_log = []
- new_rows = []
- for mod in mod_list:
- if "filter" in mod:
- # 批量修改
- filter_type = mod["filter"]
- filter_map = {
- "all_pause": "pause",
- "all_bid_down": "bid_down",
- "all_bid_up": "bid_up",
- "all_llm": None, # 所有 LLM 决策
- }
- if filter_type not in filter_map:
- change_log.append(f"⚠️ 未知 filter: {filter_type},跳过")
- continue
- target_action = filter_map[filter_type]
- if target_action:
- mask = df["action"] == target_action
- else:
- mask = df["source"] == "llm"
- matched = mask.sum()
- if matched == 0:
- change_log.append(f"filter={filter_type}: 无匹配行")
- continue
- # 应用修改
- if "new_action" in mod:
- df.loc[mask, "action"] = mod["new_action"]
- if "new_change_pct" in mod:
- df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
- # 重算 recommended_bid
- for idx in df[mask].index:
- ad_id_val = int(df.at[idx, "ad_id"])
- bid = bid_map.get(ad_id_val, 0)
- if bid > 0:
- new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- df.at[idx, "recommended_bid"] = new_bid
- df.at[idx, "current_bid"] = round(bid, 2)
- if "new_dimension" in mod:
- df.loc[mask, "dimension"] = mod["new_dimension"]
- if "new_reason" in mod:
- df.loc[mask, "reason"] = mod["new_reason"]
- df.loc[mask, "source"] = "llm_modified"
- change_log.append(f"filter={filter_type}: 修改 {matched} 行")
- elif "ad_id" in mod:
- # 精确修改/新增(upsert)
- target_id = int(mod["ad_id"])
- mask = df["ad_id"] == target_id
- if mask.any():
- # 修改已有行
- if "new_action" in mod:
- old_action = df.loc[mask, "action"].iloc[0]
- df.loc[mask, "action"] = mod["new_action"]
- change_log.append(f"ad_id={target_id}: action {old_action} → {mod['new_action']}")
- if "new_change_pct" in mod:
- df.loc[mask, "recommended_change_pct"] = mod["new_change_pct"]
- bid = bid_map.get(target_id, 0)
- if bid > 0:
- new_bid = round(bid * (1 + mod["new_change_pct"]), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- df.loc[mask, "recommended_bid"] = new_bid
- df.loc[mask, "current_bid"] = round(bid, 2)
- change_log.append(f"ad_id={target_id}: change_pct → {mod['new_change_pct']}")
- if "new_dimension" in mod:
- df.loc[mask, "dimension"] = mod["new_dimension"]
- if "new_reason" in mod:
- df.loc[mask, "reason"] = mod["new_reason"]
- df.loc[mask, "source"] = "llm_modified"
- else:
- # 新增行
- new_action = mod.get("new_action", "hold")
- change_pct = mod.get("new_change_pct")
- bid = bid_map.get(target_id, 0)
- new_bid = None
- if change_pct is not None and bid > 0:
- new_bid = round(bid * (1 + change_pct), 2)
- new_bid = max(new_bid, BID_FLOOR_YUAN)
- new_bid = min(new_bid, BID_CEILING_YUAN)
- new_row = {
- "ad_id": target_id,
- "action": new_action,
- "dimension": mod.get("new_dimension", "用户指定"),
- "reason": mod.get("new_reason", "用户定向操作"),
- "confidence": mod.get("confidence", "high"),
- "source": "llm_modified",
- "recommended_change_pct": change_pct,
- "current_bid": round(bid, 2) if bid > 0 else None,
- "recommended_bid": new_bid,
- }
- new_rows.append(new_row)
- change_log.append(f"ad_id={target_id}: 新增 action={new_action}")
- else:
- change_log.append(f"⚠️ 修改项缺少 ad_id 或 filter,跳过: {mod}")
- # 合并新增行
- if new_rows:
- df = pd.concat([df, pd.DataFrame(new_rows)], ignore_index=True)
- # 保存(覆盖原文件)
- df.to_csv(decisions_csv, index=False, encoding="utf-8-sig")
- # 统计新的 action 分布
- action_dist = df["action"].value_counts().to_dict()
- output_parts = [
- f"决策已修改并保存: {decisions_csv}",
- "",
- "修改日志:",
- ]
- for log in change_log:
- output_parts.append(f" {log}")
- output_parts.extend([
- "",
- "当前 action 分布:",
- ])
- for action, count in action_dist.items():
- output_parts.append(f" {action}: {count} 个")
- output_parts.append(f" 总计: {len(df)} 个")
- return ToolResult(
- title=f"决策修改完成({len(change_log)}项变更)",
- output="\n".join(output_parts),
- metadata={
- "csv_path": str(decisions_csv),
- "changes": len(change_log),
- "action_distribution": action_dist,
- "total": len(df),
- },
- )
- except Exception as e:
- logger.error("modify_decisions 失败: %s", e, exc_info=True)
- return ToolResult(title="modify_decisions 失败", output=str(e))
|