Bläddra i källkod

修改调度策略为立即改并发布,到点再开放可见

kevin.yang 2 månader sedan
förälder
incheckning
32a041d3e8
7 ändrade filer med 410 tillägg och 41 borttagningar
  1. 10 0
      .dockerignore
  2. 318 0
      .gitignore
  3. 0 2
      product.env
  4. 3 3
      utils/feishu_form.py
  5. 43 4
      utils/piaoquan.py
  6. 35 31
      workers/consumption_work.py
  7. 1 1
      workers/select_work.py

+ 10 - 0
.dockerignore

@@ -0,0 +1,10 @@
+.git/
+.idea/
+.vscode/
+__pycache__/
+cache/
+*.log
+*.jpg
+*.png
+*.gif
+*.webp

+ 318 - 0
.gitignore

@@ -0,0 +1,318 @@
+# Created by https://www.toptal.com/developers/gitignore/api/python,node
+# Edit at https://www.toptal.com/developers/gitignore?templates=python,node
+
+### Node ###
+# Logs
+logs
+*.log
+npm-debug.log*
+yarn-debug.log*
+yarn-error.log*
+lerna-debug.log*
+.pnpm-debug.log*
+
+# Diagnostic reports (https://nodejs.org/api/report.html)
+report.[0-9]*.[0-9]*.[0-9]*.[0-9]*.json
+
+# Runtime data
+pids
+*.pid
+*.seed
+*.pid.lock
+
+# Directory for instrumented libs generated by jscoverage/JSCover
+lib-cov
+
+# Coverage directory used by tools like istanbul
+coverage
+*.lcov
+
+# nyc test coverage
+.nyc_output
+
+# Grunt intermediate storage (https://gruntjs.com/creating-plugins#storing-task-files)
+.grunt
+
+# Bower dependency directory (https://bower.io/)
+bower_components
+
+# node-waf configuration
+.lock-wscript
+
+# Compiled binary addons (https://nodejs.org/api/addons.html)
+build/Release
+
+# Dependency directories
+node_modules/
+jspm_packages/
+
+# Snowpack dependency directory (https://snowpack.dev/)
+web_modules/
+
+# TypeScript cache
+*.tsbuildinfo
+
+# Optional npm cache directory
+.npm
+
+# Optional eslint cache
+.eslintcache
+
+# Optional stylelint cache
+.stylelintcache
+
+# Microbundle cache
+.rpt2_cache/
+.rts2_cache_cjs/
+.rts2_cache_es/
+.rts2_cache_umd/
+
+# Optional REPL history
+.node_repl_history
+
+# Output of 'npm pack'
+*.tgz
+
+# Yarn Integrity file
+.yarn-integrity
+
+# dotenv environment variable files
+.env
+.env.development.local
+.env.test.local
+.env.production.local
+.env.local
+
+# parcel-bundler cache (https://parceljs.org/)
+.cache
+.parcel-cache
+
+# Next.js build output
+.next
+out
+
+# Nuxt.js build / generate output
+.nuxt
+dist
+
+# Gatsby files
+.cache/
+# Comment in the public line in if your project uses Gatsby and not Next.js
+# https://nextjs.org/blog/next-9-1#public-directory-support
+# public
+
+# vuepress build output
+.vuepress/dist
+
+# vuepress v2.x temp and cache directory
+.temp
+
+# Docusaurus cache and generated files
+.docusaurus
+
+# Serverless directories
+.serverless/
+
+# FuseBox cache
+.fusebox/
+
+# DynamoDB Local files
+.dynamodb/
+
+# TernJS port file
+.tern-port
+
+# Stores VSCode versions used for testing VSCode extensions
+.vscode-test
+
+# yarn v2
+.yarn/cache
+.yarn/unplugged
+.yarn/build-state.yml
+.yarn/install-state.gz
+.pnp.*
+
+### Node Patch ###
+# Serverless Webpack directories
+.webpack/
+
+# Optional stylelint cache
+
+# SvelteKit build / generate output
+.svelte-kit
+
+### Python ###
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# poetry
+#   Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
+#   This is especially recommended for binary packages to ensure reproducibility, and is more
+#   commonly ignored for libraries.
+#   https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
+#poetry.lock
+
+# pdm
+#   Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
+#pdm.lock
+#   pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
+#   in version control.
+#   https://pdm.fming.dev/#use-with-ide
+.pdm.toml
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+# PyCharm
+#  JetBrains specific template is maintained in a separate JetBrains.gitignore that can
+#  be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
+#  and can be added to the global gitignore or merged into this file.  For a more nuclear
+#  option (not recommended) you can uncomment the following to ignore the entire idea folder.
+.idea/
+
+# VScode
+.vscode/
+
+### Python Patch ###
+# Poetry local configuration file - https://python-poetry.org/docs/configuration/#local-configuration
+poetry.toml
+
+# ruff
+.ruff_cache/
+
+# LSP config files
+pyrightconfig.json
+
+# End of https://www.toptal.com/developers/gitignore/api/python,node
+
+.DS_Store

