| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489 |
- """
- Claude Code Provider (Anthropic Direct)
- 使用 Anthropic 官方 API 调用 Claude 模型
- 使用 Anthropic Messages API 格式(/v1/messages)
- 环境变量:
- - ANTHROPIC_BASE_URL: API 基础地址(如 https://api.anthropic.com)
- - ANTHROPIC_AUTH_TOKEN: API 密钥
- 注意:
- - 使用 Anthropic 原生 Messages API 格式
- - 响应格式转换为框架统一的 OpenAI 兼容格式
- """
- import os
- import json
- import asyncio
- import logging
- import httpx
- from typing import List, Dict, Any, Optional
- from .usage import TokenUsage
- from .pricing import calculate_cost
- logger = logging.getLogger(__name__)
- # 可重试的异常类型
- _RETRYABLE_EXCEPTIONS = (
- httpx.RemoteProtocolError,
- httpx.ConnectError,
- httpx.ReadTimeout,
- httpx.WriteTimeout,
- httpx.ConnectTimeout,
- httpx.PoolTimeout,
- ConnectionError,
- )
- # 模糊匹配规则:(关键词, 目标模型名),从精确到宽泛排序
- # 精确匹配走 MODEL_EXACT,不命中则按顺序尝试关键词匹配
- MODEL_EXACT = {
- "claude-sonnet-4-6": "claude-sonnet-4-6",
- "claude-sonnet-4.6": "claude-sonnet-4-6",
- "claude-sonnet-4-5-20250929": "claude-sonnet-4-5-20250929",
- "claude-sonnet-4-5": "claude-sonnet-4-5-20250929",
- "claude-sonnet-4.5": "claude-sonnet-4-5-20250929",
- "claude-opus-4-6": "claude-opus-4-6",
- "claude-opus-4-5-20251101": "claude-opus-4-5-20251101",
- "claude-opus-4-5": "claude-opus-4-5-20251101",
- "claude-opus-4-1-20250805": "claude-opus-4-1-20250805",
- "claude-opus-4-1": "claude-opus-4-1-20250805",
- "claude-haiku-4-5-20251001": "claude-haiku-4-5-20251001",
- "claude-haiku-4-5": "claude-haiku-4-5-20251001",
- }
- MODEL_FUZZY = [
- # 版本+家族(精确)
- ("sonnet-4-6", "claude-sonnet-4-6"),
- ("sonnet-4.6", "claude-sonnet-4-6"),
- ("sonnet-4-5", "claude-sonnet-4-5-20250929"),
- ("sonnet-4.5", "claude-sonnet-4-5-20250929"),
- ("opus-4-6", "claude-opus-4-6"),
- ("opus-4.6", "claude-opus-4-6"),
- ("opus-4-5", "claude-opus-4-5-20251101"),
- ("opus-4.5", "claude-opus-4-5-20251101"),
- ("opus-4-1", "claude-opus-4-1-20250805"),
- ("opus-4.1", "claude-opus-4-1-20250805"),
- ("haiku-4-5", "claude-haiku-4-5-20251001"),
- ("haiku-4.5", "claude-haiku-4-5-20251001"),
- # 仅家族名 → 最新版本
- ("sonnet", "claude-sonnet-4-6"),
- ("opus", "claude-opus-4-6"),
- ("haiku", "claude-haiku-4-5-20251001"),
- ]
- def _resolve_model(model: str) -> str:
- """将任意格式的模型名映射为 Anthropic API 接受的模型名。
- 支持:OpenRouter 前缀(anthropic/xxx)、带点号(4.5)、纯家族名(sonnet)等。
- """
- # 1. 剥离 provider 前缀
- if "/" in model:
- model = model.split("/", 1)[1]
- # 2. 精确匹配
- if model in MODEL_EXACT:
- return MODEL_EXACT[model]
- # 3. 模糊匹配(大小写不敏感)
- model_lower = model.lower()
- for keyword, target in MODEL_FUZZY:
- if keyword in model_lower:
- logger.info("模型名模糊匹配: %s → %s", model, target)
- return target
- # 4. 兜底:原样返回,让 API 报错
- logger.warning("未能匹配模型名: %s, 原样传递", model)
- return model
- def _normalize_tool_call_ids(messages: List[Dict[str, Any]], target_prefix: str) -> List[Dict[str, Any]]:
- """
- 将消息历史中的 tool_call_id 统一重写为目标 Provider 的格式。
- 跨 Provider 续跑时,历史中的 tool_call_id 可能不兼容目标 API
- (如 Anthropic 的 toolu_xxx 发给 OpenAI,或 OpenAI 的 call_xxx 发给 Anthropic)。
- 仅在检测到异格式 ID 时才重写,同格式直接跳过。
- """
- # 第一遍:收集需要重写的 ID
- id_map: Dict[str, str] = {}
- counter = 0
- for msg in messages:
- if msg.get("role") == "assistant" and msg.get("tool_calls"):
- for tc in msg["tool_calls"]:
- old_id = tc.get("id", "")
- if old_id and not old_id.startswith(target_prefix + "_"):
- if old_id not in id_map:
- id_map[old_id] = f"{target_prefix}_{counter:06x}"
- counter += 1
- if not id_map:
- return messages # 无需重写
- logger.info("重写 %d 个 tool_call_id (target_prefix=%s)", len(id_map), target_prefix)
- # 第二遍:重写(浅拷贝避免修改原始数据)
- result = []
- for msg in messages:
- if msg.get("role") == "assistant" and msg.get("tool_calls"):
- new_tcs = []
- for tc in msg["tool_calls"]:
- old_id = tc.get("id", "")
- if old_id in id_map:
- new_tcs.append({**tc, "id": id_map[old_id]})
- else:
- new_tcs.append(tc)
- result.append({**msg, "tool_calls": new_tcs})
- elif msg.get("role") == "tool" and msg.get("tool_call_id") in id_map:
- result.append({**msg, "tool_call_id": id_map[msg["tool_call_id"]]})
- else:
- result.append(msg)
- return result
- def _convert_content_to_anthropic(content: Any) -> Any:
- """
- 将 OpenAI 格式的 content(字符串或列表)转换为 Anthropic 格式。
- 主要处理 image_url 类型块 → Anthropic image 块。
- """
- if not isinstance(content, list):
- return content
- result = []
- for block in content:
- if not isinstance(block, dict):
- result.append(block)
- continue
- block_type = block.get("type", "")
- if block_type == "image_url":
- image_url_obj = block.get("image_url", {})
- url = image_url_obj.get("url", "") if isinstance(image_url_obj, dict) else str(image_url_obj)
- if url.startswith("data:"):
- # base64 编码图片:data:<media_type>;base64,<data>
- header, _, data = url.partition(",")
- media_type = header.split(":")[1].split(";")[0] if ":" in header else "image/png"
- result.append({
- "type": "image",
- "source": {
- "type": "base64",
- "media_type": media_type,
- "data": data,
- },
- })
- else:
- result.append({
- "type": "image",
- "source": {
- "type": "url",
- "url": url,
- },
- })
- else:
- result.append(block)
- return result
- def _convert_messages_to_anthropic(messages: List[Dict[str, Any]]) -> tuple:
- """
- 将 OpenAI 格式消息转换为 Anthropic Messages API 格式
- Returns:
- (system_prompt, anthropic_messages)
- """
- system_prompt = None
- anthropic_messages = []
- for msg in messages:
- role = msg.get("role", "")
- content = msg.get("content", "")
- if role == "system":
- # Anthropic 把 system 消息放在顶层参数中
- system_prompt = content
- elif role == "user":
- anthropic_messages.append({"role": "user", "content": _convert_content_to_anthropic(content)})
- elif role == "assistant":
- assistant_msg = {"role": "assistant"}
- # 处理 tool_calls(assistant 发起工具调用)
- tool_calls = msg.get("tool_calls")
- if tool_calls:
- content_blocks = []
- if content:
- # content 可能已被 _add_cache_control 转成 list(含 cache_control),
- # 也可能是普通字符串。两者都需要正确处理,避免产生 {"type":"text","text":[...]}
- converted = _convert_content_to_anthropic(content)
- if isinstance(converted, list):
- content_blocks.extend(converted)
- elif isinstance(converted, str) and converted.strip():
- content_blocks.append({"type": "text", "text": converted})
- for tc in tool_calls:
- func = tc.get("function", {})
- args_str = func.get("arguments", "{}")
- try:
- args = json.loads(args_str) if isinstance(args_str, str) else args_str
- except json.JSONDecodeError:
- args = {}
- content_blocks.append({
- "type": "tool_use",
- "id": tc.get("id", ""),
- "name": func.get("name", ""),
- "input": args,
- })
- assistant_msg["content"] = content_blocks
- else:
- assistant_msg["content"] = content
- anthropic_messages.append(assistant_msg)
- elif role == "tool":
- # OpenAI tool 结果 -> Anthropic tool_result
- # Anthropic 要求同一个 assistant 的所有 tool_results 合并到一个 user message 中
- tool_result_block = {
- "type": "tool_result",
- "tool_use_id": msg.get("tool_call_id", ""),
- "content": _convert_content_to_anthropic(content),
- }
- # 如果上一条已经是 tool_result user message,合并进去
- if (anthropic_messages
- and anthropic_messages[-1].get("role") == "user"
- and isinstance(anthropic_messages[-1].get("content"), list)
- and anthropic_messages[-1]["content"]
- and anthropic_messages[-1]["content"][0].get("type") == "tool_result"):
- anthropic_messages[-1]["content"].append(tool_result_block)
- else:
- anthropic_messages.append({
- "role": "user",
- "content": [tool_result_block],
- })
- return system_prompt, anthropic_messages
- def _convert_tools_to_anthropic(tools: List[Dict]) -> List[Dict]:
- """将 OpenAI 工具定义转换为 Anthropic 格式"""
- anthropic_tools = []
- for tool in tools:
- if tool.get("type") == "function":
- func = tool["function"]
- anthropic_tools.append({
- "name": func.get("name", ""),
- "description": func.get("description", ""),
- "input_schema": func.get("parameters", {"type": "object", "properties": {}}),
- })
- return anthropic_tools
- def _parse_anthropic_response(result: Dict[str, Any]) -> Dict[str, Any]:
- """
- 将 Anthropic Messages API 响应转换为框架统一格式
- Anthropic 响应格式:
- {
- "id": "msg_...",
- "type": "message",
- "role": "assistant",
- "content": [{"type": "text", "text": "..."}, {"type": "tool_use", ...}],
- "usage": {"input_tokens": ..., "output_tokens": ...},
- "stop_reason": "end_turn" | "tool_use" | "max_tokens"
- }
- """
- content_blocks = result.get("content", [])
- # 提取文本内容
- text_parts = []
- tool_calls = []
- for block in content_blocks:
- if block.get("type") == "text":
- text_parts.append(block.get("text", ""))
- elif block.get("type") == "tool_use":
- # 转换为 OpenAI tool_calls 格式
- tool_calls.append({
- "id": block.get("id", ""),
- "type": "function",
- "function": {
- "name": block.get("name", ""),
- "arguments": json.dumps(block.get("input", {}), ensure_ascii=False),
- },
- })
- content = "\n".join(text_parts)
- # 映射 stop_reason
- stop_reason = result.get("stop_reason", "end_turn")
- finish_reason_map = {
- "end_turn": "stop",
- "tool_use": "tool_calls",
- "max_tokens": "length",
- "stop_sequence": "stop",
- }
- finish_reason = finish_reason_map.get(stop_reason, stop_reason)
- # 提取 usage(Anthropic 原生格式)
- raw_usage = result.get("usage", {})
- usage = TokenUsage(
- input_tokens=raw_usage.get("input_tokens", 0),
- output_tokens=raw_usage.get("output_tokens", 0),
- cache_creation_tokens=raw_usage.get("cache_creation_input_tokens", 0),
- cache_read_tokens=raw_usage.get("cache_read_input_tokens", 0),
- )
- return {
- "content": content,
- "tool_calls": tool_calls if tool_calls else None,
- "finish_reason": finish_reason,
- "usage": usage,
- }
- async def claude_code_llm_call(
- messages: List[Dict[str, Any]],
- model: str = "claude-sonnet-4.5",
- tools: Optional[List[Dict]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- Claude Code (Anthropic) LLM 调用函数
- Args:
- messages: OpenAI 格式消息列表
- model: 模型名称(如 "claude-sonnet-4.5")
- tools: OpenAI 格式工具定义
- **kwargs: 其他参数(temperature, max_tokens 等)
- Returns:
- 统一格式的响应字典
- """
- # base_url = os.getenv("YESCODE_BASE_URL")
- # api_key = os.getenv("YESCODE_API_KEY")
- base_url = os.getenv("ANTHROPIC_BASE_URL")
- api_key = os.getenv("ANTHROPIC_AUTH_TOKEN")
- if not base_url:
- raise ValueError("ANTHROPIC_BASE_URL environment variable not set")
- if not api_key:
- raise ValueError("ANTHROPIC_AUTH_TOKEN environment variable not set")
- base_url = base_url.rstrip("/")
- endpoint = f"{base_url}/v1/messages"
- # 解析模型名
- api_model = _resolve_model(model)
- # 跨 Provider 续跑时,重写不兼容的 tool_call_id
- messages = _normalize_tool_call_ids(messages, "toolu")
- # 转换消息格式
- system_prompt, anthropic_messages = _convert_messages_to_anthropic(messages)
- # 构建 Anthropic 格式请求
- payload = {
- "model": api_model,
- "messages": anthropic_messages,
- "max_tokens": kwargs.get("max_tokens", 16384),
- }
- if system_prompt:
- payload["system"] = system_prompt
- if tools:
- payload["tools"] = _convert_tools_to_anthropic(tools)
- if "temperature" in kwargs:
- payload["temperature"] = kwargs["temperature"]
- headers = {
- "x-api-key": api_key,
- "content-type": "application/json",
- "anthropic-version": "2023-06-01",
- "user-agent": "claude-code/1.0.0",
- }
- # 调用 API(带重试)
- max_retries = 5
- last_exception = None
- for attempt in range(max_retries):
- async with httpx.AsyncClient(timeout=300.0) as client:
- try:
- response = await client.post(endpoint, json=payload, headers=headers)
- response.raise_for_status()
- result = response.json()
- break
- except httpx.HTTPStatusError as e:
- error_body = e.response.text
- status = e.response.status_code
- if status in (429, 500, 502, 503, 504, 524, 529) and attempt < max_retries - 1:
- wait = 2 ** attempt * 2
- logger.warning(
- "[Claude Code] HTTP %d (attempt %d/%d), retrying in %ds: %s",
- status, attempt + 1, max_retries, wait, error_body[:200],
- )
- await asyncio.sleep(wait)
- last_exception = e
- continue
- logger.error("[Claude Code] Error %d: %s", status, error_body)
- print(f"[Claude Code] API Error {status}: {error_body[:500]}")
- raise
- except _RETRYABLE_EXCEPTIONS as e:
- last_exception = e
- if attempt < max_retries - 1:
- wait = 2 ** attempt * 2
- logger.warning(
- "[Claude Code] %s (attempt %d/%d), retrying in %ds",
- type(e).__name__, attempt + 1, max_retries, wait,
- )
- await asyncio.sleep(wait)
- continue
- logger.error("[Claude Code] Request failed after %d attempts: %s", max_retries, e)
- raise
- except Exception as e:
- logger.error("[Claude Code] Request failed: %s", e)
- raise
- else:
- raise last_exception # type: ignore[misc]
- # 解析 Anthropic 响应并转换为统一格式
- parsed = _parse_anthropic_response(result)
- usage = parsed["usage"]
- # 计算费用
- cost = calculate_cost(model, usage)
- return {
- "content": parsed["content"],
- "tool_calls": parsed["tool_calls"],
- "prompt_tokens": usage.input_tokens,
- "completion_tokens": usage.output_tokens,
- "reasoning_tokens": usage.reasoning_tokens,
- "cache_creation_tokens": usage.cache_creation_tokens,
- "cache_read_tokens": usage.cache_read_tokens,
- "finish_reason": parsed["finish_reason"],
- "cost": cost,
- "usage": usage,
- }
- def create_claude_code_llm_call(
- model: str = "claude-sonnet-4.5"
- ):
- """
- 创建 Claude Code (Anthropic) LLM 调用函数
- Args:
- model: 模型名称
- - "claude-sonnet-4.5"
- Returns:
- 异步 LLM 调用函数
- """
- async def llm_call(
- messages: List[Dict[str, Any]],
- model: str = model,
- tools: Optional[List[Dict]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- return await claude_code_llm_call(messages, model, tools, **kwargs)
- return llm_call
|