# 读取视频分析报告1_拆分钩子.xlsx # 提取视频链接,提取hook和time # 下载视频, # 使用ffmpeg将hook文案加入到视频内 # 保存处理后的视频至 trigger_video 文件夹内 import pandas as pd import os import requests import subprocess from datetime import datetime import time from pathlib import Path import shutil import oss2 from oss2.credentials import EnvironmentVariableCredentialsProvider def download_video(url, save_path): """下载视频文件,支持断点续传""" try: # 创建临时文件 temp_path = save_path + '.tmp' # 获取已下载的文件大小 initial_pos = 0 if os.path.exists(temp_path): initial_pos = os.path.getsize(temp_path) headers = {'Range': f'bytes={initial_pos}-'} if initial_pos > 0 else {} # 发送请求 response = requests.get(url, headers=headers, stream=True) response.raise_for_status() # 获取文件总大小 total_size = int(response.headers.get('content-length', 0)) + initial_pos # 写入文件 mode = 'ab' if initial_pos > 0 else 'wb' with open(temp_path, mode) as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # 下载完成后重命名 shutil.move(temp_path, save_path) return True except Exception as e: print(f"下载视频失败: {str(e)}") if os.path.exists(temp_path): os.remove(temp_path) return False def add_text_to_video(input_video, output_video, text, start_time, end_time): """使用ffmpeg添加文字到视频中,在指定的时间段内显示 文字样式: - 红色背景 - 白色18像素加粗字体 - 自动换行(每行最多20个字符) - 位置在底部50像素处 """ try: # 处理文本换行(每行最多20个字符) wrapped_text = text.replace('\\n', '\n') # 保留原有的换行符 if len(text) > 20 and '\\n' not in text: # 在合适的位置添加换行符 words = text.split() lines = [] current_line = [] current_length = 0 for word in words: if current_length + len(word) + 1 <= 20: # +1 for space current_line.append(word) current_length += len(word) + 1 else: lines.append(' '.join(current_line)) current_line = [word] current_length = len(word) if current_line: lines.append(' '.join(current_line)) wrapped_text = '\\n'.join(lines) # 构建ffmpeg命令 # 使用drawtext滤镜添加文字,设置字体、颜色、位置等 cmd = [ 'ffmpeg', '-y', '-i', input_video, '-vf', f"drawtext=text='{wrapped_text}'" f":fontsize=18" # 字体大小18像素 f":fontcolor=white" # 白色字体 f":fontfile=/System/Library/Fonts/PingFang.ttc" # 使用系统字体 f":fontweight=bold" # 字体加粗 f":box=1" # 启用背景框 f":boxcolor=red@0.8" # 红色背景,透明度0.8 f":boxborderw=5" # 背景框边框宽度 f":x=(w-text_w)/2" # 水平居中 f":y=h-th-50" # 距离底部50像素 f":line_spacing=10" # 行间距 f":enable='between(t,{start_time},{end_time})'", # 显示时间段 '-c:a', 'copy', # 保持音频不变 output_video ] # 执行命令 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True ) # 等待命令执行完成 stdout, stderr = process.communicate() if process.returncode != 0: print(f"添加文字失败: {stderr}") return False return True except Exception as e: print(f"处理视频时出错: {str(e)}") return False def parse_time(time_str): """解析时间字符串,支持以下格式: 1. HH:MM:SS-HH:MM:SS (时间范围) 2. HH:MM:SS (单个时间点,显示5秒) 3. "视频结束" (在视频结束前5秒显示) """ try: if time_str == "视频结束": return None, None # 特殊标记,需要后续处理 if '-' in time_str: # 处理时间范围 start_str, end_str = time_str.split('-') # 解析开始时间 h1, m1, s1 = map(int, start_str.split(':')) start_time = h1 * 3600 + m1 * 60 + s1 # 解析结束时间 h2, m2, s2 = map(int, end_str.split(':')) end_time = h2 * 3600 + m2 * 60 + s2 return start_time, end_time else: # 处理单个时间点 h, m, s = map(int, time_str.split(':')) start_time = h * 3600 + m * 60 + s return start_time, start_time + 5 # 默认显示5秒 except Exception as e: print(f"时间格式解析失败: {time_str}, 错误: {str(e)}") return None, None def process_videos(): """处理所有视频数据""" # 创建输出目录 output_dir = Path("trigger_video") output_dir.mkdir(exist_ok=True) # 创建临时目录 temp_dir = Path("temp_videos") temp_dir.mkdir(exist_ok=True) try: # 读取Excel文件 print("开始读取Excel文件...") df = pd.read_excel("视频分析报告1_拆分钩子.xlsx") total_rows = len(df) print(f"共读取到 {total_rows} 行数据") # 处理每一行 for idx, row in df.iterrows(): try: print(f"\n{'='*50}") print(f"开始处理第 {idx+1}/{total_rows} 行") print(f"{'='*50}") video_url = row.iloc[3] # 视频URL在第4列 if pd.isna(video_url): print(f"第 {idx+1} 行没有视频URL,跳过") continue print(f"视频URL: {video_url}") # 获取hook信息 hooks = row.iloc[11].split('\n') # hook在第12列 times = row.iloc[9].split('\n') # time在第10列 print(f"钩子数量: {len(hooks)}") print(f"时间点数量: {len(times)}") if not hooks or not times or len(hooks) != len(times): print(f"第 {idx+1} 行hook或time数据不完整,跳过") continue # 生成输出文件名 video_id = f"video_{idx+1}" temp_video = temp_dir / f"{video_id}.mp4" output_video = output_dir / f"{video_id}_with_hooks.mp4" # 如果输出文件已存在,跳过处理 if output_video.exists(): print(f"视频 {output_video} 已存在,跳过处理") continue # 下载视频 print(f"\n开始下载视频...") if not download_video(video_url, str(temp_video)): print(f"第 {idx+1} 行视频下载失败,跳过") continue # 获取视频总时长 print("\n获取视频时长...") cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', str(temp_video)] video_duration = float(subprocess.check_output(cmd).decode().strip()) print(f"视频总时长: {video_duration:.2f}秒") # 处理每个hook current_video = temp_video for i, (hook, time_str) in enumerate(zip(hooks, times)): if not hook.strip() or not time_str.strip(): print(f"\n跳过空的hook或时间点") continue print(f"\n处理第 {i+1}/{len(hooks)} 个钩子:") print(f"钩子内容: {hook}") print(f"时间点: {time_str}") # 解析时间 start_time, end_time = parse_time(time_str) if start_time is None: if time_str == "视频结束": # 在视频结束前5秒显示 start_time = video_duration - 5 end_time = video_duration print(f"设置为视频结束前5秒显示") else: print(f"无效的时间格式: {time_str},跳过") continue print(f"开始时间: {start_time:.2f}秒") print(f"结束时间: {end_time:.2f}秒") # 确保时间在视频范围内 if start_time >= video_duration: print(f"开始时间超出视频时长,跳过") continue end_time = min(end_time, video_duration) # 添加文字到视频 temp_output = temp_dir / f"{video_id}_temp_{i}.mp4" print(f"正在添加文字到视频...") if not add_text_to_video(str(current_video), str(temp_output), hook, start_time, end_time): print("添加文字失败,跳过") continue # 更新当前视频路径 if current_video != temp_video: os.remove(current_video) current_video = temp_output print("文字添加成功") # 移动最终视频到输出目录 print(f"\n处理完成,保存最终视频...") shutil.move(str(current_video), str(output_video)) print(f"视频已保存到: {output_video}") oss_url = upload_to_oss(f"{video_id}_with_hooks.mp4") print(f"上传成功: {oss_url}") # 将oss_url写入excel 12列 df.loc[idx, 12] = oss_url df.to_excel("视频分析报告1_拆分钩子_with_oss_url.xlsx", index=False) except Exception as e: print(f"处理第 {idx+1} 行时出错: {str(e)}") continue finally: # 清理临时文件 if temp_dir.exists(): print("\n清理临时文件...") shutil.rmtree(temp_dir) print("\n所有视频处理完成!") # 处理完成之后上传至阿里云oss def upload_to_oss(object_name): auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider()) # 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。 endpoint = "https://oss-cn-hangzhou.aliyuncs.com" # 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数 region = "cn-hangzhou" # 填写Bucket名称,例如examplebucket。 bucketName = "art-weapp" # 创建Bucket实例,指定存储空间的名称和Region信息。 bucket = oss2.Bucket(auth, endpoint, bucketName, region=region) # 本地文件的完整路径 local_file_path = '/Users/jihuaqiang/piaoquan/video-comprehension/' + object_name # 填写Object完整路径,完整路径中不能包含Bucket名称。例如exampleobject.txt。 objectName = 'ai-trigger-demo/' + object_name # 使用put_object_from_file方法将本地文件上传至OSS bucket.put_object_from_file(objectName, local_file_path) print(f"上传成功: https://art-weapp.oss-cn-hangzhou.aliyuncs.com/{objectName}") return f"https://art-weapp.oss-cn-hangzhou.aliyuncs.com/{objectName}" if __name__ == "__main__": # process_videos() # upload_to_oss("57463792VYj3UHnLFS6lAufeAy20250512191000803651096-1LD.mp4")