| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """
- TopicBuild Agent 执行上下文管理器
- 用于在 TopicBuildAgent 执行过程中共享 execution 上下文信息。
- 使用 contextvars.ContextVar 实现并发隔离,支持多 Agent 并发执行。
- 为什么用 contextvars 而非 threading.local:
- - Agent 在线程 A 设置 context,Tool 通过 asyncio.to_thread() 在线程池线程 B 执行
- - threading.local 按线程隔离,线程 B 看不到线程 A 的数据
- - contextvars 按 asyncio Task 隔离,且 asyncio.to_thread() 会自动将当前
- context 拷贝到目标线程,Tool 可以正确读取 Agent 设置的值
- """
- import contextvars
- from typing import Optional, Dict, Any
- _execution_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('execution_id', default=None)
- _topic_build_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('topic_build_id', default=None)
- _demand_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('demand', default=None)
- _demand_constraints_var: contextvars.ContextVar[Optional[Dict]] = contextvars.ContextVar('demand_constraints', default=None)
- _metadata_var: contextvars.ContextVar[Dict[str, Any]] = contextvars.ContextVar('metadata', default=None)
- _on_demand_strategies_var: contextvars.ContextVar[list] = contextvars.ContextVar('on_demand_strategies', default=[])
- class TopicBuildAgentContext:
- """
- 选题构建 Agent 执行上下文
- 使用 contextvars.ContextVar 存储当前 asyncio Task 的 Agent 执行上下文信息,
- 每个 Agent 执行拥有独立的上下文,跨线程(asyncio.to_thread)自动传播。
- """
- @classmethod
- def set_execution_id(cls, execution_id: int):
- _execution_id_var.set(execution_id)
- from log_capture import log
- log(f"[TopicBuildAgentContext] 设置 execution_id = {execution_id}")
- @classmethod
- def get_execution_id(cls) -> Optional[int]:
- return _execution_id_var.get()
- @classmethod
- def set_topic_build_id(cls, topic_build_id: int):
- _topic_build_id_var.set(topic_build_id)
- from log_capture import log
- log(f"[TopicBuildAgentContext] 设置 topic_build_id = {topic_build_id}")
- @classmethod
- def get_topic_build_id(cls) -> Optional[int]:
- return _topic_build_id_var.get()
- @classmethod
- def set_demand(cls, demand: str, constraints: Dict = None):
- _demand_var.set(demand)
- _demand_constraints_var.set(constraints)
- @classmethod
- def get_demand(cls) -> Optional[str]:
- return _demand_var.get()
- @classmethod
- def get_demand_constraints(cls) -> Optional[Dict]:
- return _demand_constraints_var.get()
- @classmethod
- def set_metadata(cls, key: str, value: Any):
- metadata = _metadata_var.get()
- if metadata is None:
- metadata = {}
- _metadata_var.set(metadata)
- metadata[key] = value
- @classmethod
- def get_metadata(cls, key: str, default: Any = None) -> Any:
- metadata = _metadata_var.get()
- if metadata is None:
- return default
- return metadata.get(key, default)
- @classmethod
- def set_on_demand_strategies(cls, names: list[str]):
- _on_demand_strategies_var.set(names or [])
- @classmethod
- def get_on_demand_strategies(cls) -> list[str]:
- return _on_demand_strategies_var.get()
- @classmethod
- def clear(cls):
- from log_capture import log
- log(f"[TopicBuildAgentContext] 清除上下文")
- _execution_id_var.set(None)
- _topic_build_id_var.set(None)
- _demand_var.set(None)
- _demand_constraints_var.set(None)
- _metadata_var.set(None)
- _on_demand_strategies_var.set([])
- class TopicBuildAgentContextManager:
- """上下文管理器:使用 with 语句管理 TopicBuildAgent 上下文"""
- def __init__(self, execution_id: int, topic_build_id: int = None,
- demand: str = None, demand_constraints: dict = None):
- self.execution_id = execution_id
- self.topic_build_id = topic_build_id
- self.demand = demand
- self.demand_constraints = demand_constraints
- def __enter__(self):
- TopicBuildAgentContext.set_execution_id(self.execution_id)
- if self.topic_build_id:
- TopicBuildAgentContext.set_topic_build_id(self.topic_build_id)
- if self.demand:
- TopicBuildAgentContext.set_demand(self.demand, self.demand_constraints)
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- TopicBuildAgentContext.clear()
- return False
|