||
- """
- 组件抽象层:Function 基础抽象类
- 定义纯函数组件的抽象基类,用于处理数据转换、计算等无副作用的操作
- """
- from abc import ABC, abstractmethod
- from typing import Any, Dict, Optional, TypeVar, Generic, Callable, List
- import inspect
- import functools
- # 输入输出类型变量
- InputType = TypeVar('InputType')
- OutputType = TypeVar('OutputType')
- class BaseFunction(ABC, Generic[InputType, OutputType]):
- """函数组件基类
-
- 定义纯函数组件的接口,用于处理数据转换、计算等操作
- """
-
- def __init__(self, name: str, description: str = ""):
- """初始化函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- """
- self.name = name
- self.description = description
- self._initialized = False
-
- @abstractmethod
- def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """执行函数逻辑
-
- Args:
- input_data: 输入数据
- context: 上下文信息
-
- Returns:
- 处理后的输出数据
- """
- pass
-
- def initialize(self) -> None:
- """初始化函数组件"""
- if not self._initialized:
- self._setup()
- self._initialized = True
-
- def _setup(self) -> None:
- """设置函数组件,子类可以重写"""
- pass
-
- @property
- def is_initialized(self) -> bool:
- """检查是否已初始化"""
- return self._initialized
-
- def __call__(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """使函数组件可调用"""
- if not self._initialized:
- self.initialize()
- return self.execute(input_data, context)
-
- def __str__(self) -> str:
- return f"{self.__class__.__name__}(name='{self.name}')"
-
- def __repr__(self) -> str:
- return self.__str__()
- class SimpleFunction(BaseFunction[InputType, OutputType]):
- """简单函数组件
-
- 基于普通函数的简单包装
- """
-
- def __init__(
- self,
- name: str,
- description: str,
- func: Callable[[InputType, Optional[Dict[str, Any]]], OutputType]
- ):
- """初始化简单函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- func: 执行函数
- """
- super().__init__(name, description)
- self.func = func
-
- def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """执行函数"""
- return self.func(input_data, context)
- class ConfigurableFunction(BaseFunction[InputType, OutputType]):
- """可配置函数组件
-
- 支持配置参数的函数组件
- """
-
- def __init__(
- self,
- name: str,
- description: str,
- func: Callable,
- config: Optional[Dict[str, Any]] = None
- ):
- """初始化可配置函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- func: 执行函数
- config: 配置参数
- """
- super().__init__(name, description)
- self.func = func
- self.config = config or {}
-
- def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """执行函数"""
- # 合并上下文和配置
- merged_context = {**(context or {}), **self.config}
- return self.func(input_data, merged_context)
-
- def update_config(self, new_config: Dict[str, Any]) -> None:
- """更新配置"""
- self.config.update(new_config)
- class PipelineFunction(BaseFunction[InputType, OutputType]):
- """管道函数组件
-
- 将多个函数组件串联成管道
- """
-
- def __init__(
- self,
- name: str,
- description: str,
- functions: List[BaseFunction]
- ):
- """初始化管道函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- functions: 函数组件列表
- """
- super().__init__(name, description)
- self.functions = functions
-
- def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """执行管道"""
- result = input_data
- for func in self.functions:
- result = func(result, context)
- return result
-
- def add_function(self, func: BaseFunction) -> None:
- """添加函数到管道"""
- self.functions.append(func)
-
- def initialize(self) -> None:
- """初始化管道中的所有函数"""
- super().initialize()
- for func in self.functions:
- if not func.is_initialized:
- func.initialize()
- class ConditionalFunction(BaseFunction[InputType, OutputType]):
- """条件函数组件
-
- 根据条件选择不同的函数执行
- """
-
- def __init__(
- self,
- name: str,
- description: str,
- condition_func: Callable[[InputType, Optional[Dict[str, Any]]], bool],
- true_func: BaseFunction,
- false_func: BaseFunction
- ):
- """初始化条件函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- condition_func: 条件判断函数
- true_func: 条件为真时执行的函数
- false_func: 条件为假时执行的函数
- """
- super().__init__(name, description)
- self.condition_func = condition_func
- self.true_func = true_func
- self.false_func = false_func
-
- def execute(self, input_data: InputType, context: Optional[Dict[str, Any]] = None) -> OutputType:
- """执行条件函数"""
- if self.condition_func(input_data, context):
- return self.true_func(input_data, context)
- else:
- return self.false_func(input_data, context)
-
- def initialize(self) -> None:
- """初始化条件函数中的所有函数"""
- super().initialize()
- if not self.true_func.is_initialized:
- self.true_func.initialize()
- if not self.false_func.is_initialized:
- self.false_func.initialize()
- class FunctionRegistry:
- """函数注册表
-
- 管理和组织函数组件
- """
-
- def __init__(self):
- self._functions: Dict[str, BaseFunction] = {}
- self._categories: Dict[str, List[str]] = {}
-
- def register(self, function: BaseFunction, category: Optional[str] = None) -> None:
- """注册函数
-
- Args:
- function: 函数实例
- category: 函数分类
- """
- if function.name in self._functions:
- raise ValueError(f"Function '{function.name}' already registered")
-
- self._functions[function.name] = function
-
- if category:
- if category not in self._categories:
- self._categories[category] = []
- self._categories[category].append(function.name)
-
- def get_function(self, name: str) -> Optional[BaseFunction]:
- """获取函数
-
- Args:
- name: 函数名称
-
- Returns:
- 函数实例或 None
- """
- return self._functions.get(name)
-
- def get_functions_by_category(self, category: str) -> List[BaseFunction]:
- """根据分类获取函数
-
- Args:
- category: 分类名称
-
- Returns:
- 函数列表
- """
- function_names = self._categories.get(category, [])
- return [self._functions[name] for name in function_names if name in self._functions]
-
- def list_functions(self) -> Dict[str, str]:
- """列出所有函数
-
- Returns:
- 函数名称到描述的映射
- """
- return {name: func.description for name, func in self._functions.items()}
-
- def execute_function(self, name: str, input_data: Any, context: Optional[Dict[str, Any]] = None) -> Any:
- """执行函数
-
- Args:
- name: 函数名称
- input_data: 输入数据
- context: 上下文
-
- Returns:
- 函数执行结果
- """
- function = self._functions.get(name)
- if not function:
- raise ValueError(f"Function '{name}' not found")
-
- return function(input_data, context)
- # 全局函数注册表实例
- function_registry = FunctionRegistry()
- # 便捷装饰器
- def component_function(
- name: Optional[str] = None,
- description: Optional[str] = None,
- category: Optional[str] = None,
- auto_register: bool = True
- ):
- """组件函数装饰器
-
- 将普通函数转换为函数组件并可选择自动注册
-
- Args:
- name: 函数名称,默认使用函数名
- description: 函数描述,默认使用函数文档字符串
- category: 函数分类
- auto_register: 是否自动注册到全局注册表
- """
- def decorator(func):
- func_name = name or func.__name__
- func_description = description or (func.__doc__ or "").strip()
-
- # 创建函数组件
- function_component = SimpleFunction(
- name=func_name,
- description=func_description,
- func=func
- )
-
- # 自动注册
- if auto_register:
- function_registry.register(function_component, category)
-
- # 返回包装后的函数,保持原有接口
- @functools.wraps(func)
- def wrapper(*args, **kwargs):
- return func(*args, **kwargs)
-
- # 将函数组件添加为函数属性
- wrapper._function_component = function_component
-
- return wrapper
-
- return decorator
- # 便捷函数创建器
- def create_simple_function(name: str, description: str, func: Callable) -> SimpleFunction:
- """创建简单函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- func: 执行函数
-
- Returns:
- 简单函数组件实例
- """
- return SimpleFunction(name, description, func)
- def create_configurable_function(
- name: str,
- description: str,
- func: Callable,
- config: Dict[str, Any]
- ) -> ConfigurableFunction:
- """创建可配置函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- func: 执行函数
- config: 配置参数
-
- Returns:
- 可配置函数组件实例
- """
- return ConfigurableFunction(name, description, func, config)
- def create_pipeline(name: str, description: str, functions: List[BaseFunction]) -> PipelineFunction:
- """创建管道函数组件
-
- Args:
- name: 函数名称
- description: 函数描述
- functions: 函数组件列表
-
- Returns:
- 管道函数组件实例
- """
- return PipelineFunction(name, description, functions)
|