| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- """
- OpenRouter Provider
- 使用 OpenRouter API 调用各种模型(包括 Claude Sonnet 4.5)
- 支持 OpenAI 兼容的 API 格式
- OpenRouter 转发多种模型,需要根据实际模型处理不同的 usage 格式:
- - OpenAI 模型: prompt_tokens, completion_tokens, completion_tokens_details.reasoning_tokens
- - Claude 模型: input_tokens, output_tokens, cache_creation_input_tokens, cache_read_input_tokens
- - DeepSeek 模型: prompt_tokens, completion_tokens, reasoning_tokens
- """
- import os
- import json
- import asyncio
- import logging
- import httpx
- from typing import List, Dict, Any, Optional
- from .usage import TokenUsage, create_usage_from_response
- from .pricing import calculate_cost
- logger = logging.getLogger(__name__)
- # 可重试的异常类型
- _RETRYABLE_EXCEPTIONS = (
- httpx.RemoteProtocolError, # Server disconnected without sending a response
- httpx.ConnectError,
- httpx.ReadTimeout,
- httpx.WriteTimeout,
- httpx.ConnectTimeout,
- httpx.PoolTimeout,
- ConnectionError,
- )
- def _detect_provider_from_model(model: str) -> str:
- """根据模型名称检测提供商"""
- model_lower = model.lower()
- if model_lower.startswith("anthropic/") or "claude" in model_lower:
- return "anthropic"
- elif model_lower.startswith("openai/") or model_lower.startswith("gpt") or model_lower.startswith("o1") or model_lower.startswith("o3"):
- return "openai"
- elif model_lower.startswith("deepseek/") or "deepseek" in model_lower:
- return "deepseek"
- elif model_lower.startswith("google/") or "gemini" in model_lower:
- return "gemini"
- else:
- return "openai" # 默认使用 OpenAI 格式
- def _parse_openrouter_usage(usage: Dict[str, Any], model: str) -> TokenUsage:
- """
- 解析 OpenRouter 返回的 usage
- OpenRouter 会根据底层模型返回不同格式的 usage
- """
- provider = _detect_provider_from_model(model)
- # OpenRouter 通常返回 OpenAI 格式,但可能包含额外字段
- if provider == "anthropic":
- # Claude 模型可能有缓存字段
- # OpenRouter 使用 prompt_tokens_details 嵌套结构
- prompt_details = usage.get("prompt_tokens_details", {})
- # 调试:打印原始 usage
- if logger.isEnabledFor(logging.DEBUG):
- logger.debug(f"[OpenRouter] Raw usage: {usage}")
- logger.debug(f"[OpenRouter] prompt_tokens_details: {prompt_details}")
- return TokenUsage(
- input_tokens=usage.get("prompt_tokens") or usage.get("input_tokens", 0),
- output_tokens=usage.get("completion_tokens") or usage.get("output_tokens", 0),
- # OpenRouter 格式:prompt_tokens_details.cached_tokens / cache_write_tokens
- cache_read_tokens=prompt_details.get("cached_tokens", 0),
- cache_creation_tokens=prompt_details.get("cache_write_tokens", 0),
- )
- elif provider == "deepseek":
- # DeepSeek 可能有 reasoning_tokens
- return TokenUsage(
- input_tokens=usage.get("prompt_tokens", 0),
- output_tokens=usage.get("completion_tokens", 0),
- reasoning_tokens=usage.get("reasoning_tokens", 0),
- )
- else:
- # OpenAI 格式(包括 o1/o3 的 reasoning_tokens)
- reasoning = 0
- if details := usage.get("completion_tokens_details"):
- reasoning = details.get("reasoning_tokens", 0)
- return TokenUsage(
- input_tokens=usage.get("prompt_tokens", 0),
- output_tokens=usage.get("completion_tokens", 0),
- reasoning_tokens=reasoning,
- )
- def _normalize_tool_call_ids(messages: List[Dict[str, Any]], target_prefix: str) -> List[Dict[str, Any]]:
- """
- 将消息历史中的 tool_call_id 统一重写为目标 Provider 的格式。
- 跨 Provider 续跑时,历史中的 tool_call_id 可能不兼容目标 API
- (如 Anthropic 的 toolu_xxx 发给 OpenAI,或 OpenAI 的 call_xxx 发给 Anthropic)。
- 仅在检测到异格式 ID 时才重写,同格式直接跳过。
- """
- # 第一遍:收集需要重写的 ID
- id_map: Dict[str, str] = {}
- counter = 0
- for msg in messages:
- if msg.get("role") == "assistant" and msg.get("tool_calls"):
- for tc in msg["tool_calls"]:
- old_id = tc.get("id", "")
- if old_id and not old_id.startswith(target_prefix + "_"):
- if old_id not in id_map:
- id_map[old_id] = f"{target_prefix}_{counter:06x}"
- counter += 1
- if not id_map:
- return messages # 无需重写
- logger.info("重写 %d 个 tool_call_id (target_prefix=%s)", len(id_map), target_prefix)
- # 第二遍:重写(浅拷贝避免修改原始数据)
- result = []
- for msg in messages:
- if msg.get("role") == "assistant" and msg.get("tool_calls"):
- new_tcs = []
- for tc in msg["tool_calls"]:
- old_id = tc.get("id", "")
- if old_id in id_map:
- new_tcs.append({**tc, "id": id_map[old_id]})
- else:
- new_tcs.append(tc)
- result.append({**msg, "tool_calls": new_tcs})
- elif msg.get("role") == "tool" and msg.get("tool_call_id") in id_map:
- result.append({**msg, "tool_call_id": id_map[msg["tool_call_id"]]})
- else:
- result.append(msg)
- return result
- async def openrouter_llm_call(
- messages: List[Dict[str, Any]],
- model: str = "anthropic/claude-sonnet-4.5",
- tools: Optional[List[Dict]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- """
- OpenRouter LLM 调用函数
- Args:
- messages: OpenAI 格式消息列表
- model: 模型名称(如 "anthropic/claude-sonnet-4.5")
- tools: OpenAI 格式工具定义
- **kwargs: 其他参数(temperature, max_tokens 等)
- Returns:
- {
- "content": str,
- "tool_calls": List[Dict] | None,
- "prompt_tokens": int,
- "completion_tokens": int,
- "finish_reason": str,
- "cost": float
- }
- """
- api_key = os.getenv("OPEN_ROUTER_API_KEY")
- if not api_key:
- raise ValueError("OPEN_ROUTER_API_KEY environment variable not set")
- base_url = "https://openrouter.ai/api/v1"
- endpoint = f"{base_url}/chat/completions"
- # 跨 Provider 续跑时,重写不兼容的 tool_call_id
- messages = _normalize_tool_call_ids(messages, "call")
- # 构建请求
- payload = {
- "model": model,
- "messages": messages,
- }
- # 添加可选参数
- if tools:
- payload["tools"] = tools
- if "temperature" in kwargs:
- payload["temperature"] = kwargs["temperature"]
- if "max_tokens" in kwargs:
- payload["max_tokens"] = kwargs["max_tokens"]
- # 对于 Anthropic 模型,锁定 provider 以确保缓存生效
- if "anthropic" in model.lower() or "claude" in model.lower():
- payload["provider"] = {
- "only": ["Anthropic"],
- "allow_fallbacks": False,
- "require_parameters": True
- }
- logger.debug("[OpenRouter] Locked provider to Anthropic for caching support")
- # OpenRouter 特定参数
- headers = {
- "Authorization": f"Bearer {api_key}",
- "HTTP-Referer": "https://github.com/your-repo", # 可选,用于统计
- "X-Title": "Agent Framework", # 可选,显示在 OpenRouter dashboard
- }
- # 调用 API(带重试)
- max_retries = 3
- last_exception = None
- for attempt in range(max_retries):
- async with httpx.AsyncClient(timeout=300.0) as client:
- try:
- response = await client.post(endpoint, json=payload, headers=headers)
- response.raise_for_status()
- result = response.json()
- break # 成功,跳出重试循环
- except httpx.HTTPStatusError as e:
- error_body = e.response.text
- status = e.response.status_code
- # 429 (rate limit) 和 5xx 可重试
- if status in (429, 500, 502, 503, 504) and attempt < max_retries - 1:
- wait = 2 ** attempt * 2 # 2s, 4s, 8s
- logger.warning(
- "[OpenRouter] HTTP %d (attempt %d/%d), retrying in %ds: %s",
- status, attempt + 1, max_retries, wait, error_body[:200],
- )
- await asyncio.sleep(wait)
- last_exception = e
- continue
- logger.error("[OpenRouter] Error %d: %s", status, error_body)
- raise
- except _RETRYABLE_EXCEPTIONS as e:
- last_exception = e
- if attempt < max_retries - 1:
- wait = 2 ** attempt * 2
- logger.warning(
- "[OpenRouter] %s (attempt %d/%d), retrying in %ds",
- type(e).__name__, attempt + 1, max_retries, wait,
- )
- await asyncio.sleep(wait)
- continue
- logger.error("[OpenRouter] Request failed after %d attempts: %s", max_retries, e)
- raise
- except Exception as e:
- logger.error("[OpenRouter] Request failed: %s", e)
- raise
- else:
- # 所有重试都用完
- raise last_exception # type: ignore[misc]
- # 解析响应(OpenAI 格式)
- choice = result["choices"][0] if result.get("choices") else {}
- message = choice.get("message", {})
- content = message.get("content", "")
- tool_calls = message.get("tool_calls")
- finish_reason = choice.get("finish_reason") # stop, length, tool_calls, content_filter 等
- # 提取 usage(完整版,根据模型类型解析)
- raw_usage = result.get("usage", {})
- usage = _parse_openrouter_usage(raw_usage, model)
- # 计算费用
- cost = calculate_cost(model, usage)
- return {
- "content": content,
- "tool_calls": tool_calls,
- "prompt_tokens": usage.input_tokens,
- "completion_tokens": usage.output_tokens,
- "reasoning_tokens": usage.reasoning_tokens,
- "cache_creation_tokens": usage.cache_creation_tokens,
- "cache_read_tokens": usage.cache_read_tokens,
- "finish_reason": finish_reason,
- "cost": cost,
- "usage": usage, # 完整的 TokenUsage 对象
- }
- def create_openrouter_llm_call(
- model: str = "anthropic/claude-sonnet-4.5"
- ):
- """
- 创建 OpenRouter LLM 调用函数
- Args:
- model: 模型名称
- - "anthropic/claude-sonnet-4.5"
- - "anthropic/claude-opus-4.5"
- - "openai/gpt-4o"
- 等等
- Returns:
- 异步 LLM 调用函数
- """
- async def llm_call(
- messages: List[Dict[str, Any]],
- model: str = model,
- tools: Optional[List[Dict]] = None,
- **kwargs
- ) -> Dict[str, Any]:
- return await openrouter_llm_call(messages, model, tools, **kwargs)
- return llm_call
|