run_service.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. from __future__ import annotations
  2. import os
  3. from collections import Counter
  4. from datetime import datetime, timezone
  5. from pathlib import Path
  6. from typing import Any
  7. from uuid import uuid4
  8. from content_agent.constants import RUNTIME_RECORD_SCHEMA_VERSION, RUNTIME_SCHEMA_VERSION
  9. from content_agent.business_modules import run_record
  10. from content_agent.errors import ContentAgentError, ErrorCode, error_from_exception
  11. from content_agent.graph import RunDependencies, build_run_graph
  12. from content_agent.integrations.composite_runtime import CompositeRuntimeStore
  13. from content_agent.integrations.database_runtime import ContentSupplyDbConfig, DatabaseRuntimeStore
  14. from content_agent.integrations.demand_source import DemandSourceService
  15. from content_agent.integrations.douyin import CrawapiDouyinClient
  16. from content_agent.integrations.gemini_quota import QuotaCappedGeminiVideoClient
  17. from content_agent.integrations.gemini_video import GeminiVideoClient as RealGeminiVideoClient
  18. from content_agent.integrations.walk_graph_json import WalkGraphStore
  19. from content_agent.integrations.mock_platform import MockPlatformClient
  20. from content_agent.integrations.shipinhao import CrawapiShipinhaoClient
  21. from content_agent.integrations.policy_json import JsonPolicyBundleStore
  22. from content_agent.integrations.query_variant import (
  23. MissingQueryVariantClient,
  24. query_variant_client_from_env,
  25. )
  26. from content_agent.integrations.runtime_files import LocalRuntimeFileStore
  27. from content_agent.interfaces import (
  28. GeminiVideoClient,
  29. PlatformSearchClient,
  30. PolicyBundleStore,
  31. QueryVariantClient,
  32. RuntimeStore,
  33. )
  34. from content_agent.models import RunState
  35. from content_agent.record_payload import with_raw_payload
  36. from content_agent.schemas import RunStartRequest
  37. class RunService:
  38. def __init__(
  39. self,
  40. runtime_root: Path | str = Path("runtime/v1"),
  41. runtime: RuntimeStore | None = None,
  42. policy_store: PolicyBundleStore | None = None,
  43. demand_source: DemandSourceService | None = None,
  44. query_variant_client: QueryVariantClient | None = None,
  45. gemini_video_client: GeminiVideoClient | None = None,
  46. ) -> None:
  47. self.runtime = runtime or LocalRuntimeFileStore(runtime_root)
  48. self.policy_store = policy_store or JsonPolicyBundleStore(Path("."))
  49. self.demand_source = demand_source
  50. self.query_variant_client = query_variant_client or MissingQueryVariantClient(
  51. "query variant client is not configured"
  52. )
  53. self._gemini_video_client = gemini_video_client or _DeterministicGeminiVideoClient()
  54. @classmethod
  55. def from_env(cls, runtime_root: Path | str = Path("runtime/v1")) -> "RunService":
  56. local_runtime = LocalRuntimeFileStore(runtime_root)
  57. env = _merged_project_env()
  58. query_variant_client = query_variant_client_from_env(env)
  59. gemini_video_client = _gemini_video_client_from_env(env)
  60. db_runtime_enabled = _env_enabled("CONTENT_AGENT_DB_RUNTIME_ENABLED")
  61. try:
  62. config = ContentSupplyDbConfig.from_env()
  63. except Exception as exc:
  64. if not db_runtime_enabled:
  65. return cls(
  66. runtime_root=runtime_root,
  67. runtime=local_runtime,
  68. query_variant_client=query_variant_client,
  69. gemini_video_client=gemini_video_client,
  70. )
  71. raise ContentAgentError(
  72. ErrorCode.DB_CONFIG_MISSING,
  73. "content supply db config is missing",
  74. {"exception_type": type(exc).__name__},
  75. ) from exc
  76. demand_source = DemandSourceService(config)
  77. if not db_runtime_enabled:
  78. return cls(
  79. runtime_root=runtime_root,
  80. runtime=local_runtime,
  81. demand_source=demand_source,
  82. query_variant_client=query_variant_client,
  83. gemini_video_client=gemini_video_client,
  84. )
  85. db_runtime = DatabaseRuntimeStore(config)
  86. return cls(
  87. runtime_root=runtime_root,
  88. runtime=CompositeRuntimeStore(db_runtime, local_runtime),
  89. demand_source=demand_source,
  90. query_variant_client=query_variant_client,
  91. gemini_video_client=gemini_video_client,
  92. )
  93. def start_run(self, request: RunStartRequest) -> RunState:
  94. run_id = request.run_id or f"v1_run_{uuid4().hex[:12]}"
  95. policy_run_id = f"policy_run_{run_id.removeprefix('v1_run_')}"
  96. source_ref = self._source_ref_from_request(request)
  97. initial_state: RunState = {
  98. "run_id": run_id,
  99. "policy_run_id": policy_run_id,
  100. "schema_version": RUNTIME_SCHEMA_VERSION,
  101. "platform": request.platform,
  102. "platform_mode": request.platform_mode,
  103. "source": request.source,
  104. "strategy_version": request.strategy_version,
  105. "current_step": "start",
  106. "status": "running",
  107. "errors": [],
  108. }
  109. try:
  110. self.runtime.prepare_run(run_id)
  111. self._create_run_record(run_id, request, source_ref)
  112. source = self._resolve_source(request)
  113. resolved_source_ref = self._source_ref_from_resolved_source(source, source_ref)
  114. if resolved_source_ref != source_ref:
  115. self.runtime.update_run_record(
  116. run_id,
  117. {
  118. "demand_content_id": _demand_content_id_from_source(source),
  119. "source_ref": resolved_source_ref,
  120. },
  121. )
  122. initial_state["source"] = source
  123. self._append_lifecycle_event(
  124. run_id,
  125. policy_run_id,
  126. event_id="lifecycle_start",
  127. event_type="run_started",
  128. status="running",
  129. message="run started",
  130. raw_payload={"source_ref": resolved_source_ref},
  131. )
  132. # M5C: 每 run new 一个配额 wrapper(per-run 计数,跨初始+walk 两次 recall 共享)。
  133. gemini_client = QuotaCappedGeminiVideoClient(self._gemini_video_client, _gemini_calls_cap())
  134. deps = RunDependencies(
  135. runtime=self.runtime,
  136. platform_client=self._platform_client(request.platform, request.platform_mode),
  137. policy_store=self.policy_store,
  138. query_variant_client=self.query_variant_client,
  139. gemini_video_client=gemini_client,
  140. )
  141. graph = build_run_graph(deps)
  142. state = graph.invoke(initial_state)
  143. if gemini_client.cap is not None and gemini_client.used >= gemini_client.cap:
  144. # 走 run_events.jsonl 通道(同 stage 事件):文件/DB 双模式都可观测。
  145. quota_event = with_raw_payload(
  146. {
  147. "record_schema_version": RUNTIME_RECORD_SCHEMA_VERSION,
  148. "run_id": run_id,
  149. "policy_run_id": policy_run_id,
  150. "event_id": "gemini_quota",
  151. "event_type": "gemini_quota_exhausted",
  152. "status": "running",
  153. "input_ref": None,
  154. "output_ref": None,
  155. "error_code": None,
  156. "message": "gemini call quota reached; remaining judgments truncated",
  157. "created_at": _utc_now(),
  158. }
  159. )
  160. quota_event["raw_payload"].update({"cap": gemini_client.cap, "used": gemini_client.used})
  161. self.runtime.append_jsonl(run_id, "run_events.jsonl", [quota_event])
  162. self._record_success_metadata(state)
  163. return state
  164. except Exception as exc:
  165. error = self._classify_error(exc)
  166. failed_state = self._failed_state(initial_state, error)
  167. self._record_failure_metadata(
  168. failed_state,
  169. request,
  170. source_ref,
  171. error,
  172. )
  173. return failed_state
  174. def _source_ref_from_request(self, request: RunStartRequest) -> dict[str, Any]:
  175. if request.demand_content_id is not None:
  176. return {
  177. "source_type": "demand_content",
  178. "demand_content_id": request.demand_content_id,
  179. }
  180. if request.run_label:
  181. return {
  182. "source_type": "demand_content",
  183. "run_label": request.run_label,
  184. }
  185. if request.source:
  186. return {"source_type": "local_source", "source": request.source}
  187. return {
  188. "source_type": "demand_content_default",
  189. "selector": "pg_pattern_v2_passed_first",
  190. }
  191. def _resolve_source(self, request: RunStartRequest) -> str | dict[str, Any] | None:
  192. if request.demand_content_id is not None:
  193. if not self.demand_source:
  194. raise ContentAgentError(
  195. ErrorCode.DB_CONFIG_MISSING,
  196. "demand source db is not configured",
  197. {"selector": "demand_content_id"},
  198. )
  199. source = self.demand_source.get_by_id(request.demand_content_id)
  200. return source
  201. if request.run_label:
  202. if not self.demand_source:
  203. raise ContentAgentError(
  204. ErrorCode.DB_CONFIG_MISSING,
  205. "demand source db is not configured",
  206. {"selector": "run_label"},
  207. )
  208. source = self.demand_source.get_by_run_label(request.run_label)
  209. return source
  210. if request.source:
  211. return request.source
  212. if not self.demand_source:
  213. raise ContentAgentError(
  214. ErrorCode.DB_CONFIG_MISSING,
  215. "demand source db is not configured",
  216. {"selector": "default_pg_pattern_v2_passed"},
  217. )
  218. return self.demand_source.get_default_pg_pattern_source()
  219. def _source_ref_from_resolved_source(
  220. self,
  221. source: str | dict[str, Any] | None,
  222. source_ref: dict[str, Any],
  223. ) -> dict[str, Any]:
  224. if source_ref.get("source_type") != "demand_content_default" or not isinstance(
  225. source,
  226. dict,
  227. ):
  228. return source_ref
  229. demand_content_id = _demand_content_id_from_source(source)
  230. resolved = dict(source_ref)
  231. if demand_content_id is not None:
  232. resolved["demand_content_id"] = demand_content_id
  233. return resolved
  234. def _create_run_record(
  235. self,
  236. run_id: str,
  237. request: RunStartRequest,
  238. source_ref: dict[str, Any],
  239. ) -> None:
  240. self.runtime.create_run_record(
  241. {
  242. "run_id": run_id,
  243. "demand_content_id": request.demand_content_id,
  244. "run_label": request.run_label,
  245. "platform": request.platform,
  246. "platform_mode": request.platform_mode,
  247. "strategy_version": request.strategy_version,
  248. "status": "running",
  249. "current_step": "start",
  250. "source_ref": source_ref,
  251. "started_at": _utc_now(),
  252. }
  253. )
  254. def _record_success_metadata(self, state: RunState) -> None:
  255. final_status = "partial_success" if state.get("query_failures") else "success"
  256. state["status"] = final_status
  257. self.runtime.record_policy_run(_policy_run_record_from_state(state))
  258. validation = self.validate_run(state["run_id"])
  259. self._update_final_output_validation(state["run_id"], validation)
  260. self.runtime.update_run_record(
  261. state["run_id"],
  262. {
  263. "status": final_status,
  264. "current_step": state.get("current_step", "review_strategy"),
  265. "validation_status": validation["status"],
  266. "completed_at": _utc_now(),
  267. },
  268. )
  269. self._append_lifecycle_event(
  270. state["run_id"],
  271. state["policy_run_id"],
  272. event_id="lifecycle_success",
  273. event_type="run_succeeded",
  274. status=final_status,
  275. message="run succeeded"
  276. if final_status == "success"
  277. else "run partially succeeded",
  278. output_ref="final_output.json",
  279. raw_payload={
  280. "validation_status": validation["status"],
  281. "policy_bundle_id": state.get("policy_bundle_id"),
  282. "query_failures": state.get("query_failures", []),
  283. },
  284. )
  285. def _update_final_output_validation(self, run_id: str, validation: dict[str, Any]) -> None:
  286. final_output = self.runtime.read_json(run_id, "final_output.json")
  287. validation_status = validation["status"]
  288. findings = validation.get("findings", [])
  289. final_output["validation_status"] = validation_status
  290. summary = final_output.setdefault("summary", {})
  291. summary["run_path_complete"] = validation_status == "pass"
  292. summary["trace_complete"] = validation_status == "pass"
  293. summary["validation_findings_summary"] = [
  294. finding.get("message", str(finding)) if isinstance(finding, dict) else str(finding)
  295. for finding in findings
  296. ]
  297. self.runtime.update_json(run_id, "final_output.json", final_output)
  298. def _record_failure_metadata(
  299. self,
  300. state: RunState,
  301. request: RunStartRequest,
  302. source_ref: dict[str, Any],
  303. error: ContentAgentError,
  304. ) -> None:
  305. run_id = state["run_id"]
  306. policy_run_id = state["policy_run_id"]
  307. try:
  308. self.runtime.update_run_record(
  309. run_id,
  310. {
  311. "status": "failed",
  312. "current_step": state.get("current_step", "failed"),
  313. "error_code": error.error_code.value,
  314. "error_message": error.message,
  315. "error_detail": error.detail,
  316. "completed_at": _utc_now(),
  317. },
  318. )
  319. self._record_platform_query_failure_details(run_id, policy_run_id, error)
  320. self._append_lifecycle_event(
  321. run_id,
  322. policy_run_id,
  323. event_id="lifecycle_failed",
  324. event_type="run_failed",
  325. status="failed",
  326. message=error.message,
  327. error_code=error.error_code.value,
  328. raw_payload={
  329. "error_detail": error.detail,
  330. "source_ref": source_ref,
  331. "platform_mode": request.platform_mode,
  332. },
  333. )
  334. except Exception:
  335. # Preserve the original run failure; DB failure here is already reflected by the state.
  336. return
  337. def _record_platform_query_failure_details(
  338. self,
  339. run_id: str,
  340. policy_run_id: str,
  341. error: ContentAgentError,
  342. ) -> None:
  343. query_failures = _query_failures_from_error(error)
  344. if not query_failures:
  345. return
  346. try:
  347. search_queries = self.runtime.read_jsonl(run_id, "search_queries.jsonl")
  348. except FileNotFoundError:
  349. return
  350. if not search_queries:
  351. return
  352. records = run_record.build_platform_query_failure_records(
  353. run_id,
  354. policy_run_id,
  355. search_queries,
  356. query_failures,
  357. )
  358. self.runtime.append_jsonl(run_id, "search_queries.jsonl", records["search_queries"])
  359. self.runtime.append_jsonl(run_id, "search_clues.jsonl", records["search_clues"])
  360. self.runtime.append_jsonl(run_id, "run_events.jsonl", records["run_events"])
  361. def _append_lifecycle_event(
  362. self,
  363. run_id: str,
  364. policy_run_id: str,
  365. *,
  366. event_id: str,
  367. event_type: str,
  368. status: str,
  369. message: str,
  370. input_ref: str | None = None,
  371. output_ref: str | None = None,
  372. error_code: str | None = None,
  373. raw_payload: dict[str, Any] | None = None,
  374. ) -> None:
  375. self.runtime.append_run_event_records(
  376. run_id,
  377. policy_run_id,
  378. [
  379. {
  380. "event_id": event_id,
  381. "event_type": event_type,
  382. "status": status,
  383. "input_ref": input_ref,
  384. "output_ref": output_ref,
  385. "error_code": error_code,
  386. "message": message,
  387. "raw_payload": raw_payload or {},
  388. "created_at": _utc_now(),
  389. }
  390. ],
  391. )
  392. def _failed_state(self, initial_state: RunState, error: ContentAgentError) -> RunState:
  393. return {
  394. **initial_state,
  395. "current_step": "failed",
  396. "status": "failed",
  397. "error_code": error.error_code.value,
  398. "error_message": error.message,
  399. "error_detail": error.detail,
  400. "http_status_code": error.status_code,
  401. "errors": [error.message],
  402. }
  403. def _classify_error(self, exc: Exception) -> ContentAgentError:
  404. if isinstance(exc, ContentAgentError):
  405. return exc
  406. if isinstance(exc, FileNotFoundError):
  407. return ContentAgentError(
  408. ErrorCode.INVALID_SOURCE,
  409. "source file not found",
  410. {"exception_type": type(exc).__name__},
  411. status_code=400,
  412. )
  413. if isinstance(exc, ValueError) and "unknown strategy_version" in str(exc):
  414. return ContentAgentError(
  415. ErrorCode.POLICY_BUNDLE_NOT_FOUND,
  416. "policy bundle not found",
  417. {"exception_type": type(exc).__name__},
  418. status_code=400,
  419. )
  420. if isinstance(exc, ValueError) and "evidence_pack" in str(exc):
  421. return ContentAgentError(
  422. ErrorCode.INVALID_SOURCE,
  423. "invalid source",
  424. {"exception_type": type(exc).__name__},
  425. status_code=400,
  426. )
  427. return error_from_exception(exc, detail={"exception_type": type(exc).__name__})
  428. def _platform_client(self, platform: str, platform_mode: str) -> PlatformSearchClient:
  429. if platform_mode == "mock":
  430. return MockPlatformClient()
  431. if platform_mode == "real":
  432. real_clients = {
  433. "douyin": CrawapiDouyinClient.from_env,
  434. "shipinhao": CrawapiShipinhaoClient.from_env,
  435. }
  436. builder = real_clients.get(platform)
  437. if builder is None:
  438. raise ContentAgentError(
  439. ErrorCode.INVALID_REQUEST,
  440. "unsupported real platform",
  441. {"platform": platform},
  442. status_code=400,
  443. )
  444. try:
  445. return builder()
  446. except Exception as exc:
  447. raise ContentAgentError(
  448. ErrorCode.PLATFORM_CONFIG_MISSING,
  449. "platform config missing",
  450. {"exception_type": type(exc).__name__},
  451. ) from exc
  452. raise ContentAgentError(
  453. ErrorCode.INVALID_REQUEST,
  454. "unsupported platform_mode",
  455. {"platform_mode": platform_mode},
  456. status_code=400,
  457. )
  458. def get_summary(self, run_id: str) -> dict:
  459. final_output_exists = (self.runtime.run_dir(run_id) / "final_output.json").exists()
  460. status = self._summary_status(run_id, final_output_exists)
  461. validation = self.validate_run(run_id) if final_output_exists else {"status": "fail"}
  462. policy_run_id = None
  463. if final_output_exists:
  464. policy_run_id = self.runtime.read_json(run_id, "final_output.json").get("policy_run_id")
  465. return {
  466. "run_id": run_id,
  467. "policy_run_id": policy_run_id,
  468. "status": status,
  469. "current_step": "review_strategy" if final_output_exists else "unknown",
  470. "output_dir": str(self.runtime.run_dir(run_id)),
  471. "files": self.runtime.file_status(run_id),
  472. "validation_status": validation["status"],
  473. "errors": [],
  474. }
  475. def _summary_status(self, run_id: str, final_output_exists: bool) -> str:
  476. try:
  477. run_events = self.runtime.read_jsonl(run_id, "run_events.jsonl")
  478. lifecycle_events = [
  479. event for event in run_events if str(event.get("event_id", "")).startswith("lifecycle_")
  480. ]
  481. except Exception:
  482. run_events = []
  483. lifecycle_events = []
  484. if lifecycle_events:
  485. return lifecycle_events[-1].get("status") or (
  486. "success" if final_output_exists else "failed"
  487. )
  488. if final_output_exists and any(
  489. event.get("event_type") == "platform_query_failed" for event in run_events
  490. ):
  491. return "partial_success"
  492. return "success" if final_output_exists else "failed"
  493. def read_jsonl(self, run_id: str, filename: str) -> list[dict]:
  494. return self.runtime.read_jsonl(run_id, filename)
  495. def read_json(self, run_id: str, filename: str) -> dict:
  496. return self.runtime.read_json(run_id, filename)
  497. def strategy_review(self, run_id: str) -> dict:
  498. try:
  499. return self.runtime.read_json(run_id, "strategy_review.json")
  500. except FileNotFoundError:
  501. policy_run_id = None
  502. try:
  503. policy_run_id = self.runtime.read_json(run_id, "final_output.json").get(
  504. "policy_run_id"
  505. )
  506. except FileNotFoundError:
  507. pass
  508. return {
  509. "schema_version": RUNTIME_SCHEMA_VERSION,
  510. "run_id": run_id,
  511. "policy_run_id": policy_run_id,
  512. "review_status": "not_generated",
  513. }
  514. def regenerate_strategy_review(self, run_id: str) -> dict:
  515. from content_agent.business_modules.learning_review import run
  516. final_output = self.runtime.read_json(run_id, "final_output.json")
  517. timestamp = datetime.now(timezone.utc).strftime("%Y%m%d%H%M%S%f")
  518. review_id = f"review_{final_output['policy_run_id']}_{timestamp}"
  519. return run(run_id, final_output["policy_run_id"], self.runtime, review_id=review_id)
  520. def validate_run(self, run_id: str) -> dict:
  521. from content_agent.business_modules.run_record import validate_run
  522. return validate_run(run_id, self.runtime)
  523. def _policy_run_record_from_state(state: RunState) -> dict[str, Any]:
  524. policy_bundle = state.get("policy_bundle") or {}
  525. decisions = state.get("rule_decisions") or []
  526. return {
  527. "run_id": state["run_id"],
  528. "policy_run_id": state["policy_run_id"],
  529. "run_role": "primary",
  530. "policy_bundle_id": state.get("policy_bundle_id"),
  531. "rule_pack_id": policy_bundle.get("rule_pack_id"),
  532. "strategy_id": policy_bundle.get("strategy_id"),
  533. "strategy_version": state.get("strategy_version"),
  534. "rule_pack_version": policy_bundle.get("rule_pack_version"),
  535. "walk_strategy_version": policy_bundle.get("walk_strategy_version"),
  536. "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
  537. "strategy_source_ref": policy_bundle.get("strategy_source_ref"),
  538. "rule_pack_source_ref": policy_bundle.get("rule_pack_source_ref"),
  539. "evidence_bundle_schema_version": policy_bundle.get("evidence_bundle_schema_version"),
  540. "runtime_record_schema_version": policy_bundle.get("runtime_record_schema_version"),
  541. "status": state.get("status", "success"),
  542. "metrics": (state.get("final_output") or {}).get("summary", {}),
  543. "decision_summary": _decision_summary(decisions),
  544. "raw_payload": {
  545. "current_step": state.get("current_step"),
  546. "search_query_count": len(state.get("search_queries") or []),
  547. "discovered_content_count": len(state.get("discovered_content_items") or []),
  548. "decision_count": len(decisions),
  549. "final_output_ref": "final_output.json",
  550. "dispatch": policy_bundle.get("dispatch"),
  551. "runtime_status_contract": policy_bundle.get("runtime_status_contract", {}),
  552. "effect_status_mapping": policy_bundle.get("effect_status_mapping", []),
  553. "query_effect_aggregation": policy_bundle.get("query_effect_aggregation", []),
  554. "decision_reason_codes": policy_bundle.get("decision_reason_codes", []),
  555. "policy_bundle_hash": policy_bundle.get("policy_bundle_hash"),
  556. },
  557. }
  558. def _decision_summary(decisions: list[dict[str, Any]]) -> dict[str, Any]:
  559. return {
  560. "decision_action_counts": dict(Counter(item.get("decision_action") for item in decisions)),
  561. "decision_reason_code_counts": dict(
  562. Counter(item.get("decision_reason_code") for item in decisions)
  563. ),
  564. "effect_status_counts": dict(
  565. Counter(item.get("search_query_effect_status") for item in decisions)
  566. ),
  567. }
  568. def _query_failures_from_error(error: ContentAgentError) -> list[dict[str, Any]]:
  569. query_failures = error.detail.get("query_failures") if isinstance(error.detail, dict) else None
  570. if not isinstance(query_failures, list):
  571. return []
  572. return [failure for failure in query_failures if isinstance(failure, dict)]
  573. def _gemini_video_client_from_env(env: dict[str, str]) -> GeminiVideoClient:
  574. return RealGeminiVideoClient.from_env(env)
  575. def _gemini_calls_cap() -> int | None:
  576. try:
  577. return WalkGraphStore().load_policy()["global"]["gemini_calls_per_run_cap"]
  578. except Exception:
  579. return None
  580. class _DeterministicGeminiVideoClient:
  581. """mock/默认判定 client:固定返回适合 50+ 的高分结果,供本地/smoke 无网跑通。"""
  582. def analyze(
  583. self,
  584. content: dict[str, Any],
  585. media: dict[str, Any],
  586. source_context: dict[str, Any],
  587. ) -> dict[str, Any]:
  588. return {
  589. "fit_senior_50plus": True,
  590. "fit_confidence": 0.9,
  591. "relevance_score": 0.8,
  592. "reason": "deterministic stub judgment",
  593. }
  594. def _env_enabled(key: str) -> bool:
  595. value = _load_project_env().get(key)
  596. if os.environ.get(key) is not None:
  597. value = os.environ[key]
  598. return (value or "").lower() in {"1", "true", "yes", "on"}
  599. def _merged_project_env() -> dict[str, str]:
  600. return {
  601. **_load_project_env(),
  602. **dict(os.environ),
  603. }
  604. def _demand_content_id_from_source(source: str | dict[str, Any] | None) -> int | None:
  605. if not isinstance(source, dict):
  606. return None
  607. value = source.get("id") or source.get("demand_content_id")
  608. if value in (None, ""):
  609. return None
  610. try:
  611. return int(value)
  612. except (TypeError, ValueError):
  613. return None
  614. def _utc_now() -> str:
  615. return datetime.now(timezone.utc).isoformat()
  616. def _load_project_env(env_file: Path | str = ".env") -> dict[str, str]:
  617. path = Path(env_file)
  618. if not path.exists():
  619. return {}
  620. result: dict[str, str] = {}
  621. for line in path.read_text(encoding="utf-8").splitlines():
  622. stripped = line.strip()
  623. if not stripped or stripped.startswith("#") or "=" not in stripped:
  624. continue
  625. key, value = stripped.split("=", 1)
  626. result[key.strip()] = value.strip().strip('"').strip("'")
  627. return result