2 Commits 4cfb5a5452 ... c1083bb078

Autor SHA1 Mensagem Data
  kevin.yang 4cfb5a5452 update há 2 meses atrás
  kevin.yang d229438632 全面接入4090 há 2 meses atrás

+ 0 - 12
.dockerignore

@@ -1,12 +0,0 @@
-.git/
-.idea/
-.vscode/
-__pycache__/
-docker/
-*.log
-*.jpg
-*.png
-*.gif
-*.webp
-*.mp4
-docker-compose.yml

+ 0 - 318
.gitignore

@@ -1,318 +0,0 @@
-# Created by https://www.toptal.com/developers/gitignore/api/python,node
-# Edit at https://www.toptal.com/developers/gitignore?templates=python,node
-
-### Node ###
-# Logs
-logs
-*.log
-npm-debug.log*
-yarn-debug.log*
-yarn-error.log*
-lerna-debug.log*
-.pnpm-debug.log*
-
-# Diagnostic reports (https://nodejs.org/api/report.html)
-report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
-
-# Runtime data
-pids
-*.pid
-*.seed
-*.pid.lock
-
-# Directory for instrumented libs generated by jscoverage/JSCover
-lib-cov
-
-# Coverage directory used by tools like istanbul
-coverage
-*.lcov
-
-# nyc test coverage
-.nyc_output
-
-# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
-.grunt
-
-# Bower dependency directory (https://bower.io/)
-bower_components
-
-# node-waf configuration
-.lock-wscript
-
-# Compiled binary addons (https://nodejs.org/api/addons.html)
-build/Release
-
-# Dependency directories
-node_modules/
-jspm_packages/
-
-# Snowpack dependency directory (https://snowpack.dev/)
-web_modules/
-
-# TypeScript cache
-*.tsbuildinfo
-
-# Optional npm cache directory
-.npm
-
-# Optional eslint cache
-.eslintcache
-
-# Optional stylelint cache
-.stylelintcache
-
-# Microbundle cache
-.rpt2_cache/
-.rts2_cache_cjs/
-.rts2_cache_es/
-.rts2_cache_umd/
-
-# Optional REPL history
-.node_repl_history
-
-# Output of 'npm pack'
-*.tgz
-
-# Yarn Integrity file
-.yarn-integrity
-
-# dotenv environment variable files
-.env
-.env.development.local
-.env.test.local
-.env.production.local
-.env.local
-
-# parcel-bundler cache (https://parceljs.org/)
-.cache
-.parcel-cache
-
-# Next.js build output
-.next
-out
-
-# Nuxt.js build / generate output
-.nuxt
-dist
-
-# Gatsby files
-.cache/
-# Comment in the public line in if your project uses Gatsby and not Next.js
-# https://nextjs.org/blog/next-9-1#public-directory-support
-# public
-
-# vuepress build output
-.vuepress/dist
-
-# vuepress v2.x temp and cache directory
-.temp
-
-# Docusaurus cache and generated files
-.docusaurus
-
-# Serverless directories
-.serverless/
-
-# FuseBox cache
-.fusebox/
-
-# DynamoDB Local files
-.dynamodb/
-
-# TernJS port file
-.tern-port
-
-# Stores VSCode versions used for testing VSCode extensions
-.vscode-test
-
-# yarn v2
-.yarn/cache
-.yarn/unplugged
-.yarn/build-state.yml
-.yarn/install-state.gz
-.pnp.*
-
-### Node Patch ###
-# Serverless Webpack directories
-.webpack/
-
-# Optional stylelint cache
-
-# SvelteKit build / generate output
-.svelte-kit
-
-### Python ###
-# Byte-compiled / optimized / DLL files
-__pycache__/
-*.py[cod]
-*$py.class
-
-# C extensions
-#*.so
-
-# Distribution / packaging
-.Python
-build/
-develop-eggs/
-dist/
-downloads/
-eggs/
-.eggs/
-lib/
-lib64/
-parts/
-sdist/
-var/
-wheels/
-share/python-wheels/
-*.egg-info/
-.installed.cfg
-*.egg
-MANIFEST
-
-# PyInstaller
-#  Usually these files are written by a python script from a template
-#  before PyInstaller builds the exe, so as to inject date/other infos into it.
-*.manifest
-*.spec
-
-# Installer logs
-pip-log.txt
-pip-delete-this-directory.txt
-
-# Unit test / coverage reports
-htmlcov/
-.tox/
-.nox/
-.coverage
-.coverage.*
-nosetests.xml
-coverage.xml
-*.cover
-*.py,cover
-.hypothesis/
-.pytest_cache/
-cover/
-
-# Translations
-*.mo
-*.pot
-
-# Django stuff:
-local_settings.py
-db.sqlite3
-db.sqlite3-journal
-
-# Flask stuff:
-instance/
-.webassets-cache
-
-# Scrapy stuff:
-.scrapy
-
-# Sphinx documentation
-docs/_build/
-
-# PyBuilder
-.pybuilder/
-target/
-
-# Jupyter Notebook
-.ipynb_checkpoints
-
-# IPython
-profile_default/
-ipython_config.py
-
-# pyenv
-#   For a library or package, you might want to ignore these files since the code is
-#   intended to run in multiple environments; otherwise, check them in:
-# .python-version
-
-# pipenv
-#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
-#   However, in case of collaboration, if having platform-specific dependencies or dependencies
-#   having no cross-platform support, pipenv may install dependencies that don't work, or not
-#   install all needed dependencies.
-#Pipfile.lock
-
-# poetry
-#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
-#   This is especially recommended for binary packages to ensure reproducibility, and is more
-#   commonly ignored for libraries.
-#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
-#poetry.lock
-
-# pdm
-#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
-#pdm.lock
-#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
-#   in version control.
-#   https://pdm.fming.dev/#use-with-ide
-.pdm.toml
-
-# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
-__pypackages__/
-
-# Celery stuff
-celerybeat-schedule
-celerybeat.pid
-
-# SageMath parsed files
-*.sage.py
-
-# Environments
-.venv
-env/
-venv/
-ENV/
-env.bak/
-venv.bak/
-
-# Spyder project settings
-.spyderproject
-.spyproject
-
-# Rope project settings
-.ropeproject
-
-# mkdocs documentation
-/site
-
-# mypy
-.mypy_cache/
-.dmypy.json
-dmypy.json
-
-# Pyre type checker
-.pyre/
-
-# pytype static type analyzer
-.pytype/
-
-# Cython debug symbols
-cython_debug/
-
-# PyCharm
-#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
-#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
-#  and can be added to the global gitignore or merged into this file.  For a more nuclear
-#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
-.idea/
-
-# VScode
-.vscode/
-
-### Python Patch ###
-# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
-poetry.toml
-
-# ruff
-.ruff_cache/
-
-# LSP config files
-pyrightconfig.json
-
-# End of https://www.toptal.com/developers/gitignore/api/python,node
-
-.DS_Store

+ 2 - 2
product.env

@@ -1,7 +1,5 @@
 ENV=prod
 
-FFMPEG_HOST=101.37.24.17:5555
-
 FS_DATA_1=范军,4a768d,task:carry_data_redis_fj,AIzaSyBqqAhzew33-ZY6u0w0Th8z4FHjiieFK0s
 FS_DATA_2=鲁涛,EZef39,task:carry_data_redis_lt,AIzaSyBXU3VIilXxDbIg5riYTu5t6WEVEH8AQNk
 FS_DATA_3=余海涛,Frush6,task:carry_data_redis_yht,AIzaSyCaurhmnci1jooXSN8Q3dKWbYYndUllbzs
@@ -12,3 +10,5 @@ FS_DATA_7=周仙琴,2WIcBU,task:carry_data_redis_zxq,AIzaSyD46rphXd-Ie51sQiQ61lr
 FS_DATA_8=信欣,v0fFCb,task:carry_data_redis_xx,AIzaSyB16JcJHwoz-YcvEUO96Dm1n0zf89GOdms
 FS_DATA_9=邓锋,DEpi6V,task:carry_data_redis_df,AIzaSyCWilmMZyG4xW_pujpRGflaa7SIBjLQHiI
 FS_DATA_10=王知微,jrpuyW,task:carry_data_redis_wzw,AIzaSyCx3hy5ef8wOVPNjvK1MIAwyZZCdYuRh-U
+
+

+ 0 - 0
protos/__init__.py


+ 0 - 69
protos/task.proto

