strategy_config.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. """
  2. 策略配置层 — 预算策略参数的持久化、加载、更新、版本管理
  3. 所有策略参数(分位数阈值、决策矩阵、调价幅度、保护规则)
  4. 从 JSON 配置文件加载,不在工具代码中硬编码。
  5. LLM + Skill 决定是否调整参数,工具按参数执行确定性计算。
  6. """
  7. import copy
  8. import json
  9. import logging
  10. from datetime import datetime
  11. from pathlib import Path
  12. from typing import Optional
  13. from agent.tools import tool
  14. from agent.tools.models import ToolContext, ToolResult
  15. logger = logging.getLogger(__name__)
  16. # 配置文件路径
  17. CONFIGS_DIR = Path(__file__).parent.parent / "configs"
  18. ACTIVE_CONFIG = CONFIGS_DIR / "budget_strategy_v1.json"
  19. def _load_config_from_file(path: Path = ACTIVE_CONFIG) -> dict:
  20. """从 JSON 文件加载配置"""
  21. if not path.exists():
  22. raise FileNotFoundError(f"配置文件不存在: {path}")
  23. with open(path, "r", encoding="utf-8") as f:
  24. return json.load(f)
  25. def _save_config_to_file(config: dict, path: Path = ACTIVE_CONFIG):
  26. """保存配置到 JSON 文件"""
  27. path.parent.mkdir(parents=True, exist_ok=True)
  28. with open(path, "w", encoding="utf-8") as f:
  29. json.dump(config, f, ensure_ascii=False, indent=2)
  30. @tool(description="加载当前预算策略配置,包括分位数阈值、决策矩阵、调价幅度、保护规则等")
  31. async def load_strategy_config(
  32. context: Optional[ToolContext] = None,
  33. ) -> ToolResult:
  34. """
  35. 加载当前预算策略配置。
  36. 返回完整的策略配置,包括:
  37. - thresholds: 分位数阈值(roi_high_percentile, roi_low_percentile, cost_mid_percentile)
  38. - decision_matrix: 决策矩阵(5种策略 × 6种象限 → 动作+幅度)
  39. - bid_limits: 出价边界和调价幅度上限
  40. - protection: 保护规则(冷启动、赔付)
  41. - version/last_updated/updated_by/update_reason: 版本元数据
  42. """
  43. try:
  44. config = _load_config_from_file()
  45. # 格式化输出
  46. lines = [
  47. f"策略配置版本: {config['version']}",
  48. f"最后更新: {config['last_updated']}",
  49. f"更新者: {config['updated_by']}",
  50. f"更新原因: {config['update_reason']}",
  51. "",
  52. "=== 分位数阈值 ===",
  53. f" ROI 高阈值: P{int(config['thresholds']['roi_high_percentile']*100)}",
  54. f" ROI 低阈值: P{int(config['thresholds']['roi_low_percentile']*100)}",
  55. f" 消耗中位数: P{int(config['thresholds']['cost_mid_percentile']*100)}",
  56. "",
  57. "=== 调价幅度限制 ===",
  58. f" 最大提价: +{int(config['bid_limits']['max_increase_pct']*100)}%",
  59. f" 最大降价: {int(config['bid_limits']['max_decrease_pct']*100)}%",
  60. f" 出价范围: {config['bid_limits']['min_bid']}分 ~ {config['bid_limits']['max_bid']}分",
  61. "",
  62. "=== 保护规则 ===",
  63. f" 冷启动保护: {'开启' if config['protection']['cold_start_enabled'] else '关闭'}",
  64. f" 冷启动期: {config['protection']['cold_start_hours']}h, 最少转化: {config['protection']['cold_start_min_conversions']}",
  65. f" 赔付保护: {'开启' if config['protection']['compensation_enabled'] else '关闭'}",
  66. f" 赔付门槛: 转化>={config['protection']['compensation_min_conversions']}, CPA偏离>={int(config['protection']['compensation_cpa_deviation_pct']*100)}%",
  67. "",
  68. "=== 决策矩阵(5种策略) ===",
  69. ]
  70. for strategy_name, matrix in config["decision_matrix"].items():
  71. lines.append(f" [{strategy_name}]")
  72. for quadrant, (action, ratio) in matrix.items():
  73. ratio_str = f"{ratio:+.0%}" if ratio != 0 else "-"
  74. lines.append(f" {quadrant}: {action} ({ratio_str})")
  75. output = "\n".join(lines)
  76. return ToolResult(
  77. title="策略配置加载成功",
  78. output=output,
  79. long_term_memory=f"已加载策略配置 {config['version']}({config['last_updated']},{config['update_reason']})",
  80. )
  81. except Exception as e:
  82. logger.error(f"加载策略配置失败: {e}")
  83. return ToolResult(title="策略配置加载失败", output=str(e))
  84. @tool(description="更新预算策略配置的指定字段,记录更新原因和版本历史")
  85. async def update_strategy_config(
  86. updates: str,
  87. reason: str,
  88. context: Optional[ToolContext] = None,
  89. ) -> ToolResult:
  90. """
  91. 更新预算策略配置的指定字段。
  92. Args:
  93. updates: JSON 格式的更新内容,支持嵌套路径。示例:
  94. - 调整分位数: {"thresholds": {"roi_high_percentile": 0.60}}
  95. - 调整决策矩阵某项: {"decision_matrix": {"moderate_scale_down": {"mid_high": ["keep", 0.0]}}}
  96. - 调整调价幅度: {"bid_limits": {"max_decrease_pct": -0.10}}
  97. - 调整保护规则: {"protection": {"cold_start_hours": 72}}
  98. reason: 更新原因,如 "周末流量质量高,ROI阈值放宽到P60"
  99. """
  100. try:
  101. config = _load_config_from_file()
  102. # 解析更新内容
  103. try:
  104. update_dict = json.loads(updates)
  105. except json.JSONDecodeError as e:
  106. return ToolResult(
  107. title="更新失败",
  108. output=f"updates 参数不是合法 JSON: {e}",
  109. )
  110. # 保存更新前的快照到 history
  111. old_version = config["version"]
  112. snapshot = {
  113. "version": old_version,
  114. "timestamp": config["last_updated"],
  115. "updated_by": config["updated_by"],
  116. "reason": config["update_reason"],
  117. "changes": updates,
  118. }
  119. if "history" not in config:
  120. config["history"] = []
  121. config["history"].append(snapshot)
  122. # 只保留最近 50 条历史
  123. config["history"] = config["history"][-50:]
  124. # 递归合并更新
  125. def deep_merge(base: dict, update: dict):
  126. for key, value in update.items():
  127. if key in base and isinstance(base[key], dict) and isinstance(value, dict):
  128. deep_merge(base[key], value)
  129. else:
  130. base[key] = value
  131. deep_merge(config, update_dict)
  132. # 更新版本元数据
  133. version_num = int(old_version.split("v")[1].split(".")[0]) if "v" in old_version else 1
  134. minor = int(old_version.split(".")[-1]) if "." in old_version else 0
  135. config["version"] = f"v{version_num}.{minor + 1}"
  136. config["last_updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  137. config["updated_by"] = "agent"
  138. config["update_reason"] = reason
  139. # 保存
  140. _save_config_to_file(config)
  141. output = (
  142. f"配置已更新: {old_version} → {config['version']}\n"
  143. f"更新时间: {config['last_updated']}\n"
  144. f"更新原因: {reason}\n"
  145. f"更新内容: {updates}"
  146. )
  147. return ToolResult(
  148. title=f"策略配置更新成功 → {config['version']}",
  149. output=output,
  150. long_term_memory=f"策略配置从 {old_version} 更新到 {config['version']},原因:{reason}",
  151. )
  152. except Exception as e:
  153. logger.error(f"更新策略配置失败: {e}")
  154. return ToolResult(title="策略配置更新失败", output=str(e))
  155. @tool(description="查看预算策略配置的变更历史")
  156. async def get_config_history(
  157. limit: int = 10,
  158. context: Optional[ToolContext] = None,
  159. ) -> ToolResult:
  160. """
  161. 查看预算策略配置的变更历史。
  162. Args:
  163. limit: 返回的历史记录条数,默认 10
  164. """
  165. try:
  166. config = _load_config_from_file()
  167. history = config.get("history", [])
  168. if not history:
  169. return ToolResult(
  170. title="配置变更历史",
  171. output="暂无变更历史,当前为初始版本。",
  172. )
  173. # 取最近 N 条,倒序展示
  174. recent = history[-limit:][::-1]
  175. lines = [f"配置变更历史(最近 {len(recent)} 条)", ""]
  176. for i, entry in enumerate(recent, 1):
  177. lines.append(f"[{i}] {entry['version']} ({entry['timestamp']})")
  178. lines.append(f" 更新者: {entry['updated_by']}")
  179. lines.append(f" 原因: {entry['reason']}")
  180. lines.append(f" 变更: {entry['changes']}")
  181. lines.append("")
  182. return ToolResult(
  183. title=f"配置变更历史({len(recent)}条)",
  184. output="\n".join(lines),
  185. )
  186. except Exception as e:
  187. logger.error(f"获取配置历史失败: {e}")
  188. return ToolResult(title="获取配置历史失败", output=str(e))
  189. # ===== 辅助函数(供其他工具调用) =====
  190. def get_strategy_config() -> dict:
  191. """同步加载策略配置(供内部工具调用)"""
  192. return _load_config_from_file()
  193. def get_decision_matrix(config: dict, strategy: str) -> dict:
  194. """从配置中获取指定策略的决策矩阵
  195. Args:
  196. config: 完整配置字典
  197. strategy: 策略名称,如 "aggressive_scale_down"
  198. Returns:
  199. dict: 象限 → (action, ratio) 的映射
  200. """
  201. matrix_raw = config["decision_matrix"].get(strategy, config["decision_matrix"]["maintain"])
  202. return {k: tuple(v) for k, v in matrix_raw.items()}
  203. def determine_strategy(scale_ratio: float, config: dict) -> str:
  204. """根据缩量/扩量比例和配置确定策略
  205. Args:
  206. scale_ratio: 预算/昨日消耗 比值
  207. config: 完整配置字典
  208. Returns:
  209. 策略名称字符串
  210. """
  211. boundaries = config.get("strategy_boundaries", {
  212. "aggressive_scale_down": [0, 0.70],
  213. "moderate_scale_down": [0.70, 0.95],
  214. "maintain": [0.95, 1.05],
  215. "moderate_scale_up": [1.05, 1.30],
  216. "aggressive_scale_up": [1.30, 999],
  217. })
  218. for strategy_name, (low, high) in boundaries.items():
  219. if low <= scale_ratio < high:
  220. return strategy_name
  221. return "maintain"