123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- import configparser
- import os
- import random
- import threading
- import time
- from datetime import datetime
- import concurrent.futures
- from common import Material, Feishu, Common, Oss
- from common.ffmpeg import FFmpeg
- from data_channel.douyin import DY
- from data_channel.piaoquan import PQ
- from common.sql_help import sqlCollect
- from data_channel.shipinhao import SPH
- config = configparser.ConfigParser()
- config.read('./config.ini')
- class getVideo:
- """
- 数据处理
- """
- @classmethod
- def get_video(cls):
- pass
- """
- 根据标示+任务标示创建目录
- """
- @classmethod
- def create_folders(cls, mark, task_mark):
- video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/"
- if not os.path.exists(video_path_url):
- os.makedirs(video_path_url)
- return video_path_url
- """
- 随机生成ID
- """
- @classmethod
- def random_id(cls):
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
- return oss_id
- """
- 删除文件
- """
- @classmethod
- def remove_files(cls, video_path_url):
- if os.path.exists(video_path_url) and os.path.isdir(video_path_url):
- for root, dirs, files in os.walk(video_path_url):
- for file in files:
- file_path = os.path.join(root, file)
- os.remove(file_path)
- for dir in dirs:
- dir_path = os.path.join(root, dir)
- os.rmdir(dir_path)
- """
- 飞书数据处理
- """
- @classmethod
- def video_task(cls, data):
- mark = data["mark"]
- name = data["name"]
- feishu_id = data["feishu_id"]
- feishu_sheet = data["feishu_sheet"]
- cookie_sheet = data["cookie_sheet"]
- pz_sheet = '500Oe0'
- pw_sheet = 'DgX7vC'
- task_data = Material.get_task_data(feishu_id, feishu_sheet)
- if len(task_data) == 0:
- Feishu.bot(mark, '机器自动改造消息通知', f'今日任务为空,请关注', name)
- return mark
- lock = threading.Lock()
- def process_task(task):
- task_mark = task["task_mark"] # 任务标示
- channel_id = str(task["channel_id"])
- channel_urls = str(task["channel_url"])
- piaoquan_id = str(task["piaoquan_id"])
- number = task["number"] # 指定条数
- title = task["title"]
- video_share = task["video_share"]
- video_ending = task["video_ending"]
- crop_total = task["crop_total"]
- gg_duration_total = task["gg_duration_total"]
- video_path_url = cls.create_folders(mark, str(task_mark)) # 创建目录
- if video_share and video_share != 'None':
- video_share_list = video_share.split('/')
- video_share_mark = video_share_list[0]
- video_share_name = video_share_list[1]
- zm = Material.get_pzsrt_data("summary", pz_sheet, video_share_name) # 获取srt
- if zm == '':
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片中标示填写错误,请关注!!!!', name)
- if ',' in channel_urls:
- channel_url = channel_urls.split(',')
- else:
- channel_url = [channel_urls]
- for url in channel_url:
- Common.logger("log").info(f"{task_mark}下的用户:{channel_url}开始获取视频")
- if '/' in title:
- titles = title.split('/')
- else:
- titles = [title]
- if channel_id == "抖音":
- data_list = DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name)
- elif channel_id == "票圈":
- data_list = PQ.get_pq_url(task_mark, url, number, mark)
- elif channel_id == "视频号":
- data_list = SPH.get_sph_url(task_mark, url, number, mark)
- # elif channel_id == "快手":
- # pass
- if len(data_list) == 0:
- Common.logger("log").info(f"{task_mark}下的视频ID{id} 已经改造过了")
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下的用户ID{id},没有已经改造的视频了', name)
- cls.remove_files(video_path_url)
- continue
- Common.logger("log").info(f"{task_mark}下的ID{id} 获取视频完成,共{len(data_list)}条")
- try:
- for video in data_list:
- v_id = video["video_id"]
- cover = video["cover"]
- video_url = video["video_url"]
- time.sleep(1)
- pw_random_id = cls.random_id()
- if channel_id == "票圈":
- new_video_path = PQ.download_video(video_url, video_path_url, v_id) # 下载视频地址
- else:
- new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id) # 下载视频地址
- if not os.path.isfile(new_video_path):
- Common.logger("log").info(f"{task_mark}下的视频{url},{new_video_path}视频下载失败")
- cls.remove_files(video_path_url)
- continue
- if crop_total and crop_total != 'None': # 判断是否需要裁剪
- new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id)
- if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长
- new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id, gg_duration_total)
- if video_ending and video_ending != 'None':
- if ',' in video_ending:
- video_ending_list = video_ending.split(',')
- else:
- video_ending_list = [video_ending]
- ending = random.choice(video_ending_list)
- pw_list = Material.get_pwsrt_data("summary", pw_sheet, ending) # 获取srt
- if pw_list:
- pw_id = pw_list["pw_id"]
- pw_srt = pw_list["pw_srt"]
- pw_url = PQ.get_pw_url(pw_id)
- else:
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name)
- for attempt in range(3):
- jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg
- if os.path.isfile(jpg_path):
- Common.logger("log").info(f"{task_mark}下的视频{url},生成视频最后一帧成功")
- break
- time.sleep(1)
- if not os.path.isfile(jpg_path):
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},获取视频最后一帧失败,请关注', name)
- cls.remove_files(video_path_url)
- continue
- for attempt in range(3):
- pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id)
- pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_url, pw_srt, pw_random_id, pw_mp3_path) # 生成片尾视频
- if os.path.isfile(pw_path):
- Common.logger("log").info(f"{task_mark}下的视频{url},生成片尾视频成功")
- break
- time.sleep(1)
- if not os.path.isfile(pw_path):
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},生成片尾视频失败,请关注',
- name)
- cls.remove_files(video_path_url)
- continue
- pw_video_list = [new_video_path, pw_path]
- video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起
- time.sleep(1)
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(video_path, video_share_mark, video_path_url, zm)
- else:
- new_video_path = video_path
- else:
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(new_video_path, video_share_mark, video_path_url, zm)
- time.sleep(1)
- oss_id = cls.random_id()
- oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS
- status = oss_object_key.get("status")
- if status == 200:
- oss_object_key = oss_object_key.get("oss_object_key")
- time.sleep(1)
- new_title = random.choice(titles)
- code = PQ.insert_piaoquantv(oss_object_key, new_title, cover, piaoquan_id)
- if code:
- Common.logger("log").info(f"{task_mark}下的视频ID{v_id}发送成功")
- sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- values = [[name, task_mark, v_id, piaoquan_id, new_title, str(code), formatted_time]]
- # 使用锁保护表格插入操作
- with lock:
- Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "A2:Z2", values)
- cls.remove_files(video_path_url)
- else:
- cls.remove_files(video_path_url)
- Common.logger("log").info(f"{task_mark}下的{url}视频{v_id} 视频发送OSS失败 ")
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name)
- except Exception as e:
- cls.remove_files(video_path_url)
- Common.logger("warning").warning(f"{name}的{task_mark}任务处理失败:{e}\n")
- batch_size = 3
- with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
- index = 0
- while index < len(task_data):
- # 计算剩余的任务数量
- remaining_tasks = len(task_data) - index
- # 当前批次大小为剩余任务数量和批次大小中较小的一个
- current_batch_size = min(batch_size, remaining_tasks)
- # 获取当前批次的任务
- current_batch = task_data[index:index + batch_size]
- futures = {executor.submit(process_task, task): task for task in current_batch}
- for future in concurrent.futures.as_completed(futures):
- task = futures[future]
- try:
- future.result()
- print(f"Task {task['task_mark']} 完成")
- except Exception as exc:
- print(f"Task {task['task_mark']} 异常信息: {exc}")
- # 移动到下一批任务
- index += current_batch_size
- Feishu.bot(mark, '机器自动改造消息通知', f'你的任务全部完成,请关注!!!!!', name)
- return mark
|