@@ -1,69 +0,0 @@
-syntax = "proto3";
-
-import "google/protobuf/empty.proto";
-
-service TaskRunner {
-  rpc StartContainer (google.protobuf.Empty) returns (StartContainerResponse);
-  rpc StopContainer (StopContainerRequest) returns (StopContainerResponse);
-  rpc RunCommand (RunCommandRequest) returns (stream RunCommandResponse);
-  rpc FileExists (FileExistsRequest) returns (FileExistsResponse);
-  rpc GetFile (GetFileRequest) returns (GetFileResponse);
-  rpc PutFile (PutFileRequest) returns (PutFileResponse);
-}
-
-message StartContainerResponse {
-  string id = 1;
-}
-
-message StopContainerRequest {
-  string id = 1;
-}
-
-message StopContainerResponse {
-  string id = 1;
-}
-
-message RunCommandRequest {
-  string id = 1;
-  repeated string command = 2;
-}
-
-message RunCommandResponse {
-  string id = 1;
-  optional string msg = 2;
-}
-
-message FileExistsRequest {
-  string id = 1;
-  string path = 2;
-}
-
-message FileExistsResponse {
-  string id = 1;
-  string path = 2;
-  optional bool exists = 3;
-}
-
-message GetFileRequest {
-  string id = 1;
-  string path = 2;
-}
-
-message GetFileResponse {
-  string id = 1;
-  string oss_object_key = 2;
-}
-
-message PutFileRequest {
-  string id = 1;
-  optional string oss_object_key = 2;
-  optional string local_path = 3;
-  optional bytes payload = 4;
-  string path = 5;
-}
-
-message PutFileResponse {
-  string id = 1;
-  string path = 2;
-  bool success = 3;
-}

+ 0 - 59
protos/task_pb2.py

@@ -1,59 +0,0 @@
-# -*- coding: utf-8 -*-
-# Generated by the protocol buffer compiler.  DO NOT EDIT!
-# NO CHECKED-IN PROTOBUF GENCODE
-# source: task.proto
-# Protobuf Python Version: 5.29.0
-"""Generated protocol buffer code."""
-from google.protobuf import descriptor as _descriptor
-from google.protobuf import descriptor_pool as _descriptor_pool
-from google.protobuf import runtime_version as _runtime_version
-from google.protobuf import symbol_database as _symbol_database
-from google.protobuf.internal import builder as _builder
-_runtime_version.ValidateProtobufRuntimeVersion(
-    _runtime_version.Domain.PUBLIC,
-    5,
-    29,
-    0,
-    '',
-    'task.proto'
-)
-# @@protoc_insertion_point(imports)
-
-_sym_db = _symbol_database.Default()
-
-
-from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
-
-
-DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\ntask.proto\x1a\x1bgoogle/protobuf/empty.proto\"$\n\x16StartContainerResponse\x12\n\n\x02id\x18\x01 \x01(\t\"\"\n\x14StopContainerRequest\x12\n\n\x02id\x18\x01 \x01(\t\"#\n\x15StopContainerResponse\x12\n\n\x02id\x18\x01 \x01(\t\"0\n\x11RunCommandRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0f\n\x07\x63ommand\x18\x02 \x03(\t\":\n\x12RunCommandResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x03msg\x18\x02 \x01(\tH\x00\x88\x01\x01\x42\x06\n\x04_msg\"-\n\x11\x46ileExistsRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"N\n\x12\x46ileExistsResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x13\n\x06\x65xists\x18\x03 \x01(\x08H\x00\x88\x01\x01\x42\t\n\x07_exists\"*\n\x0eGetFileRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\"5\n\x0fGetFileResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x16\n\x0eoss_object_key\x18\x02 \x01(\t\"\xa4\x01\n\x0ePutFileRequest\x12\n\n\x02id\x18\x01 \x01(\t\x12\x1b\n\x0eoss_object_key\x18\x02 \x01(\tH\x00\x88\x01\x01\x12\x17\n\nlocal_path\x18\x03 \x01(\tH\x01\x88\x01\x01\x12\x14\n\x07payload\x18\x04 \x01(\x0cH\x02\x88\x01\x01\x12\x0c\n\x04path\x18\x05 \x01(\tB\x11\n\x0f_oss_object_keyB\r\n\x0b_local_pathB\n\n\x08_payload\"<\n\x0fPutFileResponse\x12\n\n\x02id\x18\x01 \x01(\t\x12\x0c\n\x04path\x18\x02 \x01(\t\x12\x0f\n\x07success\x18\x03 \x01(\x08\x32\xdb\x02\n\nTaskRunner\x12\x41\n\x0eStartContainer\x12\x16.google.protobuf.Empty\x1a\x17.StartContainerResponse\x12>\n\rStopContainer\x12\x15.StopContainerRequest\x1a\x16.StopContainerResponse\x12\x37\n\nRunCommand\x12\x12.RunCommandRequest\x1a\x13.RunCommandResponse0\x01\x12\x35\n\nFileExists\x12\x12.FileExistsRequest\x1a\x13.FileExistsResponse\x12,\n\x07GetFile\x12\x0f.GetFileRequest\x1a\x10.GetFileResponse\x12,\n\x07PutFile\x12\x0f.PutFileRequest\x1a\x10.PutFileResponseb\x06proto3')
-
-_globals = globals()
-_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals)
-_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'task_pb2', _globals)
-if not _descriptor._USE_C_DESCRIPTORS:
-  DESCRIPTOR._loaded_options = None
-  _globals['_STARTCONTAINERRESPONSE']._serialized_start=43
-  _globals['_STARTCONTAINERRESPONSE']._serialized_end=79
-  _globals['_STOPCONTAINERREQUEST']._serialized_start=81
-  _globals['_STOPCONTAINERREQUEST']._serialized_end=115
-  _globals['_STOPCONTAINERRESPONSE']._serialized_start=117
-  _globals['_STOPCONTAINERRESPONSE']._serialized_end=152
-  _globals['_RUNCOMMANDREQUEST']._serialized_start=154
-  _globals['_RUNCOMMANDREQUEST']._serialized_end=202
-  _globals['_RUNCOMMANDRESPONSE']._serialized_start=204
-  _globals['_RUNCOMMANDRESPONSE']._serialized_end=262
-  _globals['_FILEEXISTSREQUEST']._serialized_start=264
-  _globals['_FILEEXISTSREQUEST']._serialized_end=309
-  _globals['_FILEEXISTSRESPONSE']._serialized_start=311
-  _globals['_FILEEXISTSRESPONSE']._serialized_end=389
-  _globals['_GETFILEREQUEST']._serialized_start=391
-  _globals['_GETFILEREQUEST']._serialized_end=433
-  _globals['_GETFILERESPONSE']._serialized_start=435
-  _globals['_GETFILERESPONSE']._serialized_end=488
-  _globals['_PUTFILEREQUEST']._serialized_start=491
-  _globals['_PUTFILEREQUEST']._serialized_end=655
-  _globals['_PUTFILERESPONSE']._serialized_start=657
-  _globals['_PUTFILERESPONSE']._serialized_end=717
-  _globals['_TASKRUNNER']._serialized_start=720
-  _globals['_TASKRUNNER']._serialized_end=1067
-# @@protoc_insertion_point(module_scope)

+ 0 - 313
protos/task_pb2_grpc.py

