123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import configparser
- import os
- import random
- import time
- from datetime import datetime
- from common import Material, Feishu, Common, Oss
- from common.ffmpeg import FFmpeg
- from common.piaoquan_utils import PQ
- from common.sql_help import sqlCollect
- config = configparser.ConfigParser()
- config.read('./config.ini')
- class getVideo:
- """
- 数据处理
- """
- @classmethod
- def get_video(cls):
- pass
- """
- 根据标示+任务标示创建目录
- """
- @classmethod
- def create_folders(cls, mark, task_mark):
- video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/"
- if not os.path.exists(video_path_url):
- os.makedirs(video_path_url)
- return video_path_url
- """
- 随机生成ID
- """
- @classmethod
- def random_id(cls):
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
- return oss_id
- """
- 飞书数据处理
- """
- @classmethod
- def video_task(cls, data):
- mark = data["mark"]
- name = data["name"]
- feishu_id = data["feishu_id"]
- feishu_sheet = data["feishu_sheet"]
- pw_sheet = data["pw_sheet"]
- task_data = Material.get_task_data(feishu_id, feishu_sheet)
- if len(task_data) == 0:
- Feishu.bot(mark, '机器自动改造消息通知', f'今日任务为空,请关注', name)
- return mark
- for task in task_data[0]:
- task_mark = task["task_mark"] # 任务标示
- old_id = str(task["old_id"])
- video_id = str(task["video_id"])
- new_id = str(task["new_id"])
- number = task["number"] # 指定条数
- title = task["title"]
- video_share = task["video_share"]
- video_ending = task["video_ending"]
- try:
- video_path_url = cls.create_folders(mark, str(task_mark)) # 创建目录
- if ',' in new_id:
- n_id = new_id.split(',')
- else:
- n_id = [new_id]
- if old_id and old_id != 'None':
- task_id = old_id.split(',')
- else:
- task_id = video_id.split(',')
- count = 0 # 初始化计数器
- for id in task_id:
- time.sleep(1)
- if old_id and old_id != 'None':
- data_list = PQ.get_user_url(task_mark, id, number, title, mark)
- else:
- data_list = PQ.get_audio_url(task_mark, id, title, mark)
- if not data_list:
- Common.logger("log").info(f"{task_mark}下的视频ID{id} 已经改造过了")
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下的视频ID{id},已经改造过,请关注', name)
- continue
- for video in data_list:
- v_id = video["video_id"]
- new_title = video["title"]
- cover = video["cover"]
- video_url = video["video_url"]
- new_video_path = PQ.download_video(video_url, video_path_url, v_id) # 下载视频地址
- if not new_video_path:
- Common.logger("log").info(f"{task_mark}下的视频ID{id},{new_video_path}视频下载失败")
- continue
- pw_list, zm_list = Material.get_pwsrt_data(feishu_id, pw_sheet) # 获取srt
- pws = random.choice(pw_list) # 随机选择 片尾srt+音频
- zm = random.choice(zm_list) # 随机选择 视频中字幕
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(new_video_path, video_share, video_path_url, zm)
- if video_ending and video_ending != 'None':
- pw_id = pws["pw_id"]
- pw_srt = pws["pw_srt"]
- pw_url = PQ.get_audio_url(task_mark, pw_id, title, mark)
- jpg_path = FFmpeg.video_png(new_video_path, video_path_url) # 生成视频最后一帧jpg
- pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_url[0]["video_url"], pw_srt) # 生成片尾视频
- video_list = [new_video_path, pw_path]
- video_path = FFmpeg.concatenate_videos(video_list, video_path_url) # 视频与片尾拼接到一起
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(video_path, video_share, video_path_url, zm)
- else:
- new_video_path = video_path
- else:
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(new_video_path, video_share, video_path_url, zm)
- oss_id = cls.random_id()
- oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS
- status = oss_object_key.get("status")
- if status == 200:
- oss_object_key = oss_object_key.get("oss_object_key")
- time.sleep(1)
- code = PQ.insert_piaoquantv(oss_object_key, new_title, cover, n_id[count])
- if code:
- Common.logger("log").info(f"{task_mark}下的视频ID{v_id}发送成功")
- sqlCollect.insert_task(task_mark, v_id, mark) # 插入数据库
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [[name, task_mark, v_id, n_id[count], new_title, str(code), formatted_time]]
- Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "A2:Z2", values)
- if os.path.exists(video_path_url) and os.path.isdir(video_path_url):
- for root, dirs, files in os.walk(video_path_url):
- for file in files:
- file_path = os.path.join(root, file)
- os.remove(file_path)
- for dir in dirs:
- dir_path = os.path.join(root, dir)
- os.rmdir(dir_path)
- else:
- Common.logger("log").info(f"{task_mark}下的视频ID{id} 视频发送OSS失败 ")
- count += 1 # 每次迭代计数器加1
- if old_id and old_id != 'None':
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name)
- if video_id and video_id != 'None':
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name)
- except Exception as e:
- Common.logger("warning").warning(f"{name}的{task_mark}任务处理失败:{e}\n")
- return mark
|