import configparser import os import time from common import Feishu, Common, Oss from common.ffmpeg import FFmpeg from common.piaoquan_utils import PQ from common.sql_help import sqlCollect config = configparser.ConfigParser() config.read('./config.ini') class getVideo: """ 根据标示+任务标示创建目录 """ @classmethod def create_folders(cls): video_path_url = config['PATHS']['VIDEO_PATH'] if not os.path.exists(video_path_url): os.makedirs(video_path_url) return video_path_url """ 删除文件 """ @classmethod def remove_files(cls, video_path_url): if os.path.exists(video_path_url) and os.path.isdir(video_path_url): for root, dirs, files in os.walk(video_path_url): for file in files: file_path = os.path.join(root, file) os.remove(file_path) for dir in dirs: dir_path = os.path.join(root, dir) os.rmdir(dir_path) """ 获取未改造的视频ID """ @classmethod def find_unique_id(cls, data, videos): video_ids = [item for item in data if item not in videos] data_video_ids = [item for item in video_ids if item is not None and item != ''] return data_video_ids """ 飞书数据处理 """ @classmethod def video_task(cls, data): video_list = sqlCollect.is_video_id() if video_list: videos = [item[0].replace("'", "").replace("(", "").replace(")", "") for item in video_list] else: videos = [] data_video_ids = cls.find_unique_id(data, videos) # 获取未处理的视频ID video_path_url = cls.create_folders() # 创建目录 if data_video_ids: for v_id in data_video_ids: time.sleep(4) try: video_url, new_title = PQ.get_pw_url(v_id) new_video_path = PQ.download_video(video_url, video_path_url, v_id) # 下载视频地址 if new_video_path == '': Common.logger("log").info(f"{video_url}视频下载失败") cls.remove_files(video_path_url) continue url = FFmpeg.video_duration(new_video_path, video_path_url, v_id) if not os.path.isfile(url): Common.logger("log").info(f"{v_id}下的视频处理失败") cls.remove_files(video_path_url) continue oss_object_key = Oss.stitching_sync_upload_oss(url, v_id) # 视频发送OSS time.sleep(1) status = oss_object_key.get("status") if status == 200: oss_object_key = oss_object_key.get("oss_object_key") time.sleep(1) code = PQ.insert_piaoquantv(oss_object_key, new_title) if code: Common.logger("log").info(f"{v_id}发送成功") sqlCollect.insert_task(v_id) # 插入数据库 time.sleep(1) values = [[v_id, str(code)]] Feishu.insert_columns("MTVXsuGKUhOI5btTjOEceyJ9nZb", "JmS9sJ", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("MTVXsuGKUhOI5btTjOEceyJ9nZb", "JmS9sJ", "A2:Z2", values) cls.remove_files(video_path_url) else: cls.remove_files(video_path_url) Common.logger("log").info(f"视频ID{v_id} 视频发送OSS失败 ") time.sleep(10) except Exception as e: cls.remove_files(video_path_url) Common.logger("log").info(f"处理失败,错误信息{e}") continue