Bladeren bron

增加单独启动脚本

zhangyong 10 maanden geleden
bovenliggende
commit
dc72fbab09
3 gewijzigde bestanden met toevoegingen van 149 en 177 verwijderingen
  1. 48 51
      agc_bk_main.py
  2. 53 65
      agc_cg_main.py
  3. 48 61
      agc_gs_main.py

+ 48 - 51
agc_bk_main.py

@@ -1,70 +1,67 @@
-import re
-
-from common import Material, Common, Feishu
-from video_agc.agc_video_method import AgcVidoe
+import os
 import concurrent.futures
+
 import schedule
 import time
+import threading
+from common import Material
+from video_agc.agc_video import AGC
 
+# 控制读写速度的参数
+MAX_BPS = 1 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 5  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 512 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
 
-# 记录今天已经返回的用户名
-returned_usernames_today = []
-def video_start(user_data):
-    global returned_usernames_today
-    user_data_mark = user_data["mark"]
-    mark_name = user_data['mark_name']
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
 
-    # 开始准备执行生成视频脚本
-    if user_data_mark is not None and user_data_mark in returned_usernames_today:
-        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
-        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
-        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
-    else:
-        print(f"视频脚本参数{user_data}")
-        mark = AgcVidoe.video_bk_stitching(user_data)
-        print(f"返回用户名{mark}")
-        if mark:
-            returned_usernames_today.append(mark)
-            Common.logger("video").info(f"返回用户名{mark}")
 
 
-# gs_name_list = Material.feishu_bk_list()
-# video_start(gs_name_list[0])
+def controlled_io_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+    if platform == "gs":
+        AGC.video(data, "跟随")
+    elif platform == "cg":
+        AGC.video(data, "常规")
+    elif platform == "bk":
+        AGC.video(data, "爆款")
 
-def clear_returned_usernames():
-    returned_usernames_today.clear()
-    print("returned_usernames_today 已清空")
-
-# 定义定时任务
-def video_task():
-    print("开始执行生成视频脚.")
-    data = Material.feishu_bk_list()
-    # 创建一个线程池
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        futures = {executor.submit(video_start, user_data): user_data for user_data in data}
-        # 设置超时时间为360分钟
-        timeout = 360 * 60
-        # 等待所有任务执行完成或超时
+def video_start(platform):
+    print("开始执行生成视频脚本.")
+    if platform == "cg":
+        data = Material.feishu_list()
+    elif platform == "gs":
+        data = Material.feishu_gs_list()
+    elif platform == "bk":
+        data = Material.feishu_bk_list()
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data}
         for future in concurrent.futures.as_completed(futures):
             try:
-                # 获取每个任务的执行结果
-                result = future.result()
-                print("处理结果:", result)
+                future.result()
+                print("处理结果: 成功")
             except concurrent.futures.TimeoutError:
-                # 如果任务超时,则取消任务
                 print("任务超时,已取消.")
             except Exception as e:
                 print("处理任务时出现异常:", e)
-    print("执行生成视频脚结束")
-
-# 每天12点30清空集合
-schedule.every().day.at("00:10").do(clear_returned_usernames)
+    print("执行生成视频脚本结束.")
 
 
-# 每天00:10执行视频任务
-schedule.every().day.at("00:20").do(video_task)
+schedule.every().day.at("00:20").do(video_start, "bk")
 
 
-while True:
-    schedule.run_pending()
-    time.sleep(1)
+if __name__ == "__main__":
+    # video_start("cg")
+    while True:
+        try:
+            schedule.run_pending()
+        except Exception as e:
+            print("执行调度任务时出现异常:", e)
+        time.sleep(1)

+ 53 - 65
agc_cg_main.py

@@ -1,79 +1,67 @@
-from common import Material, Common
-from video_agc.agc_video_method import AgcVidoe
+import os
 import concurrent.futures
 
 import schedule
 import time
-
-
-# 记录今天已经返回的用户名
-returned_usernames_today = []
-def video_start(user_data):
-    global returned_usernames_today
-    user_data_mark = user_data["mark"]
-    # 开始准备执行生成视频脚本
-    if user_data_mark is not None and user_data_mark in returned_usernames_today:
-        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
-        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
-        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
-    else:
-        print(f"视频脚本参数{user_data}")
-        mark = AgcVidoe.video_stitching(user_data)
-        print(f"返回用户名{mark}")
-        if mark:
-            Common.logger("video").info(f"返回用户名{mark}")
-            returned_usernames_today.append(user_data_mark)
-
-# name_list = Material.feishu_list()
-# video_start(name_list[1])
-
-
-
-def clear_returned_usernames():
-    returned_usernames_today.clear()
-    print("returned_usernames_today 已清空")
-
-# 定义定时任务
-def video_task():
-    print("开始执行生成视频脚.")
-    data = Material.feishu_list()
-    # 创建一个线程池
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        futures = {executor.submit(video_start, user_data): user_data for user_data in data}
-        # 等待所有任务执行完成或超时
+import threading
+from common import Material
+from video_agc.agc_video import AGC
+
+# 控制读写速度的参数
+MAX_BPS = 1 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 5  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 512 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
+
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
+
+
+
+def controlled_io_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+    if platform == "gs":
+        AGC.video(data, "跟随")
+    elif platform == "cg":
+        AGC.video(data, "常规")
+    elif platform == "bk":
+        AGC.video(data, "爆款")
+
+def video_start(platform):
+    print("开始执行生成视频脚本.")
+    if platform == "cg":
+        data = Material.feishu_list()
+    elif platform == "gs":
+        data = Material.feishu_gs_list()
+    elif platform == "bk":
+        data = Material.feishu_bk_list()
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data}
         for future in concurrent.futures.as_completed(futures):
             try:
