""" OpenRouter Provider 使用 OpenRouter API 调用各种模型(包括 Claude Sonnet 4.5) 路由策略: - Claude 模型:走 OpenRouter 的 Anthropic 原生端点(/api/v1/messages), 使用自包含的格式转换逻辑,确保多模态工具结果(截图等)正确传递。 - 其他模型:走 OpenAI 兼容端点(/api/v1/chat/completions)。 OpenRouter 转发多种模型,需要根据实际模型处理不同的 usage 格式: - OpenAI 模型: prompt_tokens, completion_tokens, completion_tokens_details.reasoning_tokens - Claude 模型: input_tokens, output_tokens, cache_creation_input_tokens, cache_read_input_tokens - DeepSeek 模型: prompt_tokens, completion_tokens, reasoning_tokens """ import os import json import asyncio import logging import httpx from pathlib import Path from typing import List, Dict, Any, Optional from .usage import TokenUsage, create_usage_from_response from .pricing import calculate_cost logger = logging.getLogger(__name__) # 可重试的异常类型 _RETRYABLE_EXCEPTIONS = ( httpx.RemoteProtocolError, # Server disconnected without sending a response httpx.ConnectError, httpx.ReadTimeout, httpx.WriteTimeout, httpx.ConnectTimeout, httpx.PoolTimeout, ConnectionError, ) # ── OpenRouter Anthropic endpoint: model name mapping ────────────────────── # Local copy of yescode's model tables so this module is self-contained. _OR_MODEL_EXACT = { "claude-sonnet-4-6": "claude-sonnet-4-6", "claude-sonnet-4.6": "claude-sonnet-4-6", "claude-sonnet-4-5-20250929": "claude-sonnet-4-5-20250929", "claude-sonnet-4-5": "claude-sonnet-4-5-20250929", "claude-sonnet-4.5": "claude-sonnet-4-5-20250929", "claude-opus-4-6": "claude-opus-4-6", "claude-opus-4-5-20251101": "claude-opus-4-5-20251101", "claude-opus-4-5": "claude-opus-4-5-20251101", "claude-opus-4-1-20250805": "claude-opus-4-1-20250805", "claude-opus-4-1": "claude-opus-4-1-20250805", "claude-haiku-4-5-20251001": "claude-haiku-4-5-20251001", "claude-haiku-4-5": "claude-haiku-4-5-20251001", } _OR_MODEL_FUZZY = [ ("sonnet-4-6", "claude-sonnet-4-6"), ("sonnet-4.6", "claude-sonnet-4-6"), ("sonnet-4-5", "claude-sonnet-4-5-20250929"), ("sonnet-4.5", "claude-sonnet-4-5-20250929"), ("opus-4-6", "claude-opus-4-6"), ("opus-4.6", "claude-opus-4-6"), ("opus-4-5", "claude-opus-4-5-20251101"), ("opus-4.5", "claude-opus-4-5-20251101"), ("opus-4-1", "claude-opus-4-1-20250805"), ("opus-4.1", "claude-opus-4-1-20250805"), ("haiku-4-5", "claude-haiku-4-5-20251001"), ("haiku-4.5", "claude-haiku-4-5-20251001"), ("sonnet", "claude-sonnet-4-6"), ("opus", "claude-opus-4-6"), ("haiku", "claude-haiku-4-5-20251001"), ] def _resolve_openrouter_model(model: str) -> str: """Normalize a model name for OpenRouter's Anthropic endpoint. Strips ``anthropic/`` prefix, resolves aliases / dot-notation, and re-prepends ``anthropic/`` for OpenRouter routing. """ # 1. Strip provider prefix bare = model.split("/", 1)[1] if "/" in model else model # 2. Exact match if bare in _OR_MODEL_EXACT: return f"anthropic/{_OR_MODEL_EXACT[bare]}" # 3. Fuzzy keyword match (case-insensitive) bare_lower = bare.lower() for keyword, target in _OR_MODEL_FUZZY: if keyword in bare_lower: logger.info("[OpenRouter] Model fuzzy match: %s → anthropic/%s", model, target) return f"anthropic/{target}" # 4. Fallback – return as-is (let API report the error) logger.warning("[OpenRouter] Could not resolve model name: %s, passing as-is", model) return model # ── OpenRouter Anthropic endpoint: format conversion helpers ─────────────── def _get_image_dimensions(data: bytes) -> Optional[tuple]: """从图片二进制数据的文件头解析宽高,支持 PNG/JPEG。不依赖 PIL。""" try: # PNG: 前 8 字节签名,IHDR chunk 在 16-24 字节存宽高 (big-endian uint32) if data[:8] == b'\x89PNG\r\n\x1a\n' and len(data) >= 24: import struct w, h = struct.unpack('>II', data[16:24]) return (w, h) # JPEG: 扫描 SOF0/SOF2 marker (0xFFC0/0xFFC2) if data[:2] == b'\xff\xd8': import struct i = 2 while i < len(data) - 9: if data[i] != 0xFF: break marker = data[i + 1] if marker in (0xC0, 0xC2): h, w = struct.unpack('>HH', data[i + 5:i + 9]) return (w, h) length = struct.unpack('>H', data[i + 2:i + 4])[0] i += 2 + length except Exception: pass return None def _to_anthropic_content(content: Any) -> Any: """Convert OpenAI-style *content* (string or block list) to Anthropic format. Handles ``image_url`` blocks → Anthropic ``image`` blocks (base64 or url). Passes through ``text`` blocks and ``cache_control`` unchanged. """ if not isinstance(content, list): return content result = [] for block in content: if not isinstance(block, dict): result.append(block) continue if block.get("type") == "image_url": image_url_obj = block.get("image_url", {}) url = image_url_obj.get("url", "") if isinstance(image_url_obj, dict) else str(image_url_obj) if url.startswith("data:"): header, _, data = url.partition(",") media_type = header.split(":")[1].split(";")[0] if ":" in header else "image/png" import base64 as b64mod raw = b64mod.b64decode(data) dims = _get_image_dimensions(raw) img_block = { "type": "image", "source": { "type": "base64", "media_type": media_type, "data": data, }, } if dims: img_block["_image_meta"] = {"width": dims[0], "height": dims[1]} result.append(img_block) else: # 检测本地文件路径,自动转 base64 local_path = Path(url) if local_path.exists() and local_path.is_file(): import base64 as b64mod import mimetypes mime_type, _ = mimetypes.guess_type(str(local_path)) mime_type = mime_type or "image/png" raw = local_path.read_bytes() dims = _get_image_dimensions(raw) b64_data = b64mod.b64encode(raw).decode("ascii") logger.info(f"[OpenRouter] 本地图片自动转 base64: {url} ({len(raw)} bytes)") img_block = { "type": "image", "source": { "type": "base64", "media_type": mime_type, "data": b64_data, }, } if dims: img_block["_image_meta"] = {"width": dims[0], "height": dims[1]} result.append(img_block) else: result.append({ "type": "image", "source": {"type": "url", "url": url}, }) else: result.append(block) return result def _to_anthropic_messages(messages: List[Dict[str, Any]]) -> tuple: """Convert an OpenAI-format message list to Anthropic Messages API format. Returns ``(system_prompt, anthropic_messages)`` where *system_prompt* is ``None`` or a string extracted from ``role=system`` messages, and *anthropic_messages* is the converted list. """ system_prompt = None anthropic_messages: List[Dict[str, Any]] = [] for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": system_prompt = content elif role == "user": anthropic_messages.append({ "role": "user", "content": _to_anthropic_content(content), }) elif role == "assistant": tool_calls = msg.get("tool_calls") if tool_calls: content_blocks: List[Dict[str, Any]] = [] if content: converted = _to_anthropic_content(content) if isinstance(converted, list): content_blocks.extend(converted) elif isinstance(converted, str) and converted.strip(): content_blocks.append({"type": "text", "text": converted}) for tc in tool_calls: func = tc.get("function", {}) args_str = func.get("arguments", "{}") try: args = json.loads(args_str) if isinstance(args_str, str) else args_str except json.JSONDecodeError: args = {} content_blocks.append({ "type": "tool_use", "id": tc.get("id", ""), "name": func.get("name", ""), "input": args, }) anthropic_messages.append({"role": "assistant", "content": content_blocks}) else: anthropic_messages.append({"role": "assistant", "content": content}) elif role == "tool": # Split tool result into text-only tool_result + sibling image blocks. # Images nested inside tool_result.content are not reliably passed # through by all proxies (e.g. OpenRouter). Placing them as sibling # content blocks in the same user message is more compatible. converted = _to_anthropic_content(content) text_parts: List[Dict[str, Any]] = [] image_parts: List[Dict[str, Any]] = [] if isinstance(converted, list): for block in converted: if isinstance(block, dict) and block.get("type") == "image": image_parts.append(block) else: text_parts.append(block) elif isinstance(converted, str): text_parts = [{"type": "text", "text": converted}] if converted else [] # tool_result keeps only text content tool_result_block: Dict[str, Any] = { "type": "tool_result", "tool_use_id": msg.get("tool_call_id", ""), } if len(text_parts) == 1 and text_parts[0].get("type") == "text": tool_result_block["content"] = text_parts[0]["text"] elif text_parts: tool_result_block["content"] = text_parts # (omit content key entirely when empty – Anthropic accepts this) # Build the blocks to append: tool_result first, then any images new_blocks = [tool_result_block] + image_parts # Merge consecutive tool results into one user message if (anthropic_messages and anthropic_messages[-1].get("role") == "user" and isinstance(anthropic_messages[-1].get("content"), list) and anthropic_messages[-1]["content"] and anthropic_messages[-1]["content"][0].get("type") == "tool_result"): anthropic_messages[-1]["content"].extend(new_blocks) else: anthropic_messages.append({ "role": "user", "content": new_blocks, }) return system_prompt, anthropic_messages def _to_anthropic_tools(tools: List[Dict]) -> List[Dict]: """Convert OpenAI tool definitions to Anthropic format.""" anthropic_tools = [] for tool in tools: if tool.get("type") == "function": func = tool["function"] anthropic_tools.append({ "name": func.get("name", ""), "description": func.get("description", ""), "input_schema": func.get("parameters", {"type": "object", "properties": {}}), }) return anthropic_tools def _parse_anthropic_response(result: Dict[str, Any]) -> Dict[str, Any]: """Parse an Anthropic Messages API response into the unified format. Returns a dict with keys: content, tool_calls, finish_reason, usage. """ content_blocks = result.get("content", []) text_parts = [] tool_calls = [] for block in content_blocks: if block.get("type") == "text": text_parts.append(block.get("text", "")) elif block.get("type") == "tool_use": tool_calls.append({ "id": block.get("id", ""), "type": "function", "function": { "name": block.get("name", ""), "arguments": json.dumps(block.get("input", {}), ensure_ascii=False), }, }) content = "\n".join(text_parts) stop_reason = result.get("stop_reason", "end_turn") finish_reason_map = { "end_turn": "stop", "tool_use": "tool_calls", "max_tokens": "length", "stop_sequence": "stop", } finish_reason = finish_reason_map.get(stop_reason, stop_reason) raw_usage = result.get("usage", {}) usage = TokenUsage( input_tokens=raw_usage.get("input_tokens", 0), output_tokens=raw_usage.get("output_tokens", 0), cache_creation_tokens=raw_usage.get("cache_creation_input_tokens", 0), cache_read_tokens=raw_usage.get("cache_read_input_tokens", 0), ) return { "content": content, "tool_calls": tool_calls if tool_calls else None, "finish_reason": finish_reason, "usage": usage, } # ── Provider detection / usage parsing ───────────────────────────────────── def _detect_provider_from_model(model: str) -> str: """根据模型名称检测提供商""" model_lower = model.lower() if model_lower.startswith("anthropic/") or "claude" in model_lower: return "anthropic" elif model_lower.startswith("openai/") or model_lower.startswith("gpt") or model_lower.startswith("o1") or model_lower.startswith("o3"): return "openai" elif model_lower.startswith("deepseek/") or "deepseek" in model_lower: return "deepseek" elif model_lower.startswith("google/") or "gemini" in model_lower: return "gemini" else: return "openai" # 默认使用 OpenAI 格式 def _parse_openrouter_usage(usage: Dict[str, Any], model: str) -> TokenUsage: """ 解析 OpenRouter 返回的 usage OpenRouter 会根据底层模型返回不同格式的 usage """ provider = _detect_provider_from_model(model) # OpenRouter 通常返回 OpenAI 格式,但可能包含额外字段 if provider == "anthropic": # Claude 模型可能有缓存字段 # OpenRouter 使用 prompt_tokens_details 嵌套结构 prompt_details = usage.get("prompt_tokens_details", {}) # 调试:打印原始 usage if logger.isEnabledFor(logging.DEBUG): logger.debug(f"[OpenRouter] Raw usage: {usage}") logger.debug(f"[OpenRouter] prompt_tokens_details: {prompt_details}") return TokenUsage( input_tokens=usage.get("prompt_tokens") or usage.get("input_tokens", 0), output_tokens=usage.get("completion_tokens") or usage.get("output_tokens", 0), # OpenRouter 格式:prompt_tokens_details.cached_tokens / cache_write_tokens cache_read_tokens=prompt_details.get("cached_tokens", 0), cache_creation_tokens=prompt_details.get("cache_write_tokens", 0), ) elif provider == "deepseek": # DeepSeek 可能有 reasoning_tokens return TokenUsage( input_tokens=usage.get("prompt_tokens", 0), output_tokens=usage.get("completion_tokens", 0), reasoning_tokens=usage.get("reasoning_tokens", 0), ) else: # OpenAI 格式(包括 o1/o3 的 reasoning_tokens) reasoning = 0 if details := usage.get("completion_tokens_details"): reasoning = details.get("reasoning_tokens", 0) return TokenUsage( input_tokens=usage.get("prompt_tokens", 0), output_tokens=usage.get("completion_tokens", 0), reasoning_tokens=reasoning, ) def _normalize_tool_call_ids(messages: List[Dict[str, Any]], target_prefix: str) -> List[Dict[str, Any]]: """ 将消息历史中的 tool_call_id 统一重写为目标 Provider 的格式。 跨 Provider 续跑时,历史中的 tool_call_id 可能不兼容目标 API (如 Anthropic 的 toolu_xxx 发给 OpenAI,或 OpenAI 的 call_xxx 发给 Anthropic)。 仅在检测到异格式 ID 时才重写,同格式直接跳过。 """ # 第一遍:收集需要重写的 ID id_map: Dict[str, str] = {} counter = 0 for msg in messages: if msg.get("role") == "assistant" and msg.get("tool_calls"): for tc in msg["tool_calls"]: old_id = tc.get("id", "") if old_id and not old_id.startswith(target_prefix + "_"): if old_id not in id_map: id_map[old_id] = f"{target_prefix}_{counter:06x}" counter += 1 if not id_map: return messages # 无需重写 logger.info("重写 %d 个 tool_call_id (target_prefix=%s)", len(id_map), target_prefix) # 第二遍:重写(浅拷贝避免修改原始数据) result = [] for msg in messages: if msg.get("role") == "assistant" and msg.get("tool_calls"): new_tcs = [] for tc in msg["tool_calls"]: old_id = tc.get("id", "") if old_id in id_map: new_tcs.append({**tc, "id": id_map[old_id]}) else: new_tcs.append(tc) result.append({**msg, "tool_calls": new_tcs}) elif msg.get("role") == "tool" and msg.get("tool_call_id") in id_map: result.append({**msg, "tool_call_id": id_map[msg["tool_call_id"]]}) else: result.append(msg) return result async def _openrouter_anthropic_call( messages: List[Dict[str, Any]], model: str, tools: Optional[List[Dict]], api_key: str, **kwargs, ) -> Dict[str, Any]: """ 通过 OpenRouter 的 Anthropic 原生端点调用 Claude 模型。 使用 Anthropic Messages API 格式(/api/v1/messages), 自包含的格式转换逻辑,确保多模态内容(截图等)正确传递。 """ endpoint = "https://openrouter.ai/api/v1/messages" # Resolve model name for OpenRouter (e.g. "claude-sonnet-4.5" → "anthropic/claude-sonnet-4-5-20250929") resolved_model = _resolve_openrouter_model(model) logger.debug("[OpenRouter/Anthropic] model: %s → %s", model, resolved_model) # 跨 Provider 续跑时,重写不兼容的 tool_call_id 为 toolu_ 前缀 messages = _normalize_tool_call_ids(messages, "toolu") # OpenAI 格式 → Anthropic 格式 system_prompt, anthropic_messages = _to_anthropic_messages(messages) # Diagnostic: count image blocks in the payload _img_count = 0 for _m in anthropic_messages: if isinstance(_m.get("content"), list): for _b in _m["content"]: if isinstance(_b, dict) and _b.get("type") == "image": _img_count += 1 if _img_count: logger.info("[OpenRouter/Anthropic] payload contains %d image block(s)", _img_count) print(f"[OpenRouter/Anthropic] payload contains {_img_count} image block(s)") payload: Dict[str, Any] = { "model": resolved_model, "messages": anthropic_messages, "max_tokens": kwargs.get("max_tokens", 16384), } if system_prompt is not None: payload["system"] = system_prompt if tools: payload["tools"] = _to_anthropic_tools(tools) if "temperature" in kwargs: payload["temperature"] = kwargs["temperature"] # Debug: 检查 cache_control 是否存在 if logger.isEnabledFor(logging.DEBUG): cache_control_count = 0 if isinstance(system_prompt, list): for block in system_prompt: if isinstance(block, dict) and "cache_control" in block: cache_control_count += 1 for msg in anthropic_messages: content = msg.get("content", "") if isinstance(content, list): for block in content: if isinstance(block, dict) and "cache_control" in block: cache_control_count += 1 if cache_control_count > 0: logger.debug(f"[OpenRouter/Anthropic] 发现 {cache_control_count} 个 cache_control 标记") headers = { "Authorization": f"Bearer {api_key}", "anthropic-version": "2023-06-01", "content-type": "application/json", "HTTP-Referer": "https://github.com/your-repo", "X-Title": "Agent Framework", } max_retries = 3 last_exception = None for attempt in range(max_retries): async with httpx.AsyncClient(timeout=300.0) as client: try: response = await client.post(endpoint, json=payload, headers=headers) response.raise_for_status() result = response.json() break except httpx.HTTPStatusError as e: status = e.response.status_code error_body = e.response.text if status in (429, 500, 502, 503, 504) and attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning( "[OpenRouter/Anthropic] HTTP %d (attempt %d/%d), retrying in %ds: %s", status, attempt + 1, max_retries, wait, error_body[:200], ) await asyncio.sleep(wait) last_exception = e continue # Log AND print error body so it is visible in console output logger.error("[OpenRouter/Anthropic] HTTP %d error body: %s", status, error_body) print(f"[OpenRouter/Anthropic] API Error {status}: {error_body[:500]}") raise except _RETRYABLE_EXCEPTIONS as e: last_exception = e if attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning( "[OpenRouter/Anthropic] %s (attempt %d/%d), retrying in %ds", type(e).__name__, attempt + 1, max_retries, wait, ) await asyncio.sleep(wait) continue raise else: raise last_exception # type: ignore[misc] # 解析 Anthropic 响应 → 统一格式 parsed = _parse_anthropic_response(result) usage = parsed["usage"] cost = calculate_cost(model, usage) return { "content": parsed["content"], "tool_calls": parsed["tool_calls"], "prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "reasoning_tokens": usage.reasoning_tokens, "cache_creation_tokens": usage.cache_creation_tokens, "cache_read_tokens": usage.cache_read_tokens, "finish_reason": parsed["finish_reason"], "cost": cost, "usage": usage, } async def openrouter_llm_call( messages: List[Dict[str, Any]], model: str = "anthropic/claude-sonnet-4.5", tools: Optional[List[Dict]] = None, **kwargs ) -> Dict[str, Any]: """ OpenRouter LLM 调用函数 Args: messages: OpenAI 格式消息列表 model: 模型名称(如 "anthropic/claude-sonnet-4.5") tools: OpenAI 格式工具定义 **kwargs: 其他参数(temperature, max_tokens 等) Returns: { "content": str, "tool_calls": List[Dict] | None, "prompt_tokens": int, "completion_tokens": int, "finish_reason": str, "cost": float } """ api_key = os.getenv("OPEN_ROUTER_API_KEY") if not api_key: raise ValueError("OPEN_ROUTER_API_KEY environment variable not set") # Claude 模型走 Anthropic 原生端点,其余走 OpenAI 兼容端点 provider = _detect_provider_from_model(model) if provider == "anthropic": logger.debug("[OpenRouter] Routing Claude model to Anthropic native endpoint") return await _openrouter_anthropic_call(messages, model, tools, api_key, **kwargs) base_url = "https://openrouter.ai/api/v1" endpoint = f"{base_url}/chat/completions" # 跨 Provider 续跑时,重写不兼容的 tool_call_id messages = _normalize_tool_call_ids(messages, "call") # 构建请求 payload = { "model": model, "messages": messages, } # 添加可选参数 if tools: payload["tools"] = tools if "temperature" in kwargs: payload["temperature"] = kwargs["temperature"] if "max_tokens" in kwargs: payload["max_tokens"] = kwargs["max_tokens"] # OpenRouter 特定参数 headers = { "Authorization": f"Bearer {api_key}", "HTTP-Referer": "https://github.com/your-repo", # 可选,用于统计 "X-Title": "Agent Framework", # 可选,显示在 OpenRouter dashboard } # 调用 API(带重试) max_retries = 3 last_exception = None for attempt in range(max_retries): async with httpx.AsyncClient(timeout=300.0) as client: try: response = await client.post(endpoint, json=payload, headers=headers) response.raise_for_status() result = response.json() break # 成功,跳出重试循环 except httpx.HTTPStatusError as e: error_body = e.response.text status = e.response.status_code # 429 (rate limit) 和 5xx 可重试 if status in (429, 500, 502, 503, 504) and attempt < max_retries - 1: wait = 2 ** attempt * 2 # 2s, 4s, 8s logger.warning( "[OpenRouter] HTTP %d (attempt %d/%d), retrying in %ds: %s", status, attempt + 1, max_retries, wait, error_body[:200], ) await asyncio.sleep(wait) last_exception = e continue logger.error("[OpenRouter] Error %d: %s", status, error_body) raise except _RETRYABLE_EXCEPTIONS as e: last_exception = e if attempt < max_retries - 1: wait = 2 ** attempt * 2 logger.warning( "[OpenRouter] %s (attempt %d/%d), retrying in %ds", type(e).__name__, attempt + 1, max_retries, wait, ) await asyncio.sleep(wait) continue logger.error("[OpenRouter] Request failed after %d attempts: %s", max_retries, e) raise except Exception as e: logger.error("[OpenRouter] Request failed: %s", e) raise else: # 所有重试都用完 raise last_exception # type: ignore[misc] # 解析响应(OpenAI 格式) choice = result["choices"][0] if result.get("choices") else {} message = choice.get("message", {}) content = message.get("content", "") tool_calls = message.get("tool_calls") finish_reason = choice.get("finish_reason") # stop, length, tool_calls, content_filter 等 # 提取 usage(完整版,根据模型类型解析) raw_usage = result.get("usage", {}) usage = _parse_openrouter_usage(raw_usage, model) # 计算费用 cost = calculate_cost(model, usage) return { "content": content, "tool_calls": tool_calls, "prompt_tokens": usage.input_tokens, "completion_tokens": usage.output_tokens, "reasoning_tokens": usage.reasoning_tokens, "cache_creation_tokens": usage.cache_creation_tokens, "cache_read_tokens": usage.cache_read_tokens, "finish_reason": finish_reason, "cost": cost, "usage": usage, # 完整的 TokenUsage 对象 } def create_openrouter_llm_call( model: str = "anthropic/claude-sonnet-4.5" ): """ 创建 OpenRouter LLM 调用函数 Args: model: 模型名称 - "anthropic/claude-sonnet-4.5" - "anthropic/claude-opus-4.5" - "openai/gpt-4o" 等等 Returns: 异步 LLM 调用函数 """ async def llm_call( messages: List[Dict[str, Any]], model: str = model, tools: Optional[List[Dict]] = None, **kwargs ) -> Dict[str, Any]: return await openrouter_llm_call(messages, model, tools, **kwargs) return llm_call