| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- """
- Native Anthropic Provider
- 直接使用 Anthropic 原生 API (或完全兼容原生协议的反代如 imds.ai) 调用 Claude 模型。
- 复用 openrouter.py 中的格式化拦截模块(无缝支持 AgentRunner Cache Control)。
- """
- import os
- import asyncio
- import logging
- import httpx
- from typing import List, Dict, Any, Optional
- from .pricing import calculate_cost
- # 直接复用底层已经在 openrouter 内部写好的格式转换/拦截器
- from .openrouter import (
- _normalize_tool_call_ids,
- _to_anthropic_messages,
- _to_anthropic_tools,
- _parse_anthropic_response,
- _RETRYABLE_EXCEPTIONS
- )
- logger = logging.getLogger(__name__)
- async def anthropic_native_llm_call(
- messages: List[Dict[str, Any]],
- model: str = "claude-3-5-sonnet-20241022",
- tools: Optional[List[Dict]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- 原生 Anthropic API 调用函数
- 支持 CLAUDE_CODE_KEY/CLAUDE_CODE_URL 或 ANTHROPIC_API_KEY/ANTHROPIC_BASE_URL
- """
- # 优先使用 CLAUDE_CODE_KEY,如果不存在则使用 ANTHROPIC_API_KEY
- claude_code_key = os.getenv("CLAUDE_CODE_KEY")
- anthropic_key = os.getenv("ANTHROPIC_API_KEY")
- if claude_code_key:
- api_key = claude_code_key
- key_source = "CLAUDE_CODE_KEY"
- elif anthropic_key:
- api_key = anthropic_key
- key_source = "ANTHROPIC_API_KEY"
- else:
- raise ValueError("CLAUDE_CODE_KEY or ANTHROPIC_API_KEY environment variable not set")
- # 优先使用 CLAUDE_CODE_URL,如果不存在则使用 ANTHROPIC_BASE_URL
- claude_code_url = os.getenv("CLAUDE_CODE_URL")
- anthropic_url = os.getenv("ANTHROPIC_BASE_URL")
- if claude_code_url:
- base_url = claude_code_url
- url_source = "CLAUDE_CODE_URL"
- elif anthropic_url:
- base_url = anthropic_url
- url_source = "ANTHROPIC_BASE_URL"
- else:
- base_url = "https://api.anthropic.com"
- url_source = "default"
- endpoint = f"{base_url.rstrip('/')}/v1/messages"
- # 记录使用的配置(只在第一次调用时输出)
- if not hasattr(anthropic_native_llm_call, '_logged_config'):
- logger.info(f"[Anthropic Native] Using {key_source}: {api_key[:20]}...")
- logger.info(f"[Anthropic Native] Using {url_source}: {base_url}")
- print(f"[Anthropic Native] Using {key_source}: {api_key[:20]}...")
- print(f"[Anthropic Native] Using {url_source}: {base_url}")
- anthropic_native_llm_call._logged_config = True
- anthropic_version = os.getenv("ANTHROPIC_VERSION", "2023-06-01")
- # 去掉 anthropic/ opneai/ 等命名空间前缀(如果传入的话)
- if "/" in model:
- model = model.split("/", 1)[1]
- # 工具前缀规范化为 toolu
- messages = _normalize_tool_call_ids(messages, "toolu")
-
- # 转换为 Anthropic 格式(这一步也会自动把 Cache Control 拦截写入 payload)
- system_prompt, anthropic_messages = _to_anthropic_messages(messages)
-
- # ── Anthropic 原生 API 严格校验:中间位置的 assistant 消息不能有空 content ──
- # OpenRouter 对此容忍,但原生 API 会直接 400。因此做净化处理:
- # 1. 非末尾的 assistant 消息若 content 为空字符串 → 补一个占位符
- # 2. content 为空列表 [] → 同样补位
- sanitized = []
- for i, m in enumerate(anthropic_messages):
- is_last = (i == len(anthropic_messages) - 1)
- if m.get("role") == "assistant" and not is_last:
- c = m.get("content", "")
- if c == "" or c == [] or c is None:
- # 跳过完全空的中间 assistant 消息(通常是历史 bug)
- logger.debug("[Anthropic Native] Dropped empty assistant message at index %d", i)
- continue
- sanitized.append(m)
- anthropic_messages = sanitized
- payload: Dict[str, Any] = {
- "model": model,
- "messages": anthropic_messages,
- "max_tokens": kwargs.get("max_tokens", 8192),
- }
-
- if system_prompt is not None:
- payload["system"] = system_prompt
- if tools:
- payload["tools"] = _to_anthropic_tools(tools)
- if "temperature" in kwargs:
- payload["temperature"] = kwargs["temperature"]
- # Debug: 检查 cache_control 是否存在
- if logger.isEnabledFor(logging.DEBUG):
- cache_control_count = 0
- if isinstance(system_prompt, list):
- for block in system_prompt:
- if isinstance(block, dict) and "cache_control" in block:
- cache_control_count += 1
- for msg in anthropic_messages:
- content = msg.get("content", "")
- if isinstance(content, list):
- for block in content:
- if isinstance(block, dict) and "cache_control" in block:
- cache_control_count += 1
- if cache_control_count > 0:
- logger.debug(f"[Anthropic Native] 发现 {cache_control_count} 个 cache_control 标记,将被发送到原生端点")
- headers = {
- "x-api-key": api_key,
- "anthropic-version": anthropic_version,
- "content-type": "application/json",
- }
-
- # 支持外部打标签如 anthropic_beta 字段
- if kwargs.get("anthropic_beta"):
- headers["anthropic-beta"] = kwargs["anthropic_beta"]
- max_retries = 3
- last_exception = None
-
- for attempt in range(max_retries):
- # 将默认 5 分钟超时延迟加长至 15 分钟,防止长文本输出(如 strategy.json 合成)时引发 ReadTimeout
- async with httpx.AsyncClient(timeout=900.0) as client:
- try:
- response = await client.post(endpoint, json=payload, headers=headers)
- response.raise_for_status()
- result = response.json()
- break
-
- except httpx.HTTPStatusError as e:
- status = e.response.status_code
- error_body = e.response.text
- if status in (429, 500, 502, 503, 504) and attempt < max_retries - 1:
- wait = 2 ** attempt * 2
- logger.warning("[Anthropic Native] HTTP %d (attempt %d/%d), retrying in %ds: %s", status, attempt + 1, max_retries, wait, error_body[:200])
- await asyncio.sleep(wait)
- last_exception = e
- continue
- logger.error("[Anthropic Native] HTTP %d error body: %s", status, error_body)
- print(f"[Anthropic Native] API Error {status}: {error_body[:500]}")
- raise
-
- except _RETRYABLE_EXCEPTIONS as e:
- last_exception = e
- if attempt < max_retries - 1:
- wait = 2 ** attempt * 2
- logger.warning("[Anthropic Native] %s (attempt %d/%d), retrying in %ds", type(e).__name__, attempt + 1, max_retries, wait)
- await asyncio.sleep(wait)
- continue
- raise
- else:
- raise last_exception # type: ignore[misc]
- # 解析响应并抽离 Usage
- parsed = _parse_anthropic_response(result)
- usage = parsed["usage"]
- cost = calculate_cost(model, usage)
- return {
- "content": parsed["content"],
- "tool_calls": parsed["tool_calls"],
- "prompt_tokens": usage.input_tokens,
- "completion_tokens": usage.output_tokens,
- "reasoning_tokens": usage.reasoning_tokens,
- "cache_creation_tokens": usage.cache_creation_tokens,
- "cache_read_tokens": usage.cache_read_tokens,
- "finish_reason": parsed["finish_reason"],
- "cost": cost,
- "usage": usage,
- }
- def create_claude_llm_call(model: str = "claude-3-5-sonnet-20241022"):
- """
- 创建 Anthropic 原生 LLM 调用函数
-
- Args:
- model: 模型名称如 "claude-3-5-sonnet-20241022"
- (可带前缀,将会自动截取)
- Returns:
- 异步 LLM 调用函数提供给 AgentRunner
- """
- async def llm_call(messages: List[Dict[str, Any]], model: str = model, tools: Optional[List[Dict]] = None, **kwargs) -> Dict[str, Any]:
- return await anthropic_native_llm_call(messages, model, tools, **kwargs)
- return llm_call
|