sensitive.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """
  2. Sensitive Data Handling - 敏感数据占位符替换
  3. 支持:
  4. 1. <secret>key</secret> 占位符格式
  5. 2. 域名匹配(不同域名使用不同密钥)
  6. 3. TOTP 2FA(key_bu_2fa_code 自动生成验证码)
  7. 4. 递归处理嵌套结构
  8. 参考 Browser-Use 的实现。
  9. """
  10. import re
  11. import logging
  12. from typing import Any, Dict, Optional
  13. logger = logging.getLogger(__name__)
  14. # 尝试导入 pyotp(TOTP 支持)
  15. try:
  16. import pyotp
  17. HAS_PYOTP = True
  18. except ImportError:
  19. HAS_PYOTP = False
  20. logger.warning("pyotp not installed, TOTP 2FA support disabled")
  21. def match_domain(url: str, domain_pattern: str) -> bool:
  22. """
  23. 检查 URL 是否匹配域名模式
  24. Args:
  25. url: 完整 URL
  26. domain_pattern: 域名模式(支持通配符)
  27. Returns:
  28. 是否匹配
  29. """
  30. from agent.tools.url_matcher import match_url_with_pattern
  31. return match_url_with_pattern(url, domain_pattern)
  32. def get_applicable_secrets(
  33. sensitive_data: Dict[str, Any],
  34. current_url: Optional[str]
  35. ) -> Dict[str, Any]:
  36. """
  37. 获取当前 URL 适用的敏感数据
  38. Args:
  39. sensitive_data: 敏感数据字典,格式:
  40. - 旧格式:{key: value}(适用于所有域名)
  41. - 新格式:{domain_pattern: {key: value}}(域名特定)
  42. current_url: 当前 URL
  43. Returns:
  44. 适用的敏感数据字典
  45. """
  46. applicable = {}
  47. for domain_or_key, content in sensitive_data.items():
  48. if isinstance(content, dict):
  49. # 新格式:{domain_pattern: {key: value}}
  50. if current_url:
  51. if match_domain(current_url, domain_or_key):
  52. applicable.update(content)
  53. else:
  54. # 旧格式:{key: value}(适用于所有域名)
  55. applicable[domain_or_key] = content
  56. # 过滤空值
  57. return {k: v for k, v in applicable.items() if v}
  58. def replace_secret_in_string(
  59. value: str,
  60. applicable_secrets: Dict[str, Any],
  61. replaced_placeholders: set,
  62. missing_placeholders: set
  63. ) -> str:
  64. """
  65. 替换字符串中的 <secret>key</secret> 占位符
  66. Args:
  67. value: 原始字符串
  68. applicable_secrets: 适用的敏感数据
  69. replaced_placeholders: 已替换的占位符集合(输出参数)
  70. missing_placeholders: 缺失的占位符集合(输出参数)
  71. Returns:
  72. 替换后的字符串
  73. """
  74. secret_pattern = re.compile(r'<secret>(.*?)</secret>')
  75. matches = secret_pattern.findall(value)
  76. for placeholder in matches:
  77. if placeholder in applicable_secrets:
  78. secret_value = applicable_secrets[placeholder]
  79. # 检查是否是 TOTP 2FA
  80. if placeholder.endswith('_bu_2fa_code'):
  81. if HAS_PYOTP:
  82. try:
  83. totp = pyotp.TOTP(secret_value, digits=6)
  84. replacement = totp.now()
  85. logger.info(f"Generated TOTP code for {placeholder}")
  86. except Exception as e:
  87. logger.error(f"Failed to generate TOTP for {placeholder}: {e}")
  88. replacement = secret_value
  89. else:
  90. logger.warning(f"TOTP requested for {placeholder} but pyotp not installed")
  91. replacement = secret_value
  92. else:
  93. replacement = secret_value
  94. # 替换占位符
  95. value = value.replace(f'<secret>{placeholder}</secret>', replacement)
  96. replaced_placeholders.add(placeholder)
  97. else:
  98. # 缺失的占位符
  99. missing_placeholders.add(placeholder)
  100. return value
  101. def replace_secrets_recursively(
  102. value: Any,
  103. applicable_secrets: Dict[str, Any],
  104. replaced_placeholders: set,
  105. missing_placeholders: set
  106. ) -> Any:
  107. """
  108. 递归替换嵌套结构中的敏感数据占位符
  109. Args:
  110. value: 原始值(可能是字符串、字典、列表等)
  111. applicable_secrets: 适用的敏感数据
  112. replaced_placeholders: 已替换的占位符集合
  113. missing_placeholders: 缺失的占位符集合
  114. Returns:
  115. 替换后的值
  116. """
  117. if isinstance(value, str):
  118. return replace_secret_in_string(
  119. value,
  120. applicable_secrets,
  121. replaced_placeholders,
  122. missing_placeholders
  123. )
  124. elif isinstance(value, dict):
  125. return {
  126. k: replace_secrets_recursively(
  127. v,
  128. applicable_secrets,
  129. replaced_placeholders,
  130. missing_placeholders
  131. )
  132. for k, v in value.items()
  133. }
  134. elif isinstance(value, list):
  135. return [
  136. replace_secrets_recursively(
  137. item,
  138. applicable_secrets,
  139. replaced_placeholders,
  140. missing_placeholders
  141. )
  142. for item in value
  143. ]
  144. else:
  145. return value
  146. def replace_sensitive_data(
  147. arguments: Dict[str, Any],
  148. sensitive_data: Dict[str, Any],
  149. current_url: Optional[str] = None
  150. ) -> Dict[str, Any]:
  151. """
  152. 替换工具参数中的敏感数据占位符
  153. Args:
  154. arguments: 工具参数字典
  155. sensitive_data: 敏感数据字典
  156. current_url: 当前 URL(用于域名匹配)
  157. Returns:
  158. 替换后的参数字典
  159. Example:
  160. sensitive_data = {
  161. "*.github.com": {
  162. "github_token": "ghp_xxxxx",
  163. "github_password": "secret123",
  164. "github_2fa_bu_2fa_code": "JBSWY3DPEHPK3PXP"
  165. }
  166. }
  167. arguments = {
  168. "username": "user",
  169. "password": "<secret>github_password</secret>",
  170. "totp": "<secret>github_2fa_bu_2fa_code</secret>"
  171. }
  172. # 执行替换
  173. replaced = replace_sensitive_data(arguments, sensitive_data, "https://github.com")
  174. # 结果:
  175. # {
  176. # "username": "user",
  177. # "password": "secret123",
  178. # "totp": "123456" # 自动生成的 TOTP 代码
  179. # }
  180. """
  181. # 获取适用的密钥
  182. applicable_secrets = get_applicable_secrets(sensitive_data, current_url)
  183. if not applicable_secrets:
  184. logger.debug("No applicable secrets found for current URL")
  185. return arguments
  186. # 跟踪替换和缺失的占位符
  187. replaced_placeholders = set()
  188. missing_placeholders = set()
  189. # 递归替换
  190. replaced_arguments = replace_secrets_recursively(
  191. arguments,
  192. applicable_secrets,
  193. replaced_placeholders,
  194. missing_placeholders
  195. )
  196. # 记录日志
  197. if replaced_placeholders:
  198. logger.info(
  199. f"Replaced sensitive placeholders: {', '.join(sorted(replaced_placeholders))}"
  200. f"{' on ' + current_url if current_url else ''}"
  201. )
  202. if missing_placeholders:
  203. logger.warning(
  204. f"Missing sensitive data keys: {', '.join(sorted(missing_placeholders))}"
  205. )
  206. return replaced_arguments