video_prep.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. import configparser
  2. import os
  3. import random
  4. import threading
  5. import time
  6. from datetime import datetime
  7. import concurrent.futures
  8. from common import Material, Feishu, Common, Oss
  9. from common.ffmpeg import FFmpeg
  10. from data_channel.douyin import DY
  11. from data_channel.piaoquan import PQ
  12. from common.sql_help import sqlCollect
  13. from data_channel.shipinhao import SPH
  14. config = configparser.ConfigParser()
  15. config.read('./config.ini')
  16. class getVideo:
  17. """
  18. 数据处理
  19. """
  20. @classmethod
  21. def get_video(cls):
  22. pass
  23. """
  24. 根据标示+任务标示创建目录
  25. """
  26. @classmethod
  27. def create_folders(cls, mark, task_mark):
  28. video_path_url = config['PATHS']['VIDEO_PATH'] + mark + "/" + task_mark + "/"
  29. if not os.path.exists(video_path_url):
  30. os.makedirs(video_path_url)
  31. return video_path_url
  32. """
  33. 随机生成ID
  34. """
  35. @classmethod
  36. def random_id(cls):
  37. now = datetime.now()
  38. rand_num = random.randint(10000, 99999)
  39. oss_id = "{}{}".format(now.strftime("%Y%m%d%H%M%S"), rand_num)
  40. return oss_id
  41. """
  42. 删除文件
  43. """
  44. @classmethod
  45. def remove_files(cls, video_path_url):
  46. if os.path.exists(video_path_url) and os.path.isdir(video_path_url):
  47. for root, dirs, files in os.walk(video_path_url):
  48. for file in files:
  49. file_path = os.path.join(root, file)
  50. os.remove(file_path)
  51. for dir in dirs:
  52. dir_path = os.path.join(root, dir)
  53. os.rmdir(dir_path)
  54. """
  55. 飞书数据处理
  56. """
  57. @classmethod
  58. def video_task(cls, data):
  59. mark = data["mark"]
  60. name = data["name"]
  61. feishu_id = data["feishu_id"]
  62. feishu_sheet = data["feishu_sheet"]
  63. cookie_sheet = data["cookie_sheet"]
  64. pz_sheet = '500Oe0'
  65. pw_sheet = 'DgX7vC'
  66. task_data = Material.get_task_data(feishu_id, feishu_sheet)
  67. if len(task_data) == 0:
  68. Feishu.bot(mark, '机器自动改造消息通知', f'今日任务为空,请关注', name)
  69. return mark
  70. lock = threading.Lock()
  71. def process_task(task):
  72. task_mark = task["task_mark"] # 任务标示
  73. channel_id = str(task["channel_id"])
  74. channel_urls = str(task["channel_url"])
  75. piaoquan_id = str(task["piaoquan_id"])
  76. number = task["number"] # 指定条数
  77. title = task["title"]
  78. video_share = task["video_share"]
  79. video_ending = task["video_ending"]
  80. crop_total = task["crop_total"]
  81. gg_duration_total = task["gg_duration_total"]
  82. video_path_url = cls.create_folders(mark, str(task_mark)) # 创建目录
  83. if video_share and video_share != 'None':
  84. video_share_list = video_share.split('/')
  85. video_share_mark = video_share_list[0]
  86. video_share_name = video_share_list[1]
  87. zm = Material.get_pzsrt_data("summary", pz_sheet, video_share_name) # 获取srt
  88. if zm == '':
  89. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片中标示填写错误,请关注!!!!', name)
  90. if ',' in channel_urls:
  91. channel_url = channel_urls.split(',')
  92. else:
  93. channel_url = [channel_urls]
  94. for url in channel_url:
  95. Common.logger("log").info(f"{task_mark}下的用户:{channel_url}开始获取视频")
  96. if '/' in title:
  97. titles = title.split('/')
  98. else:
  99. titles = [title]
  100. if channel_id == "抖音":
  101. data_list = DY.get_dy_url(task_mark, url, number, mark, feishu_id, cookie_sheet, channel_id, name)
  102. elif channel_id == "票圈":
  103. data_list = PQ.get_pq_url(task_mark, url, number, mark)
  104. elif channel_id == "视频号":
  105. data_list = SPH.get_sph_url(task_mark, url, number, mark)
  106. # elif channel_id == "快手":
  107. # pass
  108. if len(data_list) == 0:
  109. Common.logger("log").info(f"{task_mark}下的视频ID{id} 已经改造过了")
  110. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下的用户ID{id},没有已经改造的视频了', name)
  111. cls.remove_files(video_path_url)
  112. continue
  113. Common.logger("log").info(f"{task_mark}下的ID{id} 获取视频完成,共{len(data_list)}条")
  114. try:
  115. for video in data_list:
  116. v_id = video["video_id"]
  117. cover = video["cover"]
  118. video_url = video["video_url"]
  119. time.sleep(1)
  120. pw_random_id = cls.random_id()
  121. if channel_id == "票圈":
  122. new_video_path = PQ.download_video(video_url, video_path_url, v_id) # 下载视频地址
  123. else:
  124. new_video_path = Oss.download_video_oss(video_url, video_path_url, v_id) # 下载视频地址
  125. if not os.path.isfile(new_video_path):
  126. Common.logger("log").info(f"{task_mark}下的视频{url},{new_video_path}视频下载失败")
  127. cls.remove_files(video_path_url)
  128. continue
  129. if crop_total and crop_total != 'None': # 判断是否需要裁剪
  130. new_video_path = FFmpeg.video_crop(new_video_path, video_path_url, pw_random_id)
  131. if gg_duration_total and gg_duration_total != 'None': # 判断是否需要指定视频时长
  132. new_video_path = FFmpeg.video_ggduration(new_video_path, video_path_url, pw_random_id, gg_duration_total)
  133. if video_ending and video_ending != 'None':
  134. if ',' in video_ending:
  135. video_ending_list = video_ending.split(',')
  136. else:
  137. video_ending_list = [video_ending]
  138. ending = random.choice(video_ending_list)
  139. pw_list = Material.get_pwsrt_data("summary", pw_sheet, ending) # 获取srt
  140. if pw_list:
  141. pw_id = pw_list["pw_id"]
  142. pw_srt = pw_list["pw_srt"]
  143. pw_url = PQ.get_pw_url(pw_id)
  144. else:
  145. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务下片尾标示错误,请关注!!!!', name)
  146. for attempt in range(3):
  147. jpg_path = FFmpeg.video_png(new_video_path, video_path_url, pw_random_id) # 生成视频最后一帧jpg
  148. if os.path.isfile(jpg_path):
  149. Common.logger("log").info(f"{task_mark}下的视频{url},生成视频最后一帧成功")
  150. break
  151. time.sleep(1)
  152. if not os.path.isfile(jpg_path):
  153. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},获取视频最后一帧失败,请关注', name)
  154. cls.remove_files(video_path_url)
  155. continue
  156. for attempt in range(3):
  157. pw_mp3_path = FFmpeg.get_video_mp3(pw_url, video_path_url, pw_random_id)
  158. pw_path = FFmpeg.pw_video(jpg_path, video_path_url, pw_url, pw_srt, pw_random_id, pw_mp3_path) # 生成片尾视频
  159. if os.path.isfile(pw_path):
  160. Common.logger("log").info(f"{task_mark}下的视频{url},生成片尾视频成功")
  161. break
  162. time.sleep(1)
  163. if not os.path.isfile(pw_path):
  164. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务用户{url}下的视频{v_id},生成片尾视频失败,请关注',
  165. name)
  166. cls.remove_files(video_path_url)
  167. continue
  168. pw_video_list = [new_video_path, pw_path]
  169. video_path = FFmpeg.concatenate_videos(pw_video_list, video_path_url) # 视频与片尾拼接到一起
  170. time.sleep(1)
  171. if video_share and video_share != 'None':
  172. new_video_path = FFmpeg.single_video(video_path, video_share_mark, video_path_url, zm)
  173. else:
  174. new_video_path = video_path
  175. else:
  176. if video_share and video_share != 'None':
  177. new_video_path = FFmpeg.single_video(new_video_path, video_share_mark, video_path_url, zm)
  178. time.sleep(1)
  179. oss_id = cls.random_id()
  180. oss_object_key = Oss.stitching_sync_upload_oss(new_video_path, oss_id) # 视频发送OSS
  181. status = oss_object_key.get("status")
  182. if status == 200:
  183. oss_object_key = oss_object_key.get("oss_object_key")
  184. time.sleep(1)
  185. new_title = random.choice(titles)
  186. code = PQ.insert_piaoquantv(oss_object_key, new_title, cover, piaoquan_id)
  187. if code:
  188. Common.logger("log").info(f"{task_mark}下的视频ID{v_id}发送成功")
  189. sqlCollect.insert_task(task_mark, v_id, mark, channel_id) # 插入数据库
  190. current_time = datetime.now()
  191. formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
  192. values = [[name, task_mark, v_id, piaoquan_id, new_title, str(code), formatted_time]]
  193. # 使用锁保护表格插入操作
  194. with lock:
  195. Feishu.insert_columns("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "ROWS", 1, 2)
  196. time.sleep(0.5)
  197. Feishu.update_values("ILb4sa0LahddRktnRipcu2vQnLb", "a74fc4", "A2:Z2", values)
  198. cls.remove_files(video_path_url)
  199. else:
  200. cls.remove_files(video_path_url)
  201. Common.logger("log").info(f"{task_mark}下的{url}视频{v_id} 视频发送OSS失败 ")
  202. Feishu.bot(mark, '机器自动改造消息通知', f'{task_mark}任务改造完成,请关注', name)
  203. except Exception as e:
  204. cls.remove_files(video_path_url)
  205. Common.logger("warning").warning(f"{name}的{task_mark}任务处理失败:{e}\n")
  206. batch_size = 3
  207. with concurrent.futures.ThreadPoolExecutor(max_workers=batch_size) as executor:
  208. index = 0
  209. while index < len(task_data):
  210. # 计算剩余的任务数量
  211. remaining_tasks = len(task_data) - index
  212. # 当前批次大小为剩余任务数量和批次大小中较小的一个
  213. current_batch_size = min(batch_size, remaining_tasks)
  214. # 获取当前批次的任务
  215. current_batch = task_data[index:index + batch_size]
  216. futures = {executor.submit(process_task, task): task for task in current_batch}
  217. for future in concurrent.futures.as_completed(futures):
  218. task = futures[future]
  219. try:
  220. future.result()
  221. print(f"Task {task['task_mark']} 完成")
  222. except Exception as exc:
  223. print(f"Task {task['task_mark']} 异常信息: {exc}")
  224. # 移动到下一批任务
  225. index += current_batch_size
  226. Feishu.bot(mark, '机器自动改造消息通知', f'你的任务全部完成,请关注!!!!!', name)
  227. return mark