123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471 |
- import configparser
- import json
- import os
- import random
- import re
- import sys
- import threading
- import time
- from datetime import datetime
- import concurrent.futures
- from common.redis import get_data
- from common.tts_help import TTS
- from common import Material, Feishu, Common, Oss
- from common.ffmpeg import FFmpeg
- from common.gpt4o_help import GPT4o
- from data_channel.douyin import DY
- from data_channel.dy_ls import DYLS
- from data_channel.ks_ls import KSLS
- from data_channel.kuaishou import KS
- from data_channel.kuaishouchuangzuozhe import KsFeedVideo
- from data_channel.piaoquan import PQ
- from common.sql_help import sqlCollect
- from data_channel.shipinhao import SPH
- # 读取配置文件
- from data_channel.shipinhaodandian import SPHDD
- config = configparser.ConfigParser()
- config.read('./config.ini')
- class VideoProcessor:
- """
- 视频处理类,包含创建文件夹、生成随机ID、删除文件和处理视频任务等方法。
- """
- @classmethod
- def create_folders(cls, mark, task_mark):
- """
- 根据标示和任务标示创建目录
- """
- video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/"
- if not os.path.exists(video_path_url):
- os.makedirs(video_path_url)
- return video_path_url
- @classmethod
- def random_id(cls):
- """
- 随机生成ID
- """
- now = datetime.now()
- rand_num = random.randint(10000, 99999)
- return f"{now.strftime('%Y%m%d%H%M%S')}{rand_num}"
- @classmethod
- def remove_files(cls, video_path_url):
- """
- 删除指定目录下的所有文件和子目录
- """
- if os.path.exists(video_path_url) and os.path.isdir(video_path_url):
- for root, dirs, files in os.walk(video_path_url):
- for file in files:
- file_path = os.path.join(root, file)
- os.remove(file_path)
- for dir in dirs:
- dir_path = os.path.join(root, dir)
- os.rmdir(dir_path)
- @classmethod
- def process_task(cls, task, mark, name, feishu_id, cookie_sheet):
- """
- 处理单个任务
- """
- task_mark = task["task_mark"]
- channel_id = str(task["channel_id"])
- url = str(task["channel_url"])
- piaoquan_id = str(task["piaoquan_id"])
- number = task["number"]
- title = task["title"]
- video_share = task["video_share"]
- video_ending = task["video_ending"]
- crop_total = task["crop_total"]
- gg_duration_total = task["gg_duration_total"]
- video_path_url = cls.create_folders(mark, str(task_mark))
- zm = Material.get_pzsrt_data("summary", "500Oe0", video_share)
- Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的用户:{url}开始获取视频")
- data_list = cls.get_data_list(channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name)
- if not data_list:
- Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的视频ID{url} 已经改造过了")
- text = (
- f"**通知类型**: 没有改造的视频\n"
- f"**负责人**: {name}\n"
- f"**渠道**: {channel_id}\n"
- f"**视频主页ID**: {url}\n"
- f"**任务标示**: {task_mark}\n"
- )
- Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- cls.remove_files(video_path_url)
- return
- Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 获取视频完成,共{len(data_list)}条")
- for video in data_list:
- try:
- new_title = cls.generate_title(video, title)
- v_id = video["video_id"]
- cover = video["cover"]
- video_url = video["video_url"]
- old_title = video['old_title']
- rule = video['rule']
- if not old_title:
- old_title = '⭕分享给大家一个视频!值得细品!'
- text = (
- f"**通知类型**: 标题为空,使用兜底标题生成片尾\n"
- f"**负责人**: {name}\n"
- f"**渠道**: {channel_id}\n"
- f"**视频主页ID**: {url}\n"
- f"**视频Video_id**: {v_id}\n"
- f"**任务标示**: {task_mark}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},标题为空,使用兜底标题生成片尾")
- time.sleep(1)
- pw_random_id = cls.random_id()
- new_video_path = cls.download_and_process_video(channel_id, video_url, video_path_url, v_id,
- crop_total, gg_duration_total, pw_random_id, new_title, mark)
- if not os.path.isfile(new_video_path):
- text = (
- f"**通知类型**: 视频下载失败\n"
- f"**负责人**: {name}\n"
- f"**渠道**: {channel_id}\n"
- f"**视频主页ID**: {url}\n"
- f"**视频Video_id**: {v_id}\n"
- f"**任务标示**: {task_mark}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- cls.remove_files(video_path_url)
- continue
- if new_video_path:
- if video_ending and video_ending != 'None':
- new_video_path = cls.handle_video_ending(new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm)
- else:
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(new_video_path, video_path_url, zm)
- # new_video_path = FFmpeg.single_video(new_video_path, video_path_url, zm)
- if not os.path.isfile(new_video_path):
- text = (
- f"**通知类型**: 视频改造失败\n"
- f"**负责人**: {name}\n"
- f"**渠道**: {channel_id}\n"
- f"**视频主页ID**: {url}\n"
- f"**视频Video_id**: {v_id}\n"
- f"**任务标示**: {task_mark}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- cls.remove_files(video_path_url)
- continue
- # 上传视频和封面,并更新数据库
- values, code = cls.upload_video_and_thumbnail(new_video_path, cover, v_id, new_title, task_mark, name, piaoquan_id,
- video_path_url, mark, channel_id, url, old_title, title, rule)
- # 更新已使用的视频号状态
- pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接
- if name == "视频号单视频":
- sphdd_status = sqlCollect.update_shp_dd_vid(v_id)
- if sphdd_status == 1:
- Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 视频修改已使用,状态已修改")
- from_user_name = video['from_user_name'] # 来源用户
- from_group_name = video['from_group_name'] # 来源群组
- text = (
- f"**站内视频链接**: {pq_url}\n"
- f"**来源用户**: {from_user_name}\n"
- f"**来源群组**: {from_group_name}\n"
- f"**原视频链接**: {video['video_url']}\n"
- f"**原视频封面**: {video['cover']}\n"
- f"**原视频标题**: {video['old_title']}\n"
- )
- Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/0553124e-7fc1-4f9b-8976-9db4218d25e2", "【 有一条新的视频号内容改造成功 】")
- text = (
- f"**通知类型**: 视频改造成功\n"
- f"**站内视频链接**: {pq_url}\n"
- f"**负责人**: {name}\n"
- f"**渠道**: {channel_id}\n"
- f"**视频主页ID**: {url}\n"
- f"**视频Video_id**: {v_id}\n"
- f"**任务标示**: {task_mark}\n"
- )
- Feishu.finish_bot(text,
- "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703",
- "【 机器改造通知 】")
- if values:
- if name == "王雪珂":
- sheet = "vfhHwj"
- elif name == "抖音品类账号-1":
- sheet = "61kvW7"
- elif name == "鲁涛":
- sheet = "FhewlS"
- elif name == "范军":
- sheet = "B6dCfS"
- elif name == "余海涛":
- sheet = "mfBrNT"
- elif name == "罗情":
- sheet = "2J3PwN"
- elif name == "王玉婷":
- sheet = "bBHFwC"
- elif name == "刘诗雨":
- sheet = "fBdxIQ"
- elif name == "信欣":
- sheet = "lPe1eT"
- elif name == "快手创作者版品类推荐流":
- sheet = "k7l7nQ"
- elif name == "抖音品类账号":
- sheet = "Bsg5UR"
- elif name == "视频号品类账号":
- sheet = "b0uLWw"
- elif name == "视频号单视频":
- sheet = "ptgCXW"
- elif name == "快手品类账号":
- sheet = "ibjoMx"
- Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "ROWS", 1, 2)
- time.sleep(0.5)
- Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "A2:Z2", values)
- except Exception as e:
- Common.logger(mark).warning(f"{name}的{task_mark}任务处理失败:{e}")
- cls.remove_files(video_path_url)
- return
- @classmethod
- def get_data_list(cls, channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name):
- """
- 根据渠道ID获取数据列表
- """
- if channel_id == "抖音":
- return DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name)
- elif channel_id == "票圈":
- return PQ.get_pq_url(task_mark, url, number, mark)
- elif channel_id == "视频号":
- return SPH.get_sph_url(task_mark, url, number, mark)
- elif channel_id == "快手":
- return KS.get_ks_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name)
- elif channel_id == "快手创作者版":
- return KsFeedVideo.get_data()
- elif channel_id == "视频号单视频":
- return SPHDD.get_sphdd_data(url)
- elif channel_id == "抖音历史":
- return DYLS.get_dyls_list(task_mark, url, number, mark)
- elif channel_id == "快手历史":
- return KSLS.get_ksls_list(task_mark, url, number, mark)
- @classmethod
- def generate_title(cls, video, title):
- """
- 生成新标题
- """
- if video['old_title']:
- new_title = video['old_title'].strip().replace("\n", "") \
- .replace("/", "").replace("\\", "").replace("\r", "") \
- .replace(":", "").replace("*", "").replace("?", "") \
- .replace("?", "").replace('"', "").replace("<", "") \
- .replace(">", "").replace("|", "").replace(" ", "") \
- .replace("&NBSP", "").replace(".", "。").replace(" ", "") \
- .replace("'", "").replace("#", "").replace("Merge", "")
- else:
- return '⭕分享给大家一个视频!值得细品!'
- if title == "原标题":
- if not new_title:
- new_title = '⭕分享给大家一个视频!值得细品!'
- elif title == "AI标题":
- if not new_title:
- new_title = '⭕分享给大家一个视频!值得细品!'
- else:
- new_title = GPT4o.get_ai_title(new_title)
- else:
- titles = title.split('/') if '/' in title else [title]
- new_title = random.choice(titles)
- return new_title
- @classmethod
- def download_and_process_video(cls, channel_id, video_url, video_path_url, v_id, crop_total, gg_duration_total,
- pw_random_id, new_title, mark):
- """
- 下载并处理视频
- """
- if channel_id in ["票圈", "快手创作者版"]:
- new_video_path = PQ.download_video(video_url, video_path_url, v_id)
- Common.logger(mark).info(f"{channel_id}视频下载成功: {new_video_path}")
- else:
- Common.logger(mark).info(f"视频准备下载")
- new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id)
- Common.logger(mark).info(f"视频下载成功: {new_video_path}")
- if os.path.isfile(new_video_path):
- if crop_total and crop_total != 'None': # 判断是否需要裁剪
- new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id)
- if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长
- new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id,
- gg_duration_total)
- width, height = FFmpeg.get_w_h_size(new_video_path)
- if width < height: # 判断是否需要修改为竖屏
- new_video_path = FFmpeg.update_video_h_w(new_video_path, video_path_url, pw_random_id)
- new_title_re = re.sub(r'[^\w\s\u4e00-\u9fff,。!?]', '', new_title)
- if len(new_title_re) > 12:
- new_title_re = '\n'.join(
- [new_title_re[i:i + 12] for i in range(0, len(new_title_re), 12)])
- new_video_path = FFmpeg.add_video_zm(new_video_path, video_path_url, pw_random_id, new_title_re)
- return new_video_path
- else:
- Common.logger(mark).info(f"视频下载失败: {new_video_path}")
- cls.remove_files(video_path_url)
- return new_video_path
- @classmethod
- def handle_video_ending(cls, new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm):
- """
- 处理视频片尾
- """
- if video_ending == "AI片尾引导":
- pw_srt_text = GPT4o.get_ai_pw(old_title)
- if pw_srt_text:
- pw_url = TTS.get_pw_zm(pw_srt_text)
- if pw_url:
- pw_mp3_path = TTS.download_mp3(pw_url, video_path_url, pw_random_id)
- # pw_url_sec = FFmpeg.get_video_duration(pw_mp3_path)
- pw_srt = TTS.getSrt(pw_url)
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾srt成功")
- else:
- Feishu.bot(mark, 'TTS获取失败提示', f'无法获取到片尾音频,及时更换token', "张勇")
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败")
- return None
- else:
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败")
- return None
- else:
- if ',' in video_ending:
- video_ending_list = video_ending.split(',')
- else:
- video_ending_list = [video_ending]
- ending = random.choice(video_ending_list)
- pw_list = Material.get_pwsrt_data("summary", "DgX7vC", ending) # 获取srt
- if pw_list:
- pw_id = pw_list["pw_id"]
- pw_srt = pw_list["pw_srt"]
- pw_url = PQ.get_pw_url(pw_id)
- pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id)
- else:
- Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name)
- for attempt in range(3):
- jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg
- if os.path.isfile(jpg_path):
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},生成视频最后一帧成功")
- break
- time.sleep(1)
- for attempt in range(3):
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取mp3成功")
- pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_mp3_path, pw_srt, pw_random_id,
- pw_mp3_path) # 生成片尾视频
- if os.path.isfile(pw_path):
- Common.logger(mark).info(f"{task_mark}下的视频{url},生成片尾视频成功")
- break
- time.sleep(1)
- pw_video_list = [new_video_path, pw_path]
- Common.logger(mark).info(f"{task_mark}下的视频{url},视频与片尾开始拼接")
- video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起
- Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},视频与片尾拼接成功")
- time.sleep(1)
- if video_share and video_share != 'None':
- new_video_path = FFmpeg.single_video(video_path, video_path_url, zm)
- else:
- new_video_path = video_path
- return new_video_path
- @classmethod
- def upload_video_and_thumbnail(cls, new_video_path, cover, v_id, new_title, task_mark, name, piaoquan_id,
- video_path_url, mark, channel_id, url, old_title, title, rule):
- """
- 上传视频和封面到OSS,并更新数据库
- """
- try:
- oss_id = cls.random_id()
- Common.logger(mark).info(f"{name}的{task_mark},开始发送oss")
- oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS
- Common.logger(mark).info(f"{name}的{task_mark},发送oss成功{oss_object_key}")
- status = oss_object_key.get("status")
- if status == 200:
- oss_object_key = oss_object_key.get("oss_object_key")
- time.sleep(1)
- jpg_path = PQ.download_video_jpg(cover, video_path_url, v_id) # 下载视频封面
- if os.path.isfile(jpg_path):
- oss_jpg_key = Oss.stitching_fm_upload_oss(jpg_path, oss_id) # 封面发送OSS
- status = oss_jpg_key.get("status")
- if status == 200:
- jpg = oss_jpg_key.get("oss_object_key")
- else:
- jpg = None
- else:
- jpg = None
- code = PQ.insert_piaoquantv(oss_object_key, new_title, jpg, piaoquan_id)
- Common.logger(mark).info(f"{name}的{task_mark}下的视频ID{v_id}发送成功")
- sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库
- current_time = datetime.now()
- formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
- sqlCollect.insert_machine_making_data(name, task_mark, channel_id, url, v_id, piaoquan_id, new_title, code,
- formatted_time, old_title, oss_object_key)
- if channel_id == "快手历史" or channel_id == "抖音历史":
- explain = "历史爆款"
- else:
- explain = "新供给"
- values = [
- [
- name,
- task_mark,
- channel_id,
- url,
- str(v_id),
- piaoquan_id,
- old_title,
- title if title in ["原标题", "AI标题"] else "",
- new_title,
- str(code),
- formatted_time,
- str(rule),
- explain
- ]
- ]
- return values, code
- except Exception as e:
- cls.remove_files(video_path_url)
- Common.logger("error").warning(f"{name}的{task_mark}上传视频和封面到OSS,并更新数据库失败:{e}\n")
- return
- @classmethod
- def main(cls, data):
- """
- 主函数,初始化任务并使用线程池处理任务。
- """
- mark = data["mark"]
- name = data["name"]
- feishu_id = data["feishu_id"]
- feishu_sheet = data["feishu_sheet"]
- cookie_sheet = data["cookie_sheet"]
- task_data = Material.get_task_data(feishu_id, feishu_sheet)
- data = get_data(mark, task_data)
- if not data:
- return
- task = json.loads(data)
- try:
- VideoProcessor.process_task(task, name, feishu_id, cookie_sheet)
- return mark
- except Exception as e:
- Common.logger(mark).error(f"任务处理失败: {e}")
- return mark
- # if __name__ == "__main__":
- # main()
|