@@ -1,313 +0,0 @@
-# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT!
-"""Client and server classes corresponding to protobuf-defined services."""
-import grpc
-import warnings
-
-from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2
-import protos.task_pb2 as task__pb2
-
-GRPC_GENERATED_VERSION = '1.71.0'
-GRPC_VERSION = grpc.__version__
-_version_not_supported = False
-
-try:
-    from grpc._utilities import first_version_is_lower
-    _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION)
-except ImportError:
-    _version_not_supported = True
-
-if _version_not_supported:
-    raise RuntimeError(
-        f'The grpc package installed is at version {GRPC_VERSION},'
-        + f' but the generated code in task_pb2_grpc.py depends on'
-        + f' grpcio>={GRPC_GENERATED_VERSION}.'
-        + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}'
-        + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.'
-    )
-
-
-class TaskRunnerStub(object):
-    """Missing associated documentation comment in .proto file."""
-
-    def __init__(self, channel):
-        """Constructor.
-
-        Args:
-            channel: A grpc.Channel.
-        """
-        self.StartContainer = channel.unary_unary(
-                '/TaskRunner/StartContainer',
-                request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-                response_deserializer=task__pb2.StartContainerResponse.FromString,
-                _registered_method=True)
-        self.StopContainer = channel.unary_unary(
-                '/TaskRunner/StopContainer',
-                request_serializer=task__pb2.StopContainerRequest.SerializeToString,
-                response_deserializer=task__pb2.StopContainerResponse.FromString,
-                _registered_method=True)
-        self.RunCommand = channel.unary_stream(
-                '/TaskRunner/RunCommand',
-                request_serializer=task__pb2.RunCommandRequest.SerializeToString,
-                response_deserializer=task__pb2.RunCommandResponse.FromString,
-                _registered_method=True)
-        self.FileExists = channel.unary_unary(
-                '/TaskRunner/FileExists',
-                request_serializer=task__pb2.FileExistsRequest.SerializeToString,
-                response_deserializer=task__pb2.FileExistsResponse.FromString,
-                _registered_method=True)
-        self.GetFile = channel.unary_unary(
-                '/TaskRunner/GetFile',
-                request_serializer=task__pb2.GetFileRequest.SerializeToString,
-                response_deserializer=task__pb2.GetFileResponse.FromString,
-                _registered_method=True)
-        self.PutFile = channel.unary_unary(
-                '/TaskRunner/PutFile',
-                request_serializer=task__pb2.PutFileRequest.SerializeToString,
-                response_deserializer=task__pb2.PutFileResponse.FromString,
-                _registered_method=True)
-
-
-class TaskRunnerServicer(object):
-    """Missing associated documentation comment in .proto file."""
-
-    def StartContainer(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-    def StopContainer(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-    def RunCommand(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-    def FileExists(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-    def GetFile(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-    def PutFile(self, request, context):
-        """Missing associated documentation comment in .proto file."""
-        context.set_code(grpc.StatusCode.UNIMPLEMENTED)
-        context.set_details('Method not implemented!')
-        raise NotImplementedError('Method not implemented!')
-
-
-def add_TaskRunnerServicer_to_server(servicer, server):
-    rpc_method_handlers = {
-            'StartContainer': grpc.unary_unary_rpc_method_handler(
-                    servicer.StartContainer,
-                    request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-                    response_serializer=task__pb2.StartContainerResponse.SerializeToString,
-            ),
-            'StopContainer': grpc.unary_unary_rpc_method_handler(
-                    servicer.StopContainer,
-                    request_deserializer=task__pb2.StopContainerRequest.FromString,
-                    response_serializer=task__pb2.StopContainerResponse.SerializeToString,
-            ),
-            'RunCommand': grpc.unary_stream_rpc_method_handler(
-                    servicer.RunCommand,
-                    request_deserializer=task__pb2.RunCommandRequest.FromString,
-                    response_serializer=task__pb2.RunCommandResponse.SerializeToString,
-            ),
-            'FileExists': grpc.unary_unary_rpc_method_handler(
-                    servicer.FileExists,
-                    request_deserializer=task__pb2.FileExistsRequest.FromString,
-                    response_serializer=task__pb2.FileExistsResponse.SerializeToString,
-            ),
-            'GetFile': grpc.unary_unary_rpc_method_handler(
-                    servicer.GetFile,
-                    request_deserializer=task__pb2.GetFileRequest.FromString,
-                    response_serializer=task__pb2.GetFileResponse.SerializeToString,
-            ),
-            'PutFile': grpc.unary_unary_rpc_method_handler(
-                    servicer.PutFile,
-                    request_deserializer=task__pb2.PutFileRequest.FromString,
-                    response_serializer=task__pb2.PutFileResponse.SerializeToString,
-            ),
-    }
-    generic_handler = grpc.method_handlers_generic_handler(
-            'TaskRunner', rpc_method_handlers)
-    server.add_generic_rpc_handlers((generic_handler,))
-    server.add_registered_method_handlers('TaskRunner', rpc_method_handlers)
-
-
- # This class is part of an EXPERIMENTAL API.
-class TaskRunner(object):
-    """Missing associated documentation comment in .proto file."""
-
-    @staticmethod
-    def StartContainer(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_unary(
-            request,
-            target,
-            '/TaskRunner/StartContainer',
-            google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-            task__pb2.StartContainerResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)
-
-    @staticmethod
-    def StopContainer(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_unary(
-            request,
-            target,
-            '/TaskRunner/StopContainer',
-            task__pb2.StopContainerRequest.SerializeToString,
-            task__pb2.StopContainerResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)
-
-    @staticmethod
-    def RunCommand(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_stream(
-            request,
-            target,
-            '/TaskRunner/RunCommand',
-            task__pb2.RunCommandRequest.SerializeToString,
-            task__pb2.RunCommandResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)
-
-    @staticmethod
-    def FileExists(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_unary(
-            request,
-            target,
-            '/TaskRunner/FileExists',
-            task__pb2.FileExistsRequest.SerializeToString,
-            task__pb2.FileExistsResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)
-
-    @staticmethod
-    def GetFile(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_unary(
-            request,
-            target,
-            '/TaskRunner/GetFile',
-            task__pb2.GetFileRequest.SerializeToString,
-            task__pb2.GetFileResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)
-
-    @staticmethod
-    def PutFile(request,
-            target,
-            options=(),
-            channel_credentials=None,
-            call_credentials=None,
-            insecure=False,
-            compression=None,
-            wait_for_ready=None,
-            timeout=None,
-            metadata=None):
-        return grpc.experimental.unary_unary(
-            request,
-            target,
-            '/TaskRunner/PutFile',
-            task__pb2.PutFileRequest.SerializeToString,
-            task__pb2.PutFileResponse.FromString,
-            options,
-            channel_credentials,
-            insecure,
-            call_credentials,
-            compression,
-            wait_for_ready,
-            timeout,
-            metadata,
-            _registered_method=True)

+ 0 - 3
requirements.txt

@@ -1,8 +1,5 @@
 aliyun-log-python-sdk==0.9.12
 google-generativeai==0.8.3
-grpcio==1.71.0
-grpcio-reflection==1.71.0
-grpcio-tools==1.71.0
 loguru==0.7.2
 mutagen==1.47.0
 odps==3.5.1

+ 3 - 28
utils/aliyun_oss.py

@@ -11,7 +11,7 @@ OSS_ACCESS_KEY_SECRET = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
 # OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou-internal.aliyuncs.com"# 内网地址
 OSS_BUCKET_ENDPOINT = "oss-cn-hangzhou.aliyuncs.com" # 外网地址
 OSS_BUCKET_NAME = "art-crawler"
-class Oss:
+class Oss():
 
     @classmethod
     def channel_upload_oss(cls, src_url: str,
@@ -42,25 +42,6 @@ class Oss:
                 'oss_object_key': oss_object_key}
         raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
 
-    """
-        视频发送到art-pubbucket
-        """
-
-    @classmethod
-    def stitching_sync_upload_bytes_to_oss(cls, data: str, video_id: str) -> Dict[str, Any]:
-        oss_object_key = f'carry/video/{video_id}'
-        auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
-        bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, "art-pubbucket")
-        response = bucket.put_object(oss_object_key, data)
-
-        if 'Content-Length' in response.headers:
-            return {
-                'status': response.status,
-                'oss_object_key': oss_object_key,
-                'save_oss_timestamp': int(datetime.now().timestamp() * 1000),
-            }
-        raise AssertionError(f'OSS上传失败,请求ID: \n{response.headers["x-oss-request-id"]}')
-
     """
     视频发送到art-pubbucket
     """
@@ -120,8 +101,8 @@ class Oss:
 
 
     @classmethod
-    def download_video_oss(cls, url, file_path, add_suffix: bool = True):
-        video_path = file_path + 'video.mp4' if add_suffix else file_path
+    def download_video_oss(cls, url, file_path):
+        video_path = file_path + 'video.mp4'
         oss_object_key = cls.channel_upload_oss(url, str(uuid.uuid4()))
         time.sleep(2)
         oss_object = oss_object_key.get("oss_object_key")
@@ -148,12 +129,6 @@ class Oss:
         time.sleep(5)
         return video_path
 
-    @classmethod
-    def generate_url(cls, oss_object_key: str) -> str:
-        auth = oss2.Auth(OSS_ACCESS_KEY_ID, OSS_ACCESS_KEY_SECRET)
-        bucket = oss2.Bucket(auth, OSS_BUCKET_ENDPOINT, 'art-pubbucket')
-        return bucket.sign_url(method='GET', key=oss_object_key, expires=3600, slash_safe=True)
-
 
 if __name__ == '__main__':
     Oss.download_sph_ls('channel/video/sph/14374775553517295881.jpg','asa','1')

+ 11 - 11
utils/download_video.py

@@ -27,12 +27,12 @@ class DownLoad:
                     if response.status_code == 206:
                         # 以二进制写入模式打开文件
 
-                        # with open(f"{video}", "wb") as file:
-                        #     # 将响应内容写入文件
-                        #     file.write(response.content)
-                        return response.content, video
+                        with open(f"{video}", "wb") as file:
+                            # 将响应内容写入文件
+                            file.write(response.content)
+                        return video
                 except Exception:
-                    return None, video
+                    return video
         else:
             try:
                 for i in range(3):
@@ -42,13 +42,13 @@ class DownLoad:
                     if response.status_code == 200:
                         # 以二进制写入模式打开文件
 
-                        # with open(f"{video}", "wb") as file:
-                        #     # 将响应内容写入文件
-                        #     file.write(response.content)
-                        return response.content, video
-                return None, video
+                        with open(f"{video}", "wb") as file:
+                            # 将响应内容写入文件
+                            file.write(response.content)
+                        return video
+                return video
             except Exception:
-                return None, video
+                return video
 
     @classmethod
     def download_m3u8_video(cls ,url, file_path):

+ 1 - 1
utils/dy_ks_get_url.py

@@ -95,7 +95,7 @@ class Dy_KS:
             headers = {
                 'Content-Type': 'application/json'
             }
-            # time.sleep(random.uniform(10, 50))
+            time.sleep(random.uniform(10, 50))
             response = requests.request("POST", url, headers=headers, data=payload, timeout= 30)
             response = response.json()
             code = response["code"]

+ 281 - 415
utils/ffmpeg.py

@@ -1,376 +1,309 @@
+import asyncio
 import json
+import os
 import time
+from typing import List
 
-import orjson
+import cv2
 import requests
 from loguru import logger
+from mutagen.mp3 import MP3
 
-from protos import task_pb2, task_pb2_grpc
 
 
-class FFmpeg(object):
-    
-    def __init__(self, container_id: str, stub: task_pb2_grpc.TaskRunnerStub):
-        self.container_id = container_id
-        self.stub = stub
+class FFmpeg():
 
-    def seconds_to_srt_time(self, seconds):
-        """
-        时间转换
-        """
+    """
+    时间转换
+    """
+    @classmethod
+    def seconds_to_srt_time(cls, seconds):
         hours = int(seconds // 3600)
         minutes = int((seconds % 3600) // 60)
         seconds = seconds % 60
         milliseconds = int((seconds - int(seconds)) * 1000)
         return f"{hours:02d}:{minutes:02d}:{int(seconds):02d},{milliseconds:03d}"
 
-    # def get_video_duration(self, video_url):
-    #     """
-    #     获取单个视频时长
-    #     """
-    #     cap = cv2.VideoCapture(video_url)
-    #     if cap.isOpened():
-    #         rate = cap.get(5)
-    #         frame_num = cap.get(7)
-    #         duration = int(frame_num / rate)
-    #         return duration
-    #     return 0
+    """
+    获取单个视频时长
+    """
+    @classmethod
+    def get_video_duration(cls, video_url):
+        cap = cv2.VideoCapture(video_url)
+        if cap.isOpened():
+            rate = cap.get(5)
+            frame_num = cap.get(7)
+            duration = int(frame_num / rate)
+            return duration
+        return 0
+
 
     # """
     # 获取视频文件的时长(秒)
     # """
-    # def get_videos_duration(self, video_file):
-    #     result = self.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
+    # @classmethod
+    # def get_videos_duration(cls, video_file):
+    #     result = cls.asyncio_run_subprocess(["ffprobe", "-v", "error", "-show_entries", "format=duration",
     #          "-of", "default=noprint_wrappers=1:nokey=1", video_file], timeout=10)
     #     return float(result)
 
-    def get_w_h_size(self, new_video_path):
-        """
-        获取视频宽高
-        """
+    """
+    获取视频宽高
+    """
+    @classmethod
+    def get_w_h_size(cls, new_video_path):
         try:
             # 获取视频的原始宽高信息
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path])
-            # ffprobe_cmd = self.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
-            # output_decoded = ffprobe_cmd.strip()
-            output_decoded = ''
-            for response in self.stub.RunCommand(request=request):
-                output_decoded += response.msg
+            ffprobe_cmd = cls.asyncio_run_subprocess(["ffprobe", "-v" ,"error" ,"-select_streams" ,"v:0" ,"-show_entries", "stream=width,height" ,"-of" ,"csv=p=0" ,new_video_path],timeout=10)
+            output_decoded = ffprobe_cmd.strip()
             split_output = [value for value in output_decoded.split(',') if value.strip()]
             height, width = map(int, split_output)
             return width, height
         except ValueError as e:
             return 1920, 1080
 
-    def video_crop(self, video_path, file_path):
-        """
-        视频裁剪
-        """
+
+    """
+    视频裁剪
+    """
+    @classmethod
+    def video_crop(cls, video_path, file_path):
         crop_url = file_path + 'crop.mp4'
         try:
-            # # 获取视频的原始宽高信息
-            # ffprobe_cmd = cls.asyncio_run_subprocess(
-            #     ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
-            #      "csv=p=0", video_path], timeout=10)
-            # width, height = map(int, ffprobe_cmd.strip().split(','))
-            # # 计算裁剪后的高度
-            # new_height = int(height * 0.8)
+            # 获取视频的原始宽高信息
+            ffprobe_cmd = cls.asyncio_run_subprocess(
+                ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height", "-of",
+                 "csv=p=0", video_path], timeout=10)
+            width, height = map(int, ffprobe_cmd.strip().split(','))
+            # 计算裁剪后的高度
+            new_height = int(height * 0.8)
 
             # 构建 FFmpeg 命令,裁剪视频高度为原始高度的80%
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-vf", f"\"crop=in_w:in_h*0.8\"", "-c:v", "h264_nvenc", "-c:a", "aac", "-y", "-cq", "28", "-preset", "slow", crop_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess(
-            #     [
-            #     "ffmpeg",
-            #     "-i", video_path,
-            #     "-vf", f"crop={width}:{new_height}",
-            #     "-c:v", "libx264",
-            #     "-c:a", "aac",
-            #     "-y",
-            #     crop_url
-            #     ],timeout=240)
+            cls.asyncio_run_subprocess(
+                [
+                "ffmpeg",
+                "-i", video_path,
+                "-vf", f"crop={width}:{new_height}",
+                "-c:v", "libx264",
+                "-c:a", "aac",
+                "-y",
+                crop_url
+                ],timeout=240)
             return crop_url
         except Exception as e:
             return crop_url
 
-    def video_ggduration(self, video_path, file_path, gg_duration_total):
-        """
-        视频截断
-        """
+    """
+    视频截断
+    """
+    @classmethod
+    def video_ggduration(cls, video_path, file_path, gg_duration_total):
         gg_duration_url = file_path + 'gg_duration.mp4'
         # 获取视频时长
         try:
-            total_duration = self.get_duration(video_path)
+            total_duration = cls.get_video_duration(video_path)
             if total_duration == 0:
                 return gg_duration_url
             duration = int(total_duration) - int(gg_duration_total)
             if int(total_duration) < int(gg_duration_total):
                 return gg_duration_url
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-i", video_path, "-c:v", "h264_nvenc", "-c:a", "aac", "-t", str(duration), "-y", "-cq", "28", "-preset", "slow", gg_duration_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #     "ffmpeg",
-            #     "-i", video_path,
-            #     "-c:v", "libx264",
-            #     "-c:a", "aac",
-            #     "-t", str(duration),
-            #     "-y",
-            #     gg_duration_url
-            # ], timeout= 360)
+            cls.asyncio_run_subprocess([
+                "ffmpeg",
+                "-i", video_path,
+                "-c:v", "libx264",
+                "-c:a", "aac",
+                "-t", str(duration),
+                "-y",
+                gg_duration_url
+            ], timeout= 360)
             return gg_duration_url
         except Exception as e:
             return gg_duration_url
 
-    def video_png(self, video_path, file_path):
-        """
-         截取原视频最后一帧
-        """
+    """
+     截取原视频最后一帧
+    """
+    @classmethod
+    def video_png(cls, video_path, file_path):
         # 获取视频的原始宽高信息
         jpg_url = file_path + 'png.jpg'
         try:
-            # self.asyncio_run_subprocess(
-            #     ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url], timeout=120)
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
+            cls.asyncio_run_subprocess(
+                ["ffmpeg", "-sseof", "-1", '-i', video_path, '-frames:v', '1',  "-y", jpg_url], timeout=120)
             return jpg_url
         except Exception as e:
             return jpg_url
 
-    def get_video_mp3(self, video_file, video_path_url, pw_random_id):
-        """
-        获取视频音频
-        """
+    """
+    获取视频音频
+    """
+    @classmethod
+    def get_video_mp3(cls, video_file, video_path_url, pw_random_id):
         pw_mp3_path = video_path_url + str(pw_random_id) +'pw_video.mp3'
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_file, '-q:a', '0', '-map', 'a', pw_mp3_path])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #     'ffmpeg',
-            #     '-i', video_file,
-            #     '-q:a', '0',
-            #     '-map', 'a',
-            #     pw_mp3_path
-            # ], timeout=120)
+            cls.asyncio_run_subprocess([
+                'ffmpeg',
+                '-i', video_file,
+                '-q:a', '0',
+                '-map', 'a',
+                pw_mp3_path
+            ], timeout=120)
             time.sleep(1)
             return pw_mp3_path
         except Exception as e:
             return pw_mp3_path
 
-    def get_pw_video_mp3(self, video_file, video_path_url):
+    @classmethod
+    def get_pw_video_mp3(cls, video_file, video_path_url):
         bgm_pw_path_mp3 = video_file + 'bgm_pw.mp3'
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffmpeg', '-i', video_path_url, '-q:a', '0', '-map', 'a', bgm_pw_path_mp3])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #     'ffmpeg',
-            #     '-i', video_path_url,      # 输入的视频文件路径
-            #     '-q:a', '0',           # 设置音频质量为最佳
-            #     '-map', 'a',
-            #     '-y',
-            #     bgm_pw_path_mp3
-            # ], timeout=120)
-            # time.sleep(1)
+            cls.asyncio_run_subprocess([
+                'ffmpeg',
+                '-i', video_path_url,      # 输入的视频文件路径
+                '-q:a', '0',           # 设置音频质量为最佳
+                '-map', 'a',
+                '-y',
+                bgm_pw_path_mp3
+            ], timeout=120)
+            time.sleep(1)
             return bgm_pw_path_mp3
         except Exception as e:
             return bgm_pw_path_mp3
 
-    def video_add_bgm(self, video_file, bgm_path, video_path_url):
-        """
-        片尾增加bgm
-        """
+
+    """
+    片尾增加bgm
+    """
+    @classmethod
+    def video_add_bgm(cls, video_file, bgm_path, video_path_url):
         bgm_pw_path = video_path_url + 'bgm_pw.mp4'
-        pw_duration = self.get_duration(video_file)
+        pw_duration = cls.get_video_duration(video_file)
 
         try:
             pw_path_txt = video_path_url + 'bgm_pw_video.txt'
-            # with open(pw_path_txt, 'w') as f:
-            #     f.write(f"file '{video_file}'\n")
-            payload = f"file '{video_file}'\n".encode()
-            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
-            self.stub.PutFile(request=request)
-            request = task_pb2.RunCommandRequest(id=self.container_id,
-                                                 command=[
-                                                     "ffmpeg",
-                                                     "-f", "concat",
-                                                     "-safe", "0",
-                                                     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-                                                     "-i", bgm_path,  # 音频文件
-                                                     "-c:v", "h264_nvenc",  # 视频编码格式
-                                                     '-c:a', 'aac',           # 音频编码格式
-                                                     "-t", str(pw_duration),  # 输出视频的持续时间
-                                                     '-b:v', '260k',          # 视频比特率
-                                                     '-b:a', '96k',           # 音频比特率
-                                                     '-threads', '2',         # 线程数
-                                                     # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
-                                                     "-filter_complex", "\"[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]\"",  # 混合音频流
-                                                     "-map", "\"0:v:0\"",  # 映射视频流来自第一个输入文件(视频)
-                                                     "-map", "\"[aout]\"",  # 映射混合后的音频流
-                                                     '-y',                    # 强制覆盖输出文件
-                                                     "-cq", "28",
-                                                     "-preset", "slow",
-                                                     bgm_pw_path              # 输出文件路径
-                                                 ])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #     "ffmpeg",
-            #     "-f", "concat",
-            #     "-safe", "0",
-            #     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-            #     "-i", bgm_path,  # 音频文件
-            #     "-c:v", "libx264",  # 视频编码格式
-            #     "-t", str(pw_duration),  # 输出视频的持续时间
-            #     '-c:a', 'aac',           # 音频编码格式
-            #     '-b:v', '260k',          # 视频比特率
-            #     '-b:a', '96k',           # 音频比特率
-            #     '-threads', '2',         # 线程数
-            #     # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
-            #     "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]",  # 混合音频流
-            #     "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
-            #     "-map", "[aout]",  # 映射混合后的音频流
-            #     '-y',                    # 强制覆盖输出文件
-            #     bgm_pw_path              # 输出文件路径
-            # ], timeout=500)
-            # time.sleep(1)
+            with open(pw_path_txt, 'w') as f:
+                f.write(f"file '{video_file}'\n")
+            cls.asyncio_run_subprocess([
+                "ffmpeg",
+                "-f", "concat",
+                "-safe", "0",
+                "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+                "-i", bgm_path,  # 音频文件
+                "-c:v", "libx264",  # 视频编码格式
+                "-t", str(pw_duration),  # 输出视频的持续时间
+                '-c:a', 'aac',           # 音频编码格式
+                '-b:v', '260k',          # 视频比特率
+                '-b:a', '96k',           # 音频比特率
+                '-threads', '2',         # 线程数
+                # '-vf', f'{background_cmd},{subtitle_cmd}',  # 视频过滤器(背景和字幕)
+                "-filter_complex", "[1:a]volume=0.6[a1];[0:a][a1]amerge=inputs=2[aout]",  # 混合音频流
+                "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
+                "-map", "[aout]",  # 映射混合后的音频流
+                '-y',                    # 强制覆盖输出文件
+                bgm_pw_path              # 输出文件路径
+            ], timeout=500)
+            time.sleep(1)
             return bgm_pw_path
         except Exception as e:
             return bgm_pw_path
 
-    def update_video_h_w(self, video_path, file_path):
-        """横屏视频改为竖屏"""
+    """横屏视频改为竖屏"""
+    @classmethod
+    def update_video_h_w(cls, video_path, file_path):
         video_h_w_path = file_path +'video_h_w_video.mp4'
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2\"" ,video_h_w_path])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
+            cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=640:ih*640/iw,pad=iw:iw*16/9:(ow-iw)/2:(oh-ih)/2" ,video_h_w_path],timeout=420)
             return video_h_w_path
         except Exception as e:
             return video_h_w_path
 
-    def video_640(self, video_path, file_path):
-        """视频转为640像素"""
+    """视频转为640像素"""
+    @classmethod
+    def video_640(cls, video_path, file_path):
         video_url = file_path + 'pixelvideo.mp4'
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg" ,"-i" ,video_path ,"-vf" ,"\"scale=360:640\"" ,video_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
+            cls.asyncio_run_subprocess(["ffmpeg" ,"-i" ,video_path ,"-vf" ,"scale=360:640" ,video_url],timeout=420)
             return video_url
         except Exception as e:
             return video_url
 
-    def concatenate_videos(self, videos_paths, file_path):
+    @classmethod
+    def concatenate_videos(cls, videos_paths, file_path):
         video_url = file_path + 'rg_pw.mp4'
         list_filename = file_path + 'rg_pw.txt'
-        # with open(list_filename, "w") as f:
-        #     for video_path in videos_paths:
-        #         f.write(f"file '{video_path}'\n")
-        payload = b''
-        for video_path in videos_paths:
-            payload += f"file '{video_path}'\n".encode()
-        request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=list_filename)
-        self.stub.PutFile(request=request)
+        with open(list_filename, "w") as f:
+            for video_path in videos_paths:
+                f.write(f"file '{video_path}'\n")
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess(
-            #     ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
+            cls.asyncio_run_subprocess(
+                ["ffmpeg", "-f", "concat", "-safe", "0", "-i", list_filename, "-c", "copy", video_url], timeout=420)
             logger.info(f"[+] 视频转为640像素成功")
             return video_url
         except Exception as e:
             return video_url
 
-    def h_b_video(self, video_path, pw_path, file_path):
-        """视频拼接到一起"""
+    """视频拼接到一起"""
+    @classmethod
+    def h_b_video(cls, video_path, pw_path, file_path):
         video_url = file_path + 'hbvideo.mp4'
         try:
-            request = task_pb2.RunCommandRequest(id=self.container_id, command=["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"\"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]\"" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
+            cls.asyncio_run_subprocess(["ffmpeg","-i", video_path, "-i", pw_path, "-filter_complex" ,"[0:v]scale=360:640[v1]; [1:v]scale=360:640[v2]; [v1][0:a][v2][1:a]concat=n=2:v=1:a=1[outv][outa]" ,"-map" ,"[outv]" ,"-map" ,"[outa]" ,video_url],timeout=500)
             return video_url
         except Exception as e:
             return video_url
 
-    def add_video_zm(self, new_video_path, video_path_url, pw_random_id, new_text):
-        """横屏视频顶部增加字幕"""
+    """横屏视频顶部增加字幕"""
+    @classmethod
+    def add_video_zm(cls, new_video_path, video_path_url, pw_random_id, new_text):
         single_video_srt = video_path_url + str(pw_random_id) +'video_zm.srt'
         single_video_txt = video_path_url + str(pw_random_id) +'video_zm.txt'
         single_video = video_path_url + str(pw_random_id) +'video_zm.mp4'
         try:
-            duration = self.get_duration(new_video_path)
+            duration = cls.get_video_duration(new_video_path)
             if duration == 0:
                 return new_video_path
-            start_time = self.seconds_to_srt_time(0)
-            end_time = self.seconds_to_srt_time(duration)
+            start_time = cls.seconds_to_srt_time(0)
+            end_time = cls.seconds_to_srt_time(duration)
             # zm = '致敬伟大的教员,为整个民族\n感谢老人家历史向一代伟人'
-            payload = f"file '{new_video_path}'\n".encode()
-            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
-            self.stub.PutFile(request)
-            # with open(single_video_txt, 'w') as f:
-            #     f.write(f"file '{new_video_path}'\n")
-            payload = f"1\n{start_time} --> {end_time}\n{new_text}\n\n".encode()
-            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
-            self.stub.PutFile(request)
-            # with open(single_video_srt, 'w') as f:
-            #     f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
+            with open(single_video_txt, 'w') as f:
+                f.write(f"file '{new_video_path}'\n")
+            with open(single_video_srt, 'w') as f:
+                f.write(f"1\n{start_time} --> {end_time}\n{new_text}\n\n")
             subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=12,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=225'"
-            request = task_pb2.RunCommandRequest(id=self.container_id,
-                                                 command=[
-                                                     "ffmpeg",
-                                                     "-f", "concat",
-                                                     "-safe", "0",
-                                                     "-i", single_video_txt,
-                                                     "-c:v", "h264_nvenc",
-                                                     "-c:a", "aac",
-                                                     "-vf", f"\"{subtitle_cmd}\"",
-                                                     "-y",
-                                                     "-cq", "28",
-                                                     "-preset", "slow",
-                                                     single_video
-                                                 ])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #     "ffmpeg",
-            #     "-f", "concat",
-            #     "-safe", "0",
-            #     "-i", single_video_txt,
-            #     "-c:v", "libx264",
-            #     "-c:a", "aac",
-            #     "-vf", draw,
-            #     "-y",
-            #     single_video
-            # ],timeout=500)
+            draw = f"{subtitle_cmd}"
+            cls.asyncio_run_subprocess([
+                "ffmpeg",
+                "-f", "concat",
+                "-safe", "0",
+                "-i", single_video_txt,
+                "-c:v", "libx264",
+                "-c:a", "aac",
+                "-vf", draw,
+                "-y",
+                single_video
+            ],timeout=500)
             # subprocess.run(ffmpeg_cmd)
             return single_video
         except Exception as e:
             return single_video
 
-    def get_duration(self, file_path):
-        """获取mp3时长"""
-        # audio = MP3(file_path)
-        # duration = audio.info.length
-        # if duration:
-        #     return int(duration)
-        request = task_pb2.RunCommandRequest(id=self.container_id, command=['ffprobe', '-show_format', '-show_streams', '-of', 'json', '-v', 'quiet', '-hide_banner', file_path])
-        msg = ''
-        for response in self.stub.RunCommand(request):
-            msg += response.msg
-        obj = orjson.loads(msg)
-        return int(float(obj['format']['duration']))
-
-    def pw_video(self, jpg_path, file_path, pw_mp3_path, pw_srt):
+    """获取mp3时长"""
+    @classmethod
+    def get_mp3_duration(cls, file_path):
+        audio = MP3(file_path)
+        duration = audio.info.length
+        if duration:
+            return int(duration)
+        return 0
+
+
+    """
+     生成片尾视频
+    """
+    @classmethod
+    def pw_video(cls, jpg_path, file_path, pw_mp3_path, pw_srt):
+        # 添加音频到图片
         """
-        生成片尾视频
-
         jpg_url 图片地址
         pw_video 提供的片尾视频
         pw_duration  提供的片尾视频时长
@@ -379,16 +312,12 @@ class FFmpeg(object):
         pw_url 生成视频地址
         :return:
         """
-        # 添加音频到图片
         pw_srt_path = file_path +'pw_video.srt'
-        payload = pw_srt.encode()
-        request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_srt_path)
-        self.stub.PutFile(request=request)
-        # with open(pw_srt_path, 'w') as f:
-        #     f.write(pw_srt)
+        with open(pw_srt_path, 'w') as f:
+            f.write(pw_srt)
         pw_url_path = file_path + 'pw_video.mp4'
         try:
-            pw_duration = self.get_duration(pw_mp3_path)
+            pw_duration = cls.get_mp3_duration(pw_mp3_path)
             if pw_duration == 0:
                 return pw_url_path
             time.sleep(2)
@@ -400,182 +329,117 @@ class FFmpeg(object):
             background_cmd = f"drawbox=y=(ih-{int(360)}/2-{bg_position_offset}):color=yellow@1.0:width=iw:height={int(360)}/4:t=fill"
             if "mp4" in jpg_path:
                 pw_path_txt = file_path + 'pw_path_video.txt'
-                payload = f"file '{jpg_path}'\n".encode()
-                request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=pw_path_txt)
-                self.stub.PutFile(request=request)
-                # with open(pw_path_txt, 'w') as f:
-                #     f.write(f"file '{jpg_path}'\n")
-                request = task_pb2.RunCommandRequest(id=self.container_id,
-                                                     command=[
-                                                         "ffmpeg",
-                                                         "-f", "concat",
-                                                         "-safe", "0",
-                                                         "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-                                                         "-i", pw_mp3_path,  # 音频文件
-                                                         "-c:v", "h264_nvenc",  # 视频编码格式
-                                                         "-t", str(pw_duration),  # 输出视频的持续时间
-                                                         "-c:a", "aac",  # 音频编码格式
-                                                         "-b:v", "260k",  # 视频比特率
-                                                         "-b:a", "96k",  # 音频比特率
-                                                         "-threads", "2",  # 线程数
-                                                         "-vf", f"\"{background_cmd},{subtitle_cmd}\"",  # 视频过滤器(背景和字幕)
-                                                         "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
-                                                         "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
-                                                         "-y",  # 强制覆盖输出文件
-                                                         "-cq", "28",
-                                                         "-preset", "slow",
-                                                         pw_url_path  # 输出文件路径
-                                                     ])
-                for _ in self.stub.RunCommand(request=request):
-                    pass
-                # self.asyncio_run_subprocess([
-                #     "ffmpeg",
-                #     "-f", "concat",
-                #     "-safe", "0",
-                #     "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
-                #     "-i", pw_mp3_path,  # 音频文件
-                #     "-c:v", "libx264",  # 视频编码格式
-                #     "-t", str(pw_duration),  # 输出视频的持续时间
-                #     "-c:a", "aac",  # 音频编码格式
-                #     "-b:v", "260k",  # 视频比特率
-                #     "-b:a", "96k",  # 音频比特率
-                #     "-threads", "2",  # 线程数
-                #     "-vf", f"{background_cmd},{subtitle_cmd}",  # 视频过滤器(背景和字幕)
-                #     "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
-                #     "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
-                #     "-y",  # 强制覆盖输出文件
-                #     pw_url_path  # 输出文件路径
-                # ], timeout=500)
+                with open(pw_path_txt, 'w') as f:
+                    f.write(f"file '{jpg_path}'\n")
+                cls.asyncio_run_subprocess([
+                    "ffmpeg",
+                    "-f", "concat",
+                    "-safe", "0",
+                    "-i", f"{pw_path_txt}",  # 视频序列输入的文本文件
+                    "-i", pw_mp3_path,  # 音频文件
+                    "-c:v", "libx264",  # 视频编码格式
+                    "-t", str(pw_duration),  # 输出视频的持续时间
+                    "-c:a", "aac",  # 音频编码格式
+                    "-b:v", "260k",  # 视频比特率
+                    "-b:a", "96k",  # 音频比特率
+                    "-threads", "2",  # 线程数
+                    "-vf", f"{background_cmd},{subtitle_cmd}",  # 视频过滤器(背景和字幕)
+                    "-map", "0:v:0",  # 映射视频流来自第一个输入文件(视频)
+                    "-map", "1:a:0",  # 映射音频流来自第二个输入文件(音频)
+                    "-y",  # 强制覆盖输出文件
+                    pw_url_path  # 输出文件路径
+                ], timeout=500)
             else:
-                request = task_pb2.RunCommandRequest(id=self.container_id,
-                                                     command=[
-                                                         'ffmpeg',
-                                                         '-loop', '1',
-                                                         '-i', jpg_path,  # 输入的图片文件
-                                                         '-i', pw_mp3_path,  # 输入的音频文件
-                                                         '-c:v', 'h264_nvenc',  # 视频编码格式
-                                                         '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
-                                                         '-pix_fmt', 'yuv420p',  # 像素格式
-                                                         '-c:a', 'aac',  # 音频编码格式
-                                                         '-strict', 'experimental',  # 使用实验性编码器
-                                                         '-shortest',  # 确保输出视频的长度与音频一致
-                                                         '-vf', f"\"{background_cmd},{subtitle_cmd}\"",  # 视频过滤器,设置分辨率和其他过滤器
-                                                         "-cq", "28",
-                                                         "-preset", "slow",
-                                                         pw_url_path  # 输出的视频文件路径
-                                                     ])
-                for _ in self.stub.RunCommand(request=request):
-                    pass
-                # self.asyncio_run_subprocess([
-                #     'ffmpeg',
-                #     '-loop', '1',
-                #     '-i', jpg_path,  # 输入的图片文件
-                #     '-i', pw_mp3_path,  # 输入的音频文件
-                #     '-c:v', 'libx264',  # 视频编码格式
-                #     '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
-                #     '-pix_fmt', 'yuv420p',  # 像素格式
-                #     '-c:a', 'aac',  # 音频编码格式
-                #     '-strict', 'experimental',  # 使用实验性编码器
-                #     '-shortest',  # 确保输出视频的长度与音频一致
-                #     '-vf', f"{background_cmd},{subtitle_cmd}",  # 视频过滤器,设置分辨率和其他过滤器
-                #     pw_url_path  # 输出的视频文件路径
-                # ], timeout=500)
-            # if os.path.exists(pw_srt_path):
-            #     os.remove(pw_srt_path)
+                cls.asyncio_run_subprocess([
+                    'ffmpeg',
+                    '-loop', '1',
+                    '-i', jpg_path,  # 输入的图片文件
+                    '-i', pw_mp3_path,  # 输入的音频文件
+                    '-c:v', 'libx264',  # 视频编码格式
+                    '-t', str(pw_duration),  # 输出视频的持续时间,与音频持续时间相同
+                    '-pix_fmt', 'yuv420p',  # 像素格式
+                    '-c:a', 'aac',  # 音频编码格式
+                    '-strict', 'experimental',  # 使用实验性编码器
+                    '-shortest',  # 确保输出视频的长度与音频一致
+                    '-vf', f"{background_cmd},{subtitle_cmd}",  # 视频过滤器,设置分辨率和其他过滤器
+                    pw_url_path  # 输出的视频文件路径
+                ], timeout=500)
+            if os.path.exists(pw_srt_path):
+                os.remove(pw_srt_path)
             return pw_url_path
         except Exception as e:
             return pw_url_path
 
-    def single_video(self, video_path, file_path, zm):
-        """
-        单个视频拼接
-        """
+
+    """
+    单个视频拼接
+    """
+    @classmethod
+    def single_video(cls, video_path, file_path, zm):
         single_video_url = file_path + 'single_video.mp4'
         single_video_srt = file_path + 'single_video.srt'
         # 获取时长
         try:
-            duration = self.get_duration(video_path)
+            duration = cls.get_video_duration(video_path)
             if duration == 0:
                 return single_video_url
-            start_time = self.seconds_to_srt_time(2)
-            end_time = self.seconds_to_srt_time(duration)
+            start_time = cls.seconds_to_srt_time(2)
+            end_time = cls.seconds_to_srt_time(duration)
             single_video_txt = file_path + 'single_video.txt'
-            # with open(single_video_txt, 'w') as f:
-            #     f.write(f"file '{video_path}'\n")
-            payload = f"file '{video_path}'\n".encode()
-            request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_txt)
-            self.stub.PutFile(request=request)
+            with open(single_video_txt, 'w') as f:
+                f.write(f"file '{video_path}'\n")
             if zm:
-                payload = f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n".encode()
-                request = task_pb2.PutFileRequest(id=self.container_id, payload=payload, path=single_video_srt)
-                self.stub.PutFile(request=request)
-                # with open(single_video_srt, 'w') as f:
-                #     f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
+                with open(single_video_srt, 'w') as f:
+                    f.write(f"1\n{start_time} --> {end_time}\n<font color=\"red\">\u2764\uFE0F</font>{zm}\n\n")
                 subtitle_cmd = f"subtitles={single_video_srt}:force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
             else:
                 subtitle_cmd = f"force_style='Fontsize=14,Fontname=wqy-zenhei,Outline=2,PrimaryColour=&H00FFFF,SecondaryColour=&H000000,Bold=1,MarginV=20'"
             # 多线程数
             num_threads = 5
             # 构建 FFmpeg 命令,生成视频
-            request = task_pb2.RunCommandRequest(id=self.container_id,
-                                                 command=[
-                                                     "ffmpeg",
-                                                     "-f", "concat",
-                                                     "-safe", "0",
-                                                     "-i",  f"{single_video_txt}",
-                                                     "-c:v", "h264_nvenc",
-                                                     "-c:a", "aac",
-                                                     '-b:v', '260k',
-                                                     "-b:a", "96k",
-                                                     "-threads", str(num_threads),
-                                                     "-vf", f"\"{subtitle_cmd}\"",
-                                                     "-y",
-                                                     "-cq", "28",
-                                                     "-preset", "slow",
-                                                      single_video_url
-                                                 ])
-            for _ in self.stub.RunCommand(request=request):
-                pass
-            # self.asyncio_run_subprocess([
-            #         "ffmpeg",
-            #         "-f", "concat",
-            #         "-safe", "0",
-            #         "-i",  f"{single_video_txt}",
-            #         "-c:v", "libx264",
-            #         "-c:a", "aac",
-            #         '-b:v', '260k',
-            #         "-b:a", "96k",
-            #         "-threads", str(num_threads),
-            #         "-vf", subtitle_cmd,
-            #         "-y",
-            #          single_video_url
-            # ], timeout=400)
-            # if os.path.exists(single_video_srt):
-            #     os.remove(single_video_srt)
+            cls.asyncio_run_subprocess([
+                    "ffmpeg",
+                    "-f", "concat",
+                    "-safe", "0",
+                    "-i",  f"{single_video_txt}",
+                    "-c:v", "libx264",
+                    "-c:a", "aac",
+                    '-b:v', '260k',
+                    "-b:a", "96k",
+                    "-threads", str(num_threads),
+                    "-vf", subtitle_cmd,
+                    "-y",
+                     single_video_url
+            ], timeout=400)
+            if os.path.exists(single_video_srt):
+                os.remove(single_video_srt)
             return single_video_url
         except Exception as e:
             return single_video_url
 
-    # def asyncio_run_subprocess(self, params: List[str], timeout: int = 30) -> str:
-    #     async def run_subprocess():
-    #         process = await asyncio.create_subprocess_exec(
-    #             params[0],
-    #             *params[1:],
-    #             stdout=asyncio.subprocess.PIPE,
-    #             stderr=asyncio.subprocess.PIPE,
-    #         )
-    #         try:
-    #             out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
-    #             if process.returncode != 0:
-    #                 raise IOError(err)
-    #             return out.decode()
-    #         except asyncio.TimeoutError:
-    #             process.kill()
-    #             out, err = await process.communicate()
-    #             raise IOError(err)
-    #     return asyncio.run(run_subprocess())
-
-    def get_http_duration(self, videos_path):
+    @classmethod
+    def asyncio_run_subprocess(cls, params: List[str], timeout: int = 30) -> str:
+        async def run_subprocess():
+            process = await asyncio.create_subprocess_exec(
+                params[0],
+                *params[1:],
+                stdout=asyncio.subprocess.PIPE,
+                stderr=asyncio.subprocess.PIPE,
+            )
+            try:
+                out, err = await asyncio.wait_for(process.communicate(), timeout=timeout)
+                if process.returncode != 0:
+                    raise IOError(err)
+                return out.decode()
+            except asyncio.TimeoutError:
+                process.kill()
+                out, err = await process.communicate()
+                raise IOError(err)
+        return asyncio.run(run_subprocess())
+
+
+    @classmethod
+    def get_http_duration(cls, videos_path):
         total_duration = 0
         for video_path in videos_path:
             url = "http://101.37.24.17:5555/api/v1/ffmpeg/get_meta"
@@ -682,3 +546,5 @@ if __name__ == '__main__':
 00:00:32,659 --> 00:00:32,860
 吧"""
     FFmpeg.pw_video(jpg_path, file_path, mp3_path, pw_srt)
+
+

+ 1 - 1
utils/google_ai_studio.py

@@ -10,7 +10,7 @@ from google.generativeai.types import (HarmBlockThreshold, HarmCategory)
 from loguru import logger
 
 
-CACHE_DIR = '/tmp/'
+CACHE_DIR = '/app/cache/'
 # CACHE_DIR = '/Users/z/Downloads/'
 # PROXY_ADDR = 'http://localhost:1081'
 # os.environ['http_proxy'] = PROXY_ADDR

+ 5 - 5
utils/tts_help.py

@@ -55,11 +55,11 @@ class TTS:
             response = requests.request("GET", pw_url, headers=headers, data=payload, timeout= 30)
             if response.status_code == 200:
                 # 以二进制写入模式打开文件
-                # with open(f"{pw_mp3_path}", "wb") as file:
-                #     # 将响应内容写入文件
-                #     file.write(response.content)
-                return response.content, pw_mp3_path
-        return None, pw_mp3_path
+                with open(f"{pw_mp3_path}", "wb") as file:
+                    # 将响应内容写入文件
+                    file.write(response.content)
+                return pw_mp3_path
+        return None
 
     @classmethod
     def get_srt_format(cls, pw_srt_text, pw_url_sec):

+ 33 - 67
workers/consumption_work.py

@@ -6,17 +6,11 @@ import sys
 import time
 import uuid
 from datetime import datetime
-from typing import Optional
-
-import grpc
 import orjson
 from apscheduler.schedulers.blocking import BlockingScheduler
 from apscheduler.triggers.interval import IntervalTrigger
-from google.protobuf.empty_pb2 import Empty
 from loguru import logger
-
 sys.path.append('/app')
-from protos import task_pb2, task_pb2_grpc
 from utils.redis import RedisHelper
 from utils.aliyun_log import AliyunLogger
 from utils.aliyun_oss import Oss
@@ -32,11 +26,13 @@ from utils.tag_video import Tag
 from utils.tts_help import TTS
 from utils.google_ai_studio import GoogleAI
 
-channel = grpc.insecure_channel(os.getenv('FFMPEG_HOST'))
-stub = task_pb2_grpc.TaskRunnerStub(channel=channel)
-CACHE_DIR = '/tmp/'
 
 
+
+
+
+CACHE_DIR = '/app/cache/'
+# CACHE_DIR = '/Users/z/Downloads/'
 class ConsumptionRecommend(object):
     @classmethod
     def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark):
@@ -131,9 +127,7 @@ class ConsumptionRecommend(object):
         return
 
     @classmethod
-    def data_handle(cls, data, file_path, redis_name,studio_key, container_id):
-        ffmpeg = FFmpeg(container_id=container_id, stub=stub)
-        
+    def data_handle(cls, data, file_path, redis_name,studio_key):
         url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具")
         if url == "重新处理" or not url:
             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -163,11 +157,8 @@ class ConsumptionRecommend(object):
         AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"], "符合规则等待改造",
                              "2004", str(data))
         logger.info(f"[处理] {url}开始下载视频")
-        video_content, video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
-        oss_info = Oss.stitching_sync_upload_bytes_to_oss(video_content, video_id)
-        request = task_pb2.PutFileRequest(id=container_id, oss_object_key=oss_info['oss_object_key'], path=video_path)
-        response = stub.PutFile(request=request)
-        if len(video_content) == 0 or not response.success:
+        video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
+        if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
             logger.error(f"[处理] {url}下载失败")
             AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
@@ -214,14 +205,12 @@ class ConsumptionRecommend(object):
                           "搬运工具")
         if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
             try:
-                width, height = ffmpeg.get_w_h_size(video_path)
+                width, height = FFmpeg.get_w_h_size(video_path)
                 if width < height:  # 判断是否需要修改为竖屏
-                    video_path = ffmpeg.update_video_h_w(video_path, file_path)
+                    video_path = FFmpeg.update_video_h_w(video_path, file_path)
                 logger.info(f"[处理] 视频更改分辨率处理")
-                video_path = ffmpeg.video_640(video_path, file_path)
-                request = task_pb2.FileExistsRequest(id=container_id, path=video_path)
-                response = stub.FileExists(request=request)
-                if not response.exists:
+                video_path = FFmpeg.video_640(video_path, file_path)
+                if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 视频更改分辨率失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
@@ -237,15 +226,11 @@ class ConsumptionRecommend(object):
                     return
                 logger.info(f"[处理] 视频更改分辨率处理成功")
                 if data["video_clipping"]:  # 判断是否需要裁剪
-                    video_path = ffmpeg.video_crop(video_path, file_path)
+                    video_path = FFmpeg.video_crop(video_path, file_path)
                 if data["video_clipping_time"]:  # 判断是否需要指定视频时长
-                    video_path = ffmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
+                    video_path = FFmpeg.video_ggduration(video_path, file_path, data["video_clipping_time"])
                 if data['trailer_share'] == "内容分析":
-                    request = task_pb2.GetFileRequest(id=container_id, path=video_path)
-                    response = stub.GetFile(request)
-                    Oss.download_video_oss(Oss.generate_url(response.oss_object_key), video_path, add_suffix=False)
                     video_text = GoogleAI.run(studio_key, video_path)
-                    os.remove(video_path)
                     if not video_text:
                         logger.error(f"[处理] 视频内容分析获取内容信息失败")
                         data["transform_rule"] = "仅改造"
@@ -308,7 +293,7 @@ class ConsumptionRecommend(object):
                                       "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                       "【 搬运&改造效率工具失败通知 】")
                     return
-                pw_mp3_content, pw_mp3_path = TTS.download_mp3(pw_url, file_path)
+                pw_mp3_path = TTS.download_mp3(pw_url, file_path)
                 if not pw_mp3_path:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -324,8 +309,6 @@ class ConsumptionRecommend(object):
                                       "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                       "【 搬运&改造效率工具失败通知 】")
                     return
-                request = task_pb2.PutFileRequest(id=container_id, payload=pw_mp3_content, path=pw_mp3_path)
-                stub.PutFile(request=request)
                 logger.info(f"[处理] 数据片尾音频下载成功")
 
                 if str(data['trailer_share_video']) and str(data['trailer_share_video'] ) != "None":
@@ -347,10 +330,10 @@ class ConsumptionRecommend(object):
                                           "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                           "【 搬运&改造效率工具失败通知 】")
                         return
-                    pw_url_duration = ffmpeg.get_http_duration([pw_url])
-                    pw_videos_duration = ffmpeg.get_http_duration(rg_pw_url_list)
+                    pw_url_duration = FFmpeg.get_http_duration([pw_url])
+                    pw_videos_duration = FFmpeg.get_http_duration(rg_pw_url_list)
                     if pw_videos_duration < pw_url_duration:
-                        jpg_path = ffmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
+                        jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
                         if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
                             data["transform_rule"] = "仅改造"
                             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -369,7 +352,7 @@ class ConsumptionRecommend(object):
                         logger.info(f"[处理] 数据片尾获取最后一帧成功")
                     else:
                         rg_pw_url = DownLoad.download_pq_video(file_path, rg_pw_url_list)
-                        rg_pw_list = ffmpeg.concatenate_videos(rg_pw_url, file_path)
+                        rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path)
                         if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0:
                             data["transform_rule"] = "仅改造"
                             RedisHelper().get_client().rpush(redis_name, json.dumps(data))
@@ -385,14 +368,12 @@ class ConsumptionRecommend(object):
                                               "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                               "【 搬运&改造效率工具失败通知 】")
                             return
-                        jpg_path = ffmpeg.video_640(rg_pw_list, file_path)
+                        jpg_path = FFmpeg.video_640(rg_pw_list, file_path)
                         logger.info(f"[处理] 生成人工片尾成功")
 
                 else:
-                    jpg_path = ffmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
-                    request = task_pb2.FileExistsRequest(id=container_id, path=jpg_path)
-                    response = stub.FileExists(request=request)
-                    if not response.exists:
+                    jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
+                    if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
                         data["transform_rule"] = "仅改造"
                         RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                         logger.error(f"[处理] 数据片尾获取最后一帧失败")
@@ -408,10 +389,8 @@ class ConsumptionRecommend(object):
                                           "【 搬运&改造效率工具失败通知 】")
                         return
                     logger.info(f"[处理] 数据片尾获取最后一帧成功")
-                pw_path = ffmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
-                request = task_pb2.FileExistsRequest(id=container_id, path=pw_path)
-                response = stub.FileExists(request=request)
-                if not response.exists:
+                pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
+                if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 数据片尾拼接失败")
@@ -435,16 +414,14 @@ class ConsumptionRecommend(object):
                         logger.info(f"[处理] 获取bgm")
                         rg_bgm_list = PQ.get_pq_oss([trailer_share_bgm])
                         rg_bgm_url = DownLoad.download_pq_video(file_path, rg_bgm_list)
-                        bgm_mp3_path = ffmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
-                        pw_path = ffmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
+                        bgm_mp3_path = FFmpeg.get_pw_video_mp3(file_path, rg_bgm_url[0])
+                        pw_path = FFmpeg.video_add_bgm(pw_path, bgm_mp3_path, file_path)
                         logger.info(f"[处理] 片尾bgm添加成功")
                     except Exception as e:
                         logger.error(f"[处理] 片尾bgm添加失败")
-                video_path = ffmpeg.h_b_video(video_path, pw_path, file_path)
-                video_path = ffmpeg.single_video(video_path, file_path, data["video_share"])
-                request = task_pb2.FileExistsRequest(id=container_id, path=video_path)
-                response = stub.FileExists(request=request)
-                if not response.exists:
+                video_path = FFmpeg.h_b_video(video_path, pw_path, file_path)
+                video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
+                if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
                     data["transform_rule"] = "仅改造"
                     RedisHelper().get_client().rpush(redis_name, json.dumps(data))
                     logger.error(f"[处理] 数据添加片中字幕失败")
@@ -461,10 +438,9 @@ class ConsumptionRecommend(object):
                     return
                 logger.info(f"[处理] 数据添加片中字幕成功")
                 logger.info(f"[处理] 数据开始发送oss")
-                request = task_pb2.GetFileRequest(id=container_id, path=video_path)
-                response = stub.GetFile(request=request)
-                oss_object_key = response.oss_object_key
+                oss_object_key = Oss.stitching_sync_upload_oss(video_path, str(uuid.uuid4()))  # 视频发送OSS
                 logger.info(f"[处理] 数据发送oss成功")
+                oss_object_key = oss_object_key.get("oss_object_key")
 
                 tags = ','.join(filter(None, [
                     data['pq_label'],
@@ -507,15 +483,8 @@ class ConsumptionRecommend(object):
             logger.info('[处理] 无待执行的扫描任务')
             return
         data = orjson.loads(data)
-        container_id: Optional[str] = None
         try:
-            response = stub.StartContainer(request=Empty())
-            container_id = response.id
-            if not container_id:
-                logger.error('启动容器失败, 程序退出')
-                return
-
-            cls.data_handle(data, file_path, redis_name,studio_key, container_id)
+            cls.data_handle(data, file_path, redis_name,studio_key)
             for filename in os.listdir(CACHE_DIR):
                 # 检查文件名是否包含关键字
                 if uid in filename:
@@ -540,10 +509,7 @@ class ConsumptionRecommend(object):
                     except Exception as e:
                         logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
             return
-        finally:
-            if container_id:
-                request = task_pb2.StopContainerRequest(id=container_id)
-                stub.StopContainer(request)
+
 
 
 def run():

+ 1 - 1
workers/consumption_work_studio.py

@@ -27,7 +27,7 @@ from utils.tts_help import TTS
 
 
 
-CACHE_DIR = '/tmp/'
+CACHE_DIR = '/app/cache/'
 # CACHE_DIR = '/Users/z/Downloads/'
 
 class ConsumptionRecommend(object):