Browse Source

agc脚本优化

zhangyong 11 months ago
parent
commit
5f8a29b3a0
2 changed files with 198 additions and 0 deletions
  1. 56 0
      agc_data.py
  2. 142 0
      agc_job.py

+ 56 - 0
agc_data.py

@@ -0,0 +1,56 @@
+from common import Material
+from extract_data.douyin.douyin_author import douyinAuthor
+from extract_data.kuaishou.kuaishou_author import kuaishouAuthor
+from extract_data.zhannei.zhannei_author import ZhanNeiAuthor
+
+import schedule
+import time
+import concurrent.futures
+
+
+def gs_start(platform, user_data):
+    print(f"执行{platform}数据抓取{user_data}")
+    if platform == "douyin":
+        douyinAuthor.get_videoList(user_data)
+    elif platform == "kuaishou":
+        kuaishouAuthor.get_kuaishou_videoList(user_data)
+    elif platform == "zhannei":
+        ZhanNeiAuthor.get_zhannei_videoList(user_data)
+
+
+def gs_task(platform):
+    data = Material.get_all_gs_user(platform)
+    valid_data = [user_data for user_data in data if user_data['sheet'] is not None]
+    with concurrent.futures.ThreadPoolExecutor() as executor:
+        futures = {executor.submit(gs_start, platform, user_data): user_data for user_data in valid_data}
+        for future in concurrent.futures.as_completed(futures):
+            result = future.result()
+            print("处理结果:", result)
+    print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
+
+def cg_start(platform, user_data):
+    print(f"执行{platform}数据抓取{user_data}")
+    if platform == "douyin":
+        douyinAuthor.get_videoList(user_data)
+    elif platform == "kuaishou":
+        kuaishouAuthor.get_kuaishou_videoList(user_data)
+
+def cg_task(platform):
+    data = Material.get_all_user(platform)
+    with concurrent.futures.ThreadPoolExecutor() as executor:
+        futures = {executor.submit(cg_start, platform, user_data): user_data for user_data in data}
+        for future in concurrent.futures.as_completed(futures):
+            result = future.result()
+            print("处理结果:", result)
+    print(f"{platform.capitalize()}数据抓取定时任务执行完成.")
+
+
+
+schedule.every().day.at("19:20").do(gs_task, "kuaishou")
+schedule.every().day.at("19:30").do(gs_task, "douyin")
+schedule.every().day.at("18:00").do(gs_task, "zhannei")
+schedule.every(8).hours.do(cg_task, "douyin")
+schedule.every(8).hours.do(cg_task, "kuaishou")
+while True:
+    schedule.run_pending()
+    time.sleep(1)

+ 142 - 0
agc_job.py

