topic_build_agent_context.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. """
  4. TopicBuild Agent 执行上下文管理器
  5. 用于在 TopicBuildAgent 执行过程中共享 execution 上下文信息。
  6. 使用 contextvars.ContextVar 实现并发隔离,支持多 Agent 并发执行。
  7. 为什么用 contextvars 而非 threading.local:
  8. - Agent 在线程 A 设置 context,Tool 通过 asyncio.to_thread() 在线程池线程 B 执行
  9. - threading.local 按线程隔离,线程 B 看不到线程 A 的数据
  10. - contextvars 按 asyncio Task 隔离,且 asyncio.to_thread() 会自动将当前
  11. context 拷贝到目标线程,Tool 可以正确读取 Agent 设置的值
  12. """
  13. import contextvars
  14. from typing import Optional, Dict, Any
  15. _execution_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('execution_id', default=None)
  16. _topic_build_id_var: contextvars.ContextVar[Optional[int]] = contextvars.ContextVar('topic_build_id', default=None)
  17. _demand_var: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar('demand', default=None)
  18. _demand_constraints_var: contextvars.ContextVar[Optional[Dict]] = contextvars.ContextVar('demand_constraints', default=None)
  19. _metadata_var: contextvars.ContextVar[Dict[str, Any]] = contextvars.ContextVar('metadata', default=None)
  20. _on_demand_strategies_var: contextvars.ContextVar[list] = contextvars.ContextVar('on_demand_strategies', default=[])
  21. class TopicBuildAgentContext:
  22. """
  23. 选题构建 Agent 执行上下文
  24. 使用 contextvars.ContextVar 存储当前 asyncio Task 的 Agent 执行上下文信息,
  25. 每个 Agent 执行拥有独立的上下文,跨线程(asyncio.to_thread)自动传播。
  26. """
  27. @classmethod
  28. def set_execution_id(cls, execution_id: int):
  29. _execution_id_var.set(execution_id)
  30. from log_capture import log
  31. log(f"[TopicBuildAgentContext] 设置 execution_id = {execution_id}")
  32. @classmethod
  33. def get_execution_id(cls) -> Optional[int]:
  34. return _execution_id_var.get()
  35. @classmethod
  36. def set_topic_build_id(cls, topic_build_id: int):
  37. _topic_build_id_var.set(topic_build_id)
  38. from log_capture import log
  39. log(f"[TopicBuildAgentContext] 设置 topic_build_id = {topic_build_id}")
  40. @classmethod
  41. def get_topic_build_id(cls) -> Optional[int]:
  42. return _topic_build_id_var.get()
  43. @classmethod
  44. def set_demand(cls, demand: str, constraints: Dict = None):
  45. _demand_var.set(demand)
  46. _demand_constraints_var.set(constraints)
  47. @classmethod
  48. def get_demand(cls) -> Optional[str]:
  49. return _demand_var.get()
  50. @classmethod
  51. def get_demand_constraints(cls) -> Optional[Dict]:
  52. return _demand_constraints_var.get()
  53. @classmethod
  54. def set_metadata(cls, key: str, value: Any):
  55. metadata = _metadata_var.get()
  56. if metadata is None:
  57. metadata = {}
  58. _metadata_var.set(metadata)
  59. metadata[key] = value
  60. @classmethod
  61. def get_metadata(cls, key: str, default: Any = None) -> Any:
  62. metadata = _metadata_var.get()
  63. if metadata is None:
  64. return default
  65. return metadata.get(key, default)
  66. @classmethod
  67. def set_on_demand_strategies(cls, names: list[str]):
  68. _on_demand_strategies_var.set(names or [])
  69. @classmethod
  70. def get_on_demand_strategies(cls) -> list[str]:
  71. return _on_demand_strategies_var.get()
  72. @classmethod
  73. def clear(cls):
  74. from log_capture import log
  75. log(f"[TopicBuildAgentContext] 清除上下文")
  76. _execution_id_var.set(None)
  77. _topic_build_id_var.set(None)
  78. _demand_var.set(None)
  79. _demand_constraints_var.set(None)
  80. _metadata_var.set(None)
  81. _on_demand_strategies_var.set([])
  82. class TopicBuildAgentContextManager:
  83. """上下文管理器:使用 with 语句管理 TopicBuildAgent 上下文"""
  84. def __init__(self, execution_id: int, topic_build_id: int = None,
  85. demand: str = None, demand_constraints: dict = None):
  86. self.execution_id = execution_id
  87. self.topic_build_id = topic_build_id
  88. self.demand = demand
  89. self.demand_constraints = demand_constraints
  90. def __enter__(self):
  91. TopicBuildAgentContext.set_execution_id(self.execution_id)
  92. if self.topic_build_id:
  93. TopicBuildAgentContext.set_topic_build_id(self.topic_build_id)
  94. if self.demand:
  95. TopicBuildAgentContext.set_demand(self.demand, self.demand_constraints)
  96. return self
  97. def __exit__(self, exc_type, exc_val, exc_tb):
  98. TopicBuildAgentContext.clear()
  99. return False