""" @author: luojunhui """ import os import oss2 import asyncio import aiohttp import aiofiles import requests from hashlib import md5 from uuid import uuid4 from fake_useragent import FakeUserAgent async def is_empty(file_path: str) -> bool: """ 判断文件size """ # 判断文件是否大于10kb, 若小于10 kb,认为该视频文件为空 TEN_KB = 1024 * 10 if os.path.getsize(file_path) > TEN_KB: return False return True async def download_cover(file_path, platform, cover_url): """ 下载视频封面 :param platform: :param cover_url: :param file_path: :return: """ headers = request_header(platform=platform, url=cover_url, download_type="cover") response = requests.get(url=cover_url, headers=headers) if b"" in response.content: return None elif response.status_code != 200: return None else: with open(file_path, "wb") as f: f.write(response.content) return file_path def request_header(platform, url, download_type="video"): """ 请求头 :return: """ if platform == "xg_search": if "v9-xg-web-pc.ixigua.com" in url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v9-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif "v3-xg-web-pc.ixigua.com" in url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif download_type == "cover": headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8', 'Cache-Control': 'max-age=0', 'Proxy-Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' } else: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif platform == "baidu_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, } elif platform == "wx_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, "Origin": "https://mp.weixin.qq.com", "Referer": "https://mp.weixin.qq.com" } elif platform == "dy_search": headers = { 'accept': '*/*', 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8', 'priority': 'i', 'range': 'bytes=0-', 'referer': 'https://v11-coldf.douyinvod.com/', 'user-agent': FakeUserAgent().chrome } else: headers = {} return headers async def download_video(file_path, platform, video_url, download_type="video"): """ :param download_type: :param video_url: :param platform: :param file_path: :return: """ headers = request_header(platform=platform, url=video_url, download_type=download_type) max_retries = 3 # 设置最大重试次数 retries = 0 # 初始化重试次数 tunnel = "l901.kdltps.com:15818" username = "t11983523373311" password = "mtuhdr2z" proxy_auth = aiohttp.BasicAuth(username, password) while retries < max_retries: if os.path.exists(file_path): file_size = os.path.getsize(file_path) if file_size > 0: headers["Range"] = f"bytes={file_size}-" else: # 文件存在但大小为0,删除文件以便重新下载 os.remove(file_path) file_size = 0 else: file_size = 0 # start download async with aiohttp.ClientSession() as session: async with session.get(video_url, headers=headers, proxy_auth=proxy_auth, proxy='http://'+tunnel) as response: if response.status in [200, 206]: if file_size > 0: async with aiofiles.open(file_path, "ab+") as f: # 以1MB为单位分块下载 async for chunk in response.content.iter_chunked(1024 * 1024): await f.write(chunk) else: async with aiofiles.open(file_path, "wb") as f: # 以1MB为单位分块下载 async for chunk in response.content.iter_chunked(1024 * 1024): await f.write(chunk) # 判断文件是否为空, 若为空则继续重试 if await is_empty(file_path): await asyncio.sleep(3) retries += 1 if retries >= max_retries: return False else: return file_path else: # 下载失败,等待3秒后重试 await asyncio.sleep(3) retries += 1 if retries >= max_retries: print(f"下载失败,已达到最大重试次数:{max_retries}") return False def generate_video_path(platform, video_id): """ 通过视频信息生成唯一视频地址 :return: """ index = "{}-{}-{}".format(platform, video_id, uuid4()) index = md5(index.encode()).hexdigest() file_name = "{}.mp4".format(index) cover_name = "{}.png".format(index) file_path = os.path.join(os.getcwd(), "static", file_name) cover_path = os.path.join(os.getcwd(), "static", cover_name) return file_path, cover_path async def upload_to_oss(local_video_path, download_type): """ 把视频上传到 oss :return: """ oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4()) access_key_id = "LTAIP6x1l3DXfSxm" access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" endpoint = "oss-cn-hangzhou.aliyuncs.com" bucket_name = "art-pubbucket" bucket = oss2.Bucket( oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name ) bucket.put_object_from_file(key=oss_video_key, filename=local_video_path) return oss_video_key