Bläddra i källkod

feat(v3-m5bc): 判定批并发(offset 归位)+ QuotaCapped 配额闸(确定性截断)

M5B:recall_decision 唯一 analyze 循环改 ThreadPoolExecutor 批并发——
judgments[offset]=future.result() 按 offset 归位(与完成序无关);id 编号/三 list 组装/
落盘留主线程按 offset 串行 → 产物逐条等价串行;_safe_analyze 兜底意外异常转 failed
不炸 run;max_workers 经 load_policy 读(兜底4);三处调用方(graph/walk_engine×2)零签名改。

M5C:新建 gemini_quota.QuotaCappedGeminiVideoClient(实现 Protocol,Lock 计数,
analyze 超额返回 _fail 仅作 backstop);run_service.start_run 每 run new 一个 wrapper
注入 deps(per-run 计数跨初始+walk 两次 recall 累计);recall 提交前按 offset 预判截断
(offset>=remaining 直接 gemini_quota_exhausted,consume(实际提交数))——截断边界与
完成序解耦,串/并行一致;命中 cap 发 gemini_quota_exhausted 事件走 run_events.jsonl
(镜像 stage 事件行结构,文件/DB 双模式可观测;原 lifecycle 通道文件模式是 no-op)。
非 wrapper client(单测直调 FakeGemini)经 getattr 兜底不限额。

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Sam Lee 2 dagar sedan
förälder
incheckning
57e9cf6380

+ 78 - 5
content_agent/business_modules/content_discovery/pattern_recall/recall_decision.py

@@ -1,18 +1,20 @@
-"""内容判定(V3-M2C):Gemini 直读视频,产出判定字段写进 pattern_match_result。
+"""内容判定(V3-M2C,M5 批并发):Gemini 直读视频,产出判定字段写进 pattern_match_result。
 
 替换原 decode 异步解构 + 分类树匹配。每条内容调一次 gemini_video_client.analyze,
 把 4 个判定字段(fit_senior_50plus / fit_confidence / relevance_score / reason)写进
 discovered item 的 pattern_match_result,并镜像 fit_senior_50plus 进 content_audience_profile。
