container.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. import logging
  2. import os
  3. from hashlib import sha256
  4. from typing import List, Tuple
  5. import grpc
  6. from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
  7. from protos import container_pb2, container_pb2_grpc
  8. class Container(object):
  9. _channel: grpc.Channel = None
  10. _stub: container_pb2_grpc.ContainerServiceStub = None
  11. def __new__(cls, *args, **kwargs):
  12. if cls._channel is None:
  13. cls._channel = grpc.insecure_channel(
  14. target=f'{os.getenv("CONTAINER_GRPC_HOST")}:{os.getenv("CONTAINER_GRPC_PORT")}',
  15. options=[
  16. ('grpc.keepalive_time_ms', 10000),
  17. ('grpc.keepalive_timeout_ms', 5000),
  18. ('grpc.keepalive_permit_without_calls', True),
  19. ('grpc.http2.max_pings_without_data', 0),
  20. ],
  21. )
  22. cls._stub = container_pb2_grpc.ContainerServiceStub(channel=cls._channel)
  23. return super().__new__(cls, *args, **kwargs)
  24. def __init__(self):
  25. self.container_id = None
  26. # def close_channel(self):
  27. # """关闭通道"""
  28. # if self._channel is not None:
  29. # self._channel.close()
  30. def start_container(self):
  31. """启动一个容器,其最大运行时间为1小时,到期自动停止销毁"""
  32. response = self._stub.StartContainer(request=google_dot_protobuf_dot_empty__pb2.Empty())
  33. container_id = response.container_id
  34. if not container_id:
  35. raise RuntimeError('创建容器失败')
  36. self.container_id = container_id
  37. def stop_container(self) -> bool:
  38. """关闭一个容器"""
  39. if self.container_id:
  40. request = container_pb2.StopContainerRequest(container_id=self.container_id)
  41. response = self._stub.StopContainer(request=request)
  42. return response.success
  43. return True
  44. def run_command(self, command: List[str], show_log: bool = False) -> Tuple[int, str, List[str]]:
  45. """在容器内执行一条命令,可用的命令为: ffprobe | ffmpeg"""
  46. exit_code, msg = -999, ''
  47. request = container_pb2.RunCommandRequest(container_id=self.container_id, command=command)
  48. for response in self._stub.RunCommand(request=request):
  49. if show_log:
  50. logging.info(response.msg)
  51. msg += response.msg
  52. if response.exit_code != -999:
  53. exit_code = response.exit_code
  54. return exit_code, msg, command
  55. def file_exists(self, file_path: str) -> bool:
  56. """判断容器内指定路径的文件是否存在"""
  57. request = container_pb2.FileExistsRequest(container_id=self.container_id, path=file_path)
  58. response = self._stub.FileExists(request=request)
  59. return response.exists
  60. def get_file(self, container_file_path: str, host_file_path: str) -> bool:
  61. """从容器内获取文件"""
  62. hasher, tmp, sha256sum, length = sha256(), dict(), None, 0
  63. request = container_pb2.GetFileRequest(container_id=self.container_id, path=container_file_path)
  64. with open(host_file_path, 'wb') as f:
  65. for response in self._stub.GetFile(request=request):
  66. if response.sha256sum:
  67. sha256sum = response.sha256sum
  68. continue
  69. if response.payload:
  70. hasher.update(response.payload)
  71. f.seek(response.offset)
  72. f.write(response.payload)
  73. length += len(response.payload)
  74. return hasher.hexdigest() == sha256sum
  75. def put_file(self, host_file_path: str, container_file_path: str) -> bool:
  76. """将宿主机上的文件复制到容器内"""
  77. total_size = os.path.getsize(host_file_path)
  78. hasher, chunk_size, offset = sha256(), 1024 * 1024, 0
  79. with open(host_file_path, 'rb') as f:
  80. while offset < total_size:
  81. f.seek(offset)
  82. chunk = f.read(min(chunk_size, total_size - offset))
  83. if not chunk:
  84. break
  85. hasher.update(chunk)
  86. offset += len(chunk)
  87. sha256sum = hasher.hexdigest()
  88. def chunk_generator():
  89. yield container_pb2.ReusableChunk(container_id=self.container_id, path=container_file_path, sha256sum=sha256sum)
  90. _offset = 0
  91. with open(host_file_path, 'rb') as _f:
  92. while _offset < total_size:
  93. _f.seek(_offset)
  94. _chunk = _f.read(min(chunk_size, total_size - _offset))
  95. if not _chunk:
  96. break
  97. yield container_pb2.ReusableChunk(container_id=self.container_id, offset=_offset, payload=_chunk)
  98. _offset += len(_chunk)
  99. response = self._stub.PutFile(chunk_generator())
  100. return response.success
  101. def download_oss(self, bucket_name: str, object_key: str) -> str:
  102. """将OSS文件下载到容器"""
  103. request = container_pb2.DownloadOssRequest(container_id=self.container_id, bucket_name=bucket_name, object_key=object_key)
  104. response = self._stub.DownloadOss(request=request)
  105. return response.save_path
  106. def upload_oss(self, bucket_name: str, object_key: str, container_file_path: str, media_type: str):
  107. """将容器内文件上传到OSS"""
  108. request = container_pb2.UploadOssRequest(container_id=self.container_id,
  109. bucket_name=bucket_name,
  110. object_key=object_key,
  111. file_path=container_file_path,
  112. media_type=media_type)
  113. response = self._stub.UploadOss(request=request)
  114. return response.object_key