Просмотр исходного кода

feat(reliability): 全链路超时硬化 + 有界等待 + 僵尸线程清理(修游走永久卡死)

根因:httpx 标量超时无总时长上限(慢吐字节绕过)+ OSS 3600/下载1800 过大 + 主线程
future.result()/shutdown(wait=True) 无界等卡死 worker + 非daemon线程成僵尸。

- 新增 timeout_config:各阶段总时长(OSS300/下载600/qwen600/crawapi180/query120/PG30)
  + httpx.Timeout 工厂(read 短,停吐字节即抛)+ env 覆盖硬上限钳制。
- 新增 bounded_pool:DaemonThreadPoolExecutor(worker daemon 化)+ run_bounded
  (逐 future result(timeout)、单条超时记占位失败跳过、shutdown(wait=False,cancel_futures))。
- 六类外部调用全改分段 httpx.Timeout + 新值;query_variant 可注入 http_post + 1 次重试;
  PG 加 statement_timeout;crawapi 默认 60→180。
- recall_decision 判定并发改 run_bounded(超时→video_judge_timeout、跳下一条、不停 run)。
- oss_archive:daemon executor + cancel_futures(wait=False时) + 归档并发改 run_bounded
  (超时→oss_worker_timeout)。
- flow_ledger 登记 video_judge_timeout / oss_worker_timeout 中文 label(前端零改)。
- 不引入 run/阶段级看门狗(按需求);单条失败只跳过、不中止整条 run。
- 新增 tests/test_timeout_hardening.py(13 例);全量 534 passed。

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Sam Lee 6 дней назад
Родитель
Сommit
1e2e099eaf

+ 6 - 0
.env

@@ -184,3 +184,9 @@ PGVECTOR_DSN=postgresql://aiddit_aigc:%25a%26%26yqNxg%5EV1%24toJ%2AWOa%5E-b%5EX%
 OSS_ACCESS_KEY_ID=
 OSS_ACCESS_KEY_SECRET=
 OPEN_AIGC_PG_DATABASE=open_aigc
+
+# 火山方舟 embedding (Doubao-embedding-vision, 实测 2026-06-17)
+ARK_API_KEY=ark-6ca740e2-a719-4478-a8f8-bbb97985a784-059ce
+ARK_EMBEDDING_EP=ep-20260617180207-vmwg6
+ARK_EMBEDDING_URL=https://ark.cn-beijing.volces.com/api/v3/embeddings/multimodal
+ARK_EMBEDDING_DIM=2048

+ 32 - 17
content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py

@@ -8,16 +8,23 @@ id 编号、三个 list 的组装与落盘全部留主线程按 offset 串行 
 
 from __future__ import annotations
 
-from concurrent.futures import ThreadPoolExecutor, as_completed
+import os
 from datetime import datetime, timezone
 from typing import Any
 
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
 from content_agent.integrations import oss_archive
+from content_agent.integrations.bounded_pool import run_bounded
 from content_agent.integrations.gemini_video import _fail
 from content_agent.integrations.walk_graph_json import WalkGraphStore
 from content_agent.interfaces import GeminiVideoClient, RuntimeFileStore
 
+# 主线程等单条视频 worker 的看门狗:略大于 worker 内部上限(下载 600 + 判定 600×2 重试),
+# 只当兜底——worker 自身的 read 短超时会先触发干净失败。env 可调。
+JUDGE_WORKER_RESULT_TIMEOUT_SECONDS = float(
+    os.environ.get("CONTENT_AGENT_JUDGE_WORKER_TIMEOUT_SECONDS") or 2400.0
+)
+
 
 def run(
     run_id: str,
@@ -88,23 +95,31 @@ def _collect_judgments(
 
     worker 只返回 judgment、不碰共享 list;组装/落盘由调用方主线程按 offset 串行完成。
     """
-    judgments: list[dict[str, Any]] = [None] * len(discovered_content_items)  # type: ignore[list-item]
     if not discovered_content_items:
-        return judgments
-    with ThreadPoolExecutor(max_workers=_resolve_max_workers()) as pool:
-        future_to_offset = {}
-        for offset, item in enumerate(discovered_content_items):
-            future = pool.submit(
-                _safe_analyze,
-                gemini_video_client,
-                item,
-                media_by_content_id.get(item["platform_content_id"], {}),
-                source_context,
-            )
-            future_to_offset[future] = offset
-        for future in as_completed(future_to_offset):
-            judgments[future_to_offset[future]] = future.result()
-    return judgments
+        return []
+
+    def _work(item: dict[str, Any]) -> dict[str, Any]:
+        return _safe_analyze(
+            gemini_video_client,
+            item,
+            media_by_content_id.get(item["platform_content_id"], {}),
+            source_context,
+        )
+
+    def _on_timeout(_item: dict[str, Any], _offset: int) -> dict[str, Any]:
+        # 单条判定 worker 超时未返回(已超兜底上限)→ 记技术失败、跳过、不中止整条 run。
+        return _fail("video_judge_timeout")
+
+    # 有界并发:逐条 result(timeout) + 占位失败 + shutdown(wait=False, cancel_futures=True),
+    # 卡死 worker 不阻塞主线程;结果按 offset 归位,与完成顺序无关(并发=串行产物一致)。
+    return run_bounded(
+        discovered_content_items,
+        _work,
+        max_workers=_resolve_max_workers(),
+        per_future_timeout=JUDGE_WORKER_RESULT_TIMEOUT_SECONDS,
+        on_timeout=_on_timeout,
+        thread_name_prefix="video-judge",
+    )
 
 
 def _update_content_media_records(

+ 9 - 0
content_agent/flow_ledger_service.py

@@ -1081,6 +1081,10 @@ def _technical_retry_brief_reason(
         return "OpenRouter/Gemini 返回格式无法解析"
     if failure_type == "video_fetch_failed":
         return "视频下载或压缩失败"
+    if failure_type == "video_judge_timeout":
+        return "视频判定 worker 超时未返回(已超兜底上限,跳过本条)"
+    if failure_type == "oss_worker_timeout":
+        return "OSS 归档 worker 超时未返回(已超兜底上限,跳过本条)"
     if failure_type.startswith("oss_"):
         return "OSS 转存未拿到可用视频地址"
     if media_raw.get("oss_archive_last_error"):
@@ -1091,6 +1095,8 @@ def _technical_retry_brief_reason(
 def _technical_retry_stage(failure_type: str) -> str:
     if failure_type == "content_inspection_blocked":
         return "content_inspection"
+    if failure_type == "video_judge_timeout":
+        return "video_judge"
     if failure_type.startswith("portrait"):
         return "portrait"
     if failure_type.startswith("oss_"):
@@ -1118,6 +1124,7 @@ def _technical_retry_stage_label(failure_type: str) -> str:
         "openrouter": "OpenRouter/Gemini",
         "content_inspection": "内容审核拦截",
         "portrait": "热点宝画像",
+        "video_judge": "视频判定调度",
         "unknown": "未知阶段",
     }[_technical_retry_stage(failure_type)]
 
@@ -1136,8 +1143,10 @@ def _technical_retry_failure_label(failure_type: str) -> str:
         "no_valid_play_url": "未找到可用正片 URL",
         "oss_upload_response_invalid": "OSS 响应无效",
         "oss_upload_http_error": "OSS HTTP 错误",
+        "oss_worker_timeout": "OSS 归档 worker 超时",
         "portrait_unavailable": "拉不到作者 50+ 画像",
         "portrait_incomplete": "作者 50+ 画像数据不全",
+        "video_judge_timeout": "视频判定调度超时",
     }.get(failure_type, failure_type)
 
 

+ 90 - 0
content_agent/integrations/bounded_pool.py

@@ -0,0 +1,90 @@
+"""有界并发执行 + daemon 线程池(修永久卡死).
+
+两件事:
+1. `DaemonThreadPoolExecutor`:worker 线程 daemon 化。即使某 worker 卡在 socket(在 read
+   超时触发前),也不会阻止解释器退出——配合各外部调用的 read 短超时,卡死 worker 必在有限
+   时间内自终,绝不成为永久僵尸。标准库 ThreadPoolExecutor 不暴露 daemon 选项,这里复刻
+   `_adjust_thread_count` 在 start 前置 daemon;若标准库内部结构变动则回退标准行为。
+2. `run_bounded`:逐 future `result(timeout=)` 收割,**单条超时/异常 → on_timeout 占位、跳过、
+   不抛、不中止整批**;收尾 `shutdown(wait=False, cancel_futures=True)`,绝不隐式 `wait=True`
+   死等。结果按 offset 归位,与完成顺序无关(并发=串行产物一致)。
+"""
+
+from __future__ import annotations
+
+import threading
+import weakref
+from concurrent.futures import ThreadPoolExecutor
+from concurrent.futures import thread as _cf_thread
+from concurrent.futures import TimeoutError as FutureTimeoutError
+from typing import Any, Callable, TypeVar
+
+T = TypeVar("T")
+
+
+class DaemonThreadPoolExecutor(ThreadPoolExecutor):
+    def _adjust_thread_count(self) -> None:  # noqa: D401 - 复刻标准库,仅令线程 daemon
+        try:
+            if self._idle_semaphore.acquire(timeout=0):  # type: ignore[attr-defined]
+                return
+
+            def weakref_cb(_: Any, q: Any = self._work_queue) -> None:  # type: ignore[attr-defined]
+                q.put(None)
+
+            num_threads = len(self._threads)  # type: ignore[attr-defined]
+            if num_threads >= self._max_workers:  # type: ignore[attr-defined]
+                return
+            thread_name = "%s_%d" % (self._thread_name_prefix or self, num_threads)  # type: ignore[attr-defined]
+            t = threading.Thread(
+                name=thread_name,
+                target=_cf_thread._worker,  # type: ignore[attr-defined]
+                args=(
+                    weakref.ref(self, weakref_cb),
+                    self._work_queue,  # type: ignore[attr-defined]
+                    self._initializer,  # type: ignore[attr-defined]
+                    self._initargs,  # type: ignore[attr-defined]
+                ),
+                daemon=True,
+            )
+            t.start()
+            self._threads.add(t)  # type: ignore[attr-defined]
+            _cf_thread._threads_queues[t] = self._work_queue  # type: ignore[attr-defined]
+        except Exception:
+            # 标准库内部结构与预期不符 → 退回标准行为(read 短超时仍保证 worker 有限时间自终)。
+            super()._adjust_thread_count()
+
+
+def run_bounded(
+    items: list[T],
+    work_fn: Callable[[T], Any],
+    *,
+    max_workers: int,
+    per_future_timeout: float,
+    on_timeout: Callable[[T, int], Any],
+    thread_name_prefix: str = "bounded",
+) -> list[Any]:
+    """并发执行 work_fn(item),逐条 result(timeout=per_future_timeout)。
+
+    单条超时/未兜住的异常 → on_timeout(item, offset) 占位,不抛、不中止整批。
+    返回与 items 同序、同长的结果 list。
+    """
+    results: list[Any] = [None] * len(items)
+    if not items:
+        return results
+    workers = max(1, min(int(max_workers), len(items)))
+    executor = DaemonThreadPoolExecutor(max_workers=workers, thread_name_prefix=thread_name_prefix)
+    try:
+        future_to_offset = {
+            executor.submit(work_fn, item): offset for offset, item in enumerate(items)
+        }
+        for future, offset in future_to_offset.items():
+            try:
+                results[offset] = future.result(timeout=per_future_timeout)
+            except FutureTimeoutError:
+                results[offset] = on_timeout(items[offset], offset)
+            except Exception:
+                # worker 内部未兜住的意外 → 也记占位,绝不让 result() 炸主线程。
+                results[offset] = on_timeout(items[offset], offset)
+    finally:
+        executor.shutdown(wait=False, cancel_futures=True)
+    return results

+ 2 - 1
content_agent/integrations/crawapi_http.py

@@ -17,6 +17,7 @@ from urllib.parse import urljoin
 import httpx
 
 from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations import timeout_config
 
 RATE_LIMIT_MESSAGE_TOKENS = ("限流", "请求频繁", "rate limit", "too many requests")
 
@@ -78,7 +79,7 @@ def post_crawapi_json(
             url,
             json=payload,
             headers={"Content-Type": "application/json"},
-            timeout=timeout_seconds,
+            timeout=timeout_config.as_httpx_timeout(timeout_seconds, read=timeout_config.read_timeout("crawapi")),
         )
         response.raise_for_status()
         data = response.json()

+ 1 - 1
content_agent/integrations/douyin.py

@@ -86,7 +86,7 @@ class CrawapiDouyinClient:
                 default="/crawler/dou_yin/re_dian_bao/account_fans_portrait",
             ),
             timeout_seconds=float(
-                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
+                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="180")
             ),
             default_crawapi_account_ref=_env("CONTENTFIND_DOUYIN_DEFAULT_ACCOUNT_ID", env, default=""),
             default_content_type=_env("CONTENTFIND_DOUYIN_DEFAULT_CONTENT_TYPE", env, default="视频"),

+ 5 - 3
content_agent/integrations/gemini_video.py

@@ -15,11 +15,11 @@ from typing import Any, Callable, Mapping
 
 import httpx
 
-from content_agent.integrations import video_fetch
+from content_agent.integrations import timeout_config, video_fetch
 
 DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
 DEFAULT_VIDEO_MODEL = "google/gemini-3-flash-preview"
-DEFAULT_VIDEO_TIMEOUT_SECONDS = 30 * 60.0
+DEFAULT_VIDEO_TIMEOUT_SECONDS = timeout_config.total_timeout("video_llm")  # 10min(原 30min)
 V4_GEMINI_QUERY_RELEVANCE_SCHEMA_VERSION = "v4_gemini_query_relevance.v1"
 
 _SYSTEM_PROMPT = "你是视频内容与搜索需求相关性审核助手。只输出一个 JSON 对象,不要任何解释或 markdown。"
@@ -305,7 +305,9 @@ class GeminiVideoClient:
                     f"{self.base_url}/chat/completions",
                     headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
                     json={"model": self.model, "messages": messages},
-                    timeout=self.timeout_seconds,
+                    timeout=timeout_config.as_httpx_timeout(
+                        self.timeout_seconds, read=timeout_config.read_timeout("video_llm")
+                    ),
                 )
                 response_summary = _response_body_summary(response)
                 response.raise_for_status()

+ 1 - 1
content_agent/integrations/kuaishou.py

@@ -164,7 +164,7 @@ class CrawapiKuaishouClient:
                 default="/crawler/kuai_shou/account_info",
             ),
             timeout_seconds=float(
-                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
+                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="180")
             ),
             max_results_per_query=_optional_positive_int(
                 _env("CONTENTFIND_KUAISHOU_MAX_RESULTS_PER_QUERY", env, default="5")

+ 38 - 19
content_agent/integrations/oss_archive.py

@@ -2,16 +2,19 @@ from __future__ import annotations
 
 import os
 import time
-from concurrent.futures import ThreadPoolExecutor, as_completed
 from datetime import datetime, timedelta, timezone
 from threading import Lock
 from typing import Any, Callable
 
-from content_agent.integrations import oss_upload
+from content_agent.integrations import oss_upload, timeout_config
+from content_agent.integrations.bounded_pool import DaemonThreadPoolExecutor, run_bounded
 from content_agent.interfaces import RuntimeFileStore
 
 
-DEFAULT_OSS_ATTEMPT_TIMEOUT_SECONDS = 60 * 60.0
+# 单次 OSS 上传尝试上限(原 3600,即便生效也能挂 1 小时)。env 可覆盖,被硬上限钳制。
+DEFAULT_OSS_ATTEMPT_TIMEOUT_SECONDS = timeout_config.total_timeout("oss")
+# 主线程等单条 OSS worker 的看门狗:略大于单次尝试上限,只当兜底。
+OSS_WORKER_RESULT_TIMEOUT_SECONDS = timeout_config.total_timeout("oss") + 60.0
 DEFAULT_OSS_ARCHIVE_WINDOW_SECONDS = 24 * 60 * 60.0
 DEFAULT_OSS_RETRY_DELAY_SECONDS = 15 * 60.0
 DEFAULT_OSS_ARCHIVE_MAX_WORKERS = 3
@@ -36,7 +39,10 @@ class AsyncArchiveDispatcher:
         self.attempt_timeout_seconds = attempt_timeout_seconds
         self.retry_delay_seconds = retry_delay_seconds
         self._max_workers = _resolve_max_workers(max_workers)
-        self._executor = ThreadPoolExecutor(max_workers=self._max_workers)
+        # daemon 池:卡死的上传 worker 不阻止进程退出(配合 OSS read 短超时,worker 必有限时间自终)。
+        self._executor = DaemonThreadPoolExecutor(
+            max_workers=self._max_workers, thread_name_prefix="oss-archive"
+        )
         self._lock = Lock()
         self._write_lock = Lock()
         self._completed: list[dict[str, Any]] = []
@@ -81,7 +87,8 @@ class AsyncArchiveDispatcher:
         self._write_records(completed)
 
     def shutdown(self, *, wait: bool = False) -> None:
-        self._executor.shutdown(wait=wait)
+        # wait=True 是"排空全部归档"语义 → 不取消队列;wait=False 是"放弃卡住的"→ 取消未启动任务。
+        self._executor.shutdown(wait=wait, cancel_futures=not wait)
 
     def _store_completed(self, future: Any) -> None:
         try:
@@ -148,20 +155,32 @@ def archive_due_records(
             )
         return archived
 
-    with ThreadPoolExecutor(max_workers=worker_count) as pool:
-        futures = {
-            pool.submit(
-                _archive_one,
-                record,
-                now,
-                upload_fn=upload_fn,
-                attempt_timeout_seconds=attempt_timeout_seconds,
-                retry_delay_seconds=retry_delay_seconds,
-            ): index
-            for index, record in due_records
-        }
-        for future in as_completed(futures):
-            archived[futures[future]] = future.result()
+    due_only = [record for _, record in due_records]
+
+    def _work(record: dict[str, Any]) -> dict[str, Any]:
+        return _archive_one(
+            record,
+            now,
+            upload_fn=upload_fn,
+            attempt_timeout_seconds=attempt_timeout_seconds,
+            retry_delay_seconds=retry_delay_seconds,
+        )
+
+    def _on_timeout(record: dict[str, Any], _offset: int) -> dict[str, Any]:
+        raw_payload = dict(record.get("raw_payload") or {})
+        attempt_count = int(raw_payload.get("oss_archive_attempt_count") or 0) + 1
+        return _with_failed_archive(record, raw_payload, now, attempt_count, "oss_worker_timeout")
+
+    results = run_bounded(
+        due_only,
+        _work,
+        max_workers=worker_count,
+        per_future_timeout=OSS_WORKER_RESULT_TIMEOUT_SECONDS,
+        on_timeout=_on_timeout,
+        thread_name_prefix="oss-archive",
+    )
+    for (index, _record), result in zip(due_records, results):
+        archived[index] = result
     return archived
 
 

+ 8 - 2
content_agent/integrations/oss_upload.py

@@ -5,9 +5,11 @@ from typing import Any, Callable, Mapping
 
 import httpx
 
+from content_agent.integrations import timeout_config
+
 
 DEFAULT_OSS_UPLOAD_URL = "http://crawler-upload-v2.aiddit.com/crawler/oss/upload_stream"
-DEFAULT_OSS_TIMEOUT_SECONDS = 60 * 60.0
+DEFAULT_OSS_TIMEOUT_SECONDS = 300.0  # 5min(原 3600);read 相另设短,防慢吐字节永久卡 do_poll
 
 
 def upload_video_from_url(
@@ -31,7 +33,11 @@ def upload_video_from_url(
     if project:
         payload["project"] = project
     try:
-        response = http_post(endpoint, json=payload, timeout=timeout_seconds)
+        response = http_post(
+            endpoint,
+            json=payload,
+            timeout=timeout_config.as_httpx_timeout(timeout_seconds, read=timeout_config.read_timeout("oss")),
+        )
         response.raise_for_status()
         body = response.json()
     except httpx.HTTPError as exc:

+ 5 - 1
content_agent/integrations/pattern_pg.py

@@ -9,6 +9,7 @@ from __future__ import annotations
 from typing import Any
 
 from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations import timeout_config
 from content_agent.integrations.crawapi_http import _env, _load_env_file
 
 _LEAF_SQL = (
@@ -28,7 +29,7 @@ class PatternPgClient:
         user: str,
         password: str,
         database: str,
-        timeout_seconds: float = 10.0,
+        timeout_seconds: float = 30.0,
     ) -> None:
         self.host = host
         self.port = port
@@ -46,6 +47,7 @@ class PatternPgClient:
             user=_env("OPEN_AIGC_PG_USER", env, required=True),
             password=_env("OPEN_AIGC_PG_PASSWORD", env, required=True),
             database=_env("OPEN_AIGC_PG_DB_NAME", env, default="open_aigc"),
+            timeout_seconds=timeout_config.total_timeout("pg", env=env),
         )
 
     def has_terminal_element(self, execution_id: int, category_ids: list[int]) -> bool:
@@ -72,6 +74,8 @@ class PatternPgClient:
             ) from exc
         try:
             cur = conn.cursor()
+            # connect timeout 只管握手;execute/fetch 用服务端 statement_timeout 兜住,防查询永久阻塞。
+            cur.execute("SET statement_timeout = %s", (int(self.timeout_seconds * 1000),))
             cur.execute(_LEAF_SQL, (int(execution_id), ids))
             return cur.fetchone() is not None
         finally:

+ 74 - 47
content_agent/integrations/query_variant.py

@@ -3,17 +3,25 @@ from __future__ import annotations
 import copy
 import os
 from pathlib import Path
-from typing import Any, Mapping
+from typing import Any, Callable, Mapping
 
 import httpx
 
 from content_agent.errors import ContentAgentError, ErrorCode
+from content_agent.integrations import timeout_config
 from content_agent.integrations.query_prompt_config import DEFAULT_PROFILE, load_profile
 from content_agent.interfaces import QueryVariantClient, QueryVariantResult
 
 DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
 DEFAULT_QUERY_PROMPT_VERSION = "query_variant.v1"
-DEFAULT_QUERY_TIMEOUT_SECONDS = 60.0
+DEFAULT_QUERY_TIMEOUT_SECONDS = 120.0
+
+
+def _retryable_status(exc: httpx.HTTPStatusError) -> bool:
+    status = getattr(getattr(exc, "response", None), "status_code", None)
+    return isinstance(status, int) and (status in (408, 429) or 500 <= status < 600)
+
+
 # M9D Gate 2:判搜索词是否易搜到中国 50+ 人群喜欢的视频(仅非抖音)。只回 yes/no。
 _FIFTY_PLUS_GATE_SYSTEM = (
     "你判断一个中文短视频搜索词,是否容易搜到中国 50 岁以上中老年人群喜欢的视频。"
@@ -53,69 +61,88 @@ class OpenRouterQueryVariantClient:
         timeout_seconds: float = DEFAULT_QUERY_TIMEOUT_SECONDS,
         prompt_version: str = DEFAULT_QUERY_PROMPT_VERSION,
         profile: dict[str, Any] | None = None,
+        http_post: Callable[..., Any] | None = None,
     ) -> None:
         self.api_key = api_key
         self.model = model
         self.base_url = base_url.rstrip("/")
         self.timeout_seconds = timeout_seconds
+        # None → 运行时取 httpx.post(便于测试 monkeypatch httpx.post);也可注入自定义。
+        self.http_post = http_post
         self.profile = copy.deepcopy(profile or DEFAULT_PROFILE)
         self.prompt_version = str(self.profile.get("prompt_version") or prompt_version)
 
+    def _timeout(self) -> httpx.Timeout:
+        return timeout_config.as_httpx_timeout(
+            self.timeout_seconds, read=timeout_config.read_timeout("query_llm")
+        )
+
+    def _post(self, *args: Any, **kwargs: Any) -> Any:
+        return (self.http_post or httpx.post)(*args, **kwargs)
+
     def generate_variant(
         self,
         *,
         seed_term: str,
         evidence_context: dict[str, Any],
     ) -> QueryVariantResult:
-        try:
-            response = httpx.post(
-                f"{self.base_url}/chat/completions",
-                headers={
-                    "Authorization": f"Bearer {self.api_key}",
-                    "Content-Type": "application/json",
-                },
-                json={
-                    "model": self.model,
-                    "messages": _render_messages(self.profile, seed_term, evidence_context),
-                    "temperature": self.profile["temperature"],
-                    "max_tokens": self.profile["max_tokens"],
-                },
-                timeout=self.timeout_seconds,
+        # 120s 超时 + 网络/超时/5xx/429 重试一次;解析错误不重试(确定性)。
+        for attempt in range(2):
+            try:
+                response = self._post(
+                    f"{self.base_url}/chat/completions",
+                    headers={
+                        "Authorization": f"Bearer {self.api_key}",
+                        "Content-Type": "application/json",
+                    },
+                    json={
+                        "model": self.model,
+                        "messages": _render_messages(self.profile, seed_term, evidence_context),
+                        "temperature": self.profile["temperature"],
+                        "max_tokens": self.profile["max_tokens"],
+                    },
+                    timeout=self._timeout(),
+                )
+                response.raise_for_status()
+                query = _extract_query(response.json())
+            except ContentAgentError:
+                raise
+            except httpx.HTTPStatusError as exc:
+                if attempt == 0 and _retryable_status(exc):
+                    continue
+                raise _generation_error(
+                    "openrouter_http_status",
+                    seed_term,
+                    {"status_code": exc.response.status_code},
+                ) from exc
+            except httpx.HTTPError as exc:
+                if attempt == 0:
+                    continue
+                raise _generation_error(
+                    "openrouter_http_error",
+                    seed_term,
+                    {"exception_type": type(exc).__name__},
+                ) from exc
+            except (KeyError, TypeError, ValueError) as exc:
+                raise _generation_error(
+                    "openrouter_response_invalid",
+                    seed_term,
+                    {"exception_type": type(exc).__name__},
+                ) from exc
+
+            return QueryVariantResult(
+                query=query,
+                model=self.model,
+                prompt_version=self.prompt_version,
+                input_evidence=evidence_context,
             )
-            response.raise_for_status()
-            query = _extract_query(response.json())
-        except ContentAgentError:
-            raise
-        except httpx.HTTPStatusError as exc:
-            raise _generation_error(
-                "openrouter_http_status",
-                seed_term,
-                {"status_code": exc.response.status_code},
-            ) from exc
-        except httpx.HTTPError as exc:
-            raise _generation_error(
-                "openrouter_http_error",
-                seed_term,
-                {"exception_type": type(exc).__name__},
-            ) from exc
-        except (KeyError, TypeError, ValueError) as exc:
-            raise _generation_error(
-                "openrouter_response_invalid",
-                seed_term,
-                {"exception_type": type(exc).__name__},
-            ) from exc
-
-        return QueryVariantResult(
-            query=query,
-            model=self.model,
-            prompt_version=self.prompt_version,
-            input_evidence=evidence_context,
-        )
+        # 理论不可达(循环内必 return 或 raise);兜底。
+        raise _generation_error("openrouter_http_error", seed_term, {"exception_type": "Unknown"})
 
     def judge_query_fifty_plus(self, query_text: str) -> bool:
         """M9D Gate 2:返回 True=放行(含拿不准/异常);仅明确 no 才丢弃。"""
         try:
-            response = httpx.post(
+            response = self._post(
                 f"{self.base_url}/chat/completions",
                 headers={
                     "Authorization": f"Bearer {self.api_key}",
@@ -130,7 +157,7 @@ class OpenRouterQueryVariantClient:
                     "temperature": 0,
                     "max_tokens": 4,
                 },
-                timeout=self.timeout_seconds,
+                timeout=self._timeout(),
             )
             response.raise_for_status()
             content = response.json()["choices"][0]["message"]["content"]

+ 4 - 1
content_agent/integrations/qwen_video.py

@@ -15,6 +15,7 @@ from typing import Any, Callable, Mapping
 
 import httpx
 
+from content_agent.integrations import timeout_config
 from content_agent.integrations.gemini_video import (
     DEFAULT_VIDEO_TIMEOUT_SECONDS,
     MissingGeminiVideoClient,
@@ -114,7 +115,9 @@ class QwenVideoClient:
                         f"{self.base_url}/chat/completions",
                         headers={"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"},
                         json={"model": self.model, "messages": messages},
-                        timeout=self.timeout_seconds,
+                        timeout=timeout_config.as_httpx_timeout(
+                            self.timeout_seconds, read=timeout_config.read_timeout("video_llm")
+                        ),
                     )
                     response_summary = _response_body_summary(response)
                     response.raise_for_status()

+ 1 - 1
content_agent/integrations/shipinhao.py

@@ -157,7 +157,7 @@ class CrawapiShipinhaoClient:
         return cls(
             base_url=_env("CONTENTFIND_API_CRAWAPI_BASE_URL", env, required=True),
             timeout_seconds=float(
-                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="60")
+                _env("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS", env, default="180")
             ),
             max_results_per_query=_optional_positive_int(
                 _env("CONTENTFIND_SHIPINHAO_MAX_RESULTS_PER_QUERY", env, default="5")

+ 94 - 0
content_agent/integrations/timeout_config.py

@@ -0,0 +1,94 @@
+"""统一超时配置(修永久卡死).
+
+集中各阶段"单次外部调用"的总时长上限(用户拍板),并提供 httpx.Timeout 工厂。
+
+要点:httpx 的 `timeout=标量` 只把 connect/read/write/pool 各设为 N,**没有"整次请求总时长"**;
+服务端慢速吐字节时每次 read 都在 N 内返回一点 → read 永不触发 → 永久卡在 do_poll。
+所以这里强制 **read 相设短**(停止吐数据即抛 ReadTimeout),总时长由 write 相 + 调用方护栏兜。
+env 可覆盖各阶段总值,但被硬上限钳制,防再配出 3600 那种值。
+"""
+
+from __future__ import annotations
+
+import os
+from typing import Mapping
+
+import httpx
+
+CONNECT_TIMEOUT_SECONDS = 10.0
+
+# 各阶段总时长默认(秒)——用户拍板的"单次外部调用允许上限"。
+_DEFAULTS: dict[str, float] = {
+    "oss": 300.0,            # OSS 上传/归档单次尝试 5min
+    "video_download": 600.0,  # 视频下载 10min
+    "video_llm": 600.0,       # qwen/gemini 单次判定 10min
+    "crawapi": 180.0,         # 平台搜索/作者/画像 3min
+    "query_llm": 120.0,       # query variant / Gate2 2min
+    "pg": 30.0,               # pattern PG Gate1 30s
+}
+# env 覆盖也不得超过(防误配)。
+_HARD_CEILING: dict[str, float] = {
+    "oss": 600.0,
+    "video_download": 1200.0,
+    "video_llm": 1200.0,
+    "crawapi": 360.0,
+    "query_llm": 300.0,
+    "pg": 60.0,
+}
+# 单次 read(两次收到数据之间)上限——短,杜绝 do_poll 永久阻塞。
+_READ: dict[str, float] = {
+    "oss": 60.0,
+    "video_download": 120.0,
+    "video_llm": 120.0,
+    "crawapi": 60.0,
+    "query_llm": 60.0,
+    "pg": 30.0,
+}
+_ENV_KEYS: dict[str, tuple[str, ...]] = {
+    "oss": ("CONTENT_AGENT_OSS_TIMEOUT_SECONDS",),
+    "video_download": ("CONTENT_AGENT_VIDEO_DOWNLOAD_TIMEOUT_SECONDS",),
+    "video_llm": ("CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS",),
+    "crawapi": ("CONTENTFIND_API_CRAWAPI_TIMEOUT_SECONDS",),
+    "query_llm": ("CONTENT_AGENT_QUERY_LLM_TIMEOUT_SECONDS",),
+    "pg": ("OPEN_AIGC_PG_TIMEOUT_SECONDS",),
+}
+
+
+def total_timeout(stage: str, env: Mapping[str, str] | None = None) -> float:
+    """阶段总时长(秒):env 覆盖 → 硬上限钳制 → 默认。"""
+    src = os.environ if env is None else env
+    value = _DEFAULTS[stage]
+    for key in _ENV_KEYS[stage]:
+        raw = src.get(key)
+        if raw:
+            try:
+                value = float(raw)
+                break
+            except (TypeError, ValueError):
+                pass
+    return min(value, _HARD_CEILING[stage])
+
+
+def read_timeout(stage: str) -> float:
+    return _READ[stage]
+
+
+def as_httpx_timeout(
+    total_seconds: float,
+    *,
+    read: float,
+    connect: float = CONNECT_TIMEOUT_SECONDS,
+) -> httpx.Timeout:
+    """把一个总时长(秒)转成分段 httpx.Timeout:read 短、write=总、connect 短。"""
+    total = max(float(total_seconds), 1.0)
+    return httpx.Timeout(
+        connect=min(connect, total),
+        read=min(read, total),
+        write=total,
+        pool=min(connect, total),
+    )
+
+
+def httpx_timeout(stage: str, env: Mapping[str, str] | None = None) -> httpx.Timeout:
+    """按阶段直接构造 httpx.Timeout(已含 env 覆盖 + read 短上限)。"""
+    return as_httpx_timeout(total_timeout(stage, env=env), read=_READ[stage])

+ 10 - 4
content_agent/integrations/video_fetch.py

@@ -18,6 +18,8 @@ from typing import Any
 import httpx
 import imageio_ffmpeg
 
+from content_agent.integrations import timeout_config
+
 # platform_profiles 里写的是 "iOS UA"/"PC UA" 占位,这里映射成真实串 + Referer。
 _PLATFORM_DOWNLOAD_HEADERS = {
     "douyin": {
@@ -32,8 +34,8 @@ _PLATFORM_DOWNLOAD_HEADERS = {
 # 已拍板压缩档:360p / 1fps / 低清,实测 ~4MB(memory/video-multimodal-analysis)。
 _FFMPEG_ARGS = ["-vf", "scale=360:-2,fps=1", "-crf", "33", "-c:a", "aac", "-b:a", "32k", "-ac", "1"]
 MAX_INLINE_BYTES = 500 * 1024 * 1024  # 本地 inline data URL 护栏;实际上游上限由 OpenRouter/Gemini 决定
-DOWNLOAD_TIMEOUT_SECONDS = 30 * 60.0
-COMPRESS_TIMEOUT_SECONDS = 20 * 60.0
+DOWNLOAD_TIMEOUT_SECONDS = timeout_config.total_timeout("video_download")  # 10min(原 30min)
+COMPRESS_TIMEOUT_SECONDS = 600.0  # ffmpeg 压缩 10min(原 20min;正常 ~8s,subprocess 硬超时真生效)
 
 
 class VideoFetchError(RuntimeError):
@@ -190,7 +192,9 @@ def _download_to_tempfile(
                 play_url,
                 headers=download_headers,
                 follow_redirects=True,
-                timeout=timeout_seconds,
+                timeout=timeout_config.as_httpx_timeout(
+                    timeout_seconds, read=timeout_config.read_timeout("video_download")
+                ),
             ) as response:
                 response.raise_for_status()
                 with open(tmp_path, "wb") as file:
@@ -208,7 +212,9 @@ def _download_to_tempfile(
             play_url,
             headers=download_headers,
             follow_redirects=True,
-            timeout=timeout_seconds,
+            timeout=timeout_config.as_httpx_timeout(
+                timeout_seconds, read=timeout_config.read_timeout("video_download")
+            ),
         )
         response.raise_for_status()
         if clock() - started_at > timeout_seconds:

+ 42 - 0
tech_documents/数据接口与来源/embedding接口.md

@@ -0,0 +1,42 @@
+# Embedding(文本向量化)接口 · 实测
+
+> 实测可用:**2026-06-17**。用途:「作用域回扣」(scope-link)——把 5 棵分类树节点 + 提取出的候选作用域值向量化,做余弦最近邻对齐(对得上复用现有节点原名、对不上新建)。
+
+## 服务
+- 提供方:**火山方舟(Volcengine Ark)**
+- 模型:**Doubao-embedding-vision**(多模态向量化,支持文本 / 图片;本项目只用文本)
+- 接入方式:预置推理接入点(用 **ep-id** 调用,系统自动匹配预置服务)
+
+## 调用契约
+
+| 项 | 值 |
+|---|---|
+| Endpoint | `POST https://ark.cn-beijing.volces.com/api/v3/embeddings/multimodal` |
+| Auth | `Authorization: Bearer <ARK_API_KEY>`(密钥存 `.env`,**不入库 / 不入文档**) |
+| `model` | `ep-20260617180207-vmwg6` |
+| 输入 | `{"model":"ep-...","input":[{"type":"text","text":"<文本>"}]}` |
+| 输出 | `data.embedding` = 长度 **2048** 的浮点向量 |
+
+请求示例:
+```bash
+curl https://ark.cn-beijing.volces.com/api/v3/embeddings/multimodal \
+  -H "Authorization: Bearer $ARK_API_KEY" -H "Content-Type: application/json" \
+  -d '{"model":"ep-20260617180207-vmwg6","input":[{"type":"text","text":"撕裂共识"}]}'
+```
+
+## 关键坑(实测)
+- **vision 版必须走 `/embeddings/multimodal`**;标准 `/api/v3/embeddings`(input 为字符串数组)会返回 `InvalidParameter`。
+- 多模态接口是**单条调用**(一次请求 = 一个向量),批量需多次请求(可并发)。
+- 火山方舟用「**预置推理接入点**」即可(用模型 ID / ep-id 直接调,无需自建模型接入点);模型必须先在该 key 所属**项目**开通。
+- 图片向量化:把 input 项换成 `{"type":"image_url","image_url":{"url":"<url 或 data url>"}}`(同一接口,本项目暂不用)。
+
+## env(密钥放 .env,gitignore)
+```
+ARK_API_KEY=<火山方舟 API Key>
+ARK_EMBEDDING_EP=ep-20260617180207-vmwg6
+ARK_EMBEDDING_URL=https://ark.cn-beijing.volces.com/api/v3/embeddings/multimodal
+ARK_EMBEDDING_DIM=2048
+```
+
+## 在本项目的用途
+`scope-link`:① 一次性把 global_category(5 棵树 ~5000 节点)的节点名向量化、缓存成本地 `.npy`;② 提取出候选作用域值时即时向量化 → 与缓存做余弦最近邻 → 取 top-K 交模型判定「对齐现有 or 新建」。规模小(~20MB),**内存 + numpy 暴力最近邻即可,无需向量数据库**。

+ 1 - 1
tests/test_oss_archive.py

@@ -210,7 +210,7 @@ def test_archive_due_records_keeps_failed_attempt_pending_before_deadline():
 
     def upload(src_url, **kwargs):
         assert "referer" not in kwargs
-        assert kwargs["timeout_seconds"] == 3600.0
+        assert kwargs["timeout_seconds"] == 300.0  # 修永久卡死:OSS 单次尝试 3600→300
         return {
             "status": "failed",
             "failure_type": "oss_upload_http_error",

+ 4 - 1
tests/test_query_variant.py

@@ -85,7 +85,10 @@ def test_openrouter_client_uses_custom_profile(monkeypatch):
 
     assert result.query == "气血食疗"
     assert result.prompt_version == "custom-query-v2"
-    assert captured["timeout"] == 7
+    # 修永久卡死:超时改成分段 httpx.Timeout(read 短),write 相承载总时长(7s)。
+    assert isinstance(captured["timeout"], query_variant.httpx.Timeout)
+    assert captured["timeout"].write == 7
+    assert captured["timeout"].read == min(7, query_variant.timeout_config.read_timeout("query_llm"))
     assert captured["json"]["temperature"] == 0.9
     assert captured["json"]["max_tokens"] == 23
     assert captured["json"]["messages"] == [

+ 199 - 0
tests/test_timeout_hardening.py

@@ -0,0 +1,199 @@
+"""超时硬化 / 有界等待 / 僵尸线程清理 的单测(修永久卡死)。"""
+
+from __future__ import annotations
+
+import threading
+import time
+
+import httpx
+
+from content_agent.business_modules.content_discovery import pattern_recall
+from content_agent.business_modules.content_discovery.pattern_recall import recall_decision
+from content_agent.integrations import (
+    crawapi_http,
+    oss_upload,
+    timeout_config,
+    video_fetch,
+)
+from content_agent.integrations.bounded_pool import DaemonThreadPoolExecutor, run_bounded
+from content_agent.integrations.runtime_files import LocalRuntimeFileStore
+from content_agent import flow_ledger_service as fls
+from tests.gemini_helpers import FakeGeminiVideoClient, fake_gemini_pool
+
+
+# ---------- timeout_config ----------
+
+def test_total_timeout_defaults_match_user_caps():
+    env = {}
+    assert timeout_config.total_timeout("oss", env=env) == 300.0
+    assert timeout_config.total_timeout("video_download", env=env) == 600.0
+    assert timeout_config.total_timeout("video_llm", env=env) == 600.0
+    assert timeout_config.total_timeout("crawapi", env=env) == 180.0
+    assert timeout_config.total_timeout("query_llm", env=env) == 120.0
+    assert timeout_config.total_timeout("pg", env=env) == 30.0
+
+
+def test_total_timeout_env_override_and_hard_ceiling():
+    assert timeout_config.total_timeout("oss", env={"CONTENT_AGENT_OSS_TIMEOUT_SECONDS": "120"}) == 120.0
+    # env 想配 9999 也被硬上限钳到 600,杜绝再现 3600。
+    assert timeout_config.total_timeout("oss", env={"CONTENT_AGENT_OSS_TIMEOUT_SECONDS": "9999"}) == 600.0
+    # 坏值忽略,回默认。
+    assert timeout_config.total_timeout("oss", env={"CONTENT_AGENT_OSS_TIMEOUT_SECONDS": "abc"}) == 300.0
+
+
+def test_httpx_timeout_is_segmented_with_short_read():
+    t = timeout_config.httpx_timeout("video_download", env={})
+    assert isinstance(t, httpx.Timeout)
+    assert t.read == 120.0          # read 短,停吐字节即抛
+    assert t.write == 600.0         # write 承载总时长
+    assert t.connect == timeout_config.CONNECT_TIMEOUT_SECONDS
+
+
+def test_as_httpx_timeout_read_capped_by_total():
+    t = timeout_config.as_httpx_timeout(5.0, read=60.0)
+    assert t.read == 5.0            # read 不超过总时长
+    assert t.write == 5.0
+
+
+# ---------- bounded_pool ----------
+
+def test_run_bounded_results_aligned_by_offset():
+    items = [1, 2, 3, 4]
+    out = run_bounded(items, lambda x: x * 10, max_workers=3, per_future_timeout=5.0, on_timeout=lambda i, o: -1)
+    assert out == [10, 20, 30, 40]
+
+
+def test_run_bounded_single_timeout_skips_and_does_not_hang():
+    started = time.monotonic()
+
+    def work(x):
+        if x == "slow":
+            time.sleep(2.0)  # 远超 per_future_timeout;daemon 线程,被放弃
+        return f"ok:{x}"
+
+    out = run_bounded(
+        ["a", "slow", "b"],
+        work,
+        max_workers=3,
+        per_future_timeout=0.1,
+        on_timeout=lambda item, offset: f"timeout:{item}",
+    )
+    elapsed = time.monotonic() - started
+    assert out[0] == "ok:a"
+    assert out[1] == "timeout:slow"   # 单条超时记占位
+    assert out[2] == "ok:b"           # 其余正常
+    assert elapsed < 1.5              # 主线程不被卡死 worker 拖住(不等满 2s)
+
+
+def test_run_bounded_worker_exception_becomes_placeholder():
+    def work(x):
+        if x == "boom":
+            raise RuntimeError("worker exploded")
+        return f"ok:{x}"
+
+    out = run_bounded(
+        ["a", "boom"],
+        work,
+        max_workers=2,
+        per_future_timeout=5.0,
+        on_timeout=lambda item, offset: f"failed:{item}",
+    )
+    assert out == ["ok:a", "failed:boom"]
+
+
+def test_daemon_thread_pool_executor_threads_are_daemon():
+    with DaemonThreadPoolExecutor(max_workers=1, thread_name_prefix="t") as pool:
+        is_daemon = pool.submit(lambda: threading.current_thread().daemon).result(timeout=5)
+    assert is_daemon is True
+
+
+# ---------- recall_decision: 单条判定超时跳过、run 不中止 ----------
+
+class _SlowForOneClient(FakeGeminiVideoClient):
+    def __init__(self, slow_id: str, sleep_s: float = 2.0):
+        super().__init__()
+        self.slow_id = slow_id
+        self.sleep_s = sleep_s
+
+    def analyze(self, content, media, source_context):
+        if str(content.get("platform_content_id")) == self.slow_id:
+            time.sleep(self.sleep_s)
+        return super().analyze(content, media, source_context)
+
+
+def test_one_slow_video_judge_times_out_and_run_continues(tmp_path, monkeypatch):
+    monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda: 4)
+    monkeypatch.setattr(recall_decision, "JUDGE_WORKER_RESULT_TIMEOUT_SECONDS", 0.1)
+    runtime = LocalRuntimeFileStore(tmp_path)
+    runtime.prepare_run("run_001")
+    ids = ["content_000", "content_001", "content_002"]
+    items = [{"platform_content_id": cid, "platform": "douyin"} for cid in ids]
+    media = [{"platform_content_id": cid} for cid in ids]
+    bundles = [{"content": {"platform_content_id": cid}} for cid in ids]
+
+    started = time.monotonic()
+    recalled = pattern_recall.run(
+        "run_001", "policy_run_001", items, media, bundles, {}, runtime,
+        _SlowForOneClient("content_001", sleep_s=2.0),
+    )
+    elapsed = time.monotonic() - started
+
+    by_id = {row["platform_content_id"]: row for row in recalled["pattern_recall_evidence"]}
+    assert by_id["content_001"]["evidence_summary"]["final_status"] == "failed"
+    assert by_id["content_001"]["evidence_summary"]["failure_type"] == "video_judge_timeout"
+    assert by_id["content_000"]["evidence_summary"]["final_status"] == "ok"
+    assert by_id["content_002"]["evidence_summary"]["final_status"] == "ok"
+    assert elapsed < 1.5  # 不等满那条 2s 的慢 worker
+
+
+# ---------- flow_ledger 新失败类型展示登记 ----------
+
+def test_flow_ledger_registers_new_timeout_failure_types():
+    assert fls._technical_retry_stage("video_judge_timeout") == "video_judge"
+    assert fls._technical_retry_stage("oss_worker_timeout") == "oss"  # startswith oss_
+    assert fls._technical_retry_stage_label("video_judge_timeout") == "视频判定调度"
+    assert fls._technical_retry_failure_label("video_judge_timeout") == "视频判定调度超时"
+    assert fls._technical_retry_failure_label("oss_worker_timeout") == "OSS 归档 worker 超时"
+    assert "超时" in fls._technical_retry_brief_reason("video_judge_timeout", {}, {})
+    assert "超时" in fls._technical_retry_brief_reason("oss_worker_timeout", {}, {})
+
+
+# ---------- 各 client 的 httpx.Timeout 真生效(代表性 2 处) ----------
+
+def test_oss_upload_passes_segmented_timeout():
+    captured = {}
+
+    def fake_post(url, *, json, timeout):
+        captured["timeout"] = timeout
+        return httpx.Response(200, json={"oss_object": {"cdn_url": "x"}}, request=httpx.Request("POST", url))
+
+    oss_upload.upload_video_from_url("http://v/1.mp4", http_post=fake_post)
+    assert isinstance(captured["timeout"], httpx.Timeout)
+    assert captured["timeout"].read == timeout_config.read_timeout("oss")   # 60
+    assert captured["timeout"].write == 300.0
+
+
+def test_crawapi_post_passes_segmented_timeout():
+    captured = {}
+
+    class FakeClient:
+        def post(self, url, *, json, headers, timeout):
+            captured["timeout"] = timeout
+            return httpx.Response(200, json={"code": 0, "data": {}}, request=httpx.Request("POST", url))
+
+    crawapi_http.post_crawapi_json(
+        http_client=FakeClient(),
+        base_url="https://crawler.example/",
+        path="search",
+        payload={},
+        operation="search",
+        timeout_seconds=180.0,
+        business_codes=set(),
+    )
+    assert isinstance(captured["timeout"], httpx.Timeout)
+    assert captured["timeout"].read == timeout_config.read_timeout("crawapi")  # 60
+    assert captured["timeout"].write == 180.0
+
+
+def test_video_download_default_timeout_lowered():
+    assert video_fetch.DOWNLOAD_TIMEOUT_SECONDS == 600.0