""" Claude Code Provider (Anthropic Direct) 使用 Anthropic 官方 API 调用 Claude 模型 使用 Anthropic Messages API 格式(/v1/messages) 环境变量: - ANTHROPIC_BASE_URL: API 基础地址(如 https://api.anthropic.com) - ANTHROPIC_AUTH_TOKEN: API 密钥 注意: - 使用 Anthropic 原生 Messages API 格式 - 响应格式转换为框架统一的 OpenAI 兼容格式 """ import os import json import asyncio import logging import httpx from typing import List, Dict, Any, Optional from .usage import TokenUsage from .pricing import calculate_cost logger = logging.getLogger(__name__) # 可重试的异常类型 _RETRYABLE_EXCEPTIONS = ( httpx.RemoteProtocolError, httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.ConnectTimeout, httpx.PoolTimeout, ConnectionError, ) # 模糊匹配规则:(关键词, 目标模型名),从精确到宽泛排序 # 精确匹配走 MODEL_EXACT,不命中则按顺序尝试关键词匹配 MODEL_EXACT = { "claude-sonnet-4-6": "claude-sonnet-4-6", "claude-sonnet-4.6": "claude-sonnet-4-6", "claude-sonnet-4-5-20250929": "claude-sonnet-4-5-20250929", "claude-sonnet-4-5": "claude-sonnet-4-5-20250929", "claude-sonnet-4.5": "claude-sonnet-4-5-20250929", "claude-opus-4-6": "claude-opus-4-6", "claude-opus-4-5-20251101": "claude-opus-4-5-20251101", "claude-opus-4-5": "claude-opus-4-5-20251101", "claude-opus-4-1-20250805": "claude-opus-4-1-20250805", "claude-opus-4-1": "claude-opus-4-1-20250805", "claude-haiku-4-5-20251001": "claude-haiku-4-5-20251001", "claude-haiku-4-5": "claude-haiku-4-5-20251001", } MODEL_FUZZY = [ # 版本+家族(精确) ("sonnet-4-6", "claude-sonnet-4-6"), ("sonnet-4.6", "claude-sonnet-4-6"), ("sonnet-4-5", "claude-sonnet-4-5-20250929"), ("sonnet-4.5", "claude-sonnet-4-5-20250929"), ("opus-4-6", "claude-opus-4-6"), ("opus-4.6", "claude-opus-4-6"), ("opus-4-5", "claude-opus-4-5-20251101"), ("opus-4.5", "claude-opus-4-5-20251101"), ("opus-4-1", "claude-opus-4-1-20250805"), ("opus-4.1", "claude-opus-4-1-20250805"), ("haiku-4-5", "claude-haiku-4-5-20251001"), ("haiku-4.5", "claude-haiku-4-5-20251001"), # 仅家族名 → 最新版本 ("sonnet", "claude-sonnet-4-6"), ("opus", "claude-opus-4-6"), ("haiku", "claude-haiku-4-5-20251001"), ] def _resolve_model(model: str) -> str: """将任意格式的模型名映射为 Anthropic API 接受的模型名。 支持:OpenRouter 前缀(anthropic/xxx)、带点号(4.5)、纯家族名(sonnet)等。 """ # 1. 剥离 provider 前缀 if "/" in model: model = model.split("/", 1)[1] # 2. 精确匹配 if model in MODEL_EXACT: return MODEL_EXACT[model] # 3. 模糊匹配(大小写不敏感) model_lower = model.lower() for keyword, target in MODEL_FUZZY: if keyword in model_lower: logger.info("模型名模糊匹配: %s → %s", model, target) return target # 4. 兜底:原样返回,让 API 报错 logger.warning("未能匹配模型名: %s, 原样传递", model) return model def _normalize_tool_call_ids(messages: List[Dict[str, Any]], target_prefix: str) -> List[Dict[str, Any]]: """ 将消息历史中的 tool_call_id 统一重写为目标 Provider 的格式。 跨 Provider 续跑时,历史中的 tool_call_id 可能不兼容目标 API (如 Anthropic 的 toolu_xxx 发给 OpenAI,或 OpenAI 的 call_xxx 发给 Anthropic)。 仅在检测到异格式 ID 时才重写,同格式直接跳过。 """ # 第一遍:收集需要重写的 ID id_map: Dict[str, str] = {} counter = 0 for msg in messages: if msg.get("role") == "assistant" and msg.get("tool_calls"): for tc in msg["tool_calls"]: old_id = tc.get("id", "") if old_id and not old_id.startswith(target_prefix + "_"): if old_id not in id_map: id_map[old_id] = f"{target_prefix}_{counter:06x}" counter += 1 if not id_map: return messages # 无需重写 logger.info("重写 %d 个 tool_call_id (target_prefix=%s)", len(id_map), target_prefix) # 第二遍:重写(浅拷贝避免修改原始数据) result = [] for msg in messages: if msg.get("role") == "assistant" and msg.get("tool_calls"): new_tcs = [] for tc in msg["tool_calls"]: old_id = tc.get("id", "") if old_id in id_map: new_tcs.append({**tc, "id": id_map[old_id]}) else: new_tcs.append(tc) result.append({**msg, "tool_calls": new_tcs}) elif msg.get("role") == "tool" and msg.get("tool_call_id") in id_map: result.append({**msg, "tool_call_id": id_map[msg["tool_call_id"]]}) else: result.append(msg) return result def _convert_content_to_anthropic(content: Any) -> Any: """ 将 OpenAI 格式的 content(字符串或列表)转换为 Anthropic 格式。 主要处理 image_url 类型块 → Anthropic image 块。 """ if not isinstance(content, list): return content result = [] for block in content: if not isinstance(block, dict): result.append(block) continue block_type = block.get("type", "") if block_type == "image_url": image_url_obj = block.get("image_url", {}) url = image_url_obj.get("url", "") if isinstance(image_url_obj, dict) else str(image_url_obj) if url.startswith("data:"): # base64 编码图片:data:;base64, header, _, data = url.partition(",") media_type = header.split(":")[1].split(";")[0] if ":" in header else "image/png" result.append({ "type": "image", "source": { "type": "base64", "media_type": media_type, "data": data, }, }) else: result.append({ "type": "image", "source": { "type": "url", "url": url, }, }) else: result.append(block) return result def _convert_messages_to_anthropic(messages: List[Dict[str, Any]]) -> tuple: """ 将 OpenAI 格式消息转换为 Anthropic Messages API 格式 Returns: (system_prompt, anthropic_messages) """ system_prompt = None anthropic_messages = [] for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": # Anthropic 把 system 消息放在顶层参数中 system_prompt = content elif role == "user": anthropic_messages.append({"role": "user", "content": _convert_content_to_anthropic(content)}) elif role == "assistant": assistant_msg = {"role": "assistant"} # 处理 tool_calls(assistant 发起工具调用) tool_calls = msg.get("tool_calls") if tool_calls: content_blocks = [] if content: # content 可能已被 _add_cache_control 转成 list(含 cache_control), # 也可能是普通字符串。两者都需要正确处理,避免产生 {"type":"text","text":[...]} converted = _convert_content_to_anthropic(content) if isinstance(converted, list): content_blocks.extend(converted) elif isinstance(converted, str) and converted.strip(): content_blocks.append({"type": "text", "text": converted}) for tc in tool_calls: func = tc.get("function", {}) args_str = func.get("arguments", "{}") try: args = json.loads(args_str) if isinstance(args_str, str) else args_str except json.JSONDecodeError: args = {} content_blocks.append({ "type": "tool_use", "id": tc.get("id", ""), "name": func.get("name", ""), "input": args, }) assistant_msg["content"] = content_blocks else: assistant_msg["content"] = content anthropic_messages.append(assistant_msg) elif role == "tool": # OpenAI tool 结果 -> Anthropic tool_result # Anthropic 要求同一个 assistant 的所有 tool_results 合并到一个 user message 中 tool_result_block = { "type": "tool_result", "tool_use_id": msg.get("tool_call_id", ""), "content": _convert_content_to_anthropic(content), } # 如果上一条已经是 tool_result user message,合并进去 if (anthropic_messages and anthropic_messages[-1].get("role") == "user" and isinstance(anthropic_messages[-1].get("content"), list) and anthropic_messages[-1]["content"] and anthropic_messages[-1]["content"][0].get("type") == "tool_result"): anthropic_messages[-1]["content"].append(tool_result_block) else: anthropic_messages.append({ "role": "user", "content": [tool_result_block], }) return system_prompt, anthropic_messages def _convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]: """将 OpenAI 工具定义转换为 Anthropic 格式""" anthropic_tools = [] for tool in tools: if tool.get("type") == "function": func = tool["function"] anthropic_tools.append({ "name": func.get("name", ""), "description": func.get("description", ""), "input_schema": func.get("parameters", {"type": "object", "properties": {}}), }) return anthropic_tools def _parse_anthropic_response(result: Dict[str, Any]) -> Dict[str, Any]: """ 将 Anthropic Messages API 响应转换为框架统一格式 Anthropic 响应格式: { "id": "msg_...", "type": "message", "role": "assistant", "content": [{"type": "text", "text": "..."}, {"type": "tool_use", ...}], "usage": {"input_tokens": ..., "output_tokens": ...}, "stop_reason": "end_turn" | "tool_use" | "max_tokens" } """ content_blocks = result.get("content", []) # 提取文本内容 text_parts = [] tool_calls = [] for block in content_blocks: if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_use": # 转换为 OpenAI tool_calls 格式 tool_calls.append({ "id": block.get("id", ""), "type": "function", "function": { "name": block.get("name", ""), "arguments": json.dumps(block.get("input", {}), ensure_ascii=False), }, }) content = "\n".join(text_parts) # 映射 stop_reason stop_reason = result.get("stop_reason", "end_turn") finish_reason_map = { "end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop", } finish_reason = finish_reason_map.get(stop_reason, stop_reason) # 提取 usage(Anthropic 原生格式) raw_usage = result.get("usage", {}) usage = TokenUsage( input_tokens=raw_usage.get("input_tokens", 0), output_tokens=raw_usage.get("output_tokens", 0), cache_creation_tokens=raw_usage.get("cache_creation_input_tokens", 0), cache_read_tokens=raw_usage.get("cache_read_input_tokens", 0), ) return { "content": content, "tool_calls": tool_calls if tool_calls else None, "finish_reason": finish_reason, "usage": usage, } async def claude_code_llm_call( messages: List[Dict[str, Any]], model: str = "claude-sonnet-4.5", tools: Optional[List[Dict]] = None, **kwargs ) -> Dict[str, Any]: """ Claude Code (Anthropic) LLM 调用函数 Args: messages: OpenAI 格式消息列表 model: 模型名称(如 "claude-sonnet-4.5") tools: OpenAI 格式工具定义 **kwargs: 其他参数(temperature, max_tokens 等) Returns: 统一格式的响应字典 """ # base_url = os.getenv("YESCODE_BASE_URL") # api_key = os.getenv("YESCODE_API_KEY") base_url = os.getenv("ANTHROPIC_BASE_URL") api_key = os.getenv("ANTHROPIC_AUTH_TOKEN") if not base_url: raise ValueError("ANTHROPIC_BASE_URL environment variable not set") if not api_key: raise ValueError("ANTHROPIC_AUTH_TOKEN environment variable not set") base_url = base_url.rstrip("/") endpoint = f"{base_url}/v1/messages" # 解析模型名 api_model = _resolve_model(model) # 跨 Provider 续跑时,重写不兼容的 tool_call_id messages = _normalize_tool_call_ids(messages, "toolu") # 转换消息格式 system_prompt, anthropic_messages = _convert_messages_to_anthropic(messages) # 构建 Anthropic 格式请求 payload = { "model": api_model, "messages": anthropic_messages, "max_tokens": kwargs.get("max_tokens", 16384), } if system_prompt: payload["system"] = system_prompt if tools: payload["tools"] = _convert_tools_to_anthropic(tools) if "temperature" in kwargs: payload["temperature"] = kwargs["temperature"] headers = { "x-api-key": api_key, "content-type": "application/json", "anthropic-version": "2023-06-01", "user-agent": "claude-code/1.0.0", } # 调用 API(带重试) max_retries = 5 last_exception = None for attempt in range(max_retries): async with httpx.AsyncClient(timeout=300.0) as client: try: response = await client.post(endpoint, json=payload, headers=headers) response.raise_for_status() result = response.json() break except httpx.HTTPStatusError as e: error_body = e.response.text status = e.response.status_code if status in (429, 500, 502, 503, 504, 524, 529) and attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning( "[Claude Code] HTTP %d (attempt %d/%d), retrying in %ds: %s", status, attempt + 1, max_retries, wait, error_body[:200], ) await asyncio.sleep(wait) last_exception = e continue logger.error("[Claude Code] Error %d: %s", status, error_body) print(f"[Claude Code] API Error {status}: {error_body[:500]}") raise except _RETRYABLE_EXCEPTIONS as e: last_exception = e if attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning( "[Claude Code] %s (attempt %d/%d), retrying in %ds", type(e).__name__, attempt + 1, max_retries, wait, ) await asyncio.sleep(wait) continue logger.error("[Claude Code] Request failed after %d attempts: %s", max_retries, e) raise except Exception as e: logger.error("[Claude Code] Request failed: %s", e) raise else: raise last_exception # type: ignore[misc] # 解析 Anthropic 响应并转换为统一格式 parsed = _parse_anthropic_response(result) usage = parsed["usage"] # 计算费用 cost = calculate_cost(model, usage) return { "content": parsed["content"], "tool_calls": parsed["tool_calls"], "prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "reasoning_tokens": usage.reasoning_tokens, "cache_creation_tokens": usage.cache_creation_tokens, "cache_read_tokens": usage.cache_read_tokens, "finish_reason": parsed["finish_reason"], "cost": cost, "usage": usage, } def create_claude_code_llm_call( model: str = "claude-sonnet-4.5" ): """ 创建 Claude Code (Anthropic) LLM 调用函数 Args: model: 模型名称 - "claude-sonnet-4.5" Returns: 异步 LLM 调用函数 """ async def llm_call( messages: List[Dict[str, Any]], model: str = model, tools: Optional[List[Dict]] = None, **kwargs ) -> Dict[str, Any]: return await claude_code_llm_call(messages, model, tools, **kwargs) return llm_call