@@ -0,0 +1,142 @@
+import os
+import concurrent.futures
+import re
+
+import schedule
+import time
+import threading
+from common import Material, Common, Feishu
+from video_agc.agc_video_method import AgcVidoe
+# 控制读写速度的参数
+MAX_BPS = 120 * 1024 * 1024  # 120MB/s
+MAX_WORKERS = os.cpu_count() * 2  # 线程池最大工作线程数量
+READ_WRITE_CHUNK_SIZE = 1024 * 1024  # 每次读写的块大小 (1MB)
+SLEEP_INTERVAL = READ_WRITE_CHUNK_SIZE / MAX_BPS  # 控制每次读写的延迟时间
+# 全局锁,用于同步读写操作
+lock = threading.Lock()
+# 记录今天已经返回的用户名
+gs_today = []
+cg_today = []
+bk_today = []
+
+def gs_video_start(user_data):
+    global gs_today
+    user_data_mark = user_data["mark"]
+    video_call = user_data["video_call"]
+    mark_name = user_data['mark_name']
+
+    if user_data_mark is not None and user_data_mark in gs_today:
+        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
+        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
+        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
+    if video_call is not None and video_call in gs_today:
+        print(f"视频脚本参数中的脚本{user_data_mark} 今天已经返回过,不再启动线程。")
+        return  # 如果返回了某个用户名,并且今天已经返回过,则不启动线程
+    else:
+        print(f"视频脚本参数{user_data}")
+        mark = AgcVidoe.video_gs_stitching(user_data)
+        print(f"返回用户名{mark}")
+        if mark:
+            Common.logger("video").info(f"返回用户名{mark}")
+            gs_today.append(mark)
+            zd_count = user_data["zd_count"]  # 生成总条数
+            # 总条数
+            result = re.match(r'([^0-9]+)', user_data_mark).group()
+            all_count = AgcVidoe.get_link_gs_count(result)
+            if all_count >= int(zd_count):
+                Feishu.bot('recommend', 'AGC完成通知', '今日脚本跟随视频拼接任务完成啦~', user_data_mark.split("-")[0], mark_name)
+
+
+def cg_video_start(user_data):
+    global cg_today
+    user_data_mark = user_data["mark"]
+    if user_data_mark and user_data_mark in cg_today:
+        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
+        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
+        return
+    print(f"视频脚本参数 {user_data}")
+    mark = AgcVidoe.video_stitching(user_data)
+    print(f"返回用户名 {mark}")
+    if mark:
+        Common.logger("video").info(f"返回用户名 {mark}")
+        cg_today.append(user_data_mark)
+
+def bk_video_start(user_data):
+    global bk_today
+    user_data_mark = user_data["mark"]
+    # 开始准备执行生成视频脚本
+    if user_data_mark is not None and user_data_mark in bk_today:
+        Common.logger("video").info(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。今天已经返回的用户名:{returned_usernames_today}")
+        print(f"视频脚本参数中的用户名 {user_data_mark} 今天已经返回过,不再启动线程。")
+        return
+    mark = AgcVidoe.video_bk_stitching(user_data)
+    print(f"返回用户名{mark}")
+    if mark:
+        bk_today.append(mark)
+        Common.logger("video").info(f"返回用户名{mark}")
+
+def controlled_io_operation(platform, data):
+    with lock:
+        start_time = time.time()
+        time.sleep(SLEEP_INTERVAL)
+        end_time = time.time()
+        elapsed_time = end_time - start_time
+        if elapsed_time < SLEEP_INTERVAL:
+            time.sleep(SLEEP_INTERVAL - elapsed_time)
+    if platform == "gs":
+        gs_video_start(data)
+    elif platform == "cg":
+        cg_video_start(data)
+    elif platform == "bk":
+        bk_video_start(data)
+
+
+
+def video_start(platform):
+    print("开始执行生成视频脚本.")
+    if platform == "cg":
+        data = Material.feishu_list()
+    elif platform == "gs":
+        data = Material.feishu_gs_list()
+    elif platform == "bk":
+        data = Material.feishu_bk_list()
+    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
+        futures = {executor.submit(controlled_io_operation, platform, user_data): user_data for user_data in data}
+        for future in concurrent.futures.as_completed(futures):
+            try:
+                future.result()
+                print("处理结果: 成功")
+            except concurrent.futures.TimeoutError:
+                print("任务超时,已取消.")
+            except Exception as e:
+                print("处理任务时出现异常:", e)
+    print("执行生成视频脚本结束.")
+
+def gs_usernames_today():
+    gs_today.clear()
+    print("gs_usernames_today 已清空")
+def cg_usernames_today():
+    cg_today.clear()
+    print("cg_usernames_today 已清空")
+def bk_usernames_today():
+    bk_today.clear()
+    print("bk_usernames_today 已清空")
+
+# 定时任务设置
+schedule.every().day.at("00:10").do(gs_usernames_today)
+
+schedule.every().day.at("04:10").do(cg_usernames_today)
+
+schedule.every().day.at("19:10").do(bk_usernames_today)
+
+schedule.every(10).minutes.do(video_start, "cg")
+schedule.every(10).minutes.do(video_start, "gs")
+schedule.every(10).minutes.do(video_start, "bk")
+
+
+
+
+if __name__ == "__main__":
+    while True:
+        schedule.run_pending()
+        time.sleep(1)