"""
Sensitive Data Handling - 敏感数据占位符替换
支持:
1. key 占位符格式
2. 域名匹配(不同域名使用不同密钥)
3. TOTP 2FA(key_bu_2fa_code 自动生成验证码)
4. 递归处理嵌套结构
参考 Browser-Use 的实现。
"""
import re
import logging
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# 尝试导入 pyotp(TOTP 支持)
try:
import pyotp
HAS_PYOTP = True
except ImportError:
HAS_PYOTP = False
logger.warning("pyotp not installed, TOTP 2FA support disabled")
def match_domain(url: str, domain_pattern: str) -> bool:
"""
检查 URL 是否匹配域名模式
Args:
url: 完整 URL
domain_pattern: 域名模式(支持通配符)
Returns:
是否匹配
"""
from agent.tools.url_matcher import match_url_with_pattern
return match_url_with_pattern(url, domain_pattern)
def get_applicable_secrets(
sensitive_data: Dict[str, Any],
current_url: Optional[str]
) -> Dict[str, Any]:
"""
获取当前 URL 适用的敏感数据
Args:
sensitive_data: 敏感数据字典,格式:
- 旧格式:{key: value}(适用于所有域名)
- 新格式:{domain_pattern: {key: value}}(域名特定)
current_url: 当前 URL
Returns:
适用的敏感数据字典
"""
applicable = {}
for domain_or_key, content in sensitive_data.items():
if isinstance(content, dict):
# 新格式:{domain_pattern: {key: value}}
if current_url:
if match_domain(current_url, domain_or_key):
applicable.update(content)
else:
# 旧格式:{key: value}(适用于所有域名)
applicable[domain_or_key] = content
# 过滤空值
return {k: v for k, v in applicable.items() if v}
def replace_secret_in_string(
value: str,
applicable_secrets: Dict[str, Any],
replaced_placeholders: set,
missing_placeholders: set
) -> str:
"""
替换字符串中的 key 占位符
Args:
value: 原始字符串
applicable_secrets: 适用的敏感数据
replaced_placeholders: 已替换的占位符集合(输出参数)
missing_placeholders: 缺失的占位符集合(输出参数)
Returns:
替换后的字符串
"""
secret_pattern = re.compile(r'(.*?)')
matches = secret_pattern.findall(value)
for placeholder in matches:
if placeholder in applicable_secrets:
secret_value = applicable_secrets[placeholder]
# 检查是否是 TOTP 2FA
if placeholder.endswith('_bu_2fa_code'):
if HAS_PYOTP:
try:
totp = pyotp.TOTP(secret_value, digits=6)
replacement = totp.now()
logger.info(f"Generated TOTP code for {placeholder}")
except Exception as e:
logger.error(f"Failed to generate TOTP for {placeholder}: {e}")
replacement = secret_value
else:
logger.warning(f"TOTP requested for {placeholder} but pyotp not installed")
replacement = secret_value
else:
replacement = secret_value
# 替换占位符
value = value.replace(f'{placeholder}', replacement)
replaced_placeholders.add(placeholder)
else:
# 缺失的占位符
missing_placeholders.add(placeholder)
return value
def replace_secrets_recursively(
value: Any,
applicable_secrets: Dict[str, Any],
replaced_placeholders: set,
missing_placeholders: set
) -> Any:
"""
递归替换嵌套结构中的敏感数据占位符
Args:
value: 原始值(可能是字符串、字典、列表等)
applicable_secrets: 适用的敏感数据
replaced_placeholders: 已替换的占位符集合
missing_placeholders: 缺失的占位符集合
Returns:
替换后的值
"""
if isinstance(value, str):
return replace_secret_in_string(
value,
applicable_secrets,
replaced_placeholders,
missing_placeholders
)
elif isinstance(value, dict):
return {
k: replace_secrets_recursively(
v,
applicable_secrets,
replaced_placeholders,
missing_placeholders
)
for k, v in value.items()
}
elif isinstance(value, list):
return [
replace_secrets_recursively(
item,
applicable_secrets,
replaced_placeholders,
missing_placeholders
)
for item in value
]
else:
return value
def replace_sensitive_data(
arguments: Dict[str, Any],
sensitive_data: Dict[str, Any],
current_url: Optional[str] = None
) -> Dict[str, Any]:
"""
替换工具参数中的敏感数据占位符
Args:
arguments: 工具参数字典
sensitive_data: 敏感数据字典
current_url: 当前 URL(用于域名匹配)
Returns:
替换后的参数字典
Example:
sensitive_data = {
"*.github.com": {
"github_token": "ghp_xxxxx",
"github_password": "secret123",
"github_2fa_bu_2fa_code": "JBSWY3DPEHPK3PXP"
}
}
arguments = {
"username": "user",
"password": "github_password",
"totp": "github_2fa_bu_2fa_code"
}
# 执行替换
replaced = replace_sensitive_data(arguments, sensitive_data, "https://github.com")
# 结果:
# {
# "username": "user",
# "password": "secret123",
# "totp": "123456" # 自动生成的 TOTP 代码
# }
"""
# 获取适用的密钥
applicable_secrets = get_applicable_secrets(sensitive_data, current_url)
if not applicable_secrets:
logger.debug("No applicable secrets found for current URL")
return arguments
# 跟踪替换和缺失的占位符
replaced_placeholders = set()
missing_placeholders = set()
# 递归替换
replaced_arguments = replace_secrets_recursively(
arguments,
applicable_secrets,
replaced_placeholders,
missing_placeholders
)
# 记录日志
if replaced_placeholders:
logger.info(
f"Replaced sensitive placeholders: {', '.join(sorted(replaced_placeholders))}"
f"{' on ' + current_url if current_url else ''}"
)
if missing_placeholders:
logger.warning(
f"Missing sensitive data keys: {', '.join(sorted(missing_placeholders))}"
)
return replaced_arguments