123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- import logging
- import os
- from hashlib import sha256
- from typing import List, Tuple
- import grpc
- from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
- from protos import container_pb2, container_pb2_grpc
- class Container(object):
- _channel: grpc.Channel = None
- _stub: container_pb2_grpc.ContainerServiceStub = None
- def __new__(cls, *args, **kwargs):
- if cls._channel is None:
- cls._channel = grpc.insecure_channel(
- target=f'{os.getenv("CONTAINER_GRPC_HOST")}:{os.getenv("CONTAINER_GRPC_PORT")}',
- options=[
- ('grpc.keepalive_time_ms', 10000),
- ('grpc.keepalive_timeout_ms', 5000),
- ('grpc.keepalive_permit_without_calls', True),
- ('grpc.http2.max_pings_without_data', 0),
- ],
- )
- cls._stub = container_pb2_grpc.ContainerServiceStub(channel=cls._channel)
- return super().__new__(cls, *args, **kwargs)
- def __init__(self):
- self.container_id = None
- # def close_channel(self):
- # """关闭通道"""
- # if self._channel is not None:
- # self._channel.close()
- def start_container(self):
- """启动一个容器,其最大运行时间为1小时,到期自动停止销毁"""
- response = self._stub.StartContainer(request=google_dot_protobuf_dot_empty__pb2.Empty())
- container_id = response.container_id
- if not container_id:
- raise RuntimeError('创建容器失败')
- self.container_id = container_id
- def stop_container(self) -> bool:
- """关闭一个容器"""
- if self.container_id:
- request = container_pb2.StopContainerRequest(container_id=self.container_id)
- response = self._stub.StopContainer(request=request)
- return response.success
- return True
- def run_command(self, command: List[str], show_log: bool = False) -> Tuple[int, str, List[str]]:
- """在容器内执行一条命令,可用的命令为: ffprobe | ffmpeg"""
- exit_code, msg = -999, ''
- request = container_pb2.RunCommandRequest(container_id=self.container_id, command=command)
- for response in self._stub.RunCommand(request=request):
- if show_log:
- logging.info(response.msg)
- msg += response.msg
- if response.exit_code != -999:
- exit_code = response.exit_code
- return exit_code, msg, command
- def file_exists(self, file_path: str) -> bool:
- """判断容器内指定路径的文件是否存在"""
- request = container_pb2.FileExistsRequest(container_id=self.container_id, path=file_path)
- response = self._stub.FileExists(request=request)
- return response.exists
- def get_file(self, container_file_path: str, host_file_path: str) -> bool:
- """从容器内获取文件"""
- hasher, tmp, sha256sum, length = sha256(), dict(), None, 0
- request = container_pb2.GetFileRequest(container_id=self.container_id, path=container_file_path)
- with open(host_file_path, 'wb') as f:
- for response in self._stub.GetFile(request=request):
- if response.sha256sum:
- sha256sum = response.sha256sum
- continue
- if response.payload:
- hasher.update(response.payload)
- f.seek(response.offset)
- f.write(response.payload)
- length += len(response.payload)
- return hasher.hexdigest() == sha256sum
- def put_file(self, host_file_path: str, container_file_path: str) -> bool:
- """将宿主机上的文件复制到容器内"""
- total_size = os.path.getsize(host_file_path)
- hasher, chunk_size, offset = sha256(), 1024 * 1024, 0
- with open(host_file_path, 'rb') as f:
- while offset < total_size:
- f.seek(offset)
- chunk = f.read(min(chunk_size, total_size - offset))
- if not chunk:
- break
- hasher.update(chunk)
- offset += len(chunk)
- sha256sum = hasher.hexdigest()
- def chunk_generator():
- yield container_pb2.ReusableChunk(container_id=self.container_id, path=container_file_path, sha256sum=sha256sum)
- _offset = 0
- with open(host_file_path, 'rb') as _f:
- while _offset < total_size:
- _f.seek(_offset)
- _chunk = _f.read(min(chunk_size, total_size - _offset))
- if not _chunk:
- break
- yield container_pb2.ReusableChunk(container_id=self.container_id, offset=_offset, payload=_chunk)
- _offset += len(_chunk)
- response = self._stub.PutFile(chunk_generator())
- return response.success
- def download_oss(self, bucket_name: str, object_key: str) -> str:
- """将OSS文件下载到容器"""
- request = container_pb2.DownloadOssRequest(container_id=self.container_id, bucket_name=bucket_name, object_key=object_key)
- response = self._stub.DownloadOss(request=request)
- return response.save_path
- def upload_oss(self, bucket_name: str, object_key: str, container_file_path: str, media_type: str):
- """将容器内文件上传到OSS"""
- request = container_pb2.UploadOssRequest(container_id=self.container_id,
- bucket_name=bucket_name,
- object_key=object_key,
- file_path=container_file_path,
- media_type=media_type)
- response = self._stub.UploadOss(request=request)
- return response.object_key
|