Sfoglia il codice sorgente

fix(openrouter): 200 但 body 非 JSON 时退避重试,堵住静默丢帖

OpenRouter 共享 Anthropic 池限流时偶发返回 HTTP 200 但 body 是空/
SSE 残片,raise_for_status() 拦不住,response.json() 抛 JSONDecodeError。
该异常不在循环的 except 集合里(只接 HTTPStatusError / 网络类),直接
穿透 provider,被 llm_helper 外层误记成泛化 "LLM 调用异常",且 provider
内不退避重试 —— 限流成簇时整帖被丢。工序解构两批各丢约 50% 即源于此。

修复:新增 except json.JSONDecodeError,与 429/5xx 同等处理(退避重试,
耗尽抛清晰 RuntimeError)。附 tests/test_openrouter_bad_json_retry.py
复现(改前报错与线上日志逐字一致)并验证。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
刘文武 3 giorni fa
parent
commit
97e8c7f427
2 ha cambiato i file con 142 aggiunte e 0 eliminazioni
  1. 21 0
      agent/llm/openrouter.py
  2. 121 0
      tests/test_openrouter_bad_json_retry.py

+ 21 - 0
agent/llm/openrouter.py

@@ -665,6 +665,27 @@ async def _openrouter_anthropic_call(
                 print(f"[OpenRouter/Anthropic] API Error {status}: {error_body[:500]}")
                 print(f"[OpenRouter/Anthropic] API Error {status}: {error_body[:500]}")
                 raise
                 raise
 
 
+            except json.JSONDecodeError as e:
+                # HTTP 200 但 body 不是合法 JSON —— raise_for_status() 拦不住。上游共享池
+                # 限流/代理常返回空 body 或 SSE 残片,response.json() 在此抛 JSONDecodeError。
+                # 与 429/5xx 同类(瞬时上游故障):退避重试;耗尽则抛清晰错误,避免裸
+                # JSONDecodeError 穿透到上层被误记成泛化的 "LLM 调用异常"。
+                last_exception = e
+                body_preview = (getattr(response, "text", "") or "")[:200]
+                if attempt < max_retries - 1:
+                    wait = 2 ** attempt * 2
+                    logger.warning(
+                        "[OpenRouter/Anthropic] 200 但 body 非 JSON (attempt %d/%d), retrying in %ds: %r",
+                        attempt + 1, max_retries, wait, body_preview,
+                    )
+                    await asyncio.sleep(wait)
+                    continue
+                logger.error("[OpenRouter/Anthropic] 200 但 body 非 JSON,重试耗尽: %r", body_preview)
+                raise RuntimeError(
+                    f"[OpenRouter/Anthropic] 200 但 body 非合法 JSON (疑上游限流), "
+                    f"body[:200]={body_preview!r}"
+                ) from e
+
             except _RETRYABLE_EXCEPTIONS as e:
             except _RETRYABLE_EXCEPTIONS as e:
                 last_exception = e
                 last_exception = e
                 if attempt < max_retries - 1:
                 if attempt < max_retries - 1:

+ 121 - 0
tests/test_openrouter_bad_json_retry.py

@@ -0,0 +1,121 @@
+# -*- coding: utf-8 -*-
+"""复现 + 验证:OpenRouter 返回 HTTP 200 但 body 非合法 JSON 时,
+_openrouter_anthropic_call 应像 429/5xx 一样退避重试,而非让 json.JSONDecodeError
+穿透到上层被误记成 "LLM 调用异常"。
+
+根因见 agent/llm/openrouter.py:648 (response.json() 未保护)。
+标准库 test,直接 `python tests/test_openrouter_bad_json_retry.py` 跑。
+"""
+import asyncio
+import json
+import sys
+from pathlib import Path
+
+sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
+
+import agent.llm.openrouter as orm
+
+
+VALID_RESULT = {
+    "content": [{"type": "text", "text": '{"ok": 1}'}],
+    "stop_reason": "end_turn",
+    "usage": {"input_tokens": 1, "output_tokens": 1},
+}
+
+
+class _FakeResp:
+    def __init__(self, raise_times: int):
+        # .json() 前 raise_times 次抛 JSONDecodeError,之后返回合法响应
+        self._raise_times = raise_times
+        self._calls = 0
+        self.status_code = 200
+        self.text = "   "  # 模拟非 JSON 残片
+
+    def raise_for_status(self):
+        return None
+
+    def json(self):
+        self._calls += 1
+        if self._calls <= self._raise_times:
+            raise json.JSONDecodeError("Expecting value", "   ", 3)
+        return VALID_RESULT
+
+
+class _FakeClient:
+    """每个 attempt 新建一个;post 返回的 resp.json() 行为由外部计数器决定。"""
+    def __init__(self, *a, **k):
+        pass
+
+    async def __aenter__(self):
+        return self
+
+    async def __aexit__(self, *a):
+        return False
+
+    async def post(self, *a, **k):
+        return _FakeClient._resp_factory()
+
+
+def _run(raise_times: int):
+    # 每次调用都新建一个 resp(模拟每个 attempt 是一次全新 HTTP 请求)
+    resp_holder = {"n": 0}
+
+    def factory():
+        # 前 raise_times 次请求的 resp.json() 抛错,之后成功
+        idx = resp_holder["n"]
+        resp_holder["n"] += 1
+        return _FakeResp(raise_times=1 if idx < raise_times else 0)
+
+    _FakeClient._resp_factory = staticmethod(factory)
+
+    async def _nosleep(*_a, **_k):  # 退避不真等(注意不能再调用被替换的 asyncio.sleep)
+        return None
+
+    orig_client = orm.httpx.AsyncClient
+    orig_sleep = orm.asyncio.sleep
+    orm.httpx.AsyncClient = _FakeClient
+    orm.asyncio.sleep = _nosleep
+    try:
+        return asyncio.run(orm._openrouter_anthropic_call(
+            messages=[{"role": "user", "content": "hi"}],
+            model="anthropic/claude-sonnet-4-6",
+            tools=None,
+            api_key="dummy",
+        ))
+    finally:
+        orm.httpx.AsyncClient = orig_client
+        orm.asyncio.sleep = orig_sleep
+
+
+def main():
+    failures = []
+
+    # 用例 1:前 2 次 body 非 JSON,第 3 次成功 → 应退避重试后成功
+    try:
+        out = _run(raise_times=2)
+        assert out["content"] == '{"ok": 1}', out
+        print("✅ case1 一过性 bad-JSON:退避重试后成功")
+    except Exception as e:
+        failures.append(f"case1 FAILED: {type(e).__name__}: {e}")
+        print(f"❌ case1 FAILED: {type(e).__name__}: {e}")
+
+    # 用例 2:3 次全是 bad-JSON → 应抛清晰错误(重试耗尽),而非裸 JSONDecodeError
+    try:
+        _run(raise_times=3)
+        failures.append("case2 FAILED: 预期重试耗尽抛错,但没有抛")
+        print("❌ case2 FAILED: 预期抛错但没抛")
+    except json.JSONDecodeError as e:
+        failures.append(f"case2 FAILED: 仍是裸 JSONDecodeError 穿透: {e}")
+        print(f"❌ case2 FAILED: 裸 JSONDecodeError 穿透 → {e}")
+    except Exception as e:
+        print(f"✅ case2 持续 bad-JSON:重试耗尽抛清晰错误 [{type(e).__name__}] {str(e)[:80]}")
+
+    print("-" * 50)
+    if failures:
+        print(f"FAIL ({len(failures)})")
+        sys.exit(1)
+    print("ALL PASS")
+
+
+if __name__ == "__main__":
+    main()