123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- from typing import List, Dict, Optional, Set
- import json
- from dataclasses import dataclass, field
- import hashlib
- class FNV:
- INIT64 = int("cbf29ce484222325", 16)
- PRIME64 = int("100000001b3", 16)
- MOD64 = 2**64
- @staticmethod
- def fnv1_64(data: bytes) -> int:
- hash_value = FNV.INIT64
- for byte in data:
- hash_value = (hash_value * FNV.PRIME64) % FNV.MOD64
- hash_value = hash_value ^ byte
- return hash_value
- class DiversionBucket:
- def match(self, experiment_context):
- raise NotImplementedError("Subclasses must implement this method")
- class UidDiversionBucket(DiversionBucket):
- def __init__(self, total_buckets: int, buckets: str):
- self.total_buckets = total_buckets
- if buckets:
- self.buckets = set(map(int, buckets.split(",")))
- else:
- self.buckets = set()
- def match(self, experiment_context):
- uid_hash = int(experiment_context.uid)
- bucket = uid_hash % self.total_buckets
- # print(f"Matching UID {experiment_context.uid} with hash {uid_hash} to bucket {bucket} in {self.buckets}")
- return bucket in self.buckets
- class FilterDiversionBucket(DiversionBucket):
- def __init__(self, filter_condition: str):
- self.filter_condition = filter_condition
- def match(self, experiment_context):
- raise NotImplementedError("not implemented")
- class Feature:
- def __init__(self, params=None):
- self.params = params
- def init(self):
- # Initialize feature-specific logic
- pass
- class ExperimentContext:
- def __init__(self, uid=None, filter_params=None):
- self.uid = uid
- self.filter_params = filter_params or {}
- def __str__(self):
- return f"ExperimentContext(uid={self.uid}, filter_params={self.filter_params})"
- class Domain:
- def __init__(self, domain_id, name, flow: int, buckets: str, bucket_type: str, debug_crowd_ids=None, is_default_domain=False, exp_layer_id=None,
- debug_users=""):
- self.id = int(domain_id)
- self.name = name
- self.debug_crowd_ids = debug_crowd_ids
- self.is_default_domain = is_default_domain
- self.exp_layer_id = int(exp_layer_id) if exp_layer_id is not None else None
- self.features = []
- self.layers = []
- self.debug_users = debug_users
- self.flow = flow
- self.buckets = buckets
- self.diversion_bucket = None
- self.bucket_type = bucket_type
- self.debug_user_set = set()
- def add_debug_users(self, users: List[str]):
- self.debug_user_set.update(users)
- def match_debug_users(self, experiment_context):
- return experiment_context.uid in self.debug_user_set
- def add_feature(self, feature: Feature):
- self.features.append(feature)
- def add_layer(self, layer):
- self.layers.append(layer)
- def init(self):
- self.debug_user_set.update(self.debug_users.split(","))
- self.diversion_bucket = UidDiversionBucket(100, self.buckets)
- def match(self, experiment_context):
- if self.flow == 0:
- return False
- elif self.flow == 100:
- return True
- if self.diversion_bucket:
- return self.diversion_bucket.match(experiment_context)
- return False
- @dataclass
- class Layer:
- id: int
- name: str
- experiments: List['Experiment'] = field(default_factory=list)
- domains: List[Domain] = field(default_factory=list)
- def add_experiment(self, experiment):
- self.experiments.append(experiment)
- def add_domain(self, domain):
- self.domains.append(domain)
- @dataclass
- class Experiment:
- id: int
- flow: int
- crowd_ids: List[str]
- debug_users: str
- buckets: str
- filter_condition: str
- bucket_type: str = "Random"
- debug_user_set: Set[str] = field(default_factory=set)
- diversion_bucket: Optional[DiversionBucket] = None
- experiment_versions: List['ExperimentVersion'] = field(default_factory=list)
- def add_debug_users(self, users: List[str]):
- self.debug_user_set.update(users)
- def match_debug_users(self, experiment_context):
- return experiment_context.uid in self.debug_user_set
- def add_experiment_version(self, version):
- self.experiment_versions.append(version)
- def match(self, experiment_context: ExperimentContext) -> bool:
- if self.bucket_type == "Random":
- if self.flow == 0:
- return False
- elif self.flow == 100:
- return True
- if self.diversion_bucket:
- return self.diversion_bucket.match(experiment_context)
- return False
- def init(self):
- # 初始化 debug_user_map
- if self.debug_users:
- self.debug_user_set.update(self.debug_users.split(","))
- # 初始化 diversion_bucket
- if self.bucket_type == "Random": # ExpBucketTypeRand
- self.diversion_bucket = UidDiversionBucket(100, self.buckets)
- elif self.bucket_type == "Condition" and self.filter_condition: # ExpBucketTypeCond
- self.diversion_bucket = FilterDiversionBucket(self.filter_condition)
- class ExperimentVersion:
- def __init__(self, exp_version_id, flow, buckets: str, exp_version_name=None, debug_users: str = '',
- config=None, debug_crowd_ids=None):
- self.id = int(exp_version_id)
- self.exp_version_name = exp_version_name
- self.config = config
- self.debug_crowd_ids = debug_crowd_ids
- self.debug_users = debug_users
- self.params = {}
- self.flow = flow
- self.buckets = buckets
- self.debug_user_set = set()
- self.diversion_bucket = None
- def add_debug_users(self, users: List[str]):
- self.debug_user_set.update(users)
- def match_debug_users(self, experiment_context):
- return experiment_context.uid in self.debug_user_set
- def match(self, experiment_context: ExperimentContext):
- if self.flow == 0:
- return False
- elif self.flow == 100:
- return True
- if self.diversion_bucket:
- return self.diversion_bucket.match(experiment_context)
- return False
- def init(self):
- self.debug_user_set.update(self.debug_users.split(","))
- self.diversion_bucket = UidDiversionBucket(100, self.buckets)
- params = json.loads(self.config)
- for kv in params:
- self.params[kv['key']] = kv['value']
- class Project:
- def __init__(self, project_name=None, project_id=None):
- self.project_name = project_name
- self.id = int(project_id)
- self.domains = []
- self.layers = []
- self.default_domain : Optional[Domain] = None
- self.layer_map = {}
- self.domain_map = {}
- def add_domain(self, domain):
- self.domains.append(domain)
- self.domain_map[domain.id] = domain
- def add_layer(self, layer):
- self.layers.append(layer)
- self.layer_map[layer.id] = layer
- def set_default_domain(self, domain: Domain):
- self.default_domain = domain
- class ExperimentResult:
- def __init__(self, project=None, experiment_context=None, project_name=None):
- self.project = project
- self.experiment_context = experiment_context
- self.project_name = project_name
- self.params = {}
- self.experiment_versions = []
- def add_params(self, params: Dict[str, str]):
- self.params.update(params)
- def add_experiment_version(self, version):
- self.experiment_versions.append(version)
- def init(self):
- # Initialize result-specific logic
- pass
- def __str__(self):
- return f"ExperimentResult(project={self.project_name}, params={self.params}, experiment_context={self.experiment_context}, experiment_versions={self.experiment_versions})"
|