+ 0 - 2
product.env

@@ -10,5 +10,3 @@ FS_DATA_7=周仙琴,2WIcBU,task:carry_data_redis_zxq,AIzaSyD46rphXd-Ie51sQiQ61lr
 FS_DATA_8=信欣,v0fFCb,task:carry_data_redis_xx,AIzaSyB16JcJHwoz-YcvEUO96Dm1n0zf89GOdms
 FS_DATA_9=邓锋,DEpi6V,task:carry_data_redis_df,AIzaSyCWilmMZyG4xW_pujpRGflaa7SIBjLQHiI
 FS_DATA_10=王知微,jrpuyW,task:carry_data_redis_wzw,AIzaSyCx3hy5ef8wOVPNjvK1MIAwyZZCdYuRh-U
-
-

+ 3 - 3
utils/feishu_form.py

@@ -15,13 +15,13 @@ class Material():
     @classmethod
     def get_carry_data(cls, dt, FS_SHEET,NAME):
         data = Feishu.get_values_batch( "Wj0TsRKc0hZrHQtmtg4cZZIwn0c", FS_SHEET )
-        processed_list = []
+        processed_list = dict()
         try:
             for row in data[2:]:
                 activate_data = row[4]  # 启动日期
                 if not activate_data:
                     continue
-                if int(activate_data) != int(dt):
+                if int(activate_data) <= int(dt):
                     continue
                 channel_mark = row[0]
                 pq_ids = row[2]
@@ -65,7 +65,7 @@ class Material():
                     "trailer_share_bgm": str(trailer_share_bgm),
                     "dt":dt
                 }
-                processed_list.append(json.dumps(number_dict, ensure_ascii=False))
+                processed_list.update({json.dumps(number_dict, ensure_ascii=False): int(dt)})
             return  processed_list
         except:
             return processed_list

+ 43 - 4
utils/piaoquan.py

