models.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. from typing import List, Dict, Optional, Set
  2. import json
  3. from dataclasses import dataclass, field
  4. import hashlib
  5. class FNV:
  6. INIT64 = int("cbf29ce484222325", 16)
  7. PRIME64 = int("100000001b3", 16)
  8. MOD64 = 2**64
  9. @staticmethod
  10. def fnv1_64(data: bytes) -> int:
  11. hash_value = FNV.INIT64
  12. for byte in data:
  13. hash_value = (hash_value * FNV.PRIME64) % FNV.MOD64
  14. hash_value = hash_value ^ byte
  15. return hash_value
  16. class DiversionBucket:
  17. def match(self, experiment_context):
  18. raise NotImplementedError("Subclasses must implement this method")
  19. class UidDiversionBucket(DiversionBucket):
  20. def __init__(self, total_buckets: int, buckets: str):
  21. self.total_buckets = total_buckets
  22. if buckets:
  23. self.buckets = set(map(int, buckets.split(",")))
  24. else:
  25. self.buckets = set()
  26. def match(self, experiment_context):
  27. uid_hash = int(experiment_context.uid)
  28. bucket = uid_hash % self.total_buckets
  29. # print(f"Matching UID {experiment_context.uid} with hash {uid_hash} to bucket {bucket} in {self.buckets}")
  30. return bucket in self.buckets
  31. class FilterDiversionBucket(DiversionBucket):
  32. def __init__(self, filter_condition: str):
  33. self.filter_condition = filter_condition
  34. def match(self, experiment_context):
  35. raise NotImplementedError("not implemented")
  36. class Feature:
  37. def __init__(self, params=None):
  38. self.params = params
  39. def init(self):
  40. # Initialize feature-specific logic
  41. pass
  42. class ExperimentContext:
  43. def __init__(self, uid=None, filter_params=None):
  44. self.uid = uid
  45. self.filter_params = filter_params or {}
  46. def __str__(self):
  47. return f"ExperimentContext(uid={self.uid}, filter_params={self.filter_params})"
  48. class Domain:
  49. def __init__(self, domain_id, name, flow: int, buckets: str, bucket_type: str, debug_crowd_ids=None, is_default_domain=False, exp_layer_id=None,
  50. debug_users=""):
  51. self.id = int(domain_id)
  52. self.name = name
  53. self.debug_crowd_ids = debug_crowd_ids
  54. self.is_default_domain = is_default_domain
  55. self.exp_layer_id = int(exp_layer_id) if exp_layer_id is not None else None
  56. self.features = []
  57. self.layers = []
  58. self.debug_users = debug_users
  59. self.flow = flow
  60. self.buckets = buckets
  61. self.diversion_bucket = None
  62. self.bucket_type = bucket_type
  63. self.debug_user_set = set()
  64. def add_debug_users(self, users: List[str]):
  65. self.debug_user_set.update(users)
  66. def match_debug_users(self, experiment_context):
  67. return experiment_context.uid in self.debug_user_set
  68. def add_feature(self, feature: Feature):
  69. self.features.append(feature)
  70. def add_layer(self, layer):
  71. self.layers.append(layer)
  72. def init(self):
  73. self.debug_user_set.update(self.debug_users.split(","))
  74. self.diversion_bucket = UidDiversionBucket(100, self.buckets)
  75. def match(self, experiment_context):
  76. if self.flow == 0:
  77. return False
  78. elif self.flow == 100:
  79. return True
  80. if self.diversion_bucket:
  81. return self.diversion_bucket.match(experiment_context)
  82. return False
  83. @dataclass
  84. class Layer:
  85. id: int
  86. name: str
  87. experiments: List['Experiment'] = field(default_factory=list)
  88. domains: List[Domain] = field(default_factory=list)
  89. def add_experiment(self, experiment):
  90. self.experiments.append(experiment)
  91. def add_domain(self, domain):
  92. self.domains.append(domain)
  93. @dataclass
  94. class Experiment:
  95. id: int
  96. flow: int
  97. crowd_ids: List[str]
  98. debug_users: str
  99. buckets: str
  100. filter_condition: str
  101. bucket_type: str = "Random"
  102. debug_user_set: Set[str] = field(default_factory=set)
  103. diversion_bucket: Optional[DiversionBucket] = None
  104. experiment_versions: List['ExperimentVersion'] = field(default_factory=list)
  105. def add_debug_users(self, users: List[str]):
  106. self.debug_user_set.update(users)
  107. def match_debug_users(self, experiment_context):
  108. return experiment_context.uid in self.debug_user_set
  109. def add_experiment_version(self, version):
  110. self.experiment_versions.append(version)
  111. def match(self, experiment_context: ExperimentContext) -> bool:
  112. if self.bucket_type == "Random":
  113. if self.flow == 0:
  114. return False
  115. elif self.flow == 100:
  116. return True
  117. if self.diversion_bucket:
  118. return self.diversion_bucket.match(experiment_context)
  119. return False
  120. def init(self):
  121. # 初始化 debug_user_map
  122. if self.debug_users:
  123. self.debug_user_set.update(self.debug_users.split(","))
  124. # 初始化 diversion_bucket
  125. if self.bucket_type == "Random": # ExpBucketTypeRand
  126. self.diversion_bucket = UidDiversionBucket(100, self.buckets)
  127. elif self.bucket_type == "Condition" and self.filter_condition: # ExpBucketTypeCond
  128. self.diversion_bucket = FilterDiversionBucket(self.filter_condition)
  129. class ExperimentVersion:
  130. def __init__(self, exp_version_id, flow, buckets: str, exp_id: int, exp_version_name=None,
  131. debug_users: str = '', config=None, debug_crowd_ids=None):
  132. self.id = int(exp_version_id)
  133. self.exp_version_name = exp_version_name
  134. self.exp_id = int(exp_id)
  135. self.config = config
  136. self.debug_crowd_ids = debug_crowd_ids
  137. self.debug_users = debug_users
  138. self.params = {}
  139. self.flow = flow
  140. self.buckets = buckets
  141. self.debug_user_set = set()
  142. self.diversion_bucket = None
  143. def add_debug_users(self, users: List[str]):
  144. self.debug_user_set.update(users)
  145. def match_debug_users(self, experiment_context):
  146. return experiment_context.uid in self.debug_user_set
  147. def match(self, experiment_context: ExperimentContext):
  148. if self.flow == 0:
  149. return False
  150. elif self.flow == 100:
  151. return True
  152. if self.diversion_bucket:
  153. return self.diversion_bucket.match(experiment_context)
  154. return False
  155. def init(self):
  156. self.debug_user_set.update(self.debug_users.split(","))
  157. self.diversion_bucket = UidDiversionBucket(100, self.buckets)
  158. params = json.loads(self.config)
  159. for kv in params:
  160. self.params[kv['key']] = kv['value']
  161. class Project:
  162. def __init__(self, name=None, project_id=None):
  163. self.name = name
  164. self.id = int(project_id)
  165. self.domains = []
  166. self.layers = []
  167. self.default_domain : Optional[Domain] = None
  168. self.layer_map = {}
  169. self.domain_map = {}
  170. def add_domain(self, domain):
  171. self.domains.append(domain)
  172. self.domain_map[domain.id] = domain
  173. def add_layer(self, layer):
  174. self.layers.append(layer)
  175. self.layer_map[layer.id] = layer
  176. def set_default_domain(self, domain: Domain):
  177. self.default_domain = domain
  178. class ExperimentResult:
  179. def __init__(self, project=None, experiment_context=None):
  180. self.project = project
  181. if project:
  182. self.project_name = project.name
  183. else:
  184. self.project_name = None
  185. self.experiment_context = experiment_context
  186. self.params = {}
  187. self.experiment_versions: List[ExperimentVersion] = []
  188. self.exp_id = ""
  189. def add_params(self, params: Dict[str, str]):
  190. self.params.update(params)
  191. def add_experiment_version(self, version):
  192. self.experiment_versions.append(version)
  193. def init(self):
  194. buf = []
  195. if self.project:
  196. buf.append(f"ER{self.project.id}")
  197. if self.experiment_versions:
  198. for experiment_version in self.experiment_versions:
  199. buf.append(f"_E{experiment_version.exp_id}")
  200. buf.append(f"#EV{experiment_version.id}")
  201. self.exp_id = "".join(buf)
  202. def __str__(self):
  203. return f"ExperimentResult(project={self.project_name}, params={self.params}, experiment_context={self.experiment_context}, experiment_versions={self.experiment_versions})"