-                # 获取每个任务的执行结果
-                result = future.result()
-                print("处理结果:", result)
+                future.result()
+                print("处理结果: 成功")
             except concurrent.futures.TimeoutError:
-                # 如果任务超时,则取消任务
                 print("任务超时,已取消.")
             except Exception as e:
                 print("处理任务时出现异常:", e)
-    print("执行生成视频脚结束")
-
-
-# 每天0点10清空集合
-schedule.every().day.at("04:10").do(clear_returned_usernames)
-
-#每10分钟执行次脚本
-schedule.every(10).minutes.do(video_task)
-
-
-
-
-# 每天下午1:30执行任务
+    print("执行生成视频脚本结束.")
 
-while True:
-    try:
-        schedule.run_pending()
-    except Exception as e:
-        print("执行调度任务时出现异常:", e)
-    time.sleep(1)
 
-# list = Material.feishu_list()
-# AgcVidoe.video_stitching(list)
-# print(list)
+schedule.every().day.at("04:10").do(video_start, "cg")
 
 
+if __name__ == "__main__":
+    video_start("cg")
+    while True:
+        try:
+            schedule.run_pending()
+        except Exception as e:
+            print("执行调度任务时出现异常:", e)
+        time.sleep(1)

+ 48 - 61
agc_gs_main.py

@@ -1,80 +1,67 @@
-import re
-
-from common import Material, Common, Feishu
-from video_agc.agc_video_method import AgcVidoe
+import os
 import concurrent.futures
+
 import schedule
 import time
+import threading
+from common import Material
+from video_agc.agc_video import AGC
 
+# 控制读写速度的参数
+MAX_BPS = 1 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 5  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 512 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
 
-# 记录今天已经返回的用户名
-returned_usernames_today = []
-def video_start(user_data):
-    global returned_usernames_today
-    user_data_mark = user_data["mark"]
-    video_call = user_data["video_call"]
-    mark_name = user_data['mark_name']
-
-    # 开始准备执行生成视频脚本
-    if user_data_mark is not None and user_data_mark in returned_usernames_today:
-        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
-        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
-        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
-    if video_call is not None and video_call in returned_usernames_today:
-        print(f"视频脚本参数中的脚本{user_data_mark} 今天已经返回过,不再启动线程。")
-        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
-    else:
-        print(f"视频脚本参数{user_data}")
-        mark = AgcVidoe.video_gs_stitching(user_data)
-        print(f"返回用户名{mark}")
-        if mark:
-            Common.logger("video").info(f"返回用户名{mark}")
-            returned_usernames_today.append(mark)
-            zd_count = user_data["zd_count"]  # 生成总条数
-            # 总条数
-            result = re.match(r'([^0-9]+)', user_data_mark).group()
-            all_count = AgcVidoe.get_link_gs_count(result)
-            if all_count >= int(zd_count):
-                Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', user_data_mark.split("-")[0], mark_name)
-
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
 
-# gs_name_list = Material.feishu_gs_list()
-# video_start(gs_name_list[0])
 
 
-def clear_returned_usernames():
-    returned_usernames_today.clear()
-    print("returned_usernames_today 已清空")
+def controlled_io_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+    if platform == "gs":
+        AGC.video(data, "跟随")
+    elif platform == "cg":
+        AGC.video(data, "常规")
+    elif platform == "bk":
+        AGC.video(data, "爆款")
 
-# 定义定时任务
-def video_task():
-    print("开始执行生成视频脚.")
-    data = Material.feishu_gs_list()
-    # 创建一个线程池
-    with concurrent.futures.ThreadPoolExecutor() as executor:
-        futures = {executor.submit(video_start, user_data): user_data for user_data in data}
-        # 设置超时时间为20分钟
-        timeout = 25 * 60
-        # 等待所有任务执行完成或超时
+def video_start(platform):
+    print("开始执行生成视频脚本.")
+    if platform == "cg":
+        data = Material.feishu_list()
+    elif platform == "gs":
+        data = Material.feishu_gs_list()
+    elif platform == "bk":
+        data = Material.feishu_bk_list()
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data}
         for future in concurrent.futures.as_completed(futures):
             try:
-                # 获取每个任务的执行结果
-                result = future.result()
-                print("处理结果:", result)
+                future.result()
+                print("处理结果: 成功")
             except concurrent.futures.TimeoutError:
-                # 如果任务超时,则取消任务
                 print("任务超时,已取消.")
             except Exception as e:
                 print("处理任务时出现异常:", e)
-    print("执行生成视频脚结束")
+    print("执行生成视频脚结束.")
 
-# 每天0点10清空集合
-schedule.every().day.at("21:05").do(clear_returned_usernames)
 
-#每10分钟执行次脚本
-schedule.every(10).minutes.do(video_task)
+schedule.every().day.at("19:40").do(video_start, "gs")
 
 
-while True:
-    schedule.run_pending()
-    time.sleep(1)
+if __name__ == "__main__":
+    # video_start("cg")
+    while True:
+        try:
+            schedule.run_pending()
+        except Exception as e:
+            print("执行调度任务时出现异常:", e)
+        time.sleep(1)