123456789101112131415161718192021222324252627282930313233343536373839404142434445464748 |
- from common import Feishu, Material
- from common.sql_help import sqlHelp
- from video_agc.agc_video_method import AgcVidoe
- import schedule
- import threading
- import time
- def video_start(user_data):
- print(f"视频脚本参数{user_data}")
- # 开始准备执行生成视频脚本
- AgcVidoe.video_stitching(user_data)
- # 定义定时任务
- def video_task():
- print("开始执行生成视频脚.")
- data = Material.feishu_list()
- threads = []
- for _, user_data in data.iterrows():
- thread = threading.Thread(target=video_start, args=(user_data,))
- threads.append(thread)
- thread.start()
- for thread in threads:
- thread.join()
- print("执行生成视频脚结束")
- schedule.every(10).minutes.do(video_task)
- def job_feishu_bot():
- name_list = Material.feishu_name()
- count_list = sqlHelp.get_count_list(name_list)
- Feishu.bot('recommend', 'AGC视频', f'{"".join(count_list)}', ' ')
- print("机器人通知完成")
- # 每天下午1:30执行任务
- schedule.every().day.at("15:00").do(job_feishu_bot)
- while True:
- schedule.run_pending()
- time.sleep(1)
- # list = Material.feishu_list()
- # AgcVidoe.video_stitching(list)
- # print(list)
|