base.py 11 KB


  1. """
  2. 组件抽象层:Function 基础抽象类
  3. 定义纯函数组件的抽象基类,用于处理数据转换、计算等无副作用的操作
  4. """
  5. from abc import ABC, abstractmethod
  6. from typing import Any, Dict, Optional, TypeVar, Generic, Callable, List
  7. import inspect
  8. import functools
  9. # 输入输出类型变量
  10. InputType = TypeVar('InputType')
  11. OutputType = TypeVar('OutputType')
  12. class BaseFunction(ABC, Generic[InputType, OutputType]):
  13. """函数组件基类
  14. 定义纯函数组件的接口,用于处理数据转换、计算等操作
  15. """
  16. def __init__(self, name: str, description: str = ""):
  17. """初始化函数组件
  18. Args:
  19. name: 函数名称
  20. description: 函数描述
  21. """
  22. self.name = name
  23. self.description = description
  24. self._initialized = False
  25. @abstractmethod
  26. def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  27. """执行函数逻辑
  28. Args:
  29. input_data: 输入数据
  30. context: 上下文信息
  31. Returns:
  32. 处理后的输出数据
  33. """
  34. pass
  35. def initialize(self) -> None:
  36. """初始化函数组件"""
  37. if not self._initialized:
  38. self._setup()
  39. self._initialized = True
  40. def _setup(self) -> None:
  41. """设置函数组件,子类可以重写"""
  42. pass
  43. @property
  44. def is_initialized(self) -> bool:
  45. """检查是否已初始化"""
  46. return self._initialized
  47. def __call__(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  48. """使函数组件可调用"""
  49. if not self._initialized:
  50. self.initialize()
  51. return self.execute(input_data, context)
  52. def __str__(self) -> str:
  53. return f"{self.__class__.__name__}(name='{self.name}')"
  54. def __repr__(self) -> str:
  55. return self.__str__()
  56. class SimpleFunction(BaseFunction[InputType, OutputType]):
  57. """简单函数组件
  58. 基于普通函数的简单包装
  59. """
  60. def __init__(
  61. self,
  62. name: str,
  63. description: str,
  64. func: Callable[[InputType, Optional[Dict[str, Any]]], OutputType]
  65. ):
  66. """初始化简单函数组件
  67. Args:
  68. name: 函数名称
  69. description: 函数描述
  70. func: 执行函数
  71. """
  72. super().__init__(name, description)
  73. self.func = func
  74. def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  75. """执行函数"""
  76. return self.func(input_data, context)
  77. class ConfigurableFunction(BaseFunction[InputType, OutputType]):
  78. """可配置函数组件
  79. 支持配置参数的函数组件
  80. """
  81. def __init__(
  82. self,
  83. name: str,
  84. description: str,
  85. func: Callable,
  86. config: Optional[Dict[str, Any]] = None
  87. ):
  88. """初始化可配置函数组件
  89. Args:
  90. name: 函数名称
  91. description: 函数描述
  92. func: 执行函数
  93. config: 配置参数
  94. """
  95. super().__init__(name, description)
  96. self.func = func
  97. self.config = config or {}
  98. def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  99. """执行函数"""
  100. # 合并上下文和配置
  101. merged_context = {**(context or {}), **self.config}
  102. return self.func(input_data, merged_context)
  103. def update_config(self, new_config: Dict[str, Any]) -> None:
  104. """更新配置"""
  105. self.config.update(new_config)
  106. class PipelineFunction(BaseFunction[InputType, OutputType]):
  107. """管道函数组件
  108. 将多个函数组件串联成管道
  109. """
  110. def __init__(
  111. self,
  112. name: str,
  113. description: str,
  114. functions: List[BaseFunction]
  115. ):
  116. """初始化管道函数组件
  117. Args:
  118. name: 函数名称
  119. description: 函数描述
  120. functions: 函数组件列表
  121. """
  122. super().__init__(name, description)
  123. self.functions = functions
  124. def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  125. """执行管道"""
  126. result = input_data
  127. for func in self.functions:
  128. result = func(result, context)
  129. return result
  130. def add_function(self, func: BaseFunction) -> None:
  131. """添加函数到管道"""
  132. self.functions.append(func)
  133. def initialize(self) -> None:
  134. """初始化管道中的所有函数"""
  135. super().initialize()
  136. for func in self.functions:
  137. if not func.is_initialized:
  138. func.initialize()
  139. class ConditionalFunction(BaseFunction[InputType, OutputType]):
  140. """条件函数组件
  141. 根据条件选择不同的函数执行
  142. """
  143. def __init__(
  144. self,
  145. name: str,
  146. description: str,
  147. condition_func: Callable[[InputType, Optional[Dict[str, Any]]], bool],
  148. true_func: BaseFunction,
  149. false_func: BaseFunction
  150. ):
  151. """初始化条件函数组件
  152. Args:
  153. name: 函数名称
  154. description: 函数描述
  155. condition_func: 条件判断函数
  156. true_func: 条件为真时执行的函数
  157. false_func: 条件为假时执行的函数
  158. """
  159. super().__init__(name, description)
  160. self.condition_func = condition_func
  161. self.true_func = true_func
  162. self.false_func = false_func
  163. def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
  164. """执行条件函数"""
  165. if self.condition_func(input_data, context):
  166. return self.true_func(input_data, context)
  167. else:
  168. return self.false_func(input_data, context)
  169. def initialize(self) -> None:
  170. """初始化条件函数中的所有函数"""
  171. super().initialize()
  172. if not self.true_func.is_initialized:
  173. self.true_func.initialize()
  174. if not self.false_func.is_initialized:
  175. self.false_func.initialize()
  176. class FunctionRegistry:
  177. """函数注册表
  178. 管理和组织函数组件
  179. """
  180. def __init__(self):
  181. self._functions: Dict[str, BaseFunction] = {}
  182. self._categories: Dict[str, List[str]] = {}
  183. def register(self, function: BaseFunction, category: Optional[str] = None) -> None:
  184. """注册函数
  185. Args:
  186. function: 函数实例
  187. category: 函数分类
  188. """
  189. if function.name in self._functions:
  190. raise ValueError(f"Function '{function.name}' already registered")
  191. self._functions[function.name] = function
  192. if category:
  193. if category not in self._categories:
  194. self._categories[category] = []
  195. self._categories[category].append(function.name)
  196. def get_function(self, name: str) -> Optional[BaseFunction]:
  197. """获取函数
  198. Args:
  199. name: 函数名称
  200. Returns:
  201. 函数实例或 None
  202. """
  203. return self._functions.get(name)
  204. def get_functions_by_category(self, category: str) -> List[BaseFunction]:
  205. """根据分类获取函数
  206. Args:
  207. category: 分类名称
  208. Returns:
  209. 函数列表
  210. """
  211. function_names = self._categories.get(category, [])
  212. return [self._functions[name] for name in function_names if name in self._functions]
  213. def list_functions(self) -> Dict[str, str]:
  214. """列出所有函数
  215. Returns:
  216. 函数名称到描述的映射
  217. """
  218. return {name: func.description for name, func in self._functions.items()}
  219. def execute_function(self, name: str, input_data: Any, context: Optional[Dict[str, Any]] = None) -> Any:
  220. """执行函数
  221. Args:
  222. name: 函数名称
  223. input_data: 输入数据
  224. context: 上下文
  225. Returns:
  226. 函数执行结果
  227. """
  228. function = self._functions.get(name)
  229. if not function:
  230. raise ValueError(f"Function '{name}' not found")
  231. return function(input_data, context)
  232. # 全局函数注册表实例
  233. function_registry = FunctionRegistry()
  234. # 便捷装饰器
  235. def component_function(
  236. name: Optional[str] = None,
  237. description: Optional[str] = None,
  238. category: Optional[str] = None,
  239. auto_register: bool = True
  240. ):
  241. """组件函数装饰器
  242. 将普通函数转换为函数组件并可选择自动注册
  243. Args:
  244. name: 函数名称,默认使用函数名
  245. description: 函数描述,默认使用函数文档字符串
  246. category: 函数分类
  247. auto_register: 是否自动注册到全局注册表
  248. """
  249. def decorator(func):
  250. func_name = name or func.__name__
  251. func_description = description or (func.__doc__ or "").strip()
  252. # 创建函数组件
  253. function_component = SimpleFunction(
  254. name=func_name,
  255. description=func_description,
  256. func=func
  257. )
  258. # 自动注册
  259. if auto_register:
  260. function_registry.register(function_component, category)
  261. # 返回包装后的函数,保持原有接口
  262. @functools.wraps(func)
  263. def wrapper(*args, **kwargs):
  264. return func(*args, **kwargs)
  265. # 将函数组件添加为函数属性
  266. wrapper._function_component = function_component
  267. return wrapper
  268. return decorator
  269. # 便捷函数创建器
  270. def create_simple_function(name: str, description: str, func: Callable) -> SimpleFunction:
  271. """创建简单函数组件
  272. Args:
  273. name: 函数名称
  274. description: 函数描述
  275. func: 执行函数
  276. Returns:
  277. 简单函数组件实例
  278. """
  279. return SimpleFunction(name, description, func)
  280. def create_configurable_function(
  281. name: str,
  282. description: str,
  283. func: Callable,
  284. config: Dict[str, Any]
  285. ) -> ConfigurableFunction:
  286. """创建可配置函数组件
  287. Args:
  288. name: 函数名称
  289. description: 函数描述
  290. func: 执行函数
  291. config: 配置参数
  292. Returns:
  293. 可配置函数组件实例
  294. """
  295. return ConfigurableFunction(name, description, func, config)
  296. def create_pipeline(name: str, description: str, functions: List[BaseFunction]) -> PipelineFunction:
  297. """创建管道函数组件
  298. Args:
  299. name: 函数名称
  300. description: 函数描述
  301. functions: 函数组件列表
  302. Returns:
  303. 管道函数组件实例
  304. """
  305. return PipelineFunction(name, description, functions)