client.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. # Python: experiment_client.py
  2. import threading
  3. from typing import List, Dict
  4. from alibabacloud_paiabtest20240119.client import Client
  5. from pqai_agent.abtest.models import Project, Domain, Layer, Experiment, ExperimentVersion, \
  6. ExperimentContext, ExperimentResult
  7. from alibabacloud_paiabtest20240119.models import ListProjectsRequest, ListProjectsResponseBodyProjects, \
  8. ListDomainsRequest, ListFeaturesRequest, ListLayersRequest, ListExperimentsRequest, ListExperimentVersionsRequest
  9. from pqai_agent.logging_service import logger
  10. class ExperimentClient:
  11. def __init__(self, client: Client):
  12. self.client = client
  13. self.project_map = {}
  14. self.running = False
  15. self.worker_thread = None
  16. def start(self):
  17. self.running = True
  18. self.worker_thread = threading.Thread(target=self._worker_loop)
  19. self.worker_thread.start()
  20. def shutdown(self, blocking=False):
  21. self.running = False
  22. if self.worker_thread:
  23. if blocking:
  24. self.worker_thread.join()
  25. else:
  26. self.worker_thread = None
  27. def _worker_loop(self):
  28. while self.running:
  29. # Sleep or wait for a condition to avoid busy waiting
  30. threading.Event().wait(60)
  31. try:
  32. self.load_experiment_data()
  33. logger.debug("Experiment data loaded successfully.")
  34. except Exception as e:
  35. logger.error(f"Error loading experiment data: {e}")
  36. logger.info("ExperimentClient worker thread exit.")
  37. def load_experiment_data(self):
  38. project_map = {}
  39. # 获取所有项目
  40. list_project_req = ListProjectsRequest()
  41. list_project_req.all = True
  42. projects_response = self.client.list_projects(list_project_req)
  43. projects: List[ListProjectsResponseBodyProjects] = projects_response.body.projects
  44. for project_data in projects:
  45. project = Project(project_name=project_data.name, project_id=project_data.project_id)
  46. logger.debug(f"[Project] {project_data}")
  47. # 获取项目的域
  48. list_domain_req = ListDomainsRequest()
  49. list_domain_req.project_id = project.id
  50. domains_response = self.client.list_domains(list_domain_req)
  51. for domain_data in domains_response.body.domains:
  52. domain = Domain(domain_id=domain_data.domain_id,
  53. name=domain_data.name,
  54. flow=domain_data.flow,
  55. buckets=domain_data.buckets,
  56. bucket_type=domain_data.bucket_type,
  57. is_default_domain=domain_data.is_default_domain,
  58. exp_layer_id=domain_data.layer_id,
  59. debug_users=domain_data.debug_users)
  60. logger.debug(f"[Domain] {domain_data}")
  61. if domain.is_default_domain:
  62. project.set_default_domain(domain)
  63. domain.init()
  64. project.add_domain(domain)
  65. # 获取域的特性(暂无实际用处)
  66. list_feature_req = ListFeaturesRequest()
  67. list_feature_req.domain_id = domain.id
  68. features_response = self.client.list_features(list_feature_req)
  69. for feature_data in features_response.body.features:
  70. domain.add_feature(feature_data)
  71. # 获取域的层
  72. list_layer_req = ListLayersRequest()
  73. list_layer_req.domain_id = domain.id
  74. layers_response = self.client.list_layers(list_layer_req)
  75. for layer_data in layers_response.body.layers:
  76. logger.debug(f'[Layer] {layer_data}')
  77. layer = Layer(layer_id=layer_data.layer_id, name=layer_data.name)
  78. project.add_layer(layer)
  79. # 获取层的实验
  80. list_experiment_req = ListExperimentsRequest()
  81. list_experiment_req.layer_id = layer.id
  82. # FIXME: magic code
  83. list_experiment_req.status = 'Running'
  84. experiments_response = self.client.list_experiments(list_experiment_req)
  85. for experiment_data in experiments_response.body.experiments:
  86. logger.debug(f'[Experiment] {experiment_data}')
  87. # FIXME: Java SDK中有特殊处理
  88. crowd_ids = experiment_data.crowd_ids if experiment_data.crowd_ids else ""
  89. experiment = Experiment(id=int(experiment_data.experiment_id), bucket_type=experiment_data.bucket_type,
  90. flow=experiment_data.flow, buckets=experiment_data.buckets,
  91. crowd_ids=crowd_ids.split(","),
  92. debug_users=experiment_data.debug_users,
  93. filter_condition=experiment_data.condition
  94. )
  95. experiment.init()
  96. # 获取实验的版本
  97. list_exp_ver_req = ListExperimentVersionsRequest()
  98. list_exp_ver_req.experiment_id = experiment.id
  99. versions_response = self.client.list_experiment_versions(list_exp_ver_req)
  100. print(versions_response)
  101. for version_data in versions_response.body.experiment_versions:
  102. logger.debug(f'[ExperimentVersion] {version_data}')
  103. version = ExperimentVersion(exp_version_id=version_data.experiment_version_id,
  104. flow=int(version_data.flow),
  105. buckets=version_data.buckets,
  106. debug_users=version_data.debug_users,
  107. exp_version_name=version_data.name,
  108. config=version_data.config)
  109. version.init()
  110. experiment.add_experiment_version(version)
  111. layer.add_experiment(experiment)
  112. domain.add_layer(layer)
  113. # 建立layer-domain的反向映射,从而形成嵌套结构
  114. for domain in project.domains:
  115. if domain.is_default_domain:
  116. continue
  117. layer: Layer = project.layer_map.get(domain.exp_layer_id, None)
  118. if not layer:
  119. continue
  120. layer.add_domain(domain)
  121. project_map[project.project_name] = project
  122. self.project_map = project_map
  123. def match_experiment(self, project_name, experiment_context) -> ExperimentResult:
  124. if project_name not in self.project_map:
  125. return ExperimentResult(project_name=project_name, experiment_context=experiment_context)
  126. project = self.project_map[project_name]
  127. experiment_result = ExperimentResult(project=project, experiment_context=experiment_context)
  128. self._match_domain(project.default_domain, experiment_result)
  129. experiment_result.init()
  130. return experiment_result
  131. def _match_domain(self, domain: Domain, experiment_result: ExperimentResult):
  132. if not domain:
  133. return
  134. for feature in domain.features:
  135. if feature.match(experiment_result.experiment_context):
  136. experiment_result.add_params(feature.params)
  137. for layer in domain.layers:
  138. self._match_layer(layer, experiment_result)
  139. def _match_layer(self, layer, experiment_result):
  140. if not layer:
  141. return
  142. for experiment in layer.experiments:
  143. if experiment.match_debug_users(experiment_result.experiment_context):
  144. logger.debug(f"Matched debug user for experiment: {experiment.id}")
  145. self._match_experiment(experiment, experiment_result)
  146. return
  147. for domain in layer.domains:
  148. if domain.match_debug_users(experiment_result.experiment_context):
  149. logger.debug(f"Matched debug user for domain: {domain.id}")
  150. self._match_domain(domain, experiment_result)
  151. hash_key = f"{experiment_result.experiment_context.uid}_LAYER{layer.id}"
  152. hash_value = self._hash_value(hash_key)
  153. exp_context = ExperimentContext(uid=hash_value,
  154. filter_params=experiment_result.experiment_context.filter_params)
  155. matched_experiments = [exp for exp in layer.experiments if exp.match(exp_context)]
  156. if len(matched_experiments) == 1:
  157. self._match_experiment(matched_experiments[0], experiment_result)
  158. elif len(matched_experiments) > 1:
  159. for experiment in matched_experiments:
  160. if experiment.bucket_type == "Condition":
  161. self._match_experiment(experiment, experiment_result)
  162. return
  163. logger.warning(f"Warning: Multiple experiments matched under layer {layer.id}.")
  164. self._match_experiment(matched_experiments[0], experiment_result)
  165. matched_domains = []
  166. for domain in layer.domains:
  167. if domain.match(exp_context):
  168. logger.debug(f"Matched domain {domain.id} for uid {experiment_result.experiment_context.uid}.")
  169. matched_domains.append(domain)
  170. if len(matched_domains) == 1:
  171. self._match_domain(matched_domains[0], experiment_result)
  172. return
  173. elif len(matched_domains) > 1:
  174. for domain in matched_domains:
  175. if domain.bucket_type == "Condition":
  176. self._match_domain(domain, experiment_result)
  177. return
  178. logger.warning(f"Warning: Multiple domains matched under layer {layer.id}, using the first one.")
  179. self._match_domain(matched_domains[0], experiment_result)
  180. return
  181. def _match_experiment(self, experiment: Experiment, experiment_result: ExperimentResult):
  182. if not experiment:
  183. return
  184. for version in experiment.experiment_versions:
  185. if version.match_debug_users(experiment_result.experiment_context):
  186. logger.debug(f"Matched debug user for experiment version: {version.id}")
  187. experiment_result.add_params(version.params)
  188. experiment_result.add_experiment_version(version)
  189. return
  190. hash_key = f"{experiment_result.experiment_context.uid}_EXPERIMENT{experiment.id}"
  191. hash_value = self._hash_value(hash_key)
  192. exp_context = ExperimentContext(uid=hash_value,
  193. filter_params=experiment_result.experiment_context.filter_params)
  194. for version in experiment.experiment_versions:
  195. if version.match(exp_context):
  196. experiment_result.add_params(version.params)
  197. experiment_result.add_experiment_version(version)
  198. return
  199. def _hash_value(self, hash_key):
  200. import hashlib
  201. md5_hash = hashlib.md5(hash_key.encode()).hexdigest()
  202. return md5_hash
  203. def __del__(self):
  204. if self.running and self.worker_thread:
  205. self.shutdown()
  206. g_client = None
  207. def get_client():
  208. global g_client
  209. if not g_client:
  210. ak_id = 'LTAI5tFGqgC8f3mh1fRCrAEy'
  211. ak_secret = 'XhOjK9XmTYRhVAtf6yii4s4kZwWzvV'
  212. region = 'cn-hangzhou'
  213. from alibabacloud_tea_openapi.models import Config
  214. endpoint = f"paiabtest.{region}.aliyuncs.com"
  215. conf = Config(access_key_id=ak_id, access_key_secret=ak_secret, region_id=region,
  216. endpoint=endpoint, type="access_key")
  217. api_client = Client(conf)
  218. g_client = ExperimentClient(api_client)
  219. g_client.load_experiment_data()
  220. g_client.start()
  221. return g_client
  222. if __name__ == '__main__':
  223. from pqai_agent.logging_service import setup_root_logger
  224. setup_root_logger(level='DEBUG')
  225. experiment_client = get_client()
  226. for project_name, project in experiment_client.project_map.items():
  227. print(f"Project: {project_name}, ID: {project.id}")
  228. for domain in project.domains:
  229. print(f" Domain: {domain.id}, Default: {domain.is_default_domain}")
  230. for layer in domain.layers:
  231. print(f" Layer: {layer.id}")
  232. for experiment in layer.experiments:
  233. print(f" Experiment: {experiment.id}")
  234. for version in experiment.experiment_versions:
  235. print(f" Version: {version.id}, Config: {version.config}")
  236. exp_context = ExperimentContext(uid='x')
  237. result = experiment_client.match_experiment('PQAgent', exp_context)
  238. print(result)