#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ TopicBuild Agent 执行上下文管理器 用于在 TopicBuildAgent 执行过程中共享 execution 上下文信息。 使用 contextvars.ContextVar 实现并发隔离,支持多 Agent 并发执行。 为什么用 contextvars 而非 threading.local: - Agent 在线程 A 设置 context,Tool 通过 asyncio.to_thread() 在线程池线程 B 执行 - threading.local 按线程隔离,线程 B 看不到线程 A 的数据 - contextvars 按 asyncio Task 隔离,且 asyncio.to_thread() 会自动将当前 context 拷贝到目标线程,Tool 可以正确读取 Agent 设置的值 """ import contextvars from typing import Optional, Dict, Any _execution_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('execution_id', default=None) _topic_build_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('topic_build_id', default=None) _demand_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('demand', default=None) _demand_constraints_var: contextvars.ContextVar[Optional[Dict]] = contextvars.ContextVar('demand_constraints', default=None) _metadata_var: contextvars.ContextVar[Dict[str, Any]] = contextvars.ContextVar('metadata', default=None) _on_demand_strategies_var: contextvars.ContextVar[list] = contextvars.ContextVar('on_demand_strategies', default=[]) class TopicBuildAgentContext: """ 选题构建 Agent 执行上下文 使用 contextvars.ContextVar 存储当前 asyncio Task 的 Agent 执行上下文信息, 每个 Agent 执行拥有独立的上下文,跨线程(asyncio.to_thread)自动传播。 """ @classmethod def set_execution_id(cls, execution_id: int): _execution_id_var.set(execution_id) from log_capture import log log(f"[TopicBuildAgentContext] 设置 execution_id = {execution_id}") @classmethod def get_execution_id(cls) -> Optional[int]: return _execution_id_var.get() @classmethod def set_topic_build_id(cls, topic_build_id: int): _topic_build_id_var.set(topic_build_id) from log_capture import log log(f"[TopicBuildAgentContext] 设置 topic_build_id = {topic_build_id}") @classmethod def get_topic_build_id(cls) -> Optional[int]: return _topic_build_id_var.get() @classmethod def set_demand(cls, demand: str, constraints: Dict = None): _demand_var.set(demand) _demand_constraints_var.set(constraints) @classmethod def get_demand(cls) -> Optional[str]: return _demand_var.get() @classmethod def get_demand_constraints(cls) -> Optional[Dict]: return _demand_constraints_var.get() @classmethod def set_metadata(cls, key: str, value: Any): metadata = _metadata_var.get() if metadata is None: metadata = {} _metadata_var.set(metadata) metadata[key] = value @classmethod def get_metadata(cls, key: str, default: Any = None) -> Any: metadata = _metadata_var.get() if metadata is None: return default return metadata.get(key, default) @classmethod def set_on_demand_strategies(cls, names: list[str]): _on_demand_strategies_var.set(names or []) @classmethod def get_on_demand_strategies(cls) -> list[str]: return _on_demand_strategies_var.get() @classmethod def clear(cls): from log_capture import log log(f"[TopicBuildAgentContext] 清除上下文") _execution_id_var.set(None) _topic_build_id_var.set(None) _demand_var.set(None) _demand_constraints_var.set(None) _metadata_var.set(None) _on_demand_strategies_var.set([]) class TopicBuildAgentContextManager: """上下文管理器:使用 with 语句管理 TopicBuildAgent 上下文""" def __init__(self, execution_id: int, topic_build_id: int = None, demand: str = None, demand_constraints: dict = None): self.execution_id = execution_id self.topic_build_id = topic_build_id self.demand = demand self.demand_constraints = demand_constraints def __enter__(self): TopicBuildAgentContext.set_execution_id(self.execution_id) if self.topic_build_id: TopicBuildAgentContext.set_topic_build_id(self.topic_build_id) if self.demand: TopicBuildAgentContext.set_demand(self.demand, self.demand_constraints) return self def __exit__(self, exc_type, exc_val, exc_tb): TopicBuildAgentContext.clear() return False