from typing import List, Dict, Optional, Set import json from dataclasses import dataclass, field import hashlib class FNV: INIT64 = int("cbf29ce484222325", 16) PRIME64 = int("100000001b3", 16) MOD64 = 2**64 @staticmethod def fnv1_64(data: bytes) -> int: hash_value = FNV.INIT64 for byte in data: hash_value = (hash_value * FNV.PRIME64) % FNV.MOD64 hash_value = hash_value ^ byte return hash_value class DiversionBucket: def match(self, experiment_context): raise NotImplementedError("Subclasses must implement this method") class UidDiversionBucket(DiversionBucket): def __init__(self, total_buckets: int, buckets: str): self.total_buckets = total_buckets if buckets: self.buckets = set(map(int, buckets.split(","))) else: self.buckets = set() def match(self, experiment_context): uid_hash = int(experiment_context.uid) bucket = uid_hash % self.total_buckets # print(f"Matching UID {experiment_context.uid} with hash {uid_hash} to bucket {bucket} in {self.buckets}") return bucket in self.buckets class FilterDiversionBucket(DiversionBucket): def __init__(self, filter_condition: str): self.filter_condition = filter_condition def match(self, experiment_context): raise NotImplementedError("not implemented") class Feature: def __init__(self, params=None): self.params = params def init(self): # Initialize feature-specific logic pass class ExperimentContext: def __init__(self, uid=None, filter_params=None): self.uid = uid self.filter_params = filter_params or {} def __str__(self): return f"ExperimentContext(uid={self.uid}, filter_params={self.filter_params})" class Domain: def __init__(self, domain_id, name, flow: int, buckets: str, bucket_type: str, debug_crowd_ids=None, is_default_domain=False, exp_layer_id=None, debug_users=""): self.id = int(domain_id) self.name = name self.debug_crowd_ids = debug_crowd_ids self.is_default_domain = is_default_domain self.exp_layer_id = int(exp_layer_id) if exp_layer_id is not None else None self.features = [] self.layers = [] self.debug_users = debug_users self.flow = flow self.buckets = buckets self.diversion_bucket = None self.bucket_type = bucket_type self.debug_user_set = set() def add_debug_users(self, users: List[str]): self.debug_user_set.update(users) def match_debug_users(self, experiment_context): return experiment_context.uid in self.debug_user_set def add_feature(self, feature: Feature): self.features.append(feature) def add_layer(self, layer): self.layers.append(layer) def init(self): self.debug_user_set.update(self.debug_users.split(",")) self.diversion_bucket = UidDiversionBucket(100, self.buckets) def match(self, experiment_context): if self.flow == 0: return False elif self.flow == 100: return True if self.diversion_bucket: return self.diversion_bucket.match(experiment_context) return False @dataclass class Layer: id: int name: str experiments: List['Experiment'] = field(default_factory=list) domains: List[Domain] = field(default_factory=list) def add_experiment(self, experiment): self.experiments.append(experiment) def add_domain(self, domain): self.domains.append(domain) @dataclass class Experiment: id: int flow: int crowd_ids: List[str] debug_users: str buckets: str filter_condition: str bucket_type: str = "Random" debug_user_set: Set[str] = field(default_factory=set) diversion_bucket: Optional[DiversionBucket] = None experiment_versions: List['ExperimentVersion'] = field(default_factory=list) def add_debug_users(self, users: List[str]): self.debug_user_set.update(users) def match_debug_users(self, experiment_context): return experiment_context.uid in self.debug_user_set def add_experiment_version(self, version): self.experiment_versions.append(version) def match(self, experiment_context: ExperimentContext) -> bool: if self.bucket_type == "Random": if self.flow == 0: return False elif self.flow == 100: return True if self.diversion_bucket: return self.diversion_bucket.match(experiment_context) return False def init(self): # 初始化 debug_user_map if self.debug_users: self.debug_user_set.update(self.debug_users.split(",")) # 初始化 diversion_bucket if self.bucket_type == "Random": # ExpBucketTypeRand self.diversion_bucket = UidDiversionBucket(100, self.buckets) elif self.bucket_type == "Condition" and self.filter_condition: # ExpBucketTypeCond self.diversion_bucket = FilterDiversionBucket(self.filter_condition) class ExperimentVersion: def __init__(self, exp_version_id, flow, buckets: str, exp_version_name=None, debug_users: str = '', config=None, debug_crowd_ids=None): self.id = int(exp_version_id) self.exp_version_name = exp_version_name self.config = config self.debug_crowd_ids = debug_crowd_ids self.debug_users = debug_users self.params = {} self.flow = flow self.buckets = buckets self.debug_user_set = set() self.diversion_bucket = None def add_debug_users(self, users: List[str]): self.debug_user_set.update(users) def match_debug_users(self, experiment_context): return experiment_context.uid in self.debug_user_set def match(self, experiment_context: ExperimentContext): if self.flow == 0: return False elif self.flow == 100: return True if self.diversion_bucket: return self.diversion_bucket.match(experiment_context) return False def init(self): self.debug_user_set.update(self.debug_users.split(",")) self.diversion_bucket = UidDiversionBucket(100, self.buckets) params = json.loads(self.config) for kv in params: self.params[kv['key']] = kv['value'] class Project: def __init__(self, project_name=None, project_id=None): self.project_name = project_name self.id = int(project_id) self.domains = [] self.layers = [] self.default_domain : Optional[Domain] = None self.layer_map = {} self.domain_map = {} def add_domain(self, domain): self.domains.append(domain) self.domain_map[domain.id] = domain def add_layer(self, layer): self.layers.append(layer) self.layer_map[layer.id] = layer def set_default_domain(self, domain: Domain): self.default_domain = domain class ExperimentResult: def __init__(self, project=None, experiment_context=None, project_name=None): self.project = project self.experiment_context = experiment_context self.project_name = project_name self.params = {} self.experiment_versions = [] def add_params(self, params: Dict[str, str]): self.params.update(params) def add_experiment_version(self, version): self.experiment_versions.append(version) def init(self): # Initialize result-specific logic pass def __str__(self): return f"ExperimentResult(project={self.project_name}, params={self.params}, experiment_context={self.experiment_context}, experiment_versions={self.experiment_versions})"