-pattern_recall / category_or_element_binding 设为 "matched" 桥接键,仅为 M2→M3 过渡
-(让未重写的旧 hard_gate 不误拒);M3 删除旧门槛后移除这两个桥接键
+M5:analyze 纯 IO 且每条独立,用 ThreadPool 并发执行、按 offset 归位回收;
+id 编号、三个 list 的组装与落盘全部留主线程按 offset 串行 → 产物与串行逐条等价
 """
 
 from __future__ import annotations
 
+from concurrent.futures import ThreadPoolExecutor, as_completed
 from datetime import datetime, timezone
 from typing import Any
 
 from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION
+from content_agent.integrations.walk_graph_json import WalkGraphStore
 from content_agent.interfaces import GeminiVideoClient, RuntimeFileStore
 
 
@@ -31,13 +33,15 @@ def run(
     media_by_content_id = {
         row["platform_content_id"]: row for row in content_media_records
     }
+    judgments = _collect_judgments(
+        discovered_content_items, media_by_content_id, source_context, gemini_video_client
+    )
     evidence_rows: list[dict[str, Any]] = []
     updated_items: list[dict[str, Any]] = []
     updated_bundles: list[dict[str, Any]] = []
     for offset, item in enumerate(discovered_content_items):
-        media = media_by_content_id.get(item["platform_content_id"], {})
         recall_evidence_id = f"recall_{start_index + offset:03d}"
-        judgment = gemini_video_client.analyze(item, media, source_context)
+        judgment = judgments[offset]
         pattern_match_result = _build_pattern_match_result(judgment, recall_evidence_id)
         updated_items.append(_update_discovered_item(item, pattern_match_result))
         updated_bundles.append(
@@ -56,6 +60,75 @@ def run(
     }
 
 
+def _collect_judgments(
+    discovered_content_items: list[dict[str, Any]],
+    media_by_content_id: dict[str, dict[str, Any]],
+    source_context: dict[str, Any],
+    gemini_video_client: GeminiVideoClient,
+) -> list[dict[str, Any]]:
+    """并发执行 analyze,按 offset 归位(与完成顺序无关)→ 结果与串行逐条等价。
+
+    worker 只返回 judgment、不碰共享 list;组装/落盘由调用方主线程按 offset 串行完成。
+    """
+    judgments: list[dict[str, Any]] = [None] * len(discovered_content_items)  # type: ignore[list-item]
+    if not discovered_content_items:
+        return judgments
+    # 配额截断按 offset 在提交前预判(与完成顺序无关)→ 串行/并发截断边界相同。
+    # 非 wrapper client(如单测直调的 FakeGemini)无 remaining_quota → 不限额。
+    remaining = getattr(gemini_video_client, "remaining_quota", lambda: len(discovered_content_items))()
+    submitted = min(remaining, len(discovered_content_items))
+    with ThreadPoolExecutor(max_workers=_resolve_max_workers()) as pool:
+        future_to_offset = {}
+        for offset, item in enumerate(discovered_content_items):
+            if offset >= submitted:
+                judgments[offset] = {
+                    "fit_senior_50plus": False,
+                    "fit_confidence": 0.0,
+                    "relevance_score": 0.0,
+                    "reason": "gemini_quota_exhausted",
+                    "status": "failed",
+                }
+                continue
+            future = pool.submit(
+                _safe_analyze,
+                gemini_video_client,
+                item,
+                media_by_content_id.get(item["platform_content_id"], {}),
+                source_context,
+            )
+            future_to_offset[future] = offset
+        for future in as_completed(future_to_offset):
+            judgments[future_to_offset[future]] = future.result()
+    getattr(gemini_video_client, "consume", lambda count: None)(submitted)
+    return judgments
+
+
+def _safe_analyze(
+    client: GeminiVideoClient,
+    item: dict[str, Any],
+    media: dict[str, Any],
+    source_context: dict[str, Any],
+) -> dict[str, Any]:
+    # analyze 自身失败一律返回 _fail 不抛;这里兜底意外异常,绝不让 future.result() 炸主线程。
+    try:
+        return client.analyze(item, media, source_context)
+    except Exception as exc:
+        return {
+            "fit_senior_50plus": False,
+            "fit_confidence": 0.0,
+            "relevance_score": 0.0,
+            "reason": f"analyze_raised: {type(exc).__name__}",
+            "status": "failed",
+        }
+
+
+def _resolve_max_workers() -> int:
+    try:
+        return int(WalkGraphStore().load_policy()["global"]["gemini_max_workers"])
+    except Exception:
+        return 4
+
+
 def _build_pattern_match_result(judgment: dict[str, Any], recall_evidence_id: str) -> dict[str, Any]:
     return {
         "fit_senior_50plus": bool(judgment.get("fit_senior_50plus", False)),

+ 43 - 0
content_agent/integrations/gemini_quota.py

@@ -0,0 +1,43 @@
+"""单 run Gemini 调用配额闸(V3-M5C)。
+
+实现 GeminiVideoClient Protocol,包装真/假 client 在 run_service 单点注入——
+对 recall/walk_engine/graph 全部 analyze 透明,零签名改。每 run new 一个实例,
+计数跨"初始 recall + walk 内两次 recall"累计。截断的正常路径是 recall 提交前
+按 offset 预判(remaining_quota/consume);analyze 内超额返回 _fail 仅作 backstop。
+"""
+
+from __future__ import annotations
+
+import threading
+from typing import Any
+
+from content_agent.integrations.gemini_video import _fail
+from content_agent.interfaces import GeminiVideoClient
+
+_UNLIMITED = 1_000_000_000
+
+
+class QuotaCappedGeminiVideoClient:
+    def __init__(self, inner: GeminiVideoClient, cap: int | None) -> None:
+        self.inner = inner
+        self.cap = cap
+        self.used = 0
+        self._lock = threading.Lock()
+
+    def remaining_quota(self) -> int:
+        with self._lock:
+            return (self.cap - self.used) if self.cap is not None else _UNLIMITED
+
+    def consume(self, count: int) -> None:
+        with self._lock:
+            self.used += count
+
+    def analyze(
+        self,
+        content: dict[str, Any],
+        media: dict[str, Any],
+        source_context: dict[str, Any],
+    ) -> dict[str, Any]:
+        if self.cap is not None and self.remaining_quota() < 0:
+            return _fail("gemini_quota_exhausted")
+        return self.inner.analyze(content, media, source_context)

+ 34 - 3
content_agent/run_service.py

@@ -7,7 +7,7 @@ from pathlib import Path
 from typing import Any
 from uuid import uuid4
 
-from content_agent.constants import RUNTIME_SCHEMA_VERSION
+from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
 from content_agent.business_modules import run_record
 from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
 from content_agent.graph import RunDependencies, build_run_graph
@@ -15,7 +15,9 @@ from content_agent.integrations.composite_runtime import CompositeRuntimeStore
 from content_agent.integrations.database_runtime import ContentSupplyDbConfig, DatabaseRuntimeStore
 from content_agent.integrations.demand_source import DemandSourceService
 from content_agent.integrations.douyin import CrawapiDouyinClient
+from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
 from content_agent.integrations.gemini_video import GeminiVideoClient as RealGeminiVideoClient
+from content_agent.integrations.walk_graph_json import WalkGraphStore
 from content_agent.integrations.mock_platform import MockPlatformClient
 from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
 from content_agent.integrations.policy_json import JsonPolicyBundleStore
@@ -32,6 +34,7 @@ from content_agent.interfaces import (
     RuntimeStore,
 )
 from content_agent.models import RunState
+from content_agent.record_payload import with_raw_payload
 from content_agent.schemas import RunStartRequest
 
 
@@ -96,7 +99,7 @@ class RunService:
         )
 
     def start_run(self, request: RunStartRequest) -> RunState:
-        run_id = f"v1_run_{uuid4().hex[:12]}"
+        run_id = request.run_id or f"v1_run_{uuid4().hex[:12]}"
         policy_run_id = f"policy_run_{run_id.removeprefix('v1_run_')}"
         source_ref = self._source_ref_from_request(request)
         initial_state: RunState = {
@@ -136,15 +139,36 @@ class RunService:
                 raw_payload={"source_ref": resolved_source_ref},
             )
 
+            # M5C: 每 run new 一个配额 wrapper(per-run 计数,跨初始+walk 两次 recall 共享)。
+            gemini_client = QuotaCappedGeminiVideoClient(self._gemini_video_client, _gemini_calls_cap())
             deps = RunDependencies(
                 runtime=self.runtime,
                 platform_client=self._platform_client(request.platform, request.platform_mode),
                 policy_store=self.policy_store,
                 query_variant_client=self.query_variant_client,
-                gemini_video_client=self._gemini_video_client,
+                gemini_video_client=gemini_client,
             )
             graph = build_run_graph(deps)
             state = graph.invoke(initial_state)
+            if gemini_client.cap is not None and gemini_client.used >= gemini_client.cap:
+                # 走 run_events.jsonl 通道(同 stage 事件):文件/DB 双模式都可观测。
+                quota_event = with_raw_payload(
+                    {
+                        "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
+                        "run_id": run_id,
+                        "policy_run_id": policy_run_id,
+                        "event_id": "gemini_quota",
+                        "event_type": "gemini_quota_exhausted",
+                        "status": "running",
+                        "input_ref": None,
+                        "output_ref": None,
+                        "error_code": None,
+                        "message": "gemini call quota reached; remaining judgments truncated",
+                        "created_at": _utc_now(),
+                    }
+                )
+                quota_event["raw_payload"].update({"cap": gemini_client.cap, "used": gemini_client.used})
+                self.runtime.append_jsonl(run_id, "run_events.jsonl", [quota_event])
             self._record_success_metadata(state)
             return state
         except Exception as exc:
@@ -589,6 +613,13 @@ def _gemini_video_client_from_env(env: dict[str, str]) -> GeminiVideoClient:
     return RealGeminiVideoClient.from_env(env)
 
 
+def _gemini_calls_cap() -> int | None:
+    try:
+        return WalkGraphStore().load_policy()["global"]["gemini_calls_per_run_cap"]
+    except Exception:
+        return None
+
+
 class _DeterministicGeminiVideoClient:
     """mock/默认判定 client:固定返回适合 50+ 的高分结果,供本地/smoke 无网跑通。"""