| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271 |
- """
- 策略配置层 — 预算策略参数的持久化、加载、更新、版本管理
- 所有策略参数(分位数阈值、决策矩阵、调价幅度、保护规则)
- 从 JSON 配置文件加载,不在工具代码中硬编码。
- LLM + Skill 决定是否调整参数,工具按参数执行确定性计算。
- """
- import copy
- import json
- import logging
- from datetime import datetime
- from pathlib import Path
- from typing import Optional
- from agent.tools import tool
- from agent.tools.models import ToolContext, ToolResult
- logger = logging.getLogger(__name__)
- # 配置文件路径
- CONFIGS_DIR = Path(__file__).parent.parent / "configs"
- ACTIVE_CONFIG = CONFIGS_DIR / "budget_strategy_v1.json"
- def _load_config_from_file(path: Path = ACTIVE_CONFIG) -> dict:
- """从 JSON 文件加载配置"""
- if not path.exists():
- raise FileNotFoundError(f"配置文件不存在: {path}")
- with open(path, "r", encoding="utf-8") as f:
- return json.load(f)
- def _save_config_to_file(config: dict, path: Path = ACTIVE_CONFIG):
- """保存配置到 JSON 文件"""
- path.parent.mkdir(parents=True, exist_ok=True)
- with open(path, "w", encoding="utf-8") as f:
- json.dump(config, f, ensure_ascii=False, indent=2)
- @tool(description="加载当前预算策略配置,包括分位数阈值、决策矩阵、调价幅度、保护规则等")
- async def load_strategy_config(
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 加载当前预算策略配置。
- 返回完整的策略配置,包括:
- - thresholds: 分位数阈值(roi_high_percentile, roi_low_percentile, cost_mid_percentile)
- - decision_matrix: 决策矩阵(5种策略 × 6种象限 → 动作+幅度)
- - bid_limits: 出价边界和调价幅度上限
- - protection: 保护规则(冷启动、赔付)
- - version/last_updated/updated_by/update_reason: 版本元数据
- """
- try:
- config = _load_config_from_file()
- # 格式化输出
- lines = [
- f"策略配置版本: {config['version']}",
- f"最后更新: {config['last_updated']}",
- f"更新者: {config['updated_by']}",
- f"更新原因: {config['update_reason']}",
- "",
- "=== 分位数阈值 ===",
- f" ROI 高阈值: P{int(config['thresholds']['roi_high_percentile']*100)}",
- f" ROI 低阈值: P{int(config['thresholds']['roi_low_percentile']*100)}",
- f" 消耗中位数: P{int(config['thresholds']['cost_mid_percentile']*100)}",
- "",
- "=== 调价幅度限制 ===",
- f" 最大提价: +{int(config['bid_limits']['max_increase_pct']*100)}%",
- f" 最大降价: {int(config['bid_limits']['max_decrease_pct']*100)}%",
- f" 出价范围: {config['bid_limits']['min_bid']}分 ~ {config['bid_limits']['max_bid']}分",
- "",
- "=== 保护规则 ===",
- f" 冷启动保护: {'开启' if config['protection']['cold_start_enabled'] else '关闭'}",
- f" 冷启动期: {config['protection']['cold_start_hours']}h, 最少转化: {config['protection']['cold_start_min_conversions']}",
- f" 赔付保护: {'开启' if config['protection']['compensation_enabled'] else '关闭'}",
- f" 赔付门槛: 转化>={config['protection']['compensation_min_conversions']}, CPA偏离>={int(config['protection']['compensation_cpa_deviation_pct']*100)}%",
- "",
- "=== 决策矩阵(5种策略) ===",
- ]
- for strategy_name, matrix in config["decision_matrix"].items():
- lines.append(f" [{strategy_name}]")
- for quadrant, (action, ratio) in matrix.items():
- ratio_str = f"{ratio:+.0%}" if ratio != 0 else "-"
- lines.append(f" {quadrant}: {action} ({ratio_str})")
- output = "\n".join(lines)
- return ToolResult(
- title="策略配置加载成功",
- output=output,
- long_term_memory=f"已加载策略配置 {config['version']}({config['last_updated']},{config['update_reason']})",
- )
- except Exception as e:
- logger.error(f"加载策略配置失败: {e}")
- return ToolResult(title="策略配置加载失败", output=str(e))
- @tool(description="更新预算策略配置的指定字段,记录更新原因和版本历史")
- async def update_strategy_config(
- updates: str,
- reason: str,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 更新预算策略配置的指定字段。
- Args:
- updates: JSON 格式的更新内容,支持嵌套路径。示例:
- - 调整分位数: {"thresholds": {"roi_high_percentile": 0.60}}
- - 调整决策矩阵某项: {"decision_matrix": {"moderate_scale_down": {"mid_high": ["keep", 0.0]}}}
- - 调整调价幅度: {"bid_limits": {"max_decrease_pct": -0.10}}
- - 调整保护规则: {"protection": {"cold_start_hours": 72}}
- reason: 更新原因,如 "周末流量质量高,ROI阈值放宽到P60"
- """
- try:
- config = _load_config_from_file()
- # 解析更新内容
- try:
- update_dict = json.loads(updates)
- except json.JSONDecodeError as e:
- return ToolResult(
- title="更新失败",
- output=f"updates 参数不是合法 JSON: {e}",
- )
- # 保存更新前的快照到 history
- old_version = config["version"]
- snapshot = {
- "version": old_version,
- "timestamp": config["last_updated"],
- "updated_by": config["updated_by"],
- "reason": config["update_reason"],
- "changes": updates,
- }
- if "history" not in config:
- config["history"] = []
- config["history"].append(snapshot)
- # 只保留最近 50 条历史
- config["history"] = config["history"][-50:]
- # 递归合并更新
- def deep_merge(base: dict, update: dict):
- for key, value in update.items():
- if key in base and isinstance(base[key], dict) and isinstance(value, dict):
- deep_merge(base[key], value)
- else:
- base[key] = value
- deep_merge(config, update_dict)
- # 更新版本元数据
- version_num = int(old_version.split("v")[1].split(".")[0]) if "v" in old_version else 1
- minor = int(old_version.split(".")[-1]) if "." in old_version else 0
- config["version"] = f"v{version_num}.{minor + 1}"
- config["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
- config["updated_by"] = "agent"
- config["update_reason"] = reason
- # 保存
- _save_config_to_file(config)
- output = (
- f"配置已更新: {old_version} → {config['version']}\n"
- f"更新时间: {config['last_updated']}\n"
- f"更新原因: {reason}\n"
- f"更新内容: {updates}"
- )
- return ToolResult(
- title=f"策略配置更新成功 → {config['version']}",
- output=output,
- long_term_memory=f"策略配置从 {old_version} 更新到 {config['version']},原因:{reason}",
- )
- except Exception as e:
- logger.error(f"更新策略配置失败: {e}")
- return ToolResult(title="策略配置更新失败", output=str(e))
- @tool(description="查看预算策略配置的变更历史")
- async def get_config_history(
- limit: int = 10,
- context: Optional[ToolContext] = None,
- ) -> ToolResult:
- """
- 查看预算策略配置的变更历史。
- Args:
- limit: 返回的历史记录条数,默认 10
- """
- try:
- config = _load_config_from_file()
- history = config.get("history", [])
- if not history:
- return ToolResult(
- title="配置变更历史",
- output="暂无变更历史,当前为初始版本。",
- )
- # 取最近 N 条,倒序展示
- recent = history[-limit:][::-1]
- lines = [f"配置变更历史(最近 {len(recent)} 条)", ""]
- for i, entry in enumerate(recent, 1):
- lines.append(f"[{i}] {entry['version']} ({entry['timestamp']})")
- lines.append(f" 更新者: {entry['updated_by']}")
- lines.append(f" 原因: {entry['reason']}")
- lines.append(f" 变更: {entry['changes']}")
- lines.append("")
- return ToolResult(
- title=f"配置变更历史({len(recent)}条)",
- output="\n".join(lines),
- )
- except Exception as e:
- logger.error(f"获取配置历史失败: {e}")
- return ToolResult(title="获取配置历史失败", output=str(e))
- # ===== 辅助函数(供其他工具调用) =====
- def get_strategy_config() -> dict:
- """同步加载策略配置(供内部工具调用)"""
- return _load_config_from_file()
- def get_decision_matrix(config: dict, strategy: str) -> dict:
- """从配置中获取指定策略的决策矩阵
- Args:
- config: 完整配置字典
- strategy: 策略名称,如 "aggressive_scale_down"
- Returns:
- dict: 象限 → (action, ratio) 的映射
- """
- matrix_raw = config["decision_matrix"].get(strategy, config["decision_matrix"]["maintain"])
- return {k: tuple(v) for k, v in matrix_raw.items()}
- def determine_strategy(scale_ratio: float, config: dict) -> str:
- """根据缩量/扩量比例和配置确定策略
- Args:
- scale_ratio: 预算/昨日消耗 比值
- config: 完整配置字典
- Returns:
- 策略名称字符串
- """
- boundaries = config.get("strategy_boundaries", {
- "aggressive_scale_down": [0, 0.70],
- "moderate_scale_down": [0.70, 0.95],
- "maintain": [0.95, 1.05],
- "moderate_scale_up": [1.05, 1.30],
- "aggressive_scale_up": [1.30, 999],
- })
- for strategy_name, (low, high) in boundaries.items():
- if low <= scale_ratio < high:
- return strategy_name
- return "maintain"
|