kevin.yang пре 1 месец
родитељ
комит
d229438632

+ 12 - 0
.dockerignore

@@ -0,0 +1,12 @@
+.git/
+.idea/
+.vscode/
+__pycache__/
+docker/
+*.log
+*.jpg
+*.png
+*.gif
+*.webp
+*.mp4
+docker-compose.yml

+ 318 - 0
.gitignore

@@ -0,0 +1,318 @@
+# Created by https://www.toptal.com/developers/gitignore/api/python,node
+# Edit at https://www.toptal.com/developers/gitignore?templates=python,node
+
+### Node ###
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+lerna-debug.log*
+.pnpm-debug.log*
+
+# Diagnostic reports (https://nodejs.org/api/report.html)
+report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
+
+# Runtime data
+pids
+*.pid
+*.seed
+*.pid.lock
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+*.lcov
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# Bower dependency directory (https://bower.io/)
+bower_components
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (https://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules/
+jspm_packages/
+
+# Snowpack dependency directory (https://snowpack.dev/)
+web_modules/
+
+# TypeScript cache
+*.tsbuildinfo
+
+# Optional npm cache directory
+.npm
+
+# Optional eslint cache
+.eslintcache
+
+# Optional stylelint cache
+.stylelintcache
+
+# Microbundle cache
+.rpt2_cache/
+.rts2_cache_cjs/
+.rts2_cache_es/
+.rts2_cache_umd/
+
+# Optional REPL history
+.node_repl_history
+
+# Output of 'npm pack'
+*.tgz
+
+# Yarn Integrity file
+.yarn-integrity
+
+# dotenv environment variable files
+.env
+.env.development.local
+.env.test.local
+.env.production.local
+.env.local
+
+# parcel-bundler cache (https://parceljs.org/)
+.cache
+.parcel-cache
+
+# Next.js build output
+.next
+out
+
+# Nuxt.js build / generate output
+.nuxt
+dist
+
+# Gatsby files
+.cache/
+# Comment in the public line in if your project uses Gatsby and not Next.js
+# https://nextjs.org/blog/next-9-1#public-directory-support
+# public
+
+# vuepress build output
+.vuepress/dist
+
+# vuepress v2.x temp and cache directory
+.temp
+
+# Docusaurus cache and generated files
+.docusaurus
+
+# Serverless directories
+.serverless/
+
+# FuseBox cache
+.fusebox/
+
+# DynamoDB Local files
+.dynamodb/
+
+# TernJS port file
+.tern-port
+
+# Stores VSCode versions used for testing VSCode extensions
+.vscode-test
+
+# yarn v2
+.yarn/cache
+.yarn/unplugged
+.yarn/build-state.yml
+.yarn/install-state.gz
+.pnp.*
+
+### Node Patch ###
+# Serverless Webpack directories
+.webpack/
+
+# Optional stylelint cache
+
+# SvelteKit build / generate output
+.svelte-kit
+
+### Python ###
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+#*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# poetry
+#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
+#   This is especially recommended for binary packages to ensure reproducibility, and is more
+#   commonly ignored for libraries.
+#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
+#poetry.lock
+
+# pdm
+#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
+#pdm.lock
+#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
+#   in version control.
+#   https://pdm.fming.dev/#use-with-ide
+.pdm.toml
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+# PyCharm
+#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
+#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
+#  and can be added to the global gitignore or merged into this file.  For a more nuclear
+#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
+.idea/
+
+# VScode
+.vscode/
+
+### Python Patch ###
+# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
+poetry.toml
+
+# ruff
+.ruff_cache/
+
+# LSP config files
+pyrightconfig.json
+
+# End of https://www.toptal.com/developers/gitignore/api/python,node
+
+.DS_Store

+ 2 - 2
product.env

@@ -1,5 +1,7 @@
 ENV=prod
 
+FFMPEG_HOST=101.37.24.17:5555
+
 FS_DATA_1=范军,4a768d,task:carry_data_redis_fj,AIzaSyBqqAhzew33-ZY6u0w0Th8z4FHjiieFK0s
 FS_DATA_2=鲁涛,EZef39,task:carry_data_redis_lt,AIzaSyBXU3VIilXxDbIg5riYTu5t6WEVEH8AQNk
 FS_DATA_3=余海涛,Frush6,task:carry_data_redis_yht,AIzaSyCaurhmnci1jooXSN8Q3dKWbYYndUllbzs
@@ -10,5 +12,3 @@ FS_DATA_7=周仙琴,2WIcBU,task:carry_data_redis_zxq,AIzaSyD46rphXd-Ie51sQiQ61lr
 FS_DATA_8=信欣,v0fFCb,task:carry_data_redis_xx,AIzaSyB16JcJHwoz-YcvEUO96Dm1n0zf89GOdms
 FS_DATA_9=邓锋,DEpi6V,task:carry_data_redis_df,AIzaSyCWilmMZyG4xW_pujpRGflaa7SIBjLQHiI
 FS_DATA_10=王知微,jrpuyW,task:carry_data_redis_wzw,AIzaSyCx3hy5ef8wOVPNjvK1MIAwyZZCdYuRh-U
-
-

+ 0 - 0
protos/__init__.py


+ 69 - 0
protos/task.proto

@@ -0,0 +1,69 @@
+syntax = "proto3";
+
+import "google/protobuf/empty.proto";
+
+service TaskRunner {
+  rpc StartContainer (google.protobuf.Empty) returns (StartContainerResponse);
+  rpc StopContainer (StopContainerRequest) returns (StopContainerResponse);
+  rpc RunCommand (RunCommandRequest) returns (stream RunCommandResponse);
+  rpc FileExists (FileExistsRequest) returns (FileExistsResponse);
+  rpc GetFile (GetFileRequest) returns (GetFileResponse);
+  rpc PutFile (PutFileRequest) returns (PutFileResponse);
+}
+
+message StartContainerResponse {
+  string id = 1;
+}
+
+message StopContainerRequest {
+  string id = 1;
+}
+
+message StopContainerResponse {
+  string id = 1;
+}
+
+message RunCommandRequest {
+  string id = 1;
+  repeated string command = 2;
+}
+
+message RunCommandResponse {
+  string id = 1;
+  optional string msg = 2;
+}
+
+message FileExistsRequest {
+  string id = 1;
+  string path = 2;
+}
+
+message FileExistsResponse {
+  string id = 1;
+  string path = 2;
+  optional bool exists = 3;
+}
+
+message GetFileRequest {
+  string id = 1;
+  string path = 2;
+}
+
+message GetFileResponse {
+  string id = 1;
+  string oss_object_key = 2;
+}
+
+message PutFileRequest {
+  string id = 1;
+  optional string oss_object_key = 2;
+  optional string local_path = 3;
+  optional bytes payload = 4;
+  string path = 5;
+}
+
+message PutFileResponse {
+  string id = 1;
+  string path = 2;
+  bool success = 3;
+}

+ 59 - 0
protos/task_pb2.py

@@ -0,0 +1,59 @@
+# -*- coding: utf-8 -*-
+# Generated by the protocol buffer compiler.  DO NOT EDIT!
+# NO CHECKED-IN PROTOBUF GENCODE
+# source: task.proto
+# Protobuf Python Version: 5.29.0
+"""Generated protocol buffer code."""
+from google.protobuf import descriptor as _descriptor
+from google.protobuf import descriptor_pool as _descriptor_pool
+from google.protobuf import runtime_version as _runtime_version
+from google.protobuf import symbol_database as _symbol_database
+from google.protobuf.internal import builder as _builder
+_runtime_version.ValidateProtobufRuntimeVersion(
+    _runtime_version.Domain.PUBLIC,
+    5,
+    29,
+    0,
+    '',
+    'task.proto'
+)
+# @@protoc_insertion_point(imports)
+
+_sym_db = _symbol_database.Default()
+
+
+from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
+
+
+DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntask.proto\x1a\x1bgoogle/protobuf/empty.proto\"$\n\x16StartContainerResponse\x12\n\n\x02id\x18\x01 \x01(\t\"\"\n\x14StopContainerRequest\x12\n\n\x02id\x18\x01 \x01(\t\"#\n\x15StopContainerResponse\x12\n\n\x02id\x18\x01 \x01(\t\"0\n\x11RunCommandRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07\x63ommand\x18\x02 \x03(\t\":\n\x12RunCommandResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x03msg\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x06\n\x04_msg\"-\n\x11\x46ileExistsRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"N\n\x12\x46ileExistsResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x13\n\x06\x65xists\x18\x03 \x01(\x08H\x00\x88\x01\x01\x42\t\n\x07_exists\"*\n\x0eGetFileRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"5\n\x0fGetFileResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x16\n\x0eoss_object_key\x18\x02 \x01(\t\"\xa4\x01\n\x0ePutFileRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1b\n\x0eoss_object_key\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x17\n\nlocal_path\x18\x03 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07payload\x18\x04 \x01(\x0cH\x02\x88\x01\x01\x12\x0c\n\x04path\x18\x05 \x01(\tB\x11\n\x0f_oss_object_keyB\r\n\x0b_local_pathB\n\n\x08_payload\"<\n\x0fPutFileResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x0f\n\x07success\x18\x03 \x01(\x08\x32\xdb\x02\n\nTaskRunner\x12\x41\n\x0eStartContainer\x12\x16.google.protobuf.Empty\x1a\x17.StartContainerResponse\x12>\n\rStopContainer\x12\x15.StopContainerRequest\x1a\x16.StopContainerResponse\x12\x37\n\nRunCommand\x12\x12.RunCommandRequest\x1a\x13.RunCommandResponse0\x01\x12\x35\n\nFileExists\x12\x12.FileExistsRequest\x1a\x13.FileExistsResponse\x12,\n\x07GetFile\x12\x0f.GetFileRequest\x1a\x10.GetFileResponse\x12,\n\x07PutFile\x12\x0f.PutFileRequest\x1a\x10.PutFileResponseb\x06proto3')
+
+_globals = globals()
+_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
+_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'task_pb2', _globals)
+if not _descriptor._USE_C_DESCRIPTORS:
+  DESCRIPTOR._loaded_options = None
+  _globals['_STARTCONTAINERRESPONSE']._serialized_start=43
+  _globals['_STARTCONTAINERRESPONSE']._serialized_end=79
+  _globals['_STOPCONTAINERREQUEST']._serialized_start=81
+  _globals['_STOPCONTAINERREQUEST']._serialized_end=115
+  _globals['_STOPCONTAINERRESPONSE']._serialized_start=117
+  _globals['_STOPCONTAINERRESPONSE']._serialized_end=152
+  _globals['_RUNCOMMANDREQUEST']._serialized_start=154
+  _globals['_RUNCOMMANDREQUEST']._serialized_end=202
+  _globals['_RUNCOMMANDRESPONSE']._serialized_start=204
+  _globals['_RUNCOMMANDRESPONSE']._serialized_end=262
+  _globals['_FILEEXISTSREQUEST']._serialized_start=264
+  _globals['_FILEEXISTSREQUEST']._serialized_end=309
+  _globals['_FILEEXISTSRESPONSE']._serialized_start=311
+  _globals['_FILEEXISTSRESPONSE']._serialized_end=389
+  _globals['_GETFILEREQUEST']._serialized_start=391
+  _globals['_GETFILEREQUEST']._serialized_end=433
+  _globals['_GETFILERESPONSE']._serialized_start=435
+  _globals['_GETFILERESPONSE']._serialized_end=488
+  _globals['_PUTFILEREQUEST']._serialized_start=491
+  _globals['_PUTFILEREQUEST']._serialized_end=655
+  _globals['_PUTFILERESPONSE']._serialized_start=657
+  _globals['_PUTFILERESPONSE']._serialized_end=717
+  _globals['_TASKRUNNER']._serialized_start=720
+  _globals['_TASKRUNNER']._serialized_end=1067
+# @@protoc_insertion_point(module_scope)

+ 313 - 0
protos/task_pb2_grpc.py

@@ -0,0 +1,313 @@
+# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
+"""Client and server classes corresponding to protobuf-defined services."""
+import grpc
+import warnings
+
+from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
+import protos.task_pb2 as task__pb2
+
+GRPC_GENERATED_VERSION = '1.71.0'
+GRPC_VERSION = grpc.__version__
+_version_not_supported = False
+
+try:
+    from grpc._utilities import first_version_is_lower
+    _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
+except ImportError:
+    _version_not_supported = True
+
+if _version_not_supported:
+    raise RuntimeError(
+        f'The grpc package installed is at version {GRPC_VERSION},'
+        + f' but the generated code in task_pb2_grpc.py depends on'
+        + f' grpcio>={GRPC_GENERATED_VERSION}.'
+        + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
+        + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
+    )
+
+
+class TaskRunnerStub(object):
+    """Missing associated documentation comment in .proto file."""
+
+    def __init__(self, channel):
+        """Constructor.
+
+        Args:
+            channel: A grpc.Channel.
+        """
+        self.StartContainer = channel.unary_unary(
+                '/TaskRunner/StartContainer',
+                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+                response_deserializer=task__pb2.StartContainerResponse.FromString,
+                _registered_method=True)
+        self.StopContainer = channel.unary_unary(
+                '/TaskRunner/StopContainer',
+                request_serializer=task__pb2.StopContainerRequest.SerializeToString,
+                response_deserializer=task__pb2.StopContainerResponse.FromString,
+                _registered_method=True)
+        self.RunCommand = channel.unary_stream(
+                '/TaskRunner/RunCommand',
+                request_serializer=task__pb2.RunCommandRequest.SerializeToString,
+                response_deserializer=task__pb2.RunCommandResponse.FromString,
+                _registered_method=True)
+        self.FileExists = channel.unary_unary(
+                '/TaskRunner/FileExists',
+                request_serializer=task__pb2.FileExistsRequest.SerializeToString,
+                response_deserializer=task__pb2.FileExistsResponse.FromString,
+                _registered_method=True)
+        self.GetFile = channel.unary_unary(
+                '/TaskRunner/GetFile',
+                request_serializer=task__pb2.GetFileRequest.SerializeToString,
+                response_deserializer=task__pb2.GetFileResponse.FromString,
+                _registered_method=True)
+        self.PutFile = channel.unary_unary(
+                '/TaskRunner/PutFile',
+                request_serializer=task__pb2.PutFileRequest.SerializeToString,
+                response_deserializer=task__pb2.PutFileResponse.FromString,
+                _registered_method=True)
+
+
+class TaskRunnerServicer(object):
+    """Missing associated documentation comment in .proto file."""
+
+    def StartContainer(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def StopContainer(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def RunCommand(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def FileExists(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def GetFile(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+    def PutFile(self, request, context):
+        """Missing associated documentation comment in .proto file."""
+        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
+        context.set_details('Method not implemented!')
+        raise NotImplementedError('Method not implemented!')
+
+
+def add_TaskRunnerServicer_to_server(servicer, server):
+    rpc_method_handlers = {
+            'StartContainer': grpc.unary_unary_rpc_method_handler(
+                    servicer.StartContainer,
+                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
+                    response_serializer=task__pb2.StartContainerResponse.SerializeToString,
+            ),
+            'StopContainer': grpc.unary_unary_rpc_method_handler(
+                    servicer.StopContainer,
+                    request_deserializer=task__pb2.StopContainerRequest.FromString,
+                    response_serializer=task__pb2.StopContainerResponse.SerializeToString,
+            ),
+            'RunCommand': grpc.unary_stream_rpc_method_handler(
+                    servicer.RunCommand,
+                    request_deserializer=task__pb2.RunCommandRequest.FromString,
+                    response_serializer=task__pb2.RunCommandResponse.SerializeToString,
+            ),
+            'FileExists': grpc.unary_unary_rpc_method_handler(
+                    servicer.FileExists,
+                    request_deserializer=task__pb2.FileExistsRequest.FromString,
+                    response_serializer=task__pb2.FileExistsResponse.SerializeToString,
+            ),
+            'GetFile': grpc.unary_unary_rpc_method_handler(
+                    servicer.GetFile,
+                    request_deserializer=task__pb2.GetFileRequest.FromString,
+                    response_serializer=task__pb2.GetFileResponse.SerializeToString,
+            ),
+            'PutFile': grpc.unary_unary_rpc_method_handler(
+                    servicer.PutFile,
+                    request_deserializer=task__pb2.PutFileRequest.FromString,
+                    response_serializer=task__pb2.PutFileResponse.SerializeToString,
+            ),
+    }
+    generic_handler = grpc.method_handlers_generic_handler(
+            'TaskRunner', rpc_method_handlers)
+    server.add_generic_rpc_handlers((generic_handler,))
+    server.add_registered_method_handlers('TaskRunner', rpc_method_handlers)
+
+
+ # This class is part of an EXPERIMENTAL API.
+class TaskRunner(object):
+    """Missing associated documentation comment in .proto file."""
+
+    @staticmethod
+    def StartContainer(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(
+            request,
+            target,
+            '/TaskRunner/StartContainer',
+            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
+            task__pb2.StartContainerResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)
+
+    @staticmethod
+    def StopContainer(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(
+            request,
+            target,
+            '/TaskRunner/StopContainer',
+            task__pb2.StopContainerRequest.SerializeToString,
+            task__pb2.StopContainerResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)
+
+    @staticmethod
+    def RunCommand(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_stream(
+            request,
+            target,
+            '/TaskRunner/RunCommand',
+            task__pb2.RunCommandRequest.SerializeToString,
+            task__pb2.RunCommandResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)
+
+    @staticmethod
+    def FileExists(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(
+            request,
+            target,
+            '/TaskRunner/FileExists',
+            task__pb2.FileExistsRequest.SerializeToString,
+            task__pb2.FileExistsResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)
+
+    @staticmethod
+    def GetFile(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(
+            request,
+            target,
+            '/TaskRunner/GetFile',
+            task__pb2.GetFileRequest.SerializeToString,
+            task__pb2.GetFileResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)
+
+    @staticmethod
+    def PutFile(request,
+            target,
+            options=(),
+            channel_credentials=None,
+            call_credentials=None,
+            insecure=False,
+            compression=None,
+            wait_for_ready=None,
+            timeout=None,
+            metadata=None):
+        return grpc.experimental.unary_unary(
+            request,
+            target,
+            '/TaskRunner/PutFile',
+            task__pb2.PutFileRequest.SerializeToString,
+            task__pb2.PutFileResponse.FromString,
+            options,
+            channel_credentials,
+            insecure,
+            call_credentials,
+            compression,
+            wait_for_ready,
+            timeout,
+            metadata,
+            _registered_method=True)

+ 3 - 0
requirements.txt

@@ -1,5 +1,8 @@
 aliyun-log-python-sdk==0.9.12
 google-generativeai==0.8.3
+grpcio==1.71.0
+grpcio-reflection==1.71.0
+grpcio-tools==1.71.0
 loguru==0.7.2
 mutagen==1.47.0
 odps==3.5.1

+ 28 - 3
utils/aliyun_oss.py

@@ -11,7 +11,7 @@ OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
 # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址
 OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
 OSS_BUCKET_NAME = "art-crawler"
-class Oss():
+class Oss:
 
     @classmethod
     def channel_upload_oss(cls, src_url: str,
@@ -42,6 +42,25 @@ class Oss():
                 'oss_object_key': oss_object_key}
         raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
 
+    """
+        视频发送到art-pubbucket
+        """
+
+    @classmethod
+    def stitching_sync_upload_bytes_to_oss(cls, data: str, video_id: str) -> Dict[str, Any]:
+        oss_object_key = f'carry/video/{video_id}'
+        auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
+        bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
+        response = bucket.put_object(oss_object_key, data)
+
+        if 'Content-Length' in response.headers:
+            return {
+                'status': response.status,
+                'oss_object_key': oss_object_key,
+                'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
+            }
+        raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
+
     """
     视频发送到art-pubbucket
     """
@@ -101,8 +120,8 @@ class Oss():
 
 
     @classmethod
-    def download_video_oss(cls, url, file_path):
-        video_path = file_path + 'video.mp4'
+    def download_video_oss(cls, url, file_path, add_suffix: bool = True):
+        video_path = file_path + 'video.mp4' if add_suffix else file_path
         oss_object_key = cls.channel_upload_oss(url, str(uuid.uuid4()))
         time.sleep(2)
         oss_object = oss_object_key.get("oss_object_key")
@@ -129,6 +148,12 @@ class Oss():
         time.sleep(5)
         return video_path
 
+    @classmethod
+    def generate_url(cls, oss_object_key: str) -> str:
+        auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
+        bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, 'art-pubbucket')
+        return bucket.sign_url(method='GET', key=oss_object_key, expires=3600, slash_safe=True)
+
 
 if __name__ == '__main__':
     Oss.download_sph_ls('channel/video/sph/14374775553517295881.jpg','asa','1')

+ 11 - 11
utils/download_video.py

@@ -27,12 +27,12 @@ class DownLoad:
                     if response.status_code == 206:
                         # 以二进制写入模式打开文件
 
-                        with open(f"{video}", "wb") as file:
-                            # 将响应内容写入文件
-                            file.write(response.content)
-                        return video
+                        # with open(f"{video}", "wb") as file:
+                        #     # 将响应内容写入文件
+                        #     file.write(response.content)
+                        return response.content, video
                 except Exception:
-                    return video
+                    return None, video
         else:
             try:
                 for i in range(3):
@@ -42,13 +42,13 @@ class DownLoad:
                     if response.status_code == 200:
                         # 以二进制写入模式打开文件
 
-                        with open(f"{video}", "wb") as file:
-                            # 将响应内容写入文件
-                            file.write(response.content)
-                        return video
-                return video
+                        # with open(f"{video}", "wb") as file:
+                        #     # 将响应内容写入文件
+                        #     file.write(response.content)
+                        return response.content, video
+                return None, video
             except Exception:
-                return video
+                return None, video
 
     @classmethod
     def download_m3u8_video(cls ,url, file_path):

+ 1 - 1
utils/dy_ks_get_url.py

@@ -95,7 +95,7 @@ class Dy_KS:
             headers = {
                 'Content-Type': 'application/json'
             }
-            time.sleep(random.uniform(10, 50))
+            # time.sleep(random.uniform(10, 50))
             response = requests.request("POST", url, headers=headers, data=payload, timeout= 30)
             response = response.json()
             code = response["code"]

+ 415 - 281
utils/ffmpeg.py

@@ -1,309 +1,376 @@
-import asyncio
 import json
-import os
 import time
-from typing import List
 
-import cv2
+import orjson
 import requests
 from loguru import logger
-from mutagen.mp3 import MP3
 
+from protos import task_pb2, task_pb2_grpc
 
 
-class FFmpeg():
+class FFmpeg(object):
+    
+    def __init__(self, container_id: str, stub: task_pb2_grpc.TaskRunnerStub):
+        self.container_id = container_id
+        self.stub = stub
 
-    """
-    时间转换
-    """
-    @classmethod
-    def seconds_to_srt_time(cls, seconds):
+    def seconds_to_srt_time(self, seconds):
+        """
+        时间转换
+        """
         hours = int(seconds // 3600)
         minutes = int((seconds % 3600) // 60)
         seconds = seconds % 60
         milliseconds = int((seconds - int(seconds)) * 1000)
         return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
 
-    """
-    获取单个视频时长
-    """
-    @classmethod
-    def get_video_duration(cls, video_url):
-        cap = cv2.VideoCapture(video_url)
-        if cap.isOpened():
-            rate = cap.get(5)
-            frame_num = cap.get(7)
-            duration = int(frame_num / rate)
-            return duration
-        return 0
-
+    # def get_video_duration(self, video_url):
+    #     """
+    #     获取单个视频时长
+    #     """
+    #     cap = cv2.VideoCapture(video_url)
+    #     if cap.isOpened():
+    #         rate = cap.get(5)
+    #         frame_num = cap.get(7)
+    #         duration = int(frame_num / rate)
+    #         return duration
+    #     return 0
 
     # """
     # 获取视频文件的时长(秒)
     # """
-    # @classmethod
-    # def get_videos_duration(cls, video_file):
-    #     result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
+    # def get_videos_duration(self, video_file):
+    #     result = self.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
     #          "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
     #     return float(result)
 
-    """
-    获取视频宽高
-    """
-    @classmethod
-    def get_w_h_size(cls, new_video_path):
+    def get_w_h_size(self, new_video_path):
+        """
+        获取视频宽高
+        """
         try:
             # 获取视频的原始宽高信息
-            ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
-            output_decoded = ffprobe_cmd.strip()
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path])
+            # ffprobe_cmd = self.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
+            # output_decoded = ffprobe_cmd.strip()
+            output_decoded = ''
+            for response in self.stub.RunCommand(request=request):
+                output_decoded += response.msg
             split_output = [value for value in output_decoded.split(',') if value.strip()]
             height, width = map(int, split_output)
             return width, height
         except ValueError as e:
             return 1920, 1080
 
-
-    """
-    视频裁剪
-    """
-    @classmethod
-    def video_crop(cls, video_path, file_path):
+    def video_crop(self, video_path, file_path):
+        """
+        视频裁剪
+        """
         crop_url = file_path + 'crop.mp4'
         try:
-            # 获取视频的原始宽高信息
-            ffprobe_cmd = cls.asyncio_run_subprocess(
-                ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
-                 "csv=p=0", video_path], timeout=10)
-            width, height = map(int, ffprobe_cmd.strip().split(','))
-            # 计算裁剪后的高度
-            new_height = int(height * 0.8)
+            # # 获取视频的原始宽高信息
+            # ffprobe_cmd = cls.asyncio_run_subprocess(
+            #     ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
+            #      "csv=p=0", video_path], timeout=10)
+            # width, height = map(int, ffprobe_cmd.strip().split(','))
+            # # 计算裁剪后的高度
+            # new_height = int(height * 0.8)
 
             # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
-            cls.asyncio_run_subprocess(
-                [
-                "ffmpeg",
-                "-i", video_path,
-                "-vf", f"crop={width}:{new_height}",
-                "-c:v", "libx264",
-                "-c:a", "aac",
-                "-y",
-                crop_url
-                ],timeout=240)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-vf", f"\"crop=in_w:in_h*0.8\"", "-c:v", "h264_nvenc", "-c:a", "aac", "-y", "-cq", "28", "-preset", "slow", crop_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess(
+            #     [
+            #     "ffmpeg",
+            #     "-i", video_path,
+            #     "-vf", f"crop={width}:{new_height}",
+            #     "-c:v", "libx264",
+            #     "-c:a", "aac",
+            #     "-y",
+            #     crop_url
+            #     ],timeout=240)
             return crop_url
         except Exception as e:
             return crop_url
 
-    """
-    视频截断
-    """
-    @classmethod
-    def video_ggduration(cls, video_path, file_path, gg_duration_total):
+    def video_ggduration(self, video_path, file_path, gg_duration_total):
+        """
+        视频截断
+        """
         gg_duration_url = file_path + 'gg_duration.mp4'
         # 获取视频时长
         try:
-            total_duration = cls.get_video_duration(video_path)
+            total_duration = self.get_duration(video_path)
             if total_duration == 0:
                 return gg_duration_url
             duration = int(total_duration) - int(gg_duration_total)
             if int(total_duration) < int(gg_duration_total):
                 return gg_duration_url
-            cls.asyncio_run_subprocess([
-                "ffmpeg",
-                "-i", video_path,
-                "-c:v", "libx264",
-                "-c:a", "aac",
-                "-t", str(duration),
-                "-y",
-                gg_duration_url
-            ], timeout= 360)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-c:v", "h264_nvenc", "-c:a", "aac", "-t", str(duration), "-y", "-cq", "28", "-preset", "slow", gg_duration_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #     "ffmpeg",
+            #     "-i", video_path,
+            #     "-c:v", "libx264",
+            #     "-c:a", "aac",
+            #     "-t", str(duration),
+            #     "-y",
+            #     gg_duration_url
+            # ], timeout= 360)
             return gg_duration_url
         except Exception as e:
             return gg_duration_url
 
-    """
-     截取原视频最后一帧
-    """
-    @classmethod
-    def video_png(cls, video_path, file_path):
+    def video_png(self, video_path, file_path):
+        """
+         截取原视频最后一帧
+        """
         # 获取视频的原始宽高信息
         jpg_url = file_path + 'png.jpg'
         try:
-            cls.asyncio_run_subprocess(
-                ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url], timeout=120)
+            # self.asyncio_run_subprocess(
+            #     ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url], timeout=120)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
             return jpg_url
         except Exception as e:
             return jpg_url
 
-    """
-    获取视频音频
-    """
-    @classmethod
-    def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
+    def get_video_mp3(self, video_file, video_path_url, pw_random_id):
+        """
+        获取视频音频
+        """
         pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
         try:
-            cls.asyncio_run_subprocess([
-                'ffmpeg',
-                '-i', video_file,
-                '-q:a', '0',
-                '-map', 'a',
-                pw_mp3_path
-            ], timeout=120)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_file, '-q:a', '0', '-map', 'a', pw_mp3_path])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #     'ffmpeg',
+            #     '-i', video_file,
+            #     '-q:a', '0',
+            #     '-map', 'a',
+            #     pw_mp3_path
+            # ], timeout=120)
             time.sleep(1)
             return pw_mp3_path
         except Exception as e:
             return pw_mp3_path
 
-    @classmethod
-    def get_pw_video_mp3(cls, video_file, video_path_url):
+    def get_pw_video_mp3(self, video_file, video_path_url):
         bgm_pw_path_mp3 = video_file + 'bgm_pw.mp3'
         try:
-            cls.asyncio_run_subprocess([
-                'ffmpeg',
-                '-i', video_path_url,      # 输入的视频文件路径
-                '-q:a', '0',           # 设置音频质量为最佳
-                '-map', 'a',
-                '-y',
-                bgm_pw_path_mp3
-            ], timeout=120)
-            time.sleep(1)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_path_url, '-q:a', '0', '-map', 'a', bgm_pw_path_mp3])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #     'ffmpeg',
+            #     '-i', video_path_url,      # 输入的视频文件路径
+            #     '-q:a', '0',           # 设置音频质量为最佳
+            #     '-map', 'a',
+            #     '-y',
+            #     bgm_pw_path_mp3
+            # ], timeout=120)
+            # time.sleep(1)
             return bgm_pw_path_mp3
         except Exception as e:
             return bgm_pw_path_mp3
 
-
-    """
-    片尾增加bgm
-    """
-    @classmethod
-    def video_add_bgm(cls, video_file, bgm_path, video_path_url):
+    def video_add_bgm(self, video_file, bgm_path, video_path_url):
+        """
+        片尾增加bgm
+        """
         bgm_pw_path = video_path_url + 'bgm_pw.mp4'
-        pw_duration = cls.get_video_duration(video_file)
+        pw_duration = self.get_duration(video_file)
 
         try:
             pw_path_txt = video_path_url + 'bgm_pw_video.txt'
-            with open(pw_path_txt, 'w') as f:
-                f.write(f"file '{video_file}'\n")
-            cls.asyncio_run_subprocess([
-                "ffmpeg",
-                "-f", "concat",
-                "-safe", "0",
-                "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-                "-i", bgm_path,  # 音频文件
-                "-c:v", "libx264",  # 视频编码格式
-                "-t", str(pw_duration),  # 输出视频的持续时间
-                '-c:a', 'aac',           # 音频编码格式
-                '-b:v', '260k',          # 视频比特率
-                '-b:a', '96k',           # 音频比特率
-                '-threads', '2',         # 线程数
-                # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
-                "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]",  # 混合音频流
-                "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
-                "-map", "[aout]",  # 映射混合后的音频流
-                '-y',                    # 强制覆盖输出文件
-                bgm_pw_path              # 输出文件路径
-            ], timeout=500)
-            time.sleep(1)
+            # with open(pw_path_txt, 'w') as f:
+            #     f.write(f"file '{video_file}'\n")
+            payload = f"file '{video_file}'\n".encode()
+            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
+            self.stub.PutFile(request=request)
+            request = task_pb2.RunCommandRequest(id=self.container_id,
+                                                 command=[
+                                                     "ffmpeg",
+                                                     "-f", "concat",
+                                                     "-safe", "0",
+                                                     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+                                                     "-i", bgm_path,  # 音频文件
+                                                     "-c:v", "h264_nvenc",  # 视频编码格式
+                                                     '-c:a', 'aac',           # 音频编码格式
+                                                     "-t", str(pw_duration),  # 输出视频的持续时间
+                                                     '-b:v', '260k',          # 视频比特率
+                                                     '-b:a', '96k',           # 音频比特率
+                                                     '-threads', '2',         # 线程数
+                                                     # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
+                                                     "-filter_complex", "\"[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]\"",  # 混合音频流
+                                                     "-map", "\"0:v:0\"",  # 映射视频流来自第一个输入文件(视频)
+                                                     "-map", "\"[aout]\"",  # 映射混合后的音频流
+                                                     '-y',                    # 强制覆盖输出文件
+                                                     "-cq", "28",
+                                                     "-preset", "slow",
+                                                     bgm_pw_path              # 输出文件路径
+                                                 ])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #     "ffmpeg",
+            #     "-f", "concat",
+            #     "-safe", "0",
+            #     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+            #     "-i", bgm_path,  # 音频文件
+            #     "-c:v", "libx264",  # 视频编码格式
+            #     "-t", str(pw_duration),  # 输出视频的持续时间
+            #     '-c:a', 'aac',           # 音频编码格式
+            #     '-b:v', '260k',          # 视频比特率
+            #     '-b:a', '96k',           # 音频比特率
+            #     '-threads', '2',         # 线程数
+            #     # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
+            #     "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]",  # 混合音频流
+            #     "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
+            #     "-map", "[aout]",  # 映射混合后的音频流
+            #     '-y',                    # 强制覆盖输出文件
+            #     bgm_pw_path              # 输出文件路径
+            # ], timeout=500)
+            # time.sleep(1)
             return bgm_pw_path
         except Exception as e:
             return bgm_pw_path
 
-    """横屏视频改为竖屏"""
-    @classmethod
-    def update_video_h_w(cls, video_path, file_path):
+    def update_video_h_w(self, video_path, file_path):
+        """横屏视频改为竖屏"""
         video_h_w_path = file_path +'video_h_w_video.mp4'
         try:
-            cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2\"" ,video_h_w_path])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
             return video_h_w_path
         except Exception as e:
             return video_h_w_path
 
-    """视频转为640像素"""
-    @classmethod
-    def video_640(cls, video_path, file_path):
+    def video_640(self, video_path, file_path):
+        """视频转为640像素"""
         video_url = file_path + 'pixelvideo.mp4'
         try:
-            cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=360:640\"" ,video_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
             return video_url
         except Exception as e:
             return video_url
 
-    @classmethod
-    def concatenate_videos(cls, videos_paths, file_path):
+    def concatenate_videos(self, videos_paths, file_path):
         video_url = file_path + 'rg_pw.mp4'
         list_filename = file_path + 'rg_pw.txt'
-        with open(list_filename, "w") as f:
-            for video_path in videos_paths:
-                f.write(f"file '{video_path}'\n")
+        # with open(list_filename, "w") as f:
+        #     for video_path in videos_paths:
+        #         f.write(f"file '{video_path}'\n")
+        payload = b''
+        for video_path in videos_paths:
+            payload += f"file '{video_path}'\n".encode()
+        request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=list_filename)
+        self.stub.PutFile(request=request)
         try:
-            cls.asyncio_run_subprocess(
-                ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess(
+            #     ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
             logger.info(f"[+] 视频转为640像素成功")
             return video_url
         except Exception as e:
             return video_url
 
-    """视频拼接到一起"""
-    @classmethod
-    def h_b_video(cls, video_path, pw_path, file_path):
+    def h_b_video(self, video_path, pw_path, file_path):
+        """视频拼接到一起"""
         video_url = file_path + 'hbvideo.mp4'
         try:
-            cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
+            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"\"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]\"" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
             return video_url
         except Exception as e:
             return video_url
 
-    """横屏视频顶部增加字幕"""
-    @classmethod
-    def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
+    def add_video_zm(self, new_video_path, video_path_url, pw_random_id, new_text):
+        """横屏视频顶部增加字幕"""
         single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
         single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
         single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
         try:
-            duration = cls.get_video_duration(new_video_path)
+            duration = self.get_duration(new_video_path)
             if duration == 0:
                 return new_video_path
-            start_time = cls.seconds_to_srt_time(0)
-            end_time = cls.seconds_to_srt_time(duration)
+            start_time = self.seconds_to_srt_time(0)
+            end_time = self.seconds_to_srt_time(duration)
             # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
-            with open(single_video_txt, 'w') as f:
-                f.write(f"file '{new_video_path}'\n")
-            with open(single_video_srt, 'w') as f:
-                f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
+            payload = f"file '{new_video_path}'\n".encode()
+            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
+            self.stub.PutFile(request)
+            # with open(single_video_txt, 'w') as f:
+            #     f.write(f"file '{new_video_path}'\n")
+            payload = f"1\n{start_time} --> {end_time}\n{new_text}\n\n".encode()
+            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
+            self.stub.PutFile(request)
+            # with open(single_video_srt, 'w') as f:
+            #     f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
             subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
-            draw = f"{subtitle_cmd}"
-            cls.asyncio_run_subprocess([
-                "ffmpeg",
-                "-f", "concat",
-                "-safe", "0",
-                "-i", single_video_txt,
-                "-c:v", "libx264",
-                "-c:a", "aac",
-                "-vf", draw,
-                "-y",
-                single_video
-            ],timeout=500)
+            request = task_pb2.RunCommandRequest(id=self.container_id,
+                                                 command=[
+                                                     "ffmpeg",
+                                                     "-f", "concat",
+                                                     "-safe", "0",
+                                                     "-i", single_video_txt,
+                                                     "-c:v", "h264_nvenc",
+                                                     "-c:a", "aac",
+                                                     "-vf", f"\"{subtitle_cmd}\"",
+                                                     "-y",
+                                                     "-cq", "28",
+                                                     "-preset", "slow",
+                                                     single_video
+                                                 ])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #     "ffmpeg",
+            #     "-f", "concat",
+            #     "-safe", "0",
+            #     "-i", single_video_txt,
+            #     "-c:v", "libx264",
+            #     "-c:a", "aac",
+            #     "-vf", draw,
+            #     "-y",
+            #     single_video
+            # ],timeout=500)
             # subprocess.run(ffmpeg_cmd)
             return single_video
         except Exception as e:
             return single_video
 
-    """获取mp3时长"""
-    @classmethod
-    def get_mp3_duration(cls, file_path):
-        audio = MP3(file_path)
-        duration = audio.info.length
-        if duration:
-            return int(duration)
-        return 0
-
-
-    """
-     生成片尾视频
-    """
-    @classmethod
-    def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt):
-        # 添加音频到图片
+    def get_duration(self, file_path):
+        """获取mp3时长"""
+        # audio = MP3(file_path)
+        # duration = audio.info.length
+        # if duration:
+        #     return int(duration)
+        request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffprobe', '-show_format', '-show_streams', '-of', 'json', '-v', 'quiet', '-hide_banner', file_path])
+        msg = ''
+        for response in self.stub.RunCommand(request):
+            msg += response.msg
+        obj = orjson.loads(msg)
+        return int(float(obj['format']['duration']))
+
+    def pw_video(self, jpg_path, file_path, pw_mp3_path, pw_srt):
         """
+        生成片尾视频
+
         jpg_url 图片地址
         pw_video 提供的片尾视频
         pw_duration  提供的片尾视频时长
@@ -312,12 +379,16 @@ class FFmpeg():
         pw_url 生成视频地址
         :return:
         """
+        # 添加音频到图片
         pw_srt_path = file_path +'pw_video.srt'
-        with open(pw_srt_path, 'w') as f:
-            f.write(pw_srt)
+        payload = pw_srt.encode()
+        request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_srt_path)
+        self.stub.PutFile(request=request)
+        # with open(pw_srt_path, 'w') as f:
+        #     f.write(pw_srt)
         pw_url_path = file_path + 'pw_video.mp4'
         try:
-            pw_duration = cls.get_mp3_duration(pw_mp3_path)
+            pw_duration = self.get_duration(pw_mp3_path)
             if pw_duration == 0:
                 return pw_url_path
             time.sleep(2)
@@ -329,117 +400,182 @@ class FFmpeg():
             background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
             if "mp4" in jpg_path:
                 pw_path_txt = file_path + 'pw_path_video.txt'
-                with open(pw_path_txt, 'w') as f:
-                    f.write(f"file '{jpg_path}'\n")
-                cls.asyncio_run_subprocess([
-                    "ffmpeg",
-                    "-f", "concat",
-                    "-safe", "0",
-                    "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-                    "-i", pw_mp3_path,  # 音频文件
-                    "-c:v", "libx264",  # 视频编码格式
-                    "-t", str(pw_duration),  # 输出视频的持续时间
-                    "-c:a", "aac",  # 音频编码格式
-                    "-b:v", "260k",  # 视频比特率
-                    "-b:a", "96k",  # 音频比特率
-                    "-threads", "2",  # 线程数
-                    "-vf", f"{background_cmd},{subtitle_cmd}",  # 视频过滤器(背景和字幕)
-                    "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
-                    "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
-                    "-y",  # 强制覆盖输出文件
-                    pw_url_path  # 输出文件路径
-                ], timeout=500)
+                payload = f"file '{jpg_path}'\n".encode()
+                request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
+                self.stub.PutFile(request=request)
+                # with open(pw_path_txt, 'w') as f:
+                #     f.write(f"file '{jpg_path}'\n")
+                request = task_pb2.RunCommandRequest(id=self.container_id,
+                                                     command=[
+                                                         "ffmpeg",
+                                                         "-f", "concat",
+                                                         "-safe", "0",
+                                                         "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+                                                         "-i", pw_mp3_path,  # 音频文件
+                                                         "-c:v", "h264_nvenc",  # 视频编码格式
+                                                         "-t", str(pw_duration),  # 输出视频的持续时间
+                                                         "-c:a", "aac",  # 音频编码格式
+                                                         "-b:v", "260k",  # 视频比特率
+                                                         "-b:a", "96k",  # 音频比特率
+                                                         "-threads", "2",  # 线程数
+                                                         "-vf", f"\"{background_cmd},{subtitle_cmd}\"",  # 视频过滤器(背景和字幕)
+                                                         "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
+                                                         "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
+                                                         "-y",  # 强制覆盖输出文件
+                                                         "-cq", "28",
+                                                         "-preset", "slow",
+                                                         pw_url_path  # 输出文件路径
+                                                     ])
+                for _ in self.stub.RunCommand(request=request):
+                    pass
+                # self.asyncio_run_subprocess([
+                #     "ffmpeg",
+                #     "-f", "concat",
+                #     "-safe", "0",
+                #     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+                #     "-i", pw_mp3_path,  # 音频文件
+                #     "-c:v", "libx264",  # 视频编码格式
+                #     "-t", str(pw_duration),  # 输出视频的持续时间
+                #     "-c:a", "aac",  # 音频编码格式
+                #     "-b:v", "260k",  # 视频比特率
+                #     "-b:a", "96k",  # 音频比特率
+                #     "-threads", "2",  # 线程数
+                #     "-vf", f"{background_cmd},{subtitle_cmd}",  # 视频过滤器(背景和字幕)
+                #     "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
+                #     "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
+                #     "-y",  # 强制覆盖输出文件
+                #     pw_url_path  # 输出文件路径
+                # ], timeout=500)
             else:
-                cls.asyncio_run_subprocess([
-                    'ffmpeg',
-                    '-loop', '1',
-                    '-i', jpg_path,  # 输入的图片文件
-                    '-i', pw_mp3_path,  # 输入的音频文件
-                    '-c:v', 'libx264',  # 视频编码格式
-                    '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
-                    '-pix_fmt', 'yuv420p',  # 像素格式
-                    '-c:a', 'aac',  # 音频编码格式
-                    '-strict', 'experimental',  # 使用实验性编码器
-                    '-shortest',  # 确保输出视频的长度与音频一致
-                    '-vf', f"{background_cmd},{subtitle_cmd}",  # 视频过滤器,设置分辨率和其他过滤器
-                    pw_url_path  # 输出的视频文件路径
-                ], timeout=500)
-            if os.path.exists(pw_srt_path):
-                os.remove(pw_srt_path)
+                request = task_pb2.RunCommandRequest(id=self.container_id,
+                                                     command=[
+                                                         'ffmpeg',
+                                                         '-loop', '1',
+                                                         '-i', jpg_path,  # 输入的图片文件
+                                                         '-i', pw_mp3_path,  # 输入的音频文件
+                                                         '-c:v', 'h264_nvenc',  # 视频编码格式
+                                                         '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
+                                                         '-pix_fmt', 'yuv420p',  # 像素格式
+                                                         '-c:a', 'aac',  # 音频编码格式
+                                                         '-strict', 'experimental',  # 使用实验性编码器
+                                                         '-shortest',  # 确保输出视频的长度与音频一致
+                                                         '-vf', f"\"{background_cmd},{subtitle_cmd}\"",  # 视频过滤器,设置分辨率和其他过滤器
+                                                         "-cq", "28",
+                                                         "-preset", "slow",
+                                                         pw_url_path  # 输出的视频文件路径
+                                                     ])
+                for _ in self.stub.RunCommand(request=request):
+                    pass
+                # self.asyncio_run_subprocess([
+                #     'ffmpeg',
+                #     '-loop', '1',
+                #     '-i', jpg_path,  # 输入的图片文件
+                #     '-i', pw_mp3_path,  # 输入的音频文件
+                #     '-c:v', 'libx264',  # 视频编码格式
+                #     '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
+                #     '-pix_fmt', 'yuv420p',  # 像素格式
+                #     '-c:a', 'aac',  # 音频编码格式
+                #     '-strict', 'experimental',  # 使用实验性编码器
+                #     '-shortest',  # 确保输出视频的长度与音频一致
+                #     '-vf', f"{background_cmd},{subtitle_cmd}",  # 视频过滤器,设置分辨率和其他过滤器
+                #     pw_url_path  # 输出的视频文件路径
+                # ], timeout=500)
+            # if os.path.exists(pw_srt_path):
+            #     os.remove(pw_srt_path)
             return pw_url_path
         except Exception as e:
             return pw_url_path
 
-
-    """
-    单个视频拼接
-    """
-    @classmethod
-    def single_video(cls, video_path, file_path, zm):
+    def single_video(self, video_path, file_path, zm):
+        """
+        单个视频拼接
+        """
         single_video_url = file_path + 'single_video.mp4'
         single_video_srt = file_path + 'single_video.srt'
         # 获取时长
         try:
-            duration = cls.get_video_duration(video_path)
+            duration = self.get_duration(video_path)
             if duration == 0:
                 return single_video_url
-            start_time = cls.seconds_to_srt_time(2)
-            end_time = cls.seconds_to_srt_time(duration)
+            start_time = self.seconds_to_srt_time(2)
+            end_time = self.seconds_to_srt_time(duration)
             single_video_txt = file_path + 'single_video.txt'
-            with open(single_video_txt, 'w') as f:
-                f.write(f"file '{video_path}'\n")
+            # with open(single_video_txt, 'w') as f:
+            #     f.write(f"file '{video_path}'\n")
+            payload = f"file '{video_path}'\n".encode()
+            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
+            self.stub.PutFile(request=request)
             if zm:
-                with open(single_video_srt, 'w') as f:
-                    f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
+                payload = f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n".encode()
+                request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
+                self.stub.PutFile(request=request)
+                # with open(single_video_srt, 'w') as f:
+                #     f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
                 subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
             else:
                 subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
             # 多线程数
             num_threads = 5
             # 构建 FFmpeg 命令,生成视频
-            cls.asyncio_run_subprocess([
-                    "ffmpeg",
-                    "-f", "concat",
-                    "-safe", "0",
-                    "-i",  f"{single_video_txt}",
-                    "-c:v", "libx264",
-                    "-c:a", "aac",
-                    '-b:v', '260k',
-                    "-b:a", "96k",
-                    "-threads", str(num_threads),
-                    "-vf", subtitle_cmd,
-                    "-y",
-                     single_video_url
-            ], timeout=400)
-            if os.path.exists(single_video_srt):
-                os.remove(single_video_srt)
+            request = task_pb2.RunCommandRequest(id=self.container_id,
+                                                 command=[
+                                                     "ffmpeg",
+                                                     "-f", "concat",
+                                                     "-safe", "0",
+                                                     "-i",  f"{single_video_txt}",
+                                                     "-c:v", "h264_nvenc",
+                                                     "-c:a", "aac",
+                                                     '-b:v', '260k',
+                                                     "-b:a", "96k",
+                                                     "-threads", str(num_threads),
+                                                     "-vf", f"\"{subtitle_cmd}\"",
+                                                     "-y",
+                                                     "-cq", "28",
+                                                     "-preset", "slow",
+                                                      single_video_url
+                                                 ])
+            for _ in self.stub.RunCommand(request=request):
+                pass
+            # self.asyncio_run_subprocess([
+            #         "ffmpeg",
+            #         "-f", "concat",
+            #         "-safe", "0",
+            #         "-i",  f"{single_video_txt}",
+            #         "-c:v", "libx264",
+            #         "-c:a", "aac",
+            #         '-b:v', '260k',
+            #         "-b:a", "96k",
+            #         "-threads", str(num_threads),
+            #         "-vf", subtitle_cmd,
+            #         "-y",
+            #          single_video_url
+            # ], timeout=400)
+            # if os.path.exists(single_video_srt):
+            #     os.remove(single_video_srt)
             return single_video_url
         except Exception as e:
             return single_video_url
 
-    @classmethod
-    def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
-        async def run_subprocess():
-            process = await asyncio.create_subprocess_exec(
-                params[0],
-                *params[1:],
-                stdout=asyncio.subprocess.PIPE,
-                stderr=asyncio.subprocess.PIPE,
-            )
-            try:
-                out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
-                if process.returncode != 0:
-                    raise IOError(err)
-                return out.decode()
-            except asyncio.TimeoutError:
-                process.kill()
-                out, err = await process.communicate()
-                raise IOError(err)
-        return asyncio.run(run_subprocess())
-
-
-    @classmethod
-    def get_http_duration(cls, videos_path):
+    # def asyncio_run_subprocess(self, params: List[str], timeout: int = 30) -> str:
+    #     async def run_subprocess():
+    #         process = await asyncio.create_subprocess_exec(
+    #             params[0],
+    #             *params[1:],
+    #             stdout=asyncio.subprocess.PIPE,
+    #             stderr=asyncio.subprocess.PIPE,
+    #         )
+    #         try:
+    #             out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
+    #             if process.returncode != 0:
+    #                 raise IOError(err)
+    #             return out.decode()
+    #         except asyncio.TimeoutError:
+    #             process.kill()
+    #             out, err = await process.communicate()
+    #             raise IOError(err)
+    #     return asyncio.run(run_subprocess())
+
+    def get_http_duration(self, videos_path):
         total_duration = 0
         for video_path in videos_path:
             url = "http://101.37.24.17:5555/api/v1/ffmpeg/get_meta"
@@ -546,5 +682,3 @@ if __name__ == '__main__':
 00:00:32,659 --> 00:00:32,860
 吧"""
     FFmpeg.pw_video(jpg_path, file_path, mp3_path, pw_srt)
-
-

+ 34 - 0
utils/general.py

@@ -0,0 +1,34 @@
+import os
+import re
+from pathlib import Path
+
+
+def is_prod() -> bool:
+    """判断当前环境是否为生产环境"""
+    return os.getenv('ENV', 'test') == 'prod'
+
+
+def get_root_dir() -> str:
+    """获取项目根目录的绝对路径"""
+    current_path = Path(__file__).resolve()
+    root_path = current_path.parent if not current_path.is_dir() else current_path
+    while True:
+        if 'requirements.txt' in os.listdir(root_path):
+            return str(root_path.absolute())
+        root_path = root_path.parent
+
+
+def get_abs_path(relative_path: str) -> str:
+    """获取目标文件或目录的相对路径在系统中的绝对路径"""
+    return os.path.join(get_root_dir(), relative_path)
+
+
+def pascal_to_snake(pascal_str: str) -> str:
+    """将Pascal字符串转为蛇形字符串"""
+    snake_str = re.sub(r'([a-z])([A-Z])', r'\1_\2', pascal_str)
+    return snake_str.lower()
+
+
+def snake_to_pascal(snake_str: str) -> str:
+    """将蛇形字符串转为Pascal字符串"""
+    return ''.join([item.title() for item in snake_str.split('_')])

+ 1 - 1
utils/google_ai_studio.py

@@ -10,7 +10,7 @@ from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
 from loguru import logger
 
 
-CACHE_DIR = '/app/cache/'
+CACHE_DIR = '/tmp/'
 # CACHE_DIR = '/Users/z/Downloads/'
 # PROXY_ADDR = 'http://localhost:1081'
 # os.environ['http_proxy'] = PROXY_ADDR

+ 5 - 5
utils/tts_help.py

@@ -55,11 +55,11 @@ class TTS:
             response = requests.request("GET", pw_url, headers=headers, data=payload, timeout= 30)
             if response.status_code == 200:
                 # 以二进制写入模式打开文件
-                with open(f"{pw_mp3_path}", "wb") as file:
-                    # 将响应内容写入文件
-                    file.write(response.content)
-                return pw_mp3_path
-        return None
+                # with open(f"{pw_mp3_path}", "wb") as file:
+                #     # 将响应内容写入文件
+                #     file.write(response.content)
+                return response.content, pw_mp3_path
+        return None, pw_mp3_path
 
     @classmethod
     def get_srt_format(cls, pw_srt_text, pw_url_sec):

+ 68 - 33
workers/consumption_work.py

@@ -6,11 +6,17 @@ import sys
 import time
 import uuid
 from datetime import datetime
+from typing import Optional
+
+import grpc
 import orjson
 from apscheduler.schedulers.blocking import BlockingScheduler
 from apscheduler.triggers.interval import IntervalTrigger
+from google.protobuf.empty_pb2 import Empty
 from loguru import logger
+
 sys.path.append('/app')
+from protos import task_pb2, task_pb2_grpc
 from utils.redis import RedisHelper
 from utils.aliyun_log import AliyunLogger
 from utils.aliyun_oss import Oss
@@ -19,6 +25,7 @@ from utils.dy_ks_get_url import Dy_KS
 from utils.feishu_form import Material
 from utils.feishu_utils import Feishu
 from utils.ffmpeg import FFmpeg
+from utils.general import get_abs_path
 from utils.gpt4o_mini_help import GPT4oMini
 from utils.piaoquan import PQ
 from utils.sql_help import sqlCollect
@@ -26,13 +33,11 @@ from utils.tag_video import Tag
 from utils.tts_help import TTS
 from utils.google_ai_studio import GoogleAI
 
+channel = grpc.insecure_channel(os.getenv('FFMPEG_HOST'))
+stub = task_pb2_grpc.TaskRunnerStub(channel=channel)
+CACHE_DIR = '/tmp/'
 
 
-
-
-
-CACHE_DIR = '/app/cache/'
-# CACHE_DIR = '/Users/z/Downloads/'
 class ConsumptionRecommend(object):
     @classmethod
     def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark):
@@ -127,7 +132,9 @@ class ConsumptionRecommend(object):
         return
 
     @classmethod
-    def data_handle(cls, data, file_path, redis_name,studio_key):
+    def data_handle(cls, data, file_path, redis_name,studio_key, container_id):
+        ffmpeg = FFmpeg(container_id=container_id, stub=stub)
+        
         url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具")
         if url == "重新处理" or not url:
             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -157,8 +164,11 @@ class ConsumptionRecommend(object):
         AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "符合规则等待改造",
                              "2004", str(data))
         logger.info(f"[处理] {url}开始下载视频")
-        video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
-        if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
+        video_content, video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
+        oss_info = Oss.stitching_sync_upload_bytes_to_oss(video_content, video_id)
+        request = task_pb2.PutFileRequest(id=container_id, oss_object_key=oss_info['oss_object_key'], path=video_path)
+        response = stub.PutFile(request=request)
+        if len(video_content) == 0 or not response.success:
             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
             logger.error(f"[处理] {url}下载失败")
             AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
@@ -205,12 +215,14 @@ class ConsumptionRecommend(object):
                           "搬运工具")
         if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
             try:
-                width, height = FFmpeg.get_w_h_size(video_path)
+                width, height = ffmpeg.get_w_h_size(video_path)
                 if width < height:  # 判断是否需要修改为竖屏
-                    video_path = FFmpeg.update_video_h_w(video_path, file_path)
+                    video_path = ffmpeg.update_video_h_w(video_path, file_path)
                 logger.info(f"[处理] 视频更改分辨率处理")
-                video_path = FFmpeg.video_640(video_path, file_path)
-                if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
+                video_path = ffmpeg.video_640(video_path, file_path)
+                request = task_pb2.FileExistsRequest(id=container_id, path=video_path)
+                response = stub.FileExists(request=request)
+                if not response.exists:
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 视频更改分辨率失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
@@ -226,11 +238,15 @@ class ConsumptionRecommend(object):
                     return
                 logger.info(f"[处理] 视频更改分辨率处理成功")
                 if data["video_clipping"]:  # 判断是否需要裁剪
-                    video_path = FFmpeg.video_crop(video_path, file_path)
+                    video_path = ffmpeg.video_crop(video_path, file_path)
                 if data["video_clipping_time"]:  # 判断是否需要指定视频时长
-                    video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
+                    video_path = ffmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
                 if data['trailer_share'] == "内容分析":
+                    request = task_pb2.GetFileRequest(id=container_id, path=video_path)
+                    response = stub.GetFile(request)
+                    Oss.download_video_oss(Oss.generate_url(response.oss_object_key), video_path, add_suffix=False)
                     video_text = GoogleAI.run(studio_key, video_path)
+                    os.remove(video_path)
                     if not video_text:
                         logger.error(f"[处理] 视频内容分析获取内容信息失败")
                         data["transform_rule"] = "仅改造"
@@ -293,7 +309,7 @@ class ConsumptionRecommend(object):
                                       "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                       "【 搬运&改造效率工具失败通知 】")
                     return
-                pw_mp3_path = TTS.download_mp3(pw_url, file_path)
+                pw_mp3_content, pw_mp3_path = TTS.download_mp3(pw_url, file_path)
                 if not pw_mp3_path:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -309,6 +325,8 @@ class ConsumptionRecommend(object):
                                       "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                       "【 搬运&改造效率工具失败通知 】")
                     return
+                request = task_pb2.PutFileRequest(id=container_id, payload=pw_mp3_content, path=pw_mp3_path)
+                stub.PutFile(request=request)
                 logger.info(f"[处理] 数据片尾音频下载成功")
 
                 if str(data['trailer_share_video']) and str(data['trailer_share_video'] ) != "None":
@@ -330,10 +348,10 @@ class ConsumptionRecommend(object):
                                           "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                           "【 搬运&改造效率工具失败通知 】")
                         return
-                    pw_url_duration = FFmpeg.get_http_duration([pw_url])
-                    pw_videos_duration = FFmpeg.get_http_duration(rg_pw_url_list)
+                    pw_url_duration = ffmpeg.get_http_duration([pw_url])
+                    pw_videos_duration = ffmpeg.get_http_duration(rg_pw_url_list)
                     if pw_videos_duration < pw_url_duration:
-                        jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
+                        jpg_path = ffmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
                         if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
                             data["transform_rule"] = "仅改造"
                             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -352,7 +370,7 @@ class ConsumptionRecommend(object):
                         logger.info(f"[处理] 数据片尾获取最后一帧成功")
                     else:
                         rg_pw_url = DownLoad.download_pq_video(file_path, rg_pw_url_list)
-                        rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path)
+                        rg_pw_list = ffmpeg.concatenate_videos(rg_pw_url, file_path)
                         if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0:
                             data["transform_rule"] = "仅改造"
                             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -368,12 +386,14 @@ class ConsumptionRecommend(object):
                                               "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                               "【 搬运&改造效率工具失败通知 】")
                             return
-                        jpg_path = FFmpeg.video_640(rg_pw_list, file_path)
+                        jpg_path = ffmpeg.video_640(rg_pw_list, file_path)
                         logger.info(f"[处理] 生成人工片尾成功")
 
                 else:
-                    jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
-                    if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
+                    jpg_path = ffmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
+                    request = task_pb2.FileExistsRequest(id=container_id, path=jpg_path)
+                    response = stub.FileExists(request=request)
+                    if not response.exists:
                         data["transform_rule"] = "仅改造"
                         RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                         logger.error(f"[处理] 数据片尾获取最后一帧失败")
@@ -389,8 +409,10 @@ class ConsumptionRecommend(object):
                                           "【 搬运&改造效率工具失败通知 】")
                         return
                     logger.info(f"[处理] 数据片尾获取最后一帧成功")
-                pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
-                if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
+                pw_path = ffmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
+                request = task_pb2.FileExistsRequest(id=container_id, path=pw_path)
+                response = stub.FileExists(request=request)
+                if not response.exists:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 数据片尾拼接失败")
@@ -414,14 +436,16 @@ class ConsumptionRecommend(object):
                         logger.info(f"[处理] 获取bgm")
                         rg_bgm_list = PQ.get_pq_oss([trailer_share_bgm])
                         rg_bgm_url = DownLoad.download_pq_video(file_path, rg_bgm_list)
-                        bgm_mp3_path = FFmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
-                        pw_path = FFmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
+                        bgm_mp3_path = ffmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
+                        pw_path = ffmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
                         logger.info(f"[处理] 片尾bgm添加成功")
                     except Exception as e:
                         logger.error(f"[处理] 片尾bgm添加失败")
-                video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
-                video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
-                if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
+                video_path = ffmpeg.h_b_video(video_path, pw_path, file_path)
+                video_path = ffmpeg.single_video(video_path, file_path, data["video_share"])
+                request = task_pb2.FileExistsRequest(id=container_id, path=video_path)
+                response = stub.FileExists(request=request)
+                if not response.exists:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 数据添加片中字幕失败")
@@ -438,9 +462,10 @@ class ConsumptionRecommend(object):
                     return
                 logger.info(f"[处理] 数据添加片中字幕成功")
                 logger.info(f"[处理] 数据开始发送oss")
-                oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4()))  # 视频发送OSS
+                request = task_pb2.GetFileRequest(id=container_id, path=video_path)
+                response = stub.GetFile(request=request)
+                oss_object_key = response.oss_object_key
                 logger.info(f"[处理] 数据发送oss成功")
-                oss_object_key = oss_object_key.get("oss_object_key")
 
                 tags = ','.join(filter(None, [
                     data['pq_label'],
@@ -483,8 +508,15 @@ class ConsumptionRecommend(object):
             logger.info('[处理] 无待执行的扫描任务')
             return
         data = orjson.loads(data)
+        container_id: Optional[str] = None
         try:
-            cls.data_handle(data, file_path, redis_name,studio_key)
+            response = stub.StartContainer(request=Empty())
+            container_id = response.id
+            if not container_id:
+                logger.error('启动容器失败, 程序退出')
+                return
+
+            cls.data_handle(data, file_path, redis_name,studio_key, container_id)
             for filename in os.listdir(CACHE_DIR):
                 # 检查文件名是否包含关键字
                 if uid in filename:
@@ -509,7 +541,10 @@ class ConsumptionRecommend(object):
                     except Exception as e:
                         logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
             return
-
+        finally:
+            if container_id:
+                request = task_pb2.StopContainerRequest(id=container_id)
+                stub.StopContainer(request)
 
 
 def run():

+ 1 - 1
workers/consumption_work_studio.py

@@ -27,7 +27,7 @@ from utils.tts_help import TTS
 
 
 
-CACHE_DIR = '/app/cache/'
+CACHE_DIR = '/tmp/'
 # CACHE_DIR = '/Users/z/Downloads/'
 
 class ConsumptionRecommend(object):