""" Sensitive Data Handling - 敏感数据占位符替换 支持: 1. key 占位符格式 2. 域名匹配(不同域名使用不同密钥) 3. TOTP 2FA(key_bu_2fa_code 自动生成验证码) 4. 递归处理嵌套结构 参考 Browser-Use 的实现。 """ import re import logging from typing import Any, Dict, Optional logger = logging.getLogger(__name__) # 尝试导入 pyotp(TOTP 支持) try: import pyotp HAS_PYOTP = True except ImportError: HAS_PYOTP = False logger.warning("pyotp not installed, TOTP 2FA support disabled") def match_domain(url: str, domain_pattern: str) -> bool: """ 检查 URL 是否匹配域名模式 Args: url: 完整 URL domain_pattern: 域名模式(支持通配符) Returns: 是否匹配 """ from agent.tools.url_matcher import match_url_with_pattern return match_url_with_pattern(url, domain_pattern) def get_applicable_secrets( sensitive_data: Dict[str, Any], current_url: Optional[str] ) -> Dict[str, Any]: """ 获取当前 URL 适用的敏感数据 Args: sensitive_data: 敏感数据字典,格式: - 旧格式:{key: value}(适用于所有域名) - 新格式:{domain_pattern: {key: value}}(域名特定) current_url: 当前 URL Returns: 适用的敏感数据字典 """ applicable = {} for domain_or_key, content in sensitive_data.items(): if isinstance(content, dict): # 新格式:{domain_pattern: {key: value}} if current_url: if match_domain(current_url, domain_or_key): applicable.update(content) else: # 旧格式:{key: value}(适用于所有域名) applicable[domain_or_key] = content # 过滤空值 return {k: v for k, v in applicable.items() if v} def replace_secret_in_string( value: str, applicable_secrets: Dict[str, Any], replaced_placeholders: set, missing_placeholders: set ) -> str: """ 替换字符串中的 key 占位符 Args: value: 原始字符串 applicable_secrets: 适用的敏感数据 replaced_placeholders: 已替换的占位符集合(输出参数) missing_placeholders: 缺失的占位符集合(输出参数) Returns: 替换后的字符串 """ secret_pattern = re.compile(r'(.*?)') matches = secret_pattern.findall(value) for placeholder in matches: if placeholder in applicable_secrets: secret_value = applicable_secrets[placeholder] # 检查是否是 TOTP 2FA if placeholder.endswith('_bu_2fa_code'): if HAS_PYOTP: try: totp = pyotp.TOTP(secret_value, digits=6) replacement = totp.now() logger.info(f"Generated TOTP code for {placeholder}") except Exception as e: logger.error(f"Failed to generate TOTP for {placeholder}: {e}") replacement = secret_value else: logger.warning(f"TOTP requested for {placeholder} but pyotp not installed") replacement = secret_value else: replacement = secret_value # 替换占位符 value = value.replace(f'{placeholder}', replacement) replaced_placeholders.add(placeholder) else: # 缺失的占位符 missing_placeholders.add(placeholder) return value def replace_secrets_recursively( value: Any, applicable_secrets: Dict[str, Any], replaced_placeholders: set, missing_placeholders: set ) -> Any: """ 递归替换嵌套结构中的敏感数据占位符 Args: value: 原始值(可能是字符串、字典、列表等) applicable_secrets: 适用的敏感数据 replaced_placeholders: 已替换的占位符集合 missing_placeholders: 缺失的占位符集合 Returns: 替换后的值 """ if isinstance(value, str): return replace_secret_in_string( value, applicable_secrets, replaced_placeholders, missing_placeholders ) elif isinstance(value, dict): return { k: replace_secrets_recursively( v, applicable_secrets, replaced_placeholders, missing_placeholders ) for k, v in value.items() } elif isinstance(value, list): return [ replace_secrets_recursively( item, applicable_secrets, replaced_placeholders, missing_placeholders ) for item in value ] else: return value def replace_sensitive_data( arguments: Dict[str, Any], sensitive_data: Dict[str, Any], current_url: Optional[str] = None ) -> Dict[str, Any]: """ 替换工具参数中的敏感数据占位符 Args: arguments: 工具参数字典 sensitive_data: 敏感数据字典 current_url: 当前 URL(用于域名匹配) Returns: 替换后的参数字典 Example: sensitive_data = { "*.github.com": { "github_token": "ghp_xxxxx", "github_password": "secret123", "github_2fa_bu_2fa_code": "JBSWY3DPEHPK3PXP" } } arguments = { "username": "user", "password": "github_password", "totp": "github_2fa_bu_2fa_code" } # 执行替换 replaced = replace_sensitive_data(arguments, sensitive_data, "https://github.com") # 结果: # { # "username": "user", # "password": "secret123", # "totp": "123456" # 自动生成的 TOTP 代码 # } """ # 获取适用的密钥 applicable_secrets = get_applicable_secrets(sensitive_data, current_url) if not applicable_secrets: logger.debug("No applicable secrets found for current URL") return arguments # 跟踪替换和缺失的占位符 replaced_placeholders = set() missing_placeholders = set() # 递归替换 replaced_arguments = replace_secrets_recursively( arguments, applicable_secrets, replaced_placeholders, missing_placeholders ) # 记录日志 if replaced_placeholders: logger.info( f"Replaced sensitive placeholders: {', '.join(sorted(replaced_placeholders))}" f"{' on ' + current_url if current_url else ''}" ) if missing_placeholders: logger.warning( f"Missing sensitive data keys: {', '.join(sorted(missing_placeholders))}" ) return replaced_arguments