| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """
- Sensitive Data Handling - 敏感数据占位符替换
- 支持:
- 1. <secret>key</secret> 占位符格式
- 2. 域名匹配(不同域名使用不同密钥)
- 3. TOTP 2FA(key_bu_2fa_code 自动生成验证码)
- 4. 递归处理嵌套结构
- 参考 Browser-Use 的实现。
- """
- import re
- import logging
- from typing import Any, Dict, Optional
- logger = logging.getLogger(__name__)
- # 尝试导入 pyotp(TOTP 支持)
- try:
- import pyotp
- HAS_PYOTP = True
- except ImportError:
- HAS_PYOTP = False
- logger.warning("pyotp not installed, TOTP 2FA support disabled")
- def match_domain(url: str, domain_pattern: str) -> bool:
- """
- 检查 URL 是否匹配域名模式
- Args:
- url: 完整 URL
- domain_pattern: 域名模式(支持通配符)
- Returns:
- 是否匹配
- """
- from agent.tools.url_matcher import match_url_with_pattern
- return match_url_with_pattern(url, domain_pattern)
- def get_applicable_secrets(
- sensitive_data: Dict[str, Any],
- current_url: Optional[str]
- ) -> Dict[str, Any]:
- """
- 获取当前 URL 适用的敏感数据
- Args:
- sensitive_data: 敏感数据字典,格式:
- - 旧格式:{key: value}(适用于所有域名)
- - 新格式:{domain_pattern: {key: value}}(域名特定)
- current_url: 当前 URL
- Returns:
- 适用的敏感数据字典
- """
- applicable = {}
- for domain_or_key, content in sensitive_data.items():
- if isinstance(content, dict):
- # 新格式:{domain_pattern: {key: value}}
- if current_url:
- if match_domain(current_url, domain_or_key):
- applicable.update(content)
- else:
- # 旧格式:{key: value}(适用于所有域名)
- applicable[domain_or_key] = content
- # 过滤空值
- return {k: v for k, v in applicable.items() if v}
- def replace_secret_in_string(
- value: str,
- applicable_secrets: Dict[str, Any],
- replaced_placeholders: set,
- missing_placeholders: set
- ) -> str:
- """
- 替换字符串中的 <secret>key</secret> 占位符
- Args:
- value: 原始字符串
- applicable_secrets: 适用的敏感数据
- replaced_placeholders: 已替换的占位符集合(输出参数)
- missing_placeholders: 缺失的占位符集合(输出参数)
- Returns:
- 替换后的字符串
- """
- secret_pattern = re.compile(r'<secret>(.*?)</secret>')
- matches = secret_pattern.findall(value)
- for placeholder in matches:
- if placeholder in applicable_secrets:
- secret_value = applicable_secrets[placeholder]
- # 检查是否是 TOTP 2FA
- if placeholder.endswith('_bu_2fa_code'):
- if HAS_PYOTP:
- try:
- totp = pyotp.TOTP(secret_value, digits=6)
- replacement = totp.now()
- logger.info(f"Generated TOTP code for {placeholder}")
- except Exception as e:
- logger.error(f"Failed to generate TOTP for {placeholder}: {e}")
- replacement = secret_value
- else:
- logger.warning(f"TOTP requested for {placeholder} but pyotp not installed")
- replacement = secret_value
- else:
- replacement = secret_value
- # 替换占位符
- value = value.replace(f'<secret>{placeholder}</secret>', replacement)
- replaced_placeholders.add(placeholder)
- else:
- # 缺失的占位符
- missing_placeholders.add(placeholder)
- return value
- def replace_secrets_recursively(
- value: Any,
- applicable_secrets: Dict[str, Any],
- replaced_placeholders: set,
- missing_placeholders: set
- ) -> Any:
- """
- 递归替换嵌套结构中的敏感数据占位符
- Args:
- value: 原始值(可能是字符串、字典、列表等)
- applicable_secrets: 适用的敏感数据
- replaced_placeholders: 已替换的占位符集合
- missing_placeholders: 缺失的占位符集合
- Returns:
- 替换后的值
- """
- if isinstance(value, str):
- return replace_secret_in_string(
- value,
- applicable_secrets,
- replaced_placeholders,
- missing_placeholders
- )
- elif isinstance(value, dict):
- return {
- k: replace_secrets_recursively(
- v,
- applicable_secrets,
- replaced_placeholders,
- missing_placeholders
- )
- for k, v in value.items()
- }
- elif isinstance(value, list):
- return [
- replace_secrets_recursively(
- item,
- applicable_secrets,
- replaced_placeholders,
- missing_placeholders
- )
- for item in value
- ]
- else:
- return value
- def replace_sensitive_data(
- arguments: Dict[str, Any],
- sensitive_data: Dict[str, Any],
- current_url: Optional[str] = None
- ) -> Dict[str, Any]:
- """
- 替换工具参数中的敏感数据占位符
- Args:
- arguments: 工具参数字典
- sensitive_data: 敏感数据字典
- current_url: 当前 URL(用于域名匹配)
- Returns:
- 替换后的参数字典
- Example:
- sensitive_data = {
- "*.github.com": {
- "github_token": "ghp_xxxxx",
- "github_password": "secret123",
- "github_2fa_bu_2fa_code": "JBSWY3DPEHPK3PXP"
- }
- }
- arguments = {
- "username": "user",
- "password": "<secret>github_password</secret>",
- "totp": "<secret>github_2fa_bu_2fa_code</secret>"
- }
- # 执行替换
- replaced = replace_sensitive_data(arguments, sensitive_data, "https://github.com")
- # 结果:
- # {
- # "username": "user",
- # "password": "secret123",
- # "totp": "123456" # 自动生成的 TOTP 代码
- # }
- """
- # 获取适用的密钥
- applicable_secrets = get_applicable_secrets(sensitive_data, current_url)
- if not applicable_secrets:
- logger.debug("No applicable secrets found for current URL")
- return arguments
- # 跟踪替换和缺失的占位符
- replaced_placeholders = set()
- missing_placeholders = set()
- # 递归替换
- replaced_arguments = replace_secrets_recursively(
- arguments,
- applicable_secrets,
- replaced_placeholders,
- missing_placeholders
- )
- # 记录日志
- if replaced_placeholders:
- logger.info(
- f"Replaced sensitive placeholders: {', '.join(sorted(replaced_placeholders))}"
- f"{' on ' + current_url if current_url else ''}"
- )
- if missing_placeholders:
- logger.warning(
- f"Missing sensitive data keys: {', '.join(sorted(missing_placeholders))}"
- )
- return replaced_arguments
|