@@ -12,7 +12,7 @@ class PQ:
     新生成视频上传到对应账号下
     """
     @classmethod
-    def insert_piaoquantv(cls, new_video_path, new_title, n_id, cover):
+    def insert_piaoquantv(cls, new_video_path, new_title, n_id, cover, can_search: bool = True):
         url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send?muid=999"
         headers = {
             'User-Agent': 'PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0',
@@ -47,9 +47,11 @@ class PQ:
         data = response.json()
         code = data["code"]
         if code == 0:
-            new_video_id = data["data"]["id"]
-            return new_video_id
-        return None
+            new_video_id, new_user_id = data["data"]["id"], data["data"]["user"]["uid"]
+            if not can_search:
+                cls.change_recommend_status(pq_vid=str(new_video_id), pq_uid=str(new_user_id), can_search=can_search)
+            return new_video_id, new_user_id
+        return None, None
 
     @classmethod
     def get_pq_oss_path(cls, video_id):
@@ -107,6 +109,43 @@ class PQ:
                 continue
         return url_list
 
+    @classmethod
+    def change_recommend_status(cls, pq_vid: str, pq_uid: str, can_search: bool = False):
+        # 小程序推荐状态
+        url = "https://admin.piaoquantv.com/manager/video/updateVideoRecommendStatus"
+        headers = {
+            'accept': 'application/json, text/plain, */*',
+            'accept-language': 'zh-CN,zh;q=0.9',
+            'cache-control': 'no-cache',
+            'cookie': 'SESSION=M2FlYTM0MTctN2I2MS00NzFlLWEzZjItZjE4YzVhNmZjYTM3',
+            'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 '
+                          '(KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36',
+        }
+        query = {
+            'id': pq_vid,
+            'uid': pq_uid,
+            'recommendStatus': '-7' if can_search else '0',
+            'muid': '999',
+        }
+        requests.request("GET", url, headers=headers, params=query, timeout=30)
+
+        url = "https://admin.piaoquantv.com/manager/video/measure/delete"
+        query = {
+            'videoId': pq_vid,
+            'muid': '999',
+        }
+        requests.request("GET", url, headers=headers, params=query, timeout=30)
+
+        # App推荐状态
+        url = "https://admin.piaoquantv.com/manager/video/updateAppVideoRecommendStatus"
+        query = {
+            'id': pq_vid,
+            'uid': pq_uid,
+            'appRecommendStatus': '-7' if can_search else '0',
+            'muid': '999',
+        }
+        requests.request("GET", url, headers=headers, params=query, timeout=30)
+
 
 if __name__ == '__main__':
     rg_pw = "47969744,47969804,47969813,47969815,47969816"

+ 35 - 31
workers/consumption_work.py

@@ -35,7 +35,7 @@ CACHE_DIR = '/app/cache/'
 # CACHE_DIR = '/Users/z/Downloads/'
 class ConsumptionRecommend(object):
     @classmethod
-    def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark):
+    def insert_pq(cls, data, oss_object_key, title, tags, tag_transport_channel, channel_mark, task_mark, dt):
         logger.info(f"[+] 开始写入票圈")
         n_ids = str(data["pq_ids"])
         if ',' in n_ids:
@@ -44,39 +44,43 @@ class ConsumptionRecommend(object):
             n_id_list = [n_ids]
         pq_list = []
         for n_id in n_id_list:
-            code = PQ.insert_piaoquantv(oss_object_key, title, n_id, None)
-            if not code:
+            pq_vid, pq_uid = PQ.insert_piaoquantv(oss_object_key, title, n_id, None, can_search=False)
+            if not pq_vid:
                 logger.error(f"[+] 写入票圈后台失败")
                 AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                      "改造失败,写入票圈后台失败", "3001", str(data))
                 text = (
                     f"**负责人**: {data['name']}\n"
                     f"**内容**: {data}\n"
-                    f"**失败信息**: 视频写入票圈后台失败,视频ID{code}\n"
+                    f"**失败信息**: 视频写入票圈后台失败,视频ID{pq_vid}\n"
                 )
                 Feishu.finish_bot(text,
                                   "https://open.feishu.cn/open-apis/bot/v2/hook/65bc5463-dee9-46d0-bc2d-ec6c49a8f3cd",
                                   "【 搬运&改造效率工具失败通知 】")
                 continue
 
-            pq_list.append(code)
-            logger.info(f"[+] 写入票圈成功,返回视频id{code}")
-            tag_status = Tag.video_tag(code, str(tags))
+            pq_list.append(pq_vid)
+            logger.info(f"[+] 写入票圈成功,返回视频id{pq_vid}")
+
+            key = orjson.dumps({'pq_vid': str(pq_vid), 'pq_uid': str(pq_uid)}).decode()
+            RedisHelper().get_client().zadd('task:carry_data_redis_finish', {key: dt})
+
+            tag_status = Tag.video_tag(pq_vid, str(tags))
             if tag_status == 0:
-                logger.info(f"[+] 写入标签成功,后台视频ID为{code}")
+                logger.info(f"[+] 写入标签成功,后台视频ID为{pq_vid}")
             try:
                 current_time = datetime.now()
                 formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
                 sqlCollect.insert_machine_making_data(data["name"], task_mark, tag_transport_channel,
                                                       data["video_url"], data["video_url"], data["pq_ids"],
                                                       data["title_category"],
-                                                      code,
+                                                      pq_vid,
                                                       formatted_time, data["title_category"], oss_object_key)
-                pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail'  # 站内视频链接
+                pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{pq_vid}/detail'  # 站内视频链接
 
                 values = [
                     [
-                        str(code),
+                        str(pq_vid),
                         str(n_id),
                         formatted_time,
                         channel_mark,
@@ -127,10 +131,10 @@ class ConsumptionRecommend(object):
         return
 
     @classmethod
-    def data_handle(cls, data, file_path, redis_name,studio_key):
+    def data_handle(cls, data, dt, file_path, redis_name,studio_key):
         url, original_title, video_id, tag_transport_channel = Dy_KS.get_video_url(data, "效率工具")
         if url == "重新处理" or not url:
-            RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+            RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
             text = (
                 f"**负责人**: {data['name']}\n"
                 f"**内容**: {data}\n"
@@ -159,7 +163,7 @@ class ConsumptionRecommend(object):
         logger.info(f"[处理] {url}开始下载视频")
         video_path = DownLoad.download_video(url, file_path, tag_transport_channel, video_id)
         if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
-            RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+            RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
             logger.error(f"[处理] {url}下载失败")
             AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                  "视频下载失败等待重新处理", "3002", str(data))
@@ -202,7 +206,7 @@ class ConsumptionRecommend(object):
                 data['tag']
             ]))
             cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
-                          "搬运工具")
+                          "搬运工具", dt)
         if data["transform_rule"] == "仅改造" or data["transform_rule"] == "是":
             try:
                 width, height = FFmpeg.get_w_h_size(video_path)
@@ -211,7 +215,7 @@ class ConsumptionRecommend(object):
                 logger.info(f"[处理] 视频更改分辨率处理")
                 video_path = FFmpeg.video_640(video_path, file_path)
                 if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     logger.error(f"[处理] 视频更改分辨率失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,片尾拼接失败", "3001", str(data))
@@ -234,7 +238,7 @@ class ConsumptionRecommend(object):
                     if not video_text:
                         logger.error(f"[处理] 视频内容分析获取内容信息失败")
                         data["transform_rule"] = "仅改造"
-                        RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                        RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                         AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                              "改造失败,视频内容分析获取内容信息失败", "3001", str(data))
                         text = (
@@ -264,7 +268,7 @@ class ConsumptionRecommend(object):
                 if not pw_url:
                     logger.error(f"[处理] 数据片尾获取失败")
                     data["transform_rule"] = "仅改造"
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,片尾获取失败", "3001", str(data))
                     text = (
@@ -280,7 +284,7 @@ class ConsumptionRecommend(object):
                 pw_srt = TTS.getSrt(pw_url)
                 if not pw_srt:
                     data["transform_rule"] = "仅改造"
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     logger.error(f"[处理] 数据片尾音频srt获取失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,片尾音频下载失败", "3001", str(data))
@@ -296,7 +300,7 @@ class ConsumptionRecommend(object):
                 pw_mp3_path = TTS.download_mp3(pw_url, file_path)
                 if not pw_mp3_path:
                     data["transform_rule"] = "仅改造"
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     logger.error(f"[处理] 数据片尾音频下载失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,片尾音频下载失败", "3001", str(data))
@@ -336,7 +340,7 @@ class ConsumptionRecommend(object):
                         jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
                         if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
                             data["transform_rule"] = "仅改造"
-                            RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                            RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                             logger.error(f"[处理] 数据片尾获取最后一帧失败")
                             AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                                  "改造失败,获取最后一帧失败", "3001", str(data))
@@ -355,7 +359,7 @@ class ConsumptionRecommend(object):
                         rg_pw_list = FFmpeg.concatenate_videos(rg_pw_url, file_path)
                         if not os.path.exists(rg_pw_list) or os.path.getsize(rg_pw_list) == 0:
                             data["transform_rule"] = "仅改造"
-                            RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                            RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                             logger.error(f"[处理] 数据片尾拼接失败")
                             AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                                  "改造失败,片尾拼接失败", "3001", str(data))
@@ -375,7 +379,7 @@ class ConsumptionRecommend(object):
                     jpg_path = FFmpeg.video_png(video_path, file_path)  # 生成视频最后一帧jpg
                     if not os.path.exists(jpg_path) or os.path.getsize(jpg_path) == 0:
                         data["transform_rule"] = "仅改造"
-                        RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                        RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                         logger.error(f"[处理] 数据片尾获取最后一帧失败")
                         AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                              "改造失败,获取最后一帧失败", "3001", str(data))
@@ -392,7 +396,7 @@ class ConsumptionRecommend(object):
                 pw_path = FFmpeg.pw_video(jpg_path, file_path, pw_mp3_path, pw_srt)  # 生成片尾视频
                 if not os.path.exists(pw_path) or os.path.getsize(pw_path) == 0:
                     data["transform_rule"] = "仅改造"
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     logger.error(f"[处理] 数据片尾拼接失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,片尾拼接失败", "3001", str(data))
@@ -423,7 +427,7 @@ class ConsumptionRecommend(object):
                 video_path = FFmpeg.single_video(video_path, file_path, data["video_share"])
                 if not os.path.exists(video_path) or os.path.getsize(video_path) == 0:
                     data["transform_rule"] = "仅改造"
-                    RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                    RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                     logger.error(f"[处理] 数据添加片中字幕失败")
                     AliyunLogger.logging(data["name"], "效率工具", tag_transport_channel, data["video_url"],
                                          "改造失败,添加片中字幕失败", "3001", str(data))
@@ -452,11 +456,11 @@ class ConsumptionRecommend(object):
                     data['tag']
                 ]))
                 cls.insert_pq(data, oss_object_key, title, tags, tag_transport_channel, channel_mark,
-                              "搬运改造")
+                              "搬运改造", dt)
                 return
             except Exception as e:
                 data["transform_rule"] = "仅改造"
-                RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+                RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
                 logger.error(f"[+] 视频改造失败{e}")
                 text = (
                     f"**负责人**: {data['name']}\n"
@@ -478,13 +482,13 @@ class ConsumptionRecommend(object):
         fs_data_list = fs_data.split(',')
         redis_name = fs_data_list[2]
         studio_key = fs_data_list[3]
-        data = RedisHelper().get_client().rpop(name=redis_name)
+        data = RedisHelper().get_client().zpopmin(name=redis_name, count=1)
         if not data:
             logger.info('[处理] 无待执行的扫描任务')
             return
-        data = orjson.loads(data)
+        data, dt = orjson.loads(data[0][0]), int(data[0][1])
         try:
-            cls.data_handle(data, file_path, redis_name,studio_key)
+            cls.data_handle(data, dt, file_path, redis_name,studio_key)
             for filename in os.listdir(CACHE_DIR):
                 # 检查文件名是否包含关键字
                 if uid in filename:
@@ -497,7 +501,7 @@ class ConsumptionRecommend(object):
                         logger.error(f"删除文件时出错: {file_path}, 错误: {e}")
             return
         except Exception as e:
-            RedisHelper().get_client().rpush(redis_name, json.dumps(data))
+            RedisHelper().get_client().zadd(redis_name, {json.dumps(data, ensure_ascii=False): dt})
             for filename in os.listdir(CACHE_DIR):
                 # 检查文件名是否包含关键字
                 if uid in filename:

+ 1 - 1
workers/select_work.py

@@ -24,7 +24,7 @@ class StartGetRecommend(object):
         if not data:
             logger.info(f"[FS] {name},时区为{dt}没有获取到数据")
             return
-        RedisHelper().get_client().rpush(redis_name, *data)
+        RedisHelper().get_client().zadd(redis_name, mapping=data)
         logger.info(f"[FS] {name},时区为{dt}共获取{len(data)}条,写入成功")
 
 def run():