import logging import os from hashlib import sha256 from typing import List, Tuple import grpc from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 from protos import container_pb2, container_pb2_grpc class Container(object): _channel: grpc.Channel = None _stub: container_pb2_grpc.ContainerServiceStub = None def __new__(cls, *args, **kwargs): if cls._channel is None: cls._channel = grpc.insecure_channel( target=f'{os.getenv("CONTAINER_GRPC_HOST")}:{os.getenv("CONTAINER_GRPC_PORT")}', options=[ ('grpc.keepalive_time_ms', 10000), ('grpc.keepalive_timeout_ms', 5000), ('grpc.keepalive_permit_without_calls', True), ('grpc.http2.max_pings_without_data', 0), ], ) cls._stub = container_pb2_grpc.ContainerServiceStub(channel=cls._channel) return super().__new__(cls, *args, **kwargs) def __init__(self): self.container_id = None # def close_channel(self): # """关闭通道""" # if self._channel is not None: # self._channel.close() def start_container(self): """启动一个容器,其最大运行时间为1小时,到期自动停止销毁""" response = self._stub.StartContainer(request=google_dot_protobuf_dot_empty__pb2.Empty()) container_id = response.container_id if not container_id: raise RuntimeError('创建容器失败') self.container_id = container_id def stop_container(self) -> bool: """关闭一个容器""" if self.container_id: request = container_pb2.StopContainerRequest(container_id=self.container_id) response = self._stub.StopContainer(request=request) return response.success return True def run_command(self, command: List[str], show_log: bool = False) -> Tuple[int, str, List[str]]: """在容器内执行一条命令,可用的命令为: ffprobe | ffmpeg""" exit_code, msg = -999, '' request = container_pb2.RunCommandRequest(container_id=self.container_id, command=command) for response in self._stub.RunCommand(request=request): if show_log: logging.info(response.msg) msg += response.msg if response.exit_code != -999: exit_code = response.exit_code return exit_code, msg, command def file_exists(self, file_path: str) -> bool: """判断容器内指定路径的文件是否存在""" request = container_pb2.FileExistsRequest(container_id=self.container_id, path=file_path) response = self._stub.FileExists(request=request) return response.exists def get_file(self, container_file_path: str, host_file_path: str) -> bool: """从容器内获取文件""" hasher, tmp, sha256sum, length = sha256(), dict(), None, 0 request = container_pb2.GetFileRequest(container_id=self.container_id, path=container_file_path) with open(host_file_path, 'wb') as f: for response in self._stub.GetFile(request=request): if response.sha256sum: sha256sum = response.sha256sum continue if response.payload: hasher.update(response.payload) f.seek(response.offset) f.write(response.payload) length += len(response.payload) return hasher.hexdigest() == sha256sum def put_file(self, host_file_path: str, container_file_path: str) -> bool: """将宿主机上的文件复制到容器内""" total_size = os.path.getsize(host_file_path) hasher, chunk_size, offset = sha256(), 1024 * 1024, 0 with open(host_file_path, 'rb') as f: while offset < total_size: f.seek(offset) chunk = f.read(min(chunk_size, total_size - offset)) if not chunk: break hasher.update(chunk) offset += len(chunk) sha256sum = hasher.hexdigest() def chunk_generator(): yield container_pb2.ReusableChunk(container_id=self.container_id, path=container_file_path, sha256sum=sha256sum) _offset = 0 with open(host_file_path, 'rb') as _f: while _offset < total_size: _f.seek(_offset) _chunk = _f.read(min(chunk_size, total_size - _offset)) if not _chunk: break yield container_pb2.ReusableChunk(container_id=self.container_id, offset=_offset, payload=_chunk) _offset += len(_chunk) response = self._stub.PutFile(chunk_generator()) return response.success def download_oss(self, bucket_name: str, object_key: str) -> str: """将OSS文件下载到容器""" request = container_pb2.DownloadOssRequest(container_id=self.container_id, bucket_name=bucket_name, object_key=object_key) response = self._stub.DownloadOss(request=request) return response.save_path def upload_oss(self, bucket_name: str, object_key: str, container_file_path: str, media_type: str): """将容器内文件上传到OSS""" request = container_pb2.UploadOssRequest(container_id=self.container_id, bucket_name=bucket_name, object_key=object_key, file_path=container_file_path, media_type=media_type) response = self._stub.UploadOss(request=request) return response.object_key