rec_model_update.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. """
  2. 推荐模型进行日更新
  3. 1. 检查oss中模型文件夹是否准备好
  4. y: 下一步
  5. n: 等待5分钟后再次检查
  6. 2. 获取slb下所有机器
  7. 3. 逐台更新
  8. a. 从oss下载模型文件夹到服务器 /data/offline_dir
  9. b. 检查模型文件
  10. ok: 下一步
  11. fail: 发送报警到飞书
  12. c. 摘流量
  13. d. 重启服务
  14. e. 健康检查
  15. ok: 下一步
  16. fail: 发送报警到飞书
  17. f. 挂流量
  18. """
  19. import datetime
  20. import logging
  21. import time
  22. import docker
  23. import oss2
  24. import os
  25. import requests
  26. import rec_model_config
  27. import utils
  28. from threading import Timer
  29. logging.basicConfig(level=logging.INFO,
  30. format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
  31. datefmt='%a, %d %b %Y %H:%M:%S')
  32. health_instances=[]
  33. def rec_server_health_check(client, instance_id, ip_address):
  34. """
  35. 服务健康检查
  36. :param client: 客户端连接
  37. :param instance_id: instanceId
  38. :return:
  39. """
  40. #health_instances = []
  41. #ip_address = utils.get_ip_address(client=client, instance_id=instance_id)
  42. while True:
  43. health_check_url = f"http://{ip_address}:8501/v1/models/deepfm"
  44. try:
  45. http_code = requests.get(health_check_url).status_code
  46. print(http_code)
  47. except:
  48. logging.info("images is downloading")
  49. http_code = 0
  50. if http_code == 200:
  51. health_instances.append((instance_id, ip_address))
  52. break
  53. else:
  54. time.sleep(10)
  55. def model_oss_check(model_file, local_path):
  56. """判断oss是否有该文件并下载"""
  57. auth = oss2.Auth(access_key_id=rec_model_config.OSS_CONFIG['accessKeyId'],
  58. access_key_secret=rec_model_config.OSS_CONFIG['accessKeySecret'])
  59. bucket = oss2.Bucket(auth, endpoint=rec_model_config.OSS_CONFIG['endpoint'],
  60. bucket_name=rec_model_config.OSS_CONFIG['bucket_name'])
  61. exist = bucket.object_exists(model_file)
  62. # 返回值为true表示文件存在,false表示文件不存在。
  63. if exist:
  64. # 下载文件到本地服务器
  65. if not os.path.exists(local_path):
  66. os.makedirs(local_path)
  67. local_file = os.path.join(local_path, model_file)
  68. bucket.get_object_to_file(model_file, local_file)
  69. return True
  70. else:
  71. # 模型文件不存在,5分钟后重新检查
  72. Timer(60, model_oss_check, args=[model_file]).start()
  73. def server_restart(slb_client, instance_id, ip_address, ins_name, dt):
  74. try:
  75. # 1. 检查模型文件
  76. #ip_address =
  77. # 2. 摘流量
  78. utils.set_instance_weight_process(client=slb_client,
  79. slb_id=rec_model_config.slb_id,
  80. instance_id_list=[instance_id],
  81. weight_list=[(0, 60)])
  82. logging.info(f"set weight with 0 finished.")
  83. #3.scp start.sh
  84. instance_id_list=[]
  85. instance_id_list.append(instance_id)
  86. #print(rec_model_config)
  87. utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.start_sh)
  88. #utils.send_file_to_ecs(client=slb_client, instance_id_list=instance_id_list, **rec_model_config.check_sh)
  89. #server_check_sh = os.path.join(rec_model_config.check_sh['target_dir'], rec_model_config.check_sh['name'])
  90. #server_check_commend = f"sh {server_check_sh} {dt}"
  91. #print(server_check_commend)
  92. #print(instance_id)
  93. #command_status=utils.run_per_command(client=slb_client, instance=instance_id, command=server_check_commend)
  94. #print("command_status:",command_status)
  95. # 4. 重启容器
  96. docker_client = docker.DockerClient(base_url=f'tcp://{ip_address}:2375', timeout=60)
  97. logging.info(docker_client)
  98. #print(docker_client.containers.list)
  99. image_id = 0
  100. try:
  101. image_id = docker_client.containers.get(ins_name)
  102. #print(image_id)
  103. image_id.stop()
  104. image_id.remove()
  105. except:
  106. image_id = 0
  107. #image_id.sop()
  108. #print("image_id",image_id)
  109. server_start_sh = os.path.join(rec_model_config.start_sh['target_dir'], rec_model_config.start_sh['name'])
  110. server_start_commend = f"sh {server_start_sh} {dt}"
  111. utils.run_command(client=slb_client, instance_ids=instance_id_list, command=server_start_commend)
  112. logging.info(f"docker restart finished.")
  113. time.sleep(5)
  114. # 4. 探活
  115. rec_server_health_check(slb_client, instance_id, ip_address)
  116. time.sleep(30)
  117. logging.info(f"health check finished.")
  118. # 5. 挂流量
  119. add_weight_list = [(10, 30), (20, 20), (40, 10), (60, 10), (80, 10), (100, 10)]
  120. utils.set_instance_weight_process(client=slb_client,
  121. slb_id=rec_model_config.slb_id,
  122. instance_id_list=[instance_id],
  123. weight_list=add_weight_list)
  124. logging.info(f"server restart finished, instance: {instance_id}/{ip_address}")
  125. except Exception as e:
  126. print(e)
  127. logging.error(f"server restart fail, instance: {instance_id}")
  128. logging.error(e)
  129. def main():
  130. try:
  131. # 1. 检查oss中模型文件夹是否准备好并下载
  132. now_date = datetime.datetime.today()- datetime.timedelta(days=1)
  133. print("model update date",now_date)
  134. dt = datetime.datetime.strftime(now_date, '%Y%m%d')
  135. model_file = f"{dt}.tar"
  136. local_path = '/data/rec_model'
  137. #print(rec_model_config.start_sh)
  138. check_res = model_oss_check(model_file=model_file, local_path=local_path)
  139. check_res = True
  140. if check_res is True:
  141. # 2. 获取slb下所有机器
  142. slb_client = utils.connect_client(access_key_id=rec_model_config.slb_client_params['access_key_id'],
  143. access_key_secret=rec_model_config.slb_client_params['access_key_secret'],
  144. region_id=rec_model_config.slb_client_params['region_id'])
  145. online_instance_ids = utils.get_instance_ids(client=slb_client, slb_id=rec_model_config.slb_id)
  146. online_instance_count = len(online_instance_ids)
  147. #online_instance_count=1
  148. logging.info(f"online instance count: {online_instance_count}.")
  149. logging.info(f"online instance ids: {online_instance_ids}")
  150. #online_instance_ids=['i-bp13mx85rzardscc89z3']
  151. # 3. 逐台更新
  152. #image_name = 'tensorflow/serving'
  153. docker_ins_name = 'deepfm'
  154. for i, instance_id in enumerate(online_instance_ids):
  155. logging.info(f"instance:{instance_id}")
  156. # 3.1. 获取ip
  157. ip_address = utils.get_ip_address(client=slb_client, instance_id=instance_id)
  158. logging.info(f"ip address:{instance_id}/{ip_address}")
  159. # 3.2. scp 模型文件到服务器
  160. ecs_model_path = '/data/offline_dir'
  161. scp_command = f"scp {local_path}/{model_file} {ip_address}:{ecs_model_path}"
  162. os.system(command=scp_command)
  163. # 3.3 服务重启
  164. server_restart(slb_client=slb_client, instance_id=instance_id,
  165. ip_address=ip_address, ins_name=docker_ins_name, dt=dt)
  166. logging.info(f"重启进度: {i+1}/{online_instance_count}")
  167. #break
  168. logging.info(f"server restart finished!")
  169. except Exception as e:
  170. logging.error(e)
  171. if __name__ == '__main__':
  172. main()