Просмотр исходного кода

fix v4 crawler and gate1 failure diagnostics

Sam Lee 4 дней назад
Родитель
Сommit
ee74b223dd

+ 23 - 1
content_agent/business_modules/platform_access.py

@@ -5,6 +5,8 @@ from typing import Any
 from content_agent.errors import ContentAgentError, ErrorCode
 from content_agent.interfaces import PlatformSearchClient
 
+_MAX_EXCEPTION_MESSAGE_LENGTH = 1000
+
 
 def run(
     search_queries: list[dict[str, Any]], platform_client: PlatformSearchClient
@@ -85,7 +87,7 @@ def _query_failure(search_query: dict[str, Any], exc: Exception) -> dict[str, An
     else:
         error_code = ErrorCode.PLATFORM_REQUEST_FAILED.value
         message = "platform query failed"
-        detail = {"exception_type": type(exc).__name__}
+        detail = _exception_detail(exc)
     return {
         "search_query_id": search_query["search_query_id"],
         "search_query": search_query["search_query"],
@@ -97,3 +99,23 @@ def _query_failure(search_query: dict[str, Any], exc: Exception) -> dict[str, An
         "message": message,
         "error_detail": detail,
     }
+
+
+def _exception_detail(exc: Exception) -> dict[str, Any]:
+    detail: dict[str, Any] = {"exception_type": type(exc).__name__}
+    message = _truncate_exception_message(str(exc))
+    if message:
+        detail["exception_message"] = message
+    cause = exc.__cause__ or exc.__context__
+    if cause is not None:
+        detail["cause_exception_type"] = type(cause).__name__
+        cause_message = _truncate_exception_message(str(cause))
+        if cause_message:
+            detail["cause_message"] = cause_message
+    return detail
+
+
+def _truncate_exception_message(message: str) -> str:
+    if len(message) <= _MAX_EXCEPTION_MESSAGE_LENGTH:
+        return message
+    return f"{message[:_MAX_EXCEPTION_MESSAGE_LENGTH]}..."

+ 64 - 7
content_agent/integrations/crawapi_http.py

@@ -20,6 +20,7 @@ from content_agent.errors import ContentAgentError, ErrorCode
 from content_agent.integrations import timeout_config
 
 RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
+_MAX_RESPONSE_SUMMARY_LENGTH = 500
 
 
 class CrawapiTransientError(RuntimeError):
@@ -89,13 +90,26 @@ def post_crawapi_json(
             raise ContentAgentError(
                 ErrorCode.PLATFORM_RATE_LIMITED,
                 f"crawapi {operation} failed: rate_limited",
-                {"operation": operation, "status_code": 429},
+                {
+                    "operation": operation,
+                    "status_code": 429,
+                    "response_summary": _response_summary(exc.response),
+                },
             ) from exc
-        raise RuntimeError(f"crawapi {operation} failed: HTTP {status_code}") from exc
+        raise RuntimeError(
+            f"crawapi {operation} failed: HTTP {status_code}; "
+            f"response={_response_summary(exc.response)}"
+        ) from exc
     except httpx.HTTPError as exc:
-        raise CrawapiTransientError(f"crawapi {operation} failed: network_error") from exc
+        raise CrawapiTransientError(
+            f"crawapi {operation} failed: network_error "
+            f"exception_type={type(exc).__name__} message={_truncate_text(str(exc))}"
+        ) from exc
     except ValueError as exc:
-        raise RuntimeError(f"crawapi {operation} failed: bad_json") from exc
+        response_summary = _response_summary(response) if "response" in locals() else {}
+        raise RuntimeError(
+            f"crawapi {operation} failed: bad_json; response={response_summary}"
+        ) from exc
     if not isinstance(data, dict):
         raise RuntimeError(f"crawapi {operation} failed: bad_response")
     code = data.get("code")
@@ -104,16 +118,59 @@ def post_crawapi_json(
             raise ContentAgentError(
                 ErrorCode.PLATFORM_RATE_LIMITED,
                 f"crawapi {operation} failed: rate_limited",
-                {"operation": operation, "business_code": str(code)},
+                {
+                    "operation": operation,
+                    "business_code": str(code),
+                    "business_message": _business_message(data),
+                },
             )
         if str(code) in transient_business_codes:
             raise CrawapiTransientError(
-                f"crawapi {operation} failed: transient_business_error code={code}"
+                f"crawapi {operation} failed: transient_business_error "
+                f"code={code} message={_business_message(data)}"
             )
-        raise RuntimeError(f"crawapi {operation} failed: business_error")
+        raise RuntimeError(
+            f"crawapi {operation} failed: business_error "
+            f"code={code} message={_business_message(data)} "
+            f"keys={sorted(str(key) for key in data.keys())}"
+        )
     return data
 
 
+def _response_summary(response: httpx.Response | None) -> dict[str, Any]:
+    if response is None:
+        return {}
+    summary: dict[str, Any] = {
+        "status_code": response.status_code,
+        "content_type": response.headers.get("content-type"),
+    }
+    try:
+        data = response.json()
+    except ValueError:
+        text = response.text
+        if text:
+            summary["body_summary"] = _truncate_text(text)
+        return summary
+    if isinstance(data, dict):
+        summary["json_keys"] = sorted(str(key) for key in data.keys())
+        for key in ("code", "status", "msg", "message", "error"):
+            if key in data:
+                summary[key] = _truncate_text(str(data.get(key)))
+    else:
+        summary["json_type"] = type(data).__name__
+    return summary
+
+
+def _business_message(data: dict[str, Any]) -> str:
+    return _truncate_text(str(data.get("msg") or data.get("message") or ""))
+
+
+def _truncate_text(text: str, limit: int = _MAX_RESPONSE_SUMMARY_LENGTH) -> str:
+    if len(text) <= limit:
+        return text
+    return f"{text[:limit]}..."
+
+
 def _load_env_file(env_path: str | Path) -> dict[str, str]:
     path = Path(env_path)
     if not path.exists():

+ 14 - 1
content_agent/integrations/pattern_pg.py

@@ -75,8 +75,21 @@ class PatternPgClient:
         try:
             cur = conn.cursor()
             # connect timeout 只管握手;execute/fetch 用服务端 statement_timeout 兜住,防查询永久阻塞。
-            cur.execute("SET statement_timeout = %s", (int(self.timeout_seconds * 1000),))
+            # pg8000/PG 不支持在 SET statement_timeout 里用 $1 占位符;这里先转 int 再内联。
+            timeout_ms = max(1, int(self.timeout_seconds * 1000))
+            cur.execute(f"SET statement_timeout = {timeout_ms}")
             cur.execute(_LEAF_SQL, (int(execution_id), ids))
             return cur.fetchone() is not None
+        except Exception as exc:
+            raise ContentAgentError(
+                ErrorCode.DB_SCHEMA_NOT_READY,
+                "pattern pg gate1 terminal-element query failed",
+                {
+                    "exception_type": type(exc).__name__,
+                    "error_message": str(exc)[:500],
+                    "execution_id": int(execution_id),
+                    "category_id_count": len(ids),
+                },
+            ) from exc
         finally:
             conn.close()

+ 28 - 0
tests/test_crawapi_http.py

@@ -32,6 +32,15 @@ def _response(status_code, data):
     )
 
 
+def _text_response(status_code, text):
+    return httpx.Response(
+        status_code,
+        text=text,
+        headers={"content-type": "text/plain"},
+        request=httpx.Request("POST", "http://crawapi.test/x"),
+    )
+
+
 def _post(responses, **kwargs):
     return post_crawapi_json(
         http_client=FakeHttpClient(responses),
@@ -76,6 +85,25 @@ def test_bad_response_non_dict_raises_runtime_error():
         _post([_response(200, ["not", "a", "dict"])])
 
 
+def test_http_error_includes_response_summary():
+    with pytest.raises(RuntimeError) as exc:
+        _post([_text_response(502, "Bad Gateway from crawapi")])
+
+    message = str(exc.value)
+    assert "HTTP 502" in message
+    assert "Bad Gateway from crawapi" in message
+
+
+def test_business_error_includes_code_and_message():
+    with pytest.raises(RuntimeError) as exc:
+        _post([_response(200, {"code": 50001, "msg": "source url expired"})])
+
+    message = str(exc.value)
+    assert "business_error" in message
+    assert "code=50001" in message
+    assert "source url expired" in message
+
+
 def test_business_codes_param_classifies_rate_limit():
     assert is_rate_limit_business_error("30005", {}, business_codes={"30005"}) is True
     assert is_rate_limit_business_error("30005", {}, business_codes=set()) is False

+ 86 - 0
tests/test_pattern_pg.py

@@ -0,0 +1,86 @@
+from __future__ import annotations
+
+import sys
+import types
+
+import pytest
+
+from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations.pattern_pg import PatternPgClient
+
+
+class FakeCursor:
+    def __init__(self, *, fail_on_execute: bool = False) -> None:
+        self.fail_on_execute = fail_on_execute
+        self.executed: list[tuple[str, object | None]] = []
+
+    def execute(self, sql, params=None):
+        self.executed.append((sql, params))
+        if self.fail_on_execute:
+            raise RuntimeError('syntax error at or near "$1"')
+
+    def fetchone(self):
+        return (1,)
+
+
+class FakeConnection:
+    def __init__(self, cursor: FakeCursor) -> None:
+        self._cursor = cursor
+        self.closed = False
+
+    def cursor(self):
+        return self._cursor
+
+    def close(self):
+        self.closed = True
+
+
+def _install_fake_pg8000(monkeypatch, connect_fn):
+    fake_pkg = types.ModuleType("pg8000")
+    fake_pkg.__path__ = []
+    fake_dbapi = types.ModuleType("pg8000.dbapi")
+    fake_dbapi.connect = connect_fn
+    fake_pkg.dbapi = fake_dbapi
+    monkeypatch.setitem(sys.modules, "pg8000", fake_pkg)
+    monkeypatch.setitem(sys.modules, "pg8000.dbapi", fake_dbapi)
+
+
+def _client() -> PatternPgClient:
+    return PatternPgClient(
+        host="127.0.0.1",
+        port=5432,
+        user="u",
+        password="p",
+        database="open_aigc",
+        timeout_seconds=30,
+    )
+
+
+def test_pattern_pg_sets_statement_timeout_without_parameter_placeholder(monkeypatch):
+    cursor = FakeCursor()
+    connection = FakeConnection(cursor)
+    _install_fake_pg8000(monkeypatch, lambda **kwargs: connection)
+
+    assert _client().has_terminal_element(581, [101, 102]) is True
+
+    timeout_sql, timeout_params = cursor.executed[0]
+    assert timeout_sql == "SET statement_timeout = 30000"
+    assert timeout_params is None
+    assert "$1" not in timeout_sql
+    assert connection.closed is True
+
+
+def test_pattern_pg_query_failure_keeps_original_error_message(monkeypatch):
+    cursor = FakeCursor(fail_on_execute=True)
+    connection = FakeConnection(cursor)
+    _install_fake_pg8000(monkeypatch, lambda **kwargs: connection)
+
+    with pytest.raises(ContentAgentError) as exc:
+        _client().has_terminal_element(581, [101])
+
+    assert exc.value.error_code == ErrorCode.DB_SCHEMA_NOT_READY
+    assert exc.value.detail["exception_type"] == "RuntimeError"
+    assert 'syntax error at or near "$1"' in exc.value.detail["error_message"]
+    assert exc.value.detail["execution_id"] == 581
+    assert exc.value.detail["category_id_count"] == 1
+    assert connection.closed is True

+ 33 - 0
tests/test_platform_access.py

@@ -216,6 +216,39 @@ def test_platform_access_counts_runtime_error_as_platform_request_failed():
     failure = result["query_failures"][0]
     assert failure["error_code"] == ErrorCode.PLATFORM_REQUEST_FAILED.value
     assert failure["error_detail"]["exception_type"] == "RuntimeError"
+    assert (
+        failure["error_detail"]["exception_message"]
+        == "crawapi keyword_search failed: HTTP 500"
+    )
+
+
+def test_platform_access_records_runtime_error_cause_message():
+    class CausedFailureClient:
+        def search(self, search_query):
+            cause = ValueError("upstream body was not json")
+            raise RuntimeError("crawapi keyword_search failed: bad_json") from cause
+
+    search_queries = [
+        {
+            "search_query_id": "q_001",
+            "search_query": "坏响应",
+            "search_query_generation_method": "item_single",
+        }
+    ]
+
+    try:
+        platform_access.run(search_queries, CausedFailureClient())
+    except ContentAgentError as exc:
+        failure = exc.detail["query_failures"][0]
+        assert failure["error_detail"]["exception_type"] == "RuntimeError"
+        assert (
+            failure["error_detail"]["exception_message"]
+            == "crawapi keyword_search failed: bad_json"
+        )
+        assert failure["error_detail"]["cause_exception_type"] == "ValueError"
+        assert failure["error_detail"]["cause_message"] == "upstream body was not json"
+    else:
+        raise AssertionError("expected platform request failure")
 
 
 def test_platform_access_accepts_search_only_client():