""" Native Anthropic Provider 直接使用 Anthropic 原生 API (或完全兼容原生协议的反代如 imds.ai) 调用 Claude 模型。 复用 openrouter.py 中的格式化拦截模块(无缝支持 AgentRunner Cache Control)。 """ import os import asyncio import logging import httpx from typing import List, Dict, Any, Optional from .pricing import calculate_cost # 直接复用底层已经在 openrouter 内部写好的格式转换/拦截器 from .openrouter import ( _normalize_tool_call_ids, _to_anthropic_messages, _to_anthropic_tools, _parse_anthropic_response, _RETRYABLE_EXCEPTIONS ) logger = logging.getLogger(__name__) async def anthropic_native_llm_call( messages: List[Dict[str, Any]], model: str = "claude-3-5-sonnet-20241022", tools: Optional[List[Dict]] = None, **kwargs ) -> Dict[str, Any]: """ 原生 Anthropic API 调用函数 支持 CLAUDE_CODE_KEY/CLAUDE_CODE_URL 或 ANTHROPIC_API_KEY/ANTHROPIC_BASE_URL """ # 优先使用 CLAUDE_CODE_KEY,如果不存在则使用 ANTHROPIC_API_KEY claude_code_key = os.getenv("CLAUDE_CODE_KEY") anthropic_key = os.getenv("ANTHROPIC_API_KEY") if claude_code_key: api_key = claude_code_key key_source = "CLAUDE_CODE_KEY" elif anthropic_key: api_key = anthropic_key key_source = "ANTHROPIC_API_KEY" else: raise ValueError("CLAUDE_CODE_KEY or ANTHROPIC_API_KEY environment variable not set") # 优先使用 CLAUDE_CODE_URL,如果不存在则使用 ANTHROPIC_BASE_URL claude_code_url = os.getenv("CLAUDE_CODE_URL") anthropic_url = os.getenv("ANTHROPIC_BASE_URL") if claude_code_url: base_url = claude_code_url url_source = "CLAUDE_CODE_URL" elif anthropic_url: base_url = anthropic_url url_source = "ANTHROPIC_BASE_URL" else: base_url = "https://api.anthropic.com" url_source = "default" endpoint = f"{base_url.rstrip('/')}/v1/messages" # 记录使用的配置(只在第一次调用时输出) if not hasattr(anthropic_native_llm_call, '_logged_config'): logger.info(f"[Anthropic Native] Using {key_source}: {api_key[:20]}...") logger.info(f"[Anthropic Native] Using {url_source}: {base_url}") print(f"[Anthropic Native] Using {key_source}: {api_key[:20]}...") print(f"[Anthropic Native] Using {url_source}: {base_url}") anthropic_native_llm_call._logged_config = True anthropic_version = os.getenv("ANTHROPIC_VERSION", "2023-06-01") # 去掉 anthropic/ opneai/ 等命名空间前缀(如果传入的话) if "/" in model: model = model.split("/", 1)[1] # 工具前缀规范化为 toolu messages = _normalize_tool_call_ids(messages, "toolu") # 转换为 Anthropic 格式(这一步也会自动把 Cache Control 拦截写入 payload) system_prompt, anthropic_messages = _to_anthropic_messages(messages) # ── Anthropic 原生 API 严格校验:中间位置的 assistant 消息不能有空 content ── # OpenRouter 对此容忍,但原生 API 会直接 400。因此做净化处理: # 1. 非末尾的 assistant 消息若 content 为空字符串 → 补一个占位符 # 2. content 为空列表 [] → 同样补位 sanitized = [] for i, m in enumerate(anthropic_messages): is_last = (i == len(anthropic_messages) - 1) if m.get("role") == "assistant" and not is_last: c = m.get("content", "") if c == "" or c == [] or c is None: # 跳过完全空的中间 assistant 消息(通常是历史 bug) logger.debug("[Anthropic Native] Dropped empty assistant message at index %d", i) continue sanitized.append(m) anthropic_messages = sanitized payload: Dict[str, Any] = { "model": model, "messages": anthropic_messages, "max_tokens": kwargs.get("max_tokens", 8192), } if system_prompt is not None: payload["system"] = system_prompt if tools: payload["tools"] = _to_anthropic_tools(tools) if "temperature" in kwargs: payload["temperature"] = kwargs["temperature"] # Debug: 检查 cache_control 是否存在 if logger.isEnabledFor(logging.DEBUG): cache_control_count = 0 if isinstance(system_prompt, list): for block in system_prompt: if isinstance(block, dict) and "cache_control" in block: cache_control_count += 1 for msg in anthropic_messages: content = msg.get("content", "") if isinstance(content, list): for block in content: if isinstance(block, dict) and "cache_control" in block: cache_control_count += 1 if cache_control_count > 0: logger.debug(f"[Anthropic Native] 发现 {cache_control_count} 个 cache_control 标记,将被发送到原生端点") headers = { "x-api-key": api_key, "anthropic-version": anthropic_version, "content-type": "application/json", } # 支持外部打标签如 anthropic_beta 字段 if kwargs.get("anthropic_beta"): headers["anthropic-beta"] = kwargs["anthropic_beta"] max_retries = 3 last_exception = None for attempt in range(max_retries): # 将默认 5 分钟超时延迟加长至 15 分钟,防止长文本输出(如 strategy.json 合成)时引发 ReadTimeout async with httpx.AsyncClient(timeout=900.0) as client: try: response = await client.post(endpoint, json=payload, headers=headers) response.raise_for_status() result = response.json() break except httpx.HTTPStatusError as e: status = e.response.status_code error_body = e.response.text if status in (429, 500, 502, 503, 504) and attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning("[Anthropic Native] HTTP %d (attempt %d/%d), retrying in %ds: %s", status, attempt + 1, max_retries, wait, error_body[:200]) await asyncio.sleep(wait) last_exception = e continue logger.error("[Anthropic Native] HTTP %d error body: %s", status, error_body) print(f"[Anthropic Native] API Error {status}: {error_body[:500]}") raise except _RETRYABLE_EXCEPTIONS as e: last_exception = e if attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning("[Anthropic Native] %s (attempt %d/%d), retrying in %ds", type(e).__name__, attempt + 1, max_retries, wait) await asyncio.sleep(wait) continue raise else: raise last_exception # type: ignore[misc] # 解析响应并抽离 Usage parsed = _parse_anthropic_response(result) usage = parsed["usage"] cost = calculate_cost(model, usage) return { "content": parsed["content"], "tool_calls": parsed["tool_calls"], "prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "reasoning_tokens": usage.reasoning_tokens, "cache_creation_tokens": usage.cache_creation_tokens, "cache_read_tokens": usage.cache_read_tokens, "finish_reason": parsed["finish_reason"], "cost": cost, "usage": usage, } def create_claude_llm_call(model: str = "claude-3-5-sonnet-20241022"): """ 创建 Anthropic 原生 LLM 调用函数 Args: model: 模型名称如 "claude-3-5-sonnet-20241022" (可带前缀,将会自动截取) Returns: 异步 LLM 调用函数提供给 AgentRunner """ async def llm_call(messages: List[Dict[str, Any]], model: str = model, tools: Optional[List[Dict]] = None, **kwargs) -> Dict[str, Any]: return await anthropic_native_llm_call(messages, model, tools, **kwargs) return llm_call