Forráskód Böngészése

feat: route video review media through OSS

Sam Lee 1 hete
szülő
commit
c40e75ffb7

+ 52 - 8
content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py

@@ -39,6 +39,11 @@ def run(
     evidence_rows: list[dict[str, Any]] = []
     updated_items: list[dict[str, Any]] = []
     updated_bundles: list[dict[str, Any]] = []
+    updated_media_records = _update_content_media_records(
+        content_media_records,
+        discovered_content_items,
+        judgments,
+    )
     for offset, item in enumerate(discovered_content_items):
         recall_evidence_id = f"recall_{start_index + offset:03d}"
         judgment = judgments[offset]
@@ -52,9 +57,11 @@ def run(
         )
 
     runtime.append_jsonl(run_id, "pattern_recall_evidence.jsonl", evidence_rows)
+    runtime.append_jsonl(run_id, "content_media_records.jsonl", updated_media_records)
     runtime.append_jsonl(run_id, "discovered_content_items.jsonl", updated_items)
     return {
         "pattern_recall_evidence": evidence_rows,
+        "content_media_records": updated_media_records,
         "discovered_content_items": updated_items,
         "evidence_bundles": updated_bundles,
     }
@@ -73,16 +80,9 @@ def _collect_judgments(
     judgments: list[dict[str, Any]] = [None] * len(discovered_content_items)  # type: ignore[list-item]
     if not discovered_content_items:
         return judgments
-    # 配额截断按 offset 在提交前预判(与完成顺序无关)→ 串行/并发截断边界相同。
-    # 非 wrapper client(如单测直调的 FakeGemini)无 remaining_quota → 不限额。
-    remaining = getattr(gemini_video_client, "remaining_quota", lambda: len(discovered_content_items))()
-    submitted = min(remaining, len(discovered_content_items))
     with ThreadPoolExecutor(max_workers=_resolve_max_workers()) as pool:
         future_to_offset = {}
         for offset, item in enumerate(discovered_content_items):
-            if offset >= submitted:
-                judgments[offset] = _fail("gemini_quota_exhausted")
-                continue
             future = pool.submit(
                 _safe_analyze,
                 gemini_video_client,
@@ -93,10 +93,54 @@ def _collect_judgments(
             future_to_offset[future] = offset
         for future in as_completed(future_to_offset):
             judgments[future_to_offset[future]] = future.result()
-    getattr(gemini_video_client, "consume", lambda count: None)(submitted)
     return judgments
 
 
+def _update_content_media_records(
+    content_media_records: list[dict[str, Any]],
+    discovered_content_items: list[dict[str, Any]],
+    judgments: list[dict[str, Any]],
+) -> list[dict[str, Any]]:
+    updates_by_key: dict[tuple[Any, Any], dict[str, Any]] = {}
+    for item, judgment in zip(discovered_content_items, judgments):
+        update = judgment.get("media_storage_update") if isinstance(judgment, dict) else None
+        if isinstance(update, dict):
+            updates_by_key[(item.get("platform"), item.get("platform_content_id"))] = update
+    if not updates_by_key:
+        return content_media_records
+    updated_records: list[dict[str, Any]] = []
+    for record in content_media_records:
+        key = (record.get("platform"), record.get("platform_content_id"))
+        update = updates_by_key.get(key)
+        if not update:
+            updated_records.append(record)
+            continue
+        raw_payload = dict(record.get("raw_payload") or {})
+        raw_payload.update(
+            {
+                "content_media_status": update.get("content_media_status", record.get("content_media_status")),
+                "oss_url": update.get("oss_url"),
+                "local_path": update.get("local_path"),
+            }
+        )
+        if update.get("failure_reason"):
+            raw_payload["failure_reason"] = update["failure_reason"]
+        update_payload = update.get("raw_payload")
+        if isinstance(update_payload, dict):
+            raw_payload.update(update_payload)
+        merged = {
+            **record,
+            "content_media_status": update.get("content_media_status", record.get("content_media_status")),
+            "oss_url": update.get("oss_url"),
+            "local_path": update.get("local_path"),
+            "raw_payload": {key: value for key, value in raw_payload.items() if value is not None},
+        }
+        if update.get("failure_reason"):
+            merged["failure_reason"] = update["failure_reason"]
+        updated_records.append(merged)
+    return updated_records
+
+
 def _safe_analyze(
     client: GeminiVideoClient,
     item: dict[str, Any],

+ 3 - 13
content_agent/business_modules/m6_acceptance_report.py

@@ -115,13 +115,6 @@ def _gemini_summary(
     run_events: list[dict[str, Any]],
     evidence_rows: list[dict[str, Any]],
 ) -> dict[str, Any]:
-    quota_events = [
-        row
-        for row in run_events
-        if row.get("event_type") == "gemini_quota_exhausted"
-    ]
-    latest_quota = quota_events[-1] if quota_events else {}
-    quota_payload = latest_quota.get("raw_payload") or {}
     summaries = [
         row.get("evidence_summary") or row.get("raw_payload") or {}
         for row in evidence_rows
@@ -148,13 +141,10 @@ def _gemini_summary(
         elif final_status:
             final_failed_count += 1
 
-    used = quota_payload.get("used")
-    if used is None:
-        used = len(summaries)
     return {
-        "used": used,
-        "cap": quota_payload.get("cap"),
-        "quota_exhausted": bool(quota_events),
+        "used": len(summaries),
+        "cap": None,
+        "quota_exhausted": False,
         "successful_count": successful_count,
         "failure_type_counts": dict(failure_type_counts),
         "http_status_counts": dict(http_status_counts),

+ 20 - 0
content_agent/business_modules/result_source_lookup.py

@@ -43,6 +43,7 @@ def run(
         policy_run_id,
         discovered_content_items,
         decision_by_target_id,
+        media_by_platform_content_id,
     )
     author_assets, author_asset_rows, author_role_rows = _build_author_assets(
         run_id,
@@ -179,6 +180,7 @@ def _build_content_assets(
                     "content_media_status": media_by_platform_content_id[
                         platform_content_id
                     ]["content_media_status"],
+                    "media_snapshot": _media_snapshot(media_by_platform_content_id.get(platform_content_id)),
                 },
                 decision,
             )
@@ -222,6 +224,7 @@ def _build_review_records(
                     "content_media_status": media_by_platform_content_id[
                         platform_content_id
                     ]["content_media_status"],
+                    "media_snapshot": _media_snapshot(media_by_platform_content_id.get(platform_content_id)),
                 },
                 decision,
             )
@@ -233,6 +236,7 @@ def _build_reject_records(
     policy_run_id: str,
     discovered_content_items: list[dict[str, Any]],
     decision_by_target_id: dict[str, dict[str, Any]],
+    media_by_platform_content_id: dict[str, dict[str, Any]],
 ) -> list[dict[str, Any]]:
     reject_records: list[dict[str, Any]] = []
     for item in discovered_content_items:
@@ -247,6 +251,7 @@ def _build_reject_records(
                     "policy_run_id": policy_run_id,
                     "main_decision_reason_code": decision["decision_reason_code"],
                     "decision_id": decision["decision_id"],
+                    "media_snapshot": _media_snapshot(media_by_platform_content_id.get(platform_content_id)),
                     "source_evidence": decision["source_evidence"],
                 },
                 decision,
@@ -255,6 +260,21 @@ def _build_reject_records(
     return reject_records
 
 
+def _media_snapshot(media: dict[str, Any] | None) -> dict[str, Any]:
+    if not media:
+        return {}
+    raw_payload = media.get("raw_payload") if isinstance(media.get("raw_payload"), dict) else {}
+    snapshot = {
+        "content_media_status": media.get("content_media_status"),
+        "oss_url": media.get("oss_url"),
+        "local_path": media.get("local_path"),
+        "play_url": media.get("play_url"),
+        "oss_object_key": raw_payload.get("oss_object_key"),
+        "upload_failure_reason": raw_payload.get("upload_failure_reason") or raw_payload.get("failure_reason"),
+    }
+    return {key: value for key, value in snapshot.items() if value is not None}
+
+
 def _with_v4_explanation(
     record: dict[str, Any],
     decision: dict[str, Any],

+ 2 - 2
content_agent/business_modules/walk_engine.py

@@ -222,7 +222,7 @@ def _execute_query_batch(
     )
     return {
         "discovered_content_items": recalled["discovered_content_items"],
-        "content_media_records": discovered["content_media_records"],
+        "content_media_records": recalled["content_media_records"],
         "evidence_bundles": recalled["evidence_bundles"],
         "rule_decisions": decisions,
         "query_failures": result.get("query_failures", []),
@@ -642,7 +642,7 @@ def _expand_authors(
     )
     return {
         "discovered_content_items": recalled["discovered_content_items"],
-        "content_media_records": discovered["content_media_records"],
+        "content_media_records": recalled["content_media_records"],
         "evidence_bundles": recalled["evidence_bundles"],
         "rule_decisions": decisions,
         "search_queries": author_search_queries,

+ 1 - 0
content_agent/integrations/database_runtime.py

@@ -120,6 +120,7 @@ JSON_FILE_PAYLOAD_COLUMNS = {
 }
 
 JSONL_UPSERT_KEYS = {
+    "content_media_records.jsonl": ("run_id", "policy_run_id", "platform", "platform_content_id"),
     "search_queries.jsonl": ("run_id", "policy_run_id", "search_query_id"),
     "pattern_recall_evidence.jsonl": ("run_id", "policy_run_id", "recall_evidence_id"),
     "search_clues.jsonl": ("run_id", "policy_run_id", "clue_id"),

+ 0 - 43
content_agent/integrations/gemini_quota.py

@@ -1,43 +0,0 @@
-"""单 run Gemini 调用配额闸(V3-M5C)。
-
-实现 GeminiVideoClient Protocol,包装真/假 client 在 run_service 单点注入——
-对 recall/walk_engine/graph 全部 analyze 透明,零签名改。每 run new 一个实例,
-计数跨"初始 recall + walk 内两次 recall"累计。截断的正常路径是 recall 提交前
-按 offset 预判(remaining_quota/consume);analyze 内超额返回 _fail 仅作 backstop。
-"""
-
-from __future__ import annotations
-
-import threading
-from typing import Any
-
-from content_agent.integrations.gemini_video import _fail
-from content_agent.interfaces import GeminiVideoClient
-
-_UNLIMITED = 1_000_000_000
-
-
-class QuotaCappedGeminiVideoClient:
-    def __init__(self, inner: GeminiVideoClient, cap: int | None) -> None:
-        self.inner = inner
-        self.cap = cap
-        self.used = 0
-        self._lock = threading.Lock()
-
-    def remaining_quota(self) -> int:
-        with self._lock:
-            return (self.cap - self.used) if self.cap is not None else _UNLIMITED
-
-    def consume(self, count: int) -> None:
-        with self._lock:
-            self.used += count
-
-    def analyze(
-        self,
-        content: dict[str, Any],
-        media: dict[str, Any],
-        source_context: dict[str, Any],
-    ) -> dict[str, Any]:
-        if self.cap is not None and self.remaining_quota() < 0:
-            return _fail("gemini_quota_exhausted")
-        return self.inner.analyze(content, media, source_context)

+ 65 - 29
content_agent/integrations/gemini_video.py

@@ -9,19 +9,15 @@ from __future__ import annotations
 
 import json
 import os
-from pathlib import Path
 from typing import Any, Callable, Mapping
 
 import httpx
 
-from content_agent.integrations import video_fetch
+from content_agent.integrations import oss_upload, video_fetch
 
 DEFAULT_OPENROUTER_BASE_URL = "https://openrouter.ai/api/v1"
 DEFAULT_VIDEO_MODEL = "google/gemini-3-flash-preview"
 DEFAULT_VIDEO_TIMEOUT_SECONDS = 90.0
-# 原片留档目录(2026-06-12 拍板:全量存,含未过审;相对仓库根,服务器同款)。
-DEFAULT_RAW_VIDEO_DIR = "data"
-
 V4_GEMINI_QUERY_RELEVANCE_SCHEMA_VERSION = "v4_gemini_query_relevance.v1"
 
 _SYSTEM_PROMPT = "你是视频内容与搜索需求相关性审核助手。只输出一个 JSON 对象,不要任何解释或 markdown。"
@@ -55,6 +51,12 @@ def _fail(
     }
 
 
+def _with_media_update(result: dict[str, Any], update: dict[str, Any] | None) -> dict[str, Any]:
+    if update:
+        return {**result, "media_storage_update": update}
+    return result
+
+
 def _clamp_score(value: Any) -> float:
     try:
         number = float(value)
@@ -123,16 +125,16 @@ class GeminiVideoClient:
         base_url: str = DEFAULT_OPENROUTER_BASE_URL,
         timeout_seconds: float = DEFAULT_VIDEO_TIMEOUT_SECONDS,
         fetch_fn: Callable[..., str] = video_fetch.fetch_and_compress,
+        oss_upload_fn: Callable[..., dict[str, Any]] | None = None,
         http_post: Callable[..., Any] = httpx.post,
-        raw_video_save_dir: str | None = None,
     ) -> None:
         self.api_key = api_key
         self.model = model
         self.base_url = base_url.rstrip("/")
         self.timeout_seconds = timeout_seconds
         self.fetch_fn = fetch_fn
+        self.oss_upload_fn = oss_upload_fn
         self.http_post = http_post
-        self.raw_video_save_dir = raw_video_save_dir
 
     @classmethod
     def from_env(cls, env: Mapping[str, str] | None = None) -> "GeminiVideoClient":
@@ -145,19 +147,9 @@ class GeminiVideoClient:
             model=source.get("CONTENT_AGENT_VIDEO_LLM_MODEL") or DEFAULT_VIDEO_MODEL,
             base_url=source.get("OPENROUTER_BASE_URL") or DEFAULT_OPENROUTER_BASE_URL,
             timeout_seconds=float(source.get("CONTENT_AGENT_VIDEO_LLM_TIMEOUT_SECONDS") or DEFAULT_VIDEO_TIMEOUT_SECONDS),
-            raw_video_save_dir=DEFAULT_RAW_VIDEO_DIR,
+            oss_upload_fn=oss_upload.upload_video_from_env,
         )
 
-    def _raw_save_path(self, content: dict[str, Any]) -> str | None:
-        """原片留档路径 data/{run_id}/{platform_content_id}.mp4;身份字段缺失则不存。"""
-        if not self.raw_video_save_dir:
-            return None
-        run_id = content.get("run_id")
-        platform_content_id = content.get("platform_content_id")
-        if not run_id or not platform_content_id:
-            return None
-        return str(Path(self.raw_video_save_dir) / str(run_id) / f"{platform_content_id}.mp4")
-
     def analyze(
         self,
         content: dict[str, Any],
@@ -167,15 +159,18 @@ class GeminiVideoClient:
         query_text = _query_text(content, source_context)
         play_url = media.get("play_url")
         if not play_url:
-            return _fail("no_play_url", query_text=query_text)
-        fetch_kwargs: dict[str, Any] = {}
-        save_path = self._raw_save_path(content)
-        if save_path:
-            fetch_kwargs["save_raw_to"] = save_path
+            return _with_media_update(
+                _fail("no_play_url", query_text=query_text),
+                _media_unavailable_update("no_play_url"),
+            )
+        media_update = self._upload_to_oss(play_url, content.get("platform", "douyin"))
         try:
-            data_url = self.fetch_fn(play_url, content.get("platform", "douyin"), **fetch_kwargs)
+            data_url = self.fetch_fn(play_url, content.get("platform", "douyin"))
         except Exception as exc:
-            return _fail("video_fetch_failed", query_text=query_text, exception_type=type(exc).__name__)
+            return _with_media_update(
+                _fail("video_fetch_failed", query_text=query_text, exception_type=type(exc).__name__),
+                media_update or _media_unavailable_update(f"video_fetch_failed:{type(exc).__name__}"),
+            )
 
         messages = [
             {"role": "system", "content": _SYSTEM_PROMPT},
@@ -198,7 +193,7 @@ class GeminiVideoClient:
                     timeout=self.timeout_seconds,
                 )
                 response.raise_for_status()
-                return _parse(response.json(), query_text)
+                return _with_media_update(_parse(response.json(), query_text), media_update)
             except httpx.HTTPError as exc:
                 last_failure = _fail(
                     "gemini_http_error",
@@ -209,7 +204,7 @@ class GeminiVideoClient:
                 )
                 if attempt == 0 and _retryable_http(exc):
                     continue
-                return last_failure
+                return _with_media_update(last_failure, media_update)
             except (KeyError, IndexError, TypeError, ValueError, json.JSONDecodeError) as exc:
                 last_failure = _fail(
                     "gemini_response_invalid",
@@ -219,8 +214,39 @@ class GeminiVideoClient:
                 )
                 if attempt == 0:
                     continue
-                return last_failure
-        return last_failure or _fail("gemini_unknown_error", query_text=query_text)
+                return _with_media_update(last_failure, media_update)
+        return _with_media_update(last_failure or _fail("gemini_unknown_error", query_text=query_text), media_update)
+
+    def _upload_to_oss(self, play_url: str, platform: str) -> dict[str, Any] | None:
+        if not self.oss_upload_fn:
+            return None
+        upload_result = self.oss_upload_fn(
+            play_url,
+            referer=video_fetch._download_headers(platform, None),
+        )
+        if upload_result.get("status") == "ok":
+            raw_payload = {
+                "oss_object_key": upload_result.get("oss_object_key"),
+                "save_oss_timestamp": upload_result.get("save_oss_timestamp"),
+            }
+            return {
+                "content_media_status": "oss_uploaded",
+                "oss_url": upload_result.get("oss_url"),
+                "local_path": None,
+                "raw_payload": {k: v for k, v in raw_payload.items() if v is not None},
+            }
+        raw_payload = {
+            "upload_failure_reason": upload_result.get("failure_type") or "oss_upload_failed",
+            "upload_exception_type": upload_result.get("exception_type"),
+            "upload_http_status_code": upload_result.get("http_status_code"),
+        }
+        return {
+            "content_media_status": "oss_upload_pending",
+            "oss_url": None,
+            "local_path": None,
+            "failure_reason": raw_payload["upload_failure_reason"],
+            "raw_payload": {k: v for k, v in raw_payload.items() if v is not None},
+        }
 
 
 class MissingGeminiVideoClient:
@@ -234,3 +260,13 @@ class MissingGeminiVideoClient:
         source_context: dict[str, Any],
     ) -> dict[str, Any]:
         return _fail("gemini_config_missing", query_text=_query_text(content, source_context), exception_type=self.reason)
+
+
+def _media_unavailable_update(failure_reason: str) -> dict[str, Any]:
+    return {
+        "content_media_status": "unavailable",
+        "oss_url": None,
+        "local_path": None,
+        "failure_reason": failure_reason,
+        "raw_payload": {"failure_reason": failure_reason},
+    }

+ 100 - 0
content_agent/integrations/oss_upload.py

@@ -0,0 +1,100 @@
+from __future__ import annotations
+
+import os
+from typing import Any, Callable, Mapping
+
+import httpx
+
+
+DEFAULT_OSS_UPLOAD_URL = "http://crawler-upload.aiddit.com/crawler/oss/upload_stream"
+DEFAULT_OSS_TIMEOUT_SECONDS = 45.0
+
+
+def upload_video_from_url(
+    src_url: str,
+    *,
+    project: str | None = None,
+    referer: Mapping[str, str] | None = None,
+    use_proxy: bool = True,
+    endpoint: str = DEFAULT_OSS_UPLOAD_URL,
+    timeout_seconds: float = DEFAULT_OSS_TIMEOUT_SECONDS,
+    http_post: Callable[..., Any] = httpx.post,
+) -> dict[str, Any]:
+    if not src_url:
+        return _failure("missing_src_url")
+    payload: dict[str, Any] = {
+        "src_url": src_url,
+        "src_type": "video",
+        "use_proxy": use_proxy,
+    }
+    if project:
+        payload["project"] = project
+    if referer:
+        payload["referer"] = dict(referer)
+    try:
+        response = http_post(endpoint, json=payload, timeout=timeout_seconds)
+        response.raise_for_status()
+        body = response.json()
+    except httpx.HTTPError as exc:
+        return _failure(
+            "oss_upload_http_error",
+            exception_type=type(exc).__name__,
+            http_status_code=_http_status(exc),
+        )
+    except Exception as exc:
+        return _failure("oss_upload_failed", exception_type=type(exc).__name__)
+
+    oss_object = body.get("oss_object") if isinstance(body, dict) else None
+    if not isinstance(oss_object, dict) or not oss_object.get("cdn_url"):
+        return _failure("oss_upload_response_invalid")
+    return {
+        "status": "ok",
+        "oss_url": oss_object.get("cdn_url"),
+        "oss_object_key": oss_object.get("oss_object_key"),
+        "save_oss_timestamp": oss_object.get("save_oss_timestamp"),
+        "raw_payload": body,
+    }
+
+
+def upload_video_from_env(
+    src_url: str,
+    *,
+    referer: Mapping[str, str] | None = None,
+    http_post: Callable[..., Any] = httpx.post,
+) -> dict[str, Any]:
+    return upload_video_from_url(
+        src_url,
+        project=os.environ.get("CONTENT_AGENT_OSS_PROJECT") or None,
+        referer=referer,
+        use_proxy=_env_bool("CONTENT_AGENT_OSS_USE_PROXY", default=True),
+        endpoint=os.environ.get("CONTENT_AGENT_OSS_UPLOAD_URL") or DEFAULT_OSS_UPLOAD_URL,
+        timeout_seconds=float(os.environ.get("CONTENT_AGENT_OSS_TIMEOUT_SECONDS") or DEFAULT_OSS_TIMEOUT_SECONDS),
+        http_post=http_post,
+    )
+
+
+def _failure(
+    failure_type: str,
+    *,
+    exception_type: str | None = None,
+    http_status_code: int | None = None,
+) -> dict[str, Any]:
+    result: dict[str, Any] = {"status": "failed", "failure_type": failure_type}
+    if exception_type:
+        result["exception_type"] = exception_type
+    if http_status_code is not None:
+        result["http_status_code"] = http_status_code
+    return result
+
+
+def _http_status(exc: httpx.HTTPError) -> int | None:
+    response = getattr(exc, "response", None)
+    status_code = getattr(response, "status_code", None)
+    return int(status_code) if isinstance(status_code, int) else None
+
+
+def _env_bool(key: str, *, default: bool) -> bool:
+    value = os.environ.get(key)
+    if value is None or value == "":
+        return default
+    return value.strip().lower() not in {"0", "false", "no", "off"}

+ 3 - 1
content_agent/integrations/runtime_files.py

@@ -49,7 +49,7 @@ class LocalRuntimeFileStore:
     def append_jsonl(self, run_id: str, filename: str, rows: list[dict[str, Any]]) -> Path:
         path = self.run_dir(run_id) / filename
         path.parent.mkdir(parents=True, exist_ok=True)
-        if filename in {"pattern_recall_evidence.jsonl", "search_queries.jsonl"}:
+        if filename in {"content_media_records.jsonl", "pattern_recall_evidence.jsonl", "search_queries.jsonl"}:
             rows = _replace_keyed_rows(
                 self.read_jsonl(run_id, filename),
                 rows,
@@ -156,4 +156,6 @@ def _jsonl_key_fields(filename: str) -> tuple[str, ...]:
         return ("run_id", "policy_run_id", "recall_evidence_id")
     if filename == "search_queries.jsonl":
         return ("run_id", "policy_run_id", "search_query_id")
+    if filename == "content_media_records.jsonl":
+        return ("run_id", "policy_run_id", "platform", "platform_content_id")
     raise ValueError(f"unsupported keyed JSONL file: {filename}")

+ 1 - 15
content_agent/integrations/video_fetch.py

@@ -3,14 +3,13 @@
 从 play_url 下载视频(带平台下载头)→ imageio-ffmpeg 压到 ~4MB 低清 →
 base64 data URL,供 GeminiVideoClient 投喂(OpenRouter image_url)。
 真实下载/压缩只在 M7 live smoke 跑;单测全 mock。
-2026-06-12 拍板:下载成功的原片全量落盘 data/(过没过审都存,play_url 有时效留不住)
+原片不做本地持久化;长期复盘资产走 OSS,这里仅下载到内存并压缩后投喂模型
 """
 
 from __future__ import annotations
 
 import base64
 import subprocess
-from pathlib import Path
 from typing import Any
 
 import httpx
@@ -43,16 +42,6 @@ def _download_headers(platform: str, override: dict[str, str] | None) -> dict[st
     return _PLATFORM_DOWNLOAD_HEADERS.get(platform, {})
 
 
-def _save_raw(save_path: str, raw: bytes) -> None:
-    # 原片留档是 best-effort:磁盘问题绝不影响判定链路。
-    try:
-        path = Path(save_path)
-        path.parent.mkdir(parents=True, exist_ok=True)
-        path.write_bytes(raw)
-    except OSError:
-        pass
-
-
 def _compress(raw: bytes, ffmpeg_exe: str) -> bytes:
     # 超时保护:坏视频会让 ffmpeg 卡死,进而挂住一个判定并发线程(实测正常压缩 ~8s)。
     try:
@@ -79,7 +68,6 @@ def fetch_and_compress(
     http_client: Any | None = None,
     ffmpeg_exe: str | None = None,
     timeout_seconds: float = 90.0,
-    save_raw_to: str | None = None,
 ) -> str:
     if not play_url:
         raise VideoFetchError("missing play_url")
@@ -97,8 +85,6 @@ def fetch_and_compress(
         raise VideoFetchError(f"download failed: {type(exc).__name__}") from exc
     if not raw:
         raise VideoFetchError("empty download")
-    if save_raw_to:
-        _save_raw(save_raw_to, raw)
 
     compressed = _compress(raw, ffmpeg_exe or imageio_ffmpeg.get_ffmpeg_exe())
     if len(compressed) > MAX_INLINE_BYTES:

+ 2 - 46
content_agent/run_service.py

@@ -7,7 +7,7 @@ from pathlib import Path
 from typing import Any
 from uuid import uuid4
 
-from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
+from content_agent.constants import RUNTIME_SCHEMA_VERSION
 from content_agent.business_modules import run_record
 from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
 from content_agent.graph import RunDependencies, build_run_graph
@@ -15,10 +15,8 @@ from content_agent.integrations.composite_runtime import CompositeRuntimeStore
 from content_agent.integrations.database_runtime import ContentSupplyDbConfig, DatabaseRuntimeStore
 from content_agent.integrations.demand_source import DemandSourceService
 from content_agent.integrations.douyin import CrawapiDouyinClient
-from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
 from content_agent.integrations.gemini_video import GeminiVideoClient as RealGeminiVideoClient
 from content_agent.integrations.kuaishou import CrawapiKuaishouClient
-from content_agent.integrations.walk_graph_json import WalkGraphStore
 from content_agent.integrations.mock_platform import MockPlatformClient
 from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
@@ -35,7 +33,6 @@ from content_agent.interfaces import (
     RuntimeStore,
 )
 from content_agent.models import RunState
-from content_agent.record_payload import with_raw_payload
 from content_agent.schemas import RunStartRequest
 
 
@@ -140,36 +137,15 @@ class RunService:
                 raw_payload={"source_ref": resolved_source_ref},
             )
 
-            # M5C: 每 run new 一个配额 wrapper(per-run 计数,跨初始+walk 两次 recall 共享)。
-            gemini_client = QuotaCappedGeminiVideoClient(self._gemini_video_client, _gemini_calls_cap())
             deps = RunDependencies(
                 runtime=self.runtime,
                 platform_client=self._platform_client(request.platform, request.platform_mode),
                 policy_store=self.policy_store,
                 query_variant_client=self.query_variant_client,
-                gemini_video_client=gemini_client,
+                gemini_video_client=self._gemini_video_client,
             )
             graph = build_run_graph(deps)
             state = graph.invoke(initial_state)
-            if gemini_client.cap is not None and gemini_client.used >= gemini_client.cap:
-                # 走 run_events.jsonl 通道(同 stage 事件):文件/DB 双模式都可观测。
-                quota_event = with_raw_payload(
-                    {
-                        "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
-                        "run_id": run_id,
-                        "policy_run_id": policy_run_id,
-                        "event_id": "gemini_quota",
-                        "event_type": "gemini_quota_exhausted",
-                        "status": "running",
-                        "input_ref": None,
-                        "output_ref": None,
-                        "error_code": None,
-                        "message": "gemini call quota reached; remaining judgments truncated",
-                        "created_at": _utc_now(),
-                    }
-                )
-                quota_event["raw_payload"].update({"cap": gemini_client.cap, "used": gemini_client.used})
-                self.runtime.append_jsonl(run_id, "run_events.jsonl", [quota_event])
             self._record_success_metadata(state)
             return state
         except Exception as exc:
@@ -615,26 +591,6 @@ def _gemini_video_client_from_env(env: dict[str, str]) -> GeminiVideoClient:
     return RealGeminiVideoClient.from_env(env)
 
 
-def _gemini_calls_cap() -> int | None:
-    override = _optional_positive_int(os.environ.get("CONTENT_AGENT_GEMINI_CALLS_PER_RUN_CAP"))
-    if override is not None:
-        return override
-    try:
-        return WalkGraphStore().load_policy()["global"]["gemini_calls_per_run_cap"]
-    except Exception:
-        return None
-
-
-def _optional_positive_int(value: str | None) -> int | None:
-    if value is None or value == "":
-        return None
-    try:
-        parsed = int(value)
-    except ValueError:
-        return None
-    return parsed if parsed > 0 else None
-
-
 class _DeterministicGeminiVideoClient:
     """mock/默认判定 client:固定返回 V4 高相关结果,供本地/smoke 无网跑通。"""
 

+ 0 - 1
scripts/run_v4_m6_real_acceptance.py

@@ -20,7 +20,6 @@ from content_agent.schemas import RunStartRequest
 PLATFORMS = ("douyin", "kuaishou", "shipinhao")
 ALLOWED_DATA_ORIGINS = {"production_db", "mixed_with_runtime_export"}
 SMOKE_DEFAULTS = {
-    "CONTENT_AGENT_GEMINI_CALLS_PER_RUN_CAP": "3",
     "CONTENT_AGENT_GEMINI_MAX_WORKERS": "3",
     "CONTENT_AGENT_WALK_MAX_TOTAL_ACTIONS_PER_RUN": "4",
 }

+ 0 - 1
tech_documents/数据接口与来源/walk_policy.json

@@ -7,7 +7,6 @@
     "max_total_actions_per_run": { "value": 60, "provenance": "拍板 2026-06-11:沿用 v1 各边合计上限 60 作全局闸", "tbd": false },
     "max_depth": { "value": 3, "provenance": "v1 各边 max_depth=3", "tbd": false },
     "max_reseed_rounds": { "value": 1, "provenance": "拍板 2026-06-11:单层回灌(tag→query 不二次回灌),v1 等价", "tbd": false },
-    "gemini_calls_per_run_cap": { "value": 200, "provenance": "拍板 2026-06-12:单 run 上限约 $4.4(@$0.022/条),远高于典型 N≈20-40,封顶失控;M7 真负标定", "tbd": false },
     "gemini_max_workers": { "value": 4, "provenance": "拍板 2026-06-12:判定批并发度(IO 密集,不经 crawapi 限流);M7 实测调", "tbd": false }
   },
   "edge_budgets": [

+ 5 - 34
tests/test_concurrency_consistency.py

@@ -2,7 +2,7 @@
 
 固定 run_id(R9)+ 确定性 fake 下,max_workers=1(串行)与 4(并发)产出的核心产物
 剔除时间戳键后逐条相等;Jittered fake 强制乱序完成以暴露 offset 错位;
-配额截断按 offset 预判(与完成序无关);analyze 意外异常经 _safe_analyze 兜底不炸 run。
+analyze 意外异常经 _safe_analyze 兜底不炸 run。
 """
 
 from __future__ import annotations
@@ -11,9 +11,7 @@ from typing import Any
 
 from content_agent.business_modules.content_discovery import pattern_recall
 from content_agent.business_modules.content_discovery.pattern_recall import recall_decision
-from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
 from content_agent.integrations.runtime_files import LocalRuntimeFileStore
-from content_agent.run_service import RunService
 from tests.gemini_helpers import (
     FakeGeminiVideoClient,
     JitteredFakeGeminiVideoClient,
@@ -84,14 +82,14 @@ def test_jittered_completion_preserves_offset_order(tmp_path):
         assert updated["pattern_match_result"]["pattern_recall_evidence_id"] == f"recall_{i + 1:03d}"
 
 
-def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
+def test_all_items_are_submitted_without_quota_truncation(tmp_path, monkeypatch):
     items, media, bundles = _synthetic_recall_inputs(5)
     statuses = {}
     for label, workers in [("serial", 1), ("concurrent", 4)]:
         monkeypatch.setattr(recall_decision, "_resolve_max_workers", lambda workers=workers: workers)
         runtime = LocalRuntimeFileStore(tmp_path / label)
         runtime.prepare_run("run_001")
-        client = QuotaCappedGeminiVideoClient(FakeGeminiVideoClient(), cap=2)
+        client = FakeGeminiVideoClient()
         recalled = pattern_recall.run(
             "run_001", "policy_run_001", items, media, bundles, {}, runtime, client,
         )
@@ -102,36 +100,9 @@ def test_quota_cap_deterministic_truncation(tmp_path, monkeypatch):
             )
             for row in recalled["discovered_content_items"]
         ]
-        assert client.used == 2
-    # 截断边界按 offset 预判:前 2 条真判、后 3 条配额拒,串/并行完全一致。
+        assert len(client.calls) == 5
     assert statuses["serial"] == statuses["concurrent"]
-    assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "failed", "failed", "failed"]
-    assert all(reason == "" for _, reason in statuses["serial"][2:])
-
-
-def test_quota_exhaustion_is_observable(tmp_path, monkeypatch):
-    import content_agent.run_service as run_service_module
-
-    monkeypatch.setattr(run_service_module, "_gemini_calls_cap", lambda: 2)
-    artifacts = replay_case(
-        "sph_caihong",
-        runtime_root=tmp_path / "rt",
-        gemini_video_client=FakeGeminiVideoClient(),
-    )
-    assert artifacts.state["status"] == "success"
-    quota_rows = [
-        row for row in artifacts.files["pattern_recall_evidence.jsonl"]
-        if row["evidence_summary"].get("failure_type") == "gemini_quota_exhausted"
-    ]
-    assert quota_rows
-    assert all(row["evidence_summary"]["final_status"] == "failed" for row in quota_rows)
-    quota_events = [
-        row for row in artifacts.files["run_events.jsonl"]
-        if row["event_type"] == "gemini_quota_exhausted"
-    ]
-    assert len(quota_events) == 1
-    assert quota_events[0]["raw_payload"]["cap"] == 2
-    assert quota_events[0]["raw_payload"]["used"] == 2
+    assert [status for status, _ in statuses["serial"]] == ["ok", "ok", "ok", "ok", "ok"]
 
 
 def test_analyze_exception_does_not_break_run(tmp_path):

+ 36 - 0
tests/test_database_runtime.py

@@ -193,6 +193,42 @@ def test_database_runtime_appends_jsonl_with_raw_payload():
     assert json.loads(values["raw_payload"])["search_query_id"] == "q_001"
 
 
+def test_database_runtime_upserts_content_media_records():
+    connection = FakeConnection()
+    store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)
+
+    store.append_jsonl(
+        "run_001",
+        "content_media_records.jsonl",
+        [
+            {
+                "record_schema_version": "runtime_record.v1",
+                "run_id": "run_001",
+                "policy_run_id": "policy_run_001",
+                "platform": "douyin",
+                "platform_content_id": "content_001",
+                "content_media_status": "oss_uploaded",
+                "content_metadata_source": "douyin_keyword_search",
+                "play_url": "https://source.example/video.mp4",
+                "local_path": None,
+                "oss_url": "https://res.example/video.mp4",
+                "raw_payload": {
+                    "run_id": "run_001",
+                    "platform_content_id": "content_001",
+                    "oss_object_key": "crawler/video/content_001.mp4",
+                },
+                "created_at": "2026-06-15T00:00:00+00:00",
+            }
+        ],
+    )
+
+    sql, _ = connection.statements[-1]
+
+    assert "INSERT INTO `content_agent_content_media_records`" in sql
+    assert "ON DUPLICATE KEY UPDATE" in sql
+    assert "`oss_url` = VALUES(`oss_url`)" in sql
+
+
 def test_database_runtime_upserts_failed_search_query_status():
     connection = FakeConnection()
     store = DatabaseRuntimeStore(_config(), connection_factory=lambda: connection)

+ 45 - 5
tests/test_gemini_video.py

@@ -81,25 +81,65 @@ def test_analyze_parses_json_in_markdown_fence_and_clamps_score():
     assert result["query_relevance_score"] == 100.0
 
 
-def test_analyze_passes_raw_save_path_when_dir_configured(tmp_path):
+def test_analyze_uploads_video_to_oss_and_returns_media_update():
     body = '{"query_relevance_score": 80, "query_relevance_reason": "x"}'
     seen = {}
 
-    def fetch(play_url, platform, **kwargs):
-        seen.update(kwargs)
+    def upload(play_url, **kwargs):
+        seen["upload"] = {"play_url": play_url, **kwargs}
+        return {
+            "status": "ok",
+            "oss_url": "https://res.example/c1.mp4",
+            "oss_object_key": "crawler/video/c1.mp4",
+            "save_oss_timestamp": 123,
+        }
+
+    def fetch(play_url, platform):
+        seen["fetch"] = {"play_url": play_url, "platform": platform}
         return "data:video/mp4;base64,AAAA"
 
     client = GeminiVideoClient(
         api_key="k",
         fetch_fn=fetch,
+        oss_upload_fn=upload,
         http_post=lambda *a, **k: FakeResponse(body),
-        raw_video_save_dir=str(tmp_path),
     )
     item = {**_ITEM, "run_id": "run_1"}
     result = client.analyze(item, _MEDIA, _CTX)
 
-    assert seen["save_raw_to"] == str(tmp_path / "run_1" / "c1.mp4")
+    assert seen["upload"]["play_url"] == "http://v/x"
+    assert seen["upload"]["referer"]["Referer"] == "https://www.douyin.com/"
+    assert seen["fetch"] == {"play_url": "http://v/x", "platform": "douyin"}
     assert result["query_relevance_score"] == 80.0
+    assert result["media_storage_update"] == {
+        "content_media_status": "oss_uploaded",
+        "oss_url": "https://res.example/c1.mp4",
+        "local_path": None,
+        "raw_payload": {
+            "oss_object_key": "crawler/video/c1.mp4",
+            "save_oss_timestamp": 123,
+        },
+    }
+
+
+def test_analyze_keeps_judging_when_oss_upload_fails():
+    body = '{"query_relevance_score": 80, "query_relevance_reason": "x"}'
+    client = GeminiVideoClient(
+        api_key="k",
+        fetch_fn=lambda play_url, platform: "data:video/mp4;base64,AAAA",
+        oss_upload_fn=lambda play_url, **kwargs: {
+            "status": "failed",
+            "failure_type": "oss_upload_http_error",
+            "http_status_code": 500,
+        },
+        http_post=lambda *a, **k: FakeResponse(body),
+    )
+
+    result = client.analyze(_ITEM, _MEDIA, _CTX)
+
+    assert result["final_status"] == "ok"
+    assert result["media_storage_update"]["content_media_status"] == "oss_upload_pending"
+    assert result["media_storage_update"]["failure_reason"] == "oss_upload_http_error"
 
 
 def test_analyze_no_play_url_returns_v4_fail():

+ 65 - 0
tests/test_oss_upload.py

@@ -0,0 +1,65 @@
+from __future__ import annotations
+
+import httpx
+
+from content_agent.integrations.oss_upload import upload_video_from_url
+
+
+class FakeResponse:
+    def __init__(self, body, *, status_code=200):
+        self._body = body
+        self.status_code = status_code
+        self.request = httpx.Request("POST", "http://oss.test/upload")
+
+    def raise_for_status(self):
+        if self.status_code >= 400:
+            response = httpx.Response(self.status_code, request=self.request)
+            raise httpx.HTTPStatusError("bad", request=self.request, response=response)
+
+    def json(self):
+        return self._body
+
+
+def test_upload_video_from_url_parses_oss_object():
+    seen = {}
+
+    def post(*args, **kwargs):
+        seen.update({"args": args, **kwargs})
+        return FakeResponse(
+            {
+                "status": 0,
+                "msg": "ok",
+                "oss_object": {
+                    "cdn_url": "https://res.example/video.mp4",
+                    "oss_object_key": "crawler/video/a.mp4",
+                    "save_oss_timestamp": 123,
+                },
+            }
+        )
+
+    result = upload_video_from_url(
+        "https://source.example/video.mp4",
+        project="content-agent",
+        referer={"Referer": "https://www.douyin.com/"},
+        http_post=post,
+        endpoint="http://oss.test/upload",
+    )
+
+    assert result["status"] == "ok"
+    assert result["oss_url"] == "https://res.example/video.mp4"
+    assert result["oss_object_key"] == "crawler/video/a.mp4"
+    assert seen["json"]["src_type"] == "video"
+    assert seen["json"]["use_proxy"] is True
+    assert seen["json"]["project"] == "content-agent"
+
+
+def test_upload_video_from_url_failure_is_returned_not_raised():
+    result = upload_video_from_url(
+        "https://source.example/video.mp4",
+        http_post=lambda *a, **k: FakeResponse({}, status_code=500),
+        endpoint="http://oss.test/upload",
+    )
+
+    assert result["status"] == "failed"
+    assert result["failure_type"] == "oss_upload_http_error"
+    assert result["http_status_code"] == 500

+ 41 - 0
tests/test_runtime_files.py

@@ -114,6 +114,47 @@ def test_runtime_files_are_parseable_and_consistent(tmp_path):
     assert validation["status"] == "pass"
 
 
+def test_content_media_records_replace_same_video_row(tmp_path):
+    runtime = LocalRuntimeFileStore(tmp_path / "runtime")
+    runtime.prepare_run("run_001")
+    base = {
+        "record_schema_version": "runtime_record.v1",
+        "run_id": "run_001",
+        "policy_run_id": "policy_run_001",
+        "platform": "douyin",
+        "platform_content_id": "content_001",
+        "content_media_status": "metadata_only",
+        "content_metadata_source": "douyin_keyword_search",
+        "play_url": "https://source.example/video.mp4",
+        "local_path": None,
+        "oss_url": None,
+        "raw_payload": {"run_id": "run_001", "platform_content_id": "content_001"},
+        "created_at": "2026-06-15T00:00:00+00:00",
+    }
+    runtime.append_jsonl("run_001", "content_media_records.jsonl", [base])
+    runtime.append_jsonl(
+        "run_001",
+        "content_media_records.jsonl",
+        [
+            {
+                **base,
+                "content_media_status": "oss_uploaded",
+                "oss_url": "https://res.example/video.mp4",
+                "raw_payload": {
+                    **base["raw_payload"],
+                    "oss_object_key": "crawler/video/content_001.mp4",
+                },
+            }
+        ],
+    )
+
+    rows = runtime.read_jsonl("run_001", "content_media_records.jsonl")
+
+    assert len(rows) == 1
+    assert rows[0]["content_media_status"] == "oss_uploaded"
+    assert rows[0]["oss_url"] == "https://res.example/video.mp4"
+
+
 def test_all_platform_query_failure_writes_failed_query_runtime_records(tmp_path):
     service = RunService(
         runtime_root=tmp_path / "runtime" / "v1",

+ 0 - 15
tests/test_video_fetch.py

@@ -68,18 +68,3 @@ def test_fetch_oversize_raises(monkeypatch):
     with pytest.raises(VideoFetchError, match="oversize"):
         fetch_and_compress("http://v/x", "douyin", http_client=FakeHttpClient(), ffmpeg_exe="ffmpeg")
 
-
-def test_fetch_saves_raw_video_when_requested(monkeypatch, tmp_path):
-    # 2026-06-12 拍板: save_raw_to 指定时原片落盘(自动建父目录);默认不落盘。
-    _patch_compress(monkeypatch)
-    save_path = tmp_path / "run_1" / "c1.mp4"
-    fetch_and_compress(
-        "http://v/x", "douyin",
-        http_client=FakeHttpClient(content=b"rawvideo"),
-        ffmpeg_exe="ffmpeg",
-        save_raw_to=str(save_path),
-    )
-    assert save_path.read_bytes() == b"rawvideo"
-
-    fetch_and_compress("http://v/x", "douyin", http_client=FakeHttpClient(), ffmpeg_exe="ffmpeg")
-    assert list(tmp_path.iterdir()) == [save_path.parent]

+ 14 - 4
tests/test_walk_graph_config.py

@@ -31,10 +31,15 @@ def test_policy_unwraps_pinned_values():
     assert low_budget(10) == 5
 
 
-def test_policy_pins_gemini_cap_and_workers():
-    # 2026-06-12 拍板(M5):单 run Gemini 调用上限 200、判定并发度 4
+def test_policy_pins_gemini_workers_without_call_cap():
+    # Gemini 数量上限已取消;仅保留判定并发度
     policy_global = WalkGraphStore().load_policy()["global"]
-    assert policy_global["gemini_calls_per_run_cap"] == 200
+    assert set(policy_global) == {
+        "max_total_actions_per_run",
+        "max_depth",
+        "max_reseed_rounds",
+        "gemini_max_workers",
+    }
     assert policy_global["gemini_max_workers"] == 4
 
 
@@ -46,7 +51,12 @@ def test_policy_allows_smoke_env_overrides(monkeypatch):
 
     assert policy_global["max_total_actions_per_run"] == 4
     assert policy_global["gemini_max_workers"] == 3
-    assert policy_global["gemini_calls_per_run_cap"] == 200
+    assert set(policy_global) == {
+        "max_total_actions_per_run",
+        "max_depth",
+        "max_reseed_rounds",
+        "gemini_max_workers",
+    }
 
 
 def test_policy_edge_budgets_match_decided_values():