123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- # 读取视频分析报告1_拆分钩子.xlsx
- # 提取视频链接,提取hook和time
- # 下载视频,
- # 使用ffmpeg将hook文案加入到视频内
- # 保存处理后的视频至 trigger_video 文件夹内
- import pandas as pd
- import os
- import requests
- import subprocess
- from datetime import datetime
- import time
- from pathlib import Path
- import shutil
- import oss2
- from oss2.credentials import EnvironmentVariableCredentialsProvider
- def download_video(url, save_path):
- """下载视频文件,支持断点续传"""
- try:
- # 创建临时文件
- temp_path = save_path + '.tmp'
-
- # 获取已下载的文件大小
- initial_pos = 0
- if os.path.exists(temp_path):
- initial_pos = os.path.getsize(temp_path)
-
- headers = {'Range': f'bytes={initial_pos}-'} if initial_pos > 0 else {}
-
- # 发送请求
- response = requests.get(url, headers=headers, stream=True)
- response.raise_for_status()
-
- # 获取文件总大小
- total_size = int(response.headers.get('content-length', 0)) + initial_pos
-
- # 写入文件
- mode = 'ab' if initial_pos > 0 else 'wb'
- with open(temp_path, mode) as f:
- for chunk in response.iter_content(chunk_size=8192):
- if chunk:
- f.write(chunk)
-
- # 下载完成后重命名
- shutil.move(temp_path, save_path)
- return True
-
- except Exception as e:
- print(f"下载视频失败: {str(e)}")
- if os.path.exists(temp_path):
- os.remove(temp_path)
- return False
- def add_text_to_video(input_video, output_video, text, start_time, end_time):
- """使用ffmpeg添加文字到视频中,在指定的时间段内显示
- 文字样式:
- - 红色背景
- - 白色18像素加粗字体
- - 自动换行(每行最多20个字符)
- - 位置在底部50像素处
- """
- try:
- # 处理文本换行(每行最多20个字符)
- wrapped_text = text.replace('\\n', '\n') # 保留原有的换行符
- if len(text) > 20 and '\\n' not in text:
- # 在合适的位置添加换行符
- words = text.split()
- lines = []
- current_line = []
- current_length = 0
-
- for word in words:
- if current_length + len(word) + 1 <= 20: # +1 for space
- current_line.append(word)
- current_length += len(word) + 1
- else:
- lines.append(' '.join(current_line))
- current_line = [word]
- current_length = len(word)
-
- if current_line:
- lines.append(' '.join(current_line))
-
- wrapped_text = '\\n'.join(lines)
-
- # 构建ffmpeg命令
- # 使用drawtext滤镜添加文字,设置字体、颜色、位置等
- cmd = [
- 'ffmpeg', '-y',
- '-i', input_video,
- '-vf', f"drawtext=text='{wrapped_text}'"
- f":fontsize=18" # 字体大小18像素
- f":fontcolor=white" # 白色字体
- f":fontfile=/System/Library/Fonts/PingFang.ttc" # 使用系统字体
- f":fontweight=bold" # 字体加粗
- f":box=1" # 启用背景框
- f":boxcolor=red@0.8" # 红色背景,透明度0.8
- f":boxborderw=5" # 背景框边框宽度
- f":x=(w-text_w)/2" # 水平居中
- f":y=h-th-50" # 距离底部50像素
- f":line_spacing=10" # 行间距
- f":enable='between(t,{start_time},{end_time})'", # 显示时间段
- '-c:a', 'copy', # 保持音频不变
- output_video
- ]
-
- # 执行命令
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE,
- universal_newlines=True
- )
-
- # 等待命令执行完成
- stdout, stderr = process.communicate()
-
- if process.returncode != 0:
- print(f"添加文字失败: {stderr}")
- return False
-
- return True
-
- except Exception as e:
- print(f"处理视频时出错: {str(e)}")
- return False
- def parse_time(time_str):
- """解析时间字符串,支持以下格式:
- 1. HH:MM:SS-HH:MM:SS (时间范围)
- 2. HH:MM:SS (单个时间点,显示5秒)
- 3. "视频结束" (在视频结束前5秒显示)
- """
- try:
- if time_str == "视频结束":
- return None, None # 特殊标记,需要后续处理
-
- if '-' in time_str:
- # 处理时间范围
- start_str, end_str = time_str.split('-')
- # 解析开始时间
- h1, m1, s1 = map(int, start_str.split(':'))
- start_time = h1 * 3600 + m1 * 60 + s1
- # 解析结束时间
- h2, m2, s2 = map(int, end_str.split(':'))
- end_time = h2 * 3600 + m2 * 60 + s2
- return start_time, end_time
- else:
- # 处理单个时间点
- h, m, s = map(int, time_str.split(':'))
- start_time = h * 3600 + m * 60 + s
- return start_time, start_time + 5 # 默认显示5秒
-
- except Exception as e:
- print(f"时间格式解析失败: {time_str}, 错误: {str(e)}")
- return None, None
- def process_videos():
- """处理所有视频数据"""
- # 创建输出目录
- output_dir = Path("trigger_video")
- output_dir.mkdir(exist_ok=True)
-
- # 创建临时目录
- temp_dir = Path("temp_videos")
- temp_dir.mkdir(exist_ok=True)
-
- try:
- # 读取Excel文件
- print("开始读取Excel文件...")
- df = pd.read_excel("视频分析报告1_拆分钩子.xlsx")
- total_rows = len(df)
- print(f"共读取到 {total_rows} 行数据")
-
- # 处理每一行
- for idx, row in df.iterrows():
- try:
- print(f"\n{'='*50}")
- print(f"开始处理第 {idx+1}/{total_rows} 行")
- print(f"{'='*50}")
-
- video_url = row.iloc[3] # 视频URL在第4列
- if pd.isna(video_url):
- print(f"第 {idx+1} 行没有视频URL,跳过")
- continue
-
- print(f"视频URL: {video_url}")
-
- # 获取hook信息
- hooks = row.iloc[11].split('\n') # hook在第12列
- times = row.iloc[9].split('\n') # time在第10列
-
- print(f"钩子数量: {len(hooks)}")
- print(f"时间点数量: {len(times)}")
-
- if not hooks or not times or len(hooks) != len(times):
- print(f"第 {idx+1} 行hook或time数据不完整,跳过")
- continue
-
- # 生成输出文件名
- video_id = f"video_{idx+1}"
- temp_video = temp_dir / f"{video_id}.mp4"
- output_video = output_dir / f"{video_id}_with_hooks.mp4"
-
- # 如果输出文件已存在,跳过处理
- if output_video.exists():
- print(f"视频 {output_video} 已存在,跳过处理")
- continue
-
- # 下载视频
- print(f"\n开始下载视频...")
- if not download_video(video_url, str(temp_video)):
- print(f"第 {idx+1} 行视频下载失败,跳过")
- continue
-
- # 获取视频总时长
- print("\n获取视频时长...")
- cmd = ['ffprobe', '-v', 'error', '-show_entries', 'format=duration',
- '-of', 'default=noprint_wrappers=1:nokey=1', str(temp_video)]
- video_duration = float(subprocess.check_output(cmd).decode().strip())
- print(f"视频总时长: {video_duration:.2f}秒")
-
- # 处理每个hook
- current_video = temp_video
- for i, (hook, time_str) in enumerate(zip(hooks, times)):
- if not hook.strip() or not time_str.strip():
- print(f"\n跳过空的hook或时间点")
- continue
-
- print(f"\n处理第 {i+1}/{len(hooks)} 个钩子:")
- print(f"钩子内容: {hook}")
- print(f"时间点: {time_str}")
-
- # 解析时间
- start_time, end_time = parse_time(time_str)
- if start_time is None:
- if time_str == "视频结束":
- # 在视频结束前5秒显示
- start_time = video_duration - 5
- end_time = video_duration
- print(f"设置为视频结束前5秒显示")
- else:
- print(f"无效的时间格式: {time_str},跳过")
- continue
-
- print(f"开始时间: {start_time:.2f}秒")
- print(f"结束时间: {end_time:.2f}秒")
-
- # 确保时间在视频范围内
- if start_time >= video_duration:
- print(f"开始时间超出视频时长,跳过")
- continue
- end_time = min(end_time, video_duration)
-
- # 添加文字到视频
- temp_output = temp_dir / f"{video_id}_temp_{i}.mp4"
- print(f"正在添加文字到视频...")
- if not add_text_to_video(str(current_video), str(temp_output), hook, start_time, end_time):
- print("添加文字失败,跳过")
- continue
-
- # 更新当前视频路径
- if current_video != temp_video:
- os.remove(current_video)
- current_video = temp_output
- print("文字添加成功")
-
- # 移动最终视频到输出目录
- print(f"\n处理完成,保存最终视频...")
- shutil.move(str(current_video), str(output_video))
- print(f"视频已保存到: {output_video}")
- oss_url = upload_to_oss(f"{video_id}_with_hooks.mp4")
- print(f"上传成功: {oss_url}")
- # 将oss_url写入excel 12列
- df.loc[idx, 12] = oss_url
- df.to_excel("视频分析报告1_拆分钩子_with_oss_url.xlsx", index=False)
-
- except Exception as e:
- print(f"处理第 {idx+1} 行时出错: {str(e)}")
- continue
-
- finally:
- # 清理临时文件
- if temp_dir.exists():
- print("\n清理临时文件...")
- shutil.rmtree(temp_dir)
-
- print("\n所有视频处理完成!")
- # 处理完成之后上传至阿里云oss
-
- def upload_to_oss(object_name):
- auth = oss2.ProviderAuthV4(EnvironmentVariableCredentialsProvider())
- # 填写Bucket所在地域对应的Endpoint。以华东1(杭州)为例,Endpoint填写为https://oss-cn-hangzhou.aliyuncs.com。
- endpoint = "https://oss-cn-hangzhou.aliyuncs.com"
- # 填写Endpoint对应的Region信息,例如cn-hangzhou。注意,v4签名下,必须填写该参数
- region = "cn-hangzhou"
- # 填写Bucket名称,例如examplebucket。
- bucketName = "art-weapp"
- # 创建Bucket实例,指定存储空间的名称和Region信息。
- bucket = oss2.Bucket(auth, endpoint, bucketName, region=region)
- # 本地文件的完整路径
- local_file_path = '/Users/jihuaqiang/piaoquan/video-comprehension/' + object_name
- # 填写Object完整路径,完整路径中不能包含Bucket名称。例如exampleobject.txt。
- objectName = 'ai-trigger-demo/' + object_name
- # 使用put_object_from_file方法将本地文件上传至OSS
- bucket.put_object_from_file(objectName, local_file_path)
- print(f"上传成功: https://art-weapp.oss-cn-hangzhou.aliyuncs.com/{objectName}")
- return f"https://art-weapp.oss-cn-hangzhou.aliyuncs.com/{objectName}"
- if __name__ == "__main__":
- # process_videos()
- #
- upload_to_oss("57463792VYj3UHnLFS6lAufeAy20250512191000803651096-1LD.mp4")
|