import configparser import json import os import random import re import shutil import time from datetime import datetime from common.gpt4o_mini_help import GPT4oMini from common.redis import get_data, get_first_value_with_prefix, increment_key from common.tag_video import Tag from common.tts_help import TTS from common import Material, Feishu, Common, Oss, AliyunLogger from common.ffmpeg import FFmpeg from data_channel.douyin import DY from data_channel.dy_keyword import DyKeyword from data_channel.dy_ls import DYLS from data_channel.ks_feed import KSFeed from data_channel.ks_keyword import KsKeyword from data_channel.ks_ls import KSLS from data_channel.ks_xcx import KSXCX from data_channel.ks_xcx_keyword import KsXCXKeyword from data_channel.kuaishou import KS from data_channel.kuaishouchuangzuozhe import KsFeedVideo from data_channel.piaoquan import PQ from common.sql_help import sqlCollect from data_channel.shipinhao import SPH # 读取配置文件 from data_channel.shipinhaodandian import SPHDD from data_channel.sph_feed import SPHFeed from data_channel.sph_keyword import SphKeyword from data_channel.sph_ls import SPHLS config = configparser.ConfigParser() config.read('./config.ini') class VideoProcessor: """ 视频处理类,包含创建文件夹、生成随机ID、删除文件和处理视频任务等方法。 """ @classmethod def create_folders(cls, mark): """ 根据标示和任务标示创建目录 """ id = cls.random_id() video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + str(id) + "/" if not os.path.exists(video_path_url): os.makedirs(video_path_url) return video_path_url @classmethod def random_id(cls): """ 随机生成ID """ now = datetime.now() rand_num = random.randint(10000, 99999) return f"{now.strftime('%Y%m%d%H%M%S')}{rand_num}" @classmethod def remove_files(cls, mark): """ 删除指定目录下的所有文件和子目录 """ path = config['PATHS']['VIDEO_PATH'] + mark + "/" # 删除目录下的所有内容 if os.path.exists(path): # 遍历目录下的所有文件和子目录 for filename in os.listdir(path): file_path = os.path.join(path, filename) try: if os.path.isfile(file_path) or os.path.islink(file_path): os.unlink(file_path) # 删除文件或符号链接 elif os.path.isdir(file_path): shutil.rmtree(file_path) # 删除子目录及其所有内容 except Exception as e: print(f'Failed to delete {file_path}. Reason: {e}') @classmethod def process_task(cls, task, mark, name, feishu_id, cookie_sheet): """ 处理单个任务 """ try: task_mark = task["task_mark"] channel_id = str(task["channel_id"]) url = str(task["channel_url"]) piaoquan_id = str(task["piaoquan_id"]) number = task["number"] title = task["title"] video_share = task["video_share"] video_ending = task["video_ending"] crop_total = task["crop_total"] gg_duration_total = task["gg_duration_total"] voice = task['voice'] tags = task['tags'] if voice: if ',' in voice: voices = voice.split(',') else: voices = [voice] voice = random.choice(voices) else: voice = "zhifeng_emo" zm = Material.get_pzsrt_data("summary", "500Oe0", video_share) Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的用户:{url}开始获取视频") new_count = None # if name in ['快手品类账号', '抖音品类账号', '抖音品类账号-1', '视频号品类账号']: # new_count = OdpsDataCount.main(channel_id, name, url) data_list = cls.get_data_list( channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name, task ) if not data_list: AliyunLogger.logging(channel_id, name, url, "", "无改造视频", "4000") Common.logger(mark).info(f"{name}的{task_mark}下{channel_id}的视频ID{url} 无改造视频") text = ( f"**通知类型**: 没有改造的视频\n" f"**负责人**: {name}\n" f"**渠道**: {channel_id}\n" f"**视频主页ID**: {url}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") return if new_count: sqlCollect.insert_spider_supply_targetcnt(channel_id, name, url, number, new_count, str(len(data_list))) current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") values = [ [ name, channel_id, url, str(number), str(new_count), str(len(data_list)), formatted_time ] ] Feishu.insert_columns("Z5xLsdyyxh3abntTTvUc9zw8nYd", "099da8", "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("Z5xLsdyyxh3abntTTvUc9zw8nYd", "099da8", "A2:Z2", values) Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 获取视频完成,共{len(data_list)}条") try: for video in data_list: # limit_number = task["limit_number"] # if limit_number: # task_mark = task["task_mark"] # makr_count = sqlCollect.get_mark_count(task_mark) # if int(limit_number) <= int(makr_count[0][0]): # AliyunLogger.logging((task["channel_id"]), name, task["channel_url"], '', # f"{task_mark}标识任务每日指定条数已足够,指定条数{limit_number},实际生成条数{int(makr_count[0][0])}", # "1111") # return cls.remove_files(mark) video_path_url = cls.create_folders(mark) new_title = cls.generate_title(video, title) v_id = video["video_id"] cover = video["cover"] video_url = video["video_url"] old_title = video['old_title'] rule = video['rule'] if not old_title: old_title = '这个视频,分享给我的老友,祝愿您能幸福安康' text = ( f"**通知类型**: 标题为空,使用兜底标题生成片尾\n" f"**负责人**: {name}\n" f"**渠道**: {channel_id}\n" f"**视频主页ID**: {url}\n" f"**视频Video_id**: {v_id}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},标题为空,使用兜底标题生成片尾") time.sleep(1) pw_random_id = cls.random_id() Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 开始下载视频") new_video_path = cls.download_and_process_video(channel_id, video_url, video_path_url, v_id, crop_total, gg_duration_total, pw_random_id, new_title, mark, video) if not os.path.isfile(new_video_path) or os.path.getsize(new_video_path) == 0: AliyunLogger.logging(channel_id, name, url, v_id, "视频下载失败", "3002", f"video_url:{video_url}") text = ( f"**通知类型**: 视频下载失败\n" f"**负责人**: {name}\n" f"**渠道**: {channel_id}\n" f"**视频主页ID**: {url}\n" f"**视频Video_id**: {v_id}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue if video_ending and video_ending != 'None': new_video_path = cls.handle_video_ending(new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm, voice) if new_video_path == None: if name == "单点视频": sqlCollect.update_shp_dd_vid_4(v_id) from_user_name = video['from_user_name'] # 来源用户 from_group_name = video['from_group_name'] # 来源群组 source = video['source'] # 渠道 text = ( f"**渠道**: {source}\n" f"**来源用户**: {from_user_name}\n" f"**来源群组**: {from_group_name}\n" f"**原视频链接**: {video['video_url']}\n" f"**原视频封面**: {video['cover']}\n" f"**原视频标题**: {video['old_title']}\n" ) AliyunLogger.logging(channel_id, name, url, v_id, "视频下载失败", "3002") Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/493b3d4c-5fae-4a9d-980b-1dd86636524e", "【 视频下载失败,跳过该视频 】") if name == "快手推荐流" or name == "视频号推荐流": sqlCollect.update_feed_vid_2(v_id) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/493b3d4c-5fae-4a9d-980b-1dd86636524e", "【 视频下载失败,跳过该视频 】") continue else: if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(new_video_path, video_path_url, zm) if not os.path.isfile(new_video_path) or os.path.getsize(new_video_path) == 0: AliyunLogger.logging(channel_id, name, url, v_id, "视频改造失败", "3001") text = ( f"**通知类型**: 视频改造失败\n" f"**负责人**: {name}\n" f"**渠道**: {channel_id}\n" f"**视频主页ID**: {url}\n" f"**视频Video_id**: {v_id}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") continue # 上传视频和封面,并更新数据库 code = cls.upload_video_and_thumbnail(new_video_path, cover, v_id, new_title, task_mark, name, piaoquan_id, video_path_url, mark, channel_id, url, old_title, title, rule, video) # 更新已使用的视频号状态 pq_url = f'https://admin.piaoquantv.com/cms/post-detail/{code}/detail' # 站内视频链接 if name == "单点视频": sphdd_status = sqlCollect.update_shp_dd_vid(v_id) if sphdd_status == 1: Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 视频修改已使用,状态已修改") from_user_name = video['from_user_name'] # 来源用户 from_group_name = video['from_group_name'] # 来源群组 source = video['source'] # 渠道 channel_id = source text = ( f"**站内视频链接**: {pq_url}\n" f"**渠道**: {source}\n" f"**来源用户**: {from_user_name}\n" f"**来源群组**: {from_group_name}\n" f"**原视频链接**: {video['video_url']}\n" f"**原视频封面**: {video['cover']}\n" f"**原视频标题**: {video['old_title']}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/d2f751a8-5b0a-49ca-a306-1fda142707a9", "【 有一条新的内容改造成功 】") if name == "快手推荐流" or name == "视频号推荐流": feed_status = sqlCollect.update_feed_vid(v_id) if feed_status == 1: Common.logger(mark).info(f"{name}的{task_mark}下的ID{url} 视频修改已使用,状态已修改") if channel_id == "快手历史" or channel_id == "抖音历史" or channel_id == "视频号历史": explain = "历史爆款" else: explain = "新供给" current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") if name == "品类关键词搜索": first_category = task["first_category"] keyword_principal = task["keyword_name"] tag_first = f"一级品类_{first_category}" tag_keyword = f"关键词_{url}" if channel_id == "抖音搜索": tag_channel = "来源_抖音关键词" elif channel_id == "快手搜索": tag_channel = "来源_快手关键词" elif channel_id == "视频号搜索": tag_channel = "来源_视频号关键词" tag = f"{tag_first},{tag_keyword},{tag_channel}" tag_status = Tag.video_tag(code, tag) if tag_status == 0: Common.logger(mark).info(f"{name}的{task_mark}下的ID{url}下的票圈视频{code},写入标签成功") secondary_category = task["secondary_category"] log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice},,first_category:{first_category},,secondary_category:{secondary_category},,keyword_principal:{keyword_principal},,tag:{tag}" values = [ [ name, task_mark, channel_id, url, str(v_id), piaoquan_id, old_title, title if title in ["原标题", "AI标题"] else "", new_title, str(code), formatted_time, str(rule), explain, voice, first_category, secondary_category, keyword_principal, pq_url ] ] elif name == "抖音品类账号-1" or name == "抖音品类账号" or name == "视频号品类账号" or name == "快手品类账号": first_category = task["first_category"] tag_first = f"一级品类_{first_category}" if channel_id == "抖音" or channel_id == "抖音历史": tag_channel = "来源_抖音品类账号" elif channel_id == "快手" or channel_id == "快手历史": tag_channel = "来源_快手品类账号" elif channel_id == "视频号" or channel_id == "视频号历史": tag_channel = "来源_视频号品类账号" tag = f"{tag_first},{tag_channel}" tag_status = Tag.video_tag( code, tag ) if tag_status == 0: Common.logger(mark).info(f"{name}的{task_mark}下的ID{url}下的票圈视频{code},写入标签成功") log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice},,tag:{tag}" # log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice},,first_category:{first_category},,tag:{tag}" values = [ [ name, task_mark, channel_id, url, str( v_id ), piaoquan_id, old_title, title if title in ["原标题", "AI标题"] else "", new_title, str( code ), formatted_time, str( rule ), explain, voice, first_category, pq_url ] ] else: log_data = f"user:{url},,video_id:{v_id},,video_url:{video_url},,ai_title:{new_title},,voice:{voice}" values = [ [ name, task_mark, channel_id, url, str(v_id), piaoquan_id, old_title, title if title in ["原标题", "AI标题"] else "", new_title, str(code), formatted_time, str(rule), explain, voice ] ] AliyunLogger.logging(channel_id, name, url, v_id, "视频改造成功", "1000", log_data, str(code)) text = ( f"**通知类型**: 视频改造成功\n" f"**站内视频链接**: {pq_url}\n" f"**负责人**: {name}\n" f"**渠道**: {channel_id}\n" f"**视频主页ID**: {url}\n" f"**视频Video_id**: {v_id}\n" f"**使用音频音色**: {voice}\n" ) Feishu.finish_bot(text, "https://open.feishu.cn/open-apis/bot/v2/hook/e7697dc6-5254-4411-8b59-3cd0742bf703", "【 机器改造通知 】") if tags: Tag.video_tag(code, tags) if values: if name == "王雪珂": sheet = "vfhHwj" elif name == "鲁涛": sheet = "FhewlS" elif name == "范军": sheet = "B6dCfS" elif name == "余海涛": sheet = "mfBrNT" elif name == "罗情": sheet = "2J3PwN" elif name == "王玉婷": sheet = "bBHFwC" elif name == "刘诗雨": sheet = "fBdxIQ" elif name == "信欣": sheet = "lPe1eT" elif name == "快手创作者版品类推荐流": sheet = "k7l7nQ" elif name == "抖音品类账号": sheet = "ZixHmf" elif name == "抖音品类账号-1": sheet = "61kvW7" elif name == "视频号品类账号": sheet = "b0uLWw" elif name == "单点视频": sheet = "ptgCXW" elif name == "快手品类账号": sheet = "ibjoMx" elif name == "品类关键词搜索": sheet = "rBAJT8" elif name == "快手推荐流": sheet = "9Ii8lw" elif name == "视频号推荐流": sheet = "hMBv7T" elif name == "快手小程序": sheet = "GeDT6Q" Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "ROWS", 1, 2) time.sleep(0.5) Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", sheet, "A2:Z2", values) except Exception as e: AliyunLogger.logging(channel_id, name, url, video["video_id"], f"改造失败{e}", "3001", log_data) Common.logger(mark).error(f"{name}的{task_mark}任务处理失败:{e}") except Exception as e: AliyunLogger.logging(channel_id, name, url, video["video_id"], f"改造失败{e}", "3001", log_data) Common.logger(mark).error(f"{name}的{task_mark}任务处理失败:{e}") @classmethod def get_data_list(cls, channel_id, task_mark, url, number, mark, feishu_id, cookie_sheet, name, task): """ 根据渠道ID获取数据列表 """ if channel_id == "抖音": return DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "票圈": return PQ.get_pq_url(task_mark, url, number, mark, channel_id, name) elif channel_id == "视频号": return SPH.get_sph_url(task_mark, url, number, mark, channel_id, name) elif channel_id == "快手": return KS.get_ks_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name) elif channel_id == "快手创作者版": return KsFeedVideo.get_data(channel_id, name) elif channel_id == "单点视频": return SPHDD.get_sphdd_data(url, channel_id, name) elif channel_id == "抖音历史": return DYLS.get_dy_zr_list(task_mark, url, number, mark, channel_id, name) elif channel_id == "快手历史": return KSLS.get_ksls_list(task_mark, url, number, mark, channel_id, name) elif channel_id == "视频号历史": return SPHLS.get_sphls_data(task_mark, url, number, mark, channel_id, name) elif channel_id == '抖音搜索': return DyKeyword.get_key_word(url, task_mark, mark, channel_id, name, task) elif channel_id == '快手搜索': return KsXCXKeyword.get_key_word(url, task_mark, mark, channel_id, name, task) elif channel_id == '视频号搜索': return SphKeyword.get_key_word(url, task_mark, mark, channel_id, name) elif channel_id == '快手推荐流': return KSFeed.get_feed_date() elif channel_id == '视频号推荐流': return SPHFeed.get_feed_date() elif channel_id == '快手小程序': return KSXCX.get_xcx_date() @classmethod def generate_title(cls, video, title): """ 生成新标题 """ if video['old_title']: new_title = video['old_title'].strip().replace("\n", "") \ .replace("/", "").replace("\\", "").replace("\r", "") \ .replace(":", "").replace("*", "").replace("?", "") \ .replace("?", "").replace('"', "").replace("<", "") \ .replace(">", "").replace("|", "").replace(" ", "") \ .replace("&NBSP", "").replace(".", "。").replace(" ", "") \ .replace("'", "").replace("#", "").replace("Merge", "") else: return '这个视频,分享给我的老友,祝愿您能幸福安康' if title == "原标题": if not new_title: new_title = '这个视频,分享给我的老友,祝愿您能幸福安康' elif title == "AI标题": if not new_title: new_title = '这个视频,分享给我的老友,祝愿您能幸福安康' else: new_title = GPT4oMini.get_ai_mini_title(new_title) else: titles = title.split('/') if '/' in title else [title] new_title = random.choice(titles) return new_title @classmethod def download_and_process_video(cls, channel_id, video_url, video_path_url, v_id, crop_total, gg_duration_total, pw_random_id, new_title, mark, video): """ 下载并处理视频 """ if channel_id == "单点视频": new_video_path = PQ.dd_sph_download_video(video_url, video_path_url, v_id, video, channel_id) elif channel_id == "视频号": new_video_path = PQ.sph_download_video(video_url, video_path_url, v_id, video) if new_video_path == None: return None Common.logger(mark).info(f"{channel_id}视频下载成功: {new_video_path}") elif channel_id == "票圈" or channel_id == "快手创作者版" or channel_id == '视频号搜索' or channel_id == "快手推荐流": new_video_path = PQ.download_video(video_url, video_path_url, v_id) if new_video_path == None: return None Common.logger(mark).info(f"{channel_id}视频下载成功: {new_video_path}") elif channel_id == "抖音" or channel_id == "抖音历史" or channel_id == "抖音搜索": new_video_path = PQ.download_dy_video(video_url, video_path_url, v_id) if new_video_path == None: return None Common.logger(mark).info(f"{channel_id}视频下载成功: {new_video_path}") elif channel_id == "视频号历史": new_video_path = Oss.download_sph_ls(video_url, video_path_url, v_id) else: Common.logger(mark).info(f"视频准备下载") new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id) if not os.path.isfile(new_video_path) or os.path.getsize(new_video_path) == 0: return None Common.logger(mark).info(f"视频下载成功: {new_video_path}") if crop_total and crop_total != 'None': # 判断是否需要裁剪 new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id) if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长 new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id, gg_duration_total) width, height = FFmpeg.get_w_h_size(new_video_path) if width < height: # 判断是否需要修改为竖屏 new_video_path = FFmpeg.update_video_h_w(new_video_path, video_path_url, pw_random_id) new_title_re = re.sub(r'[^\w\s\u4e00-\u9fff,。!?]', '', new_title) if len(new_title_re) > 12: new_title_re = '\n'.join( [new_title_re[i:i + 12] for i in range(0, len(new_title_re), 12)]) new_video_path = FFmpeg.add_video_zm(new_video_path, video_path_url, pw_random_id, new_title_re) return new_video_path @classmethod def handle_video_ending(cls, new_video_path, video_ending, old_title, pw_random_id, video_path_url, mark, task_mark, url, name, video_share, zm, voice): """ 处理视频片尾 """ if video_ending == "AI片尾引导": pw_srt_text = GPT4oMini.get_ai_mini_pw(old_title) if pw_srt_text: pw_url = TTS.get_pw_zm(pw_srt_text, voice) if pw_url: pw_mp3_path = TTS.download_mp3(pw_url, video_path_url, pw_random_id) # oss_mp3_key = Oss.mp3_upload_oss(pw_mp3_path, pw_random_id) # oss_mp3_key = oss_mp3_key.get("oss_object_key") # new_pw_path = f"http://art-crawler.oss-cn-hangzhou.aliyuncs.com/{oss_mp3_key}" # print(f"mp3地址:{new_pw_path}") # pw_url_sec = FFmpeg.get_video_duration(pw_mp3_path) pw_srt = TTS.getSrt(pw_url) Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾srt成功") else: Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") return None else: Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取AI片尾失败") return None else: if ',' in video_ending: video_ending_list = video_ending.split(',') else: video_ending_list = [video_ending] ending = random.choice(video_ending_list) pw_list = Material.get_pwsrt_data("summary", "DgX7vC", ending) # 获取srt if pw_list: pw_id = pw_list["pw_id"] pw_srt = pw_list["pw_srt"] pw_url = PQ.get_pw_url(pw_id) pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id) else: Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name) for attempt in range(3): jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg if os.path.isfile(jpg_path): Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},生成视频最后一帧成功") break time.sleep(1) for attempt in range(3): Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},获取mp3成功") pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_mp3_path, pw_srt, pw_random_id, pw_mp3_path) # 生成片尾视频 if os.path.isfile(pw_path): Common.logger(mark).info(f"{task_mark}下的视频{url},生成片尾视频成功") break time.sleep(1) pw_video_list = [new_video_path, pw_path] Common.logger(mark).info(f"{task_mark}下的视频{url},视频与片尾开始拼接") video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起 if os.path.exists(video_path) or os.path.getsize(video_path) != 0: Common.logger(mark).info(f"{name}的{task_mark}下的视频{url},视频与片尾拼接成功") time.sleep(1) if video_share and video_share != 'None': new_video_path = FFmpeg.single_video(video_path, video_path_url, zm) else: new_video_path = video_path return new_video_path else: return new_video_path @classmethod def upload_video_and_thumbnail(cls, new_video_path: str, cover: str, v_id, new_title: str, task_mark: str, name: str, piaoquan_id, video_path_url: str, mark: str, channel_id: str, url: str, old_title: str, title, rule: str, video): """ 上传视频和封面到OSS,并更新数据库 """ try: oss_id = cls.random_id() Common.logger(mark).info(f"{name}的{task_mark},开始发送oss") oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS Common.logger(mark).info(f"{name}的{task_mark},发送oss成功{oss_object_key}") status = oss_object_key.get("status") if status == 200: oss_object_key = oss_object_key.get("oss_object_key") time.sleep(1) jpg_path = None if channel_id == "快手历史" or channel_id == "快手" or channel_id == '快手搜索' or channel_id == '视频号': jpg = None elif channel_id == "视频号历史": jpg_path = Oss.download_sph_ls( cover, video_path_url, v_id ) elif channel_id == '单点视频': if video['source'] != "快手": jpg_path = PQ.download_video_jpg( cover, video_path_url, v_id ) # 下载视频封面 if jpg_path and os.path.isfile( jpg_path ): oss_jpg_key = Oss.stitching_fm_upload_oss( jpg_path, oss_id ) # 封面上传OSS jpg = oss_jpg_key.get( "oss_object_key" ) else: jpg = None code = PQ.insert_piaoquantv(oss_object_key, new_title, jpg, piaoquan_id) Common.logger(mark).info(f"{name}的{task_mark}下的视频ID{v_id}发送成功") sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库 current_time = datetime.now() formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S") if name == "单点视频": url = str(rule) sqlCollect.insert_machine_making_data(name, task_mark, channel_id, url, v_id, piaoquan_id, new_title, code, formatted_time, old_title, oss_object_key) return code except Exception as e: Common.logger(mark).error(f"{name}的{task_mark}上传视频和封面到OSS,并更新数据库失败:{e}\n") AliyunLogger.logging(channel_id, name, url, video["video_id"], "改造失败-上传视频和封面到OSS", "3001") return @classmethod def main(cls, data): """ 主函数,初始化任务并使用线程池处理任务。 """ mark = data["mark"] name = data["name"] feishu_id = data["feishu_id"] feishu_sheet = data["feishu_sheet"] cookie_sheet = data["cookie_sheet"] data = get_data(mark, feishu_id, feishu_sheet) if not data: Common.logger("redis").error(f"{mark}任务开始新的一轮\n") return task = json.loads(data) try: limit_number = task["limit_number"] if limit_number: task_mark = task["task_mark"] makr_count = sqlCollect.get_mark_count(task_mark) if int(limit_number) <= int(makr_count[0][0]): AliyunLogger.logging((task["channel_id"]), name, task["channel_url"], '', f"{task_mark}标识任务每日指定条数已足够,指定条数{limit_number},实际生成条数{int(makr_count[0][0])}", "1111") return if mark == 'dy-pl-gjc' and task['channel_id'] == '抖音搜索': mark_count = 'dyss-count' count = get_first_value_with_prefix(mark_count) increment_key(mark_count) if int(count) >= 300: return "抖音搜索上限" if mark == 'ks-pl-gjc': mark_count = 'ksss-count' count = get_first_value_with_prefix(mark_count) increment_key(mark_count) if int(count) >= 300: return "快手搜索上限" if mark == 'sph-pl-gjc': mark_count = 'ss-sph-count' count = get_first_value_with_prefix(mark_count) increment_key(mark_count) if int(count) >= 300: time.sleep(10) return "视频号搜索上限" if mark == 'sph-plzh'and task['channel_id'] == '视频号': mark_count = 'sph-count' count = get_first_value_with_prefix(mark_count) increment_key(mark_count) if int(count) >= 400: time.sleep(10) return "视频号获取用户主页视频上限" VideoProcessor.process_task(task, mark, name, feishu_id, cookie_sheet) return mark except Exception as e: AliyunLogger.logging((task["channel_id"]), name, task["channel_url"],'', f"用户抓取异常:{e}", "3001") return mark # if __name__ == "__main__": # main()