123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- """
- @author: luojunhui
- """
- import os
- import oss2
- import asyncio
- import aiohttp
- import aiofiles
- import requests
- from hashlib import md5
- from uuid import uuid4
- from fake_useragent import FakeUserAgent
- async def is_empty(file_path: str) -> bool:
- """
- 判断文件size
- """
- # 判断文件是否大于10kb, 若小于10 kb,认为该视频文件为空
- TEN_KB = 1024 * 10
- if os.path.getsize(file_path) > TEN_KB:
- return False
- return True
- async def download_cover(file_path, platform, cover_url):
- """
- 下载视频封面
- :param platform:
- :param cover_url:
- :param file_path:
- :return:
- """
- headers = request_header(platform=platform, url=cover_url, download_type="cover")
- response = requests.get(url=cover_url, headers=headers)
- if b"<html>" in response.content:
- return None
- elif response.status_code != 200:
- return None
- else:
- with open(file_path, "wb") as f:
- f.write(response.content)
- return file_path
- def request_header(platform, url, download_type="video"):
- """
- 请求头
- :return:
- """
- if platform == "xg_search":
- if "v9-xg-web-pc.ixigua.com" in url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v9-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif "v3-xg-web-pc.ixigua.com" in url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif download_type == "cover":
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
- 'Cache-Control': 'max-age=0',
- 'Proxy-Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
- }
- else:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif platform == "baidu_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- }
- elif platform == "wx_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://mp.weixin.qq.com",
- "Referer": "https://mp.weixin.qq.com"
- }
- elif platform == "dy_search":
- headers = {
- 'accept': '*/*',
- 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
- 'priority': 'i',
- 'range': 'bytes=0-',
- 'referer': 'https://v11-coldf.douyinvod.com/',
- 'user-agent': FakeUserAgent().chrome
- }
- else:
- headers = {}
- return headers
- async def download_video(file_path, platform, video_url, download_type="video"):
- """
- :param download_type:
- :param video_url:
- :param platform:
- :param file_path:
- :return:
- """
- headers = request_header(platform=platform, url=video_url, download_type=download_type)
- max_retries = 3 # 设置最大重试次数
- retries = 0 # 初始化重试次数
- tunnel = "l901.kdltps.com:15818"
- username = "t11983523373311"
- password = "mtuhdr2z"
- proxy_auth = aiohttp.BasicAuth(username, password)
- while retries < max_retries:
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path)
- if file_size > 0:
- headers["Range"] = f"bytes={file_size}-"
- else:
- # 文件存在但大小为0,删除文件以便重新下载
- os.remove(file_path)
- file_size = 0
- else:
- file_size = 0
- # start download
- async with aiohttp.ClientSession() as session:
- async with session.get(video_url, headers=headers, proxy_auth=proxy_auth, proxy='http://'+tunnel) as response:
- if response.status in [200, 206]:
- if file_size > 0:
- async with aiofiles.open(file_path, "ab+") as f:
- # 以1MB为单位分块下载
- async for chunk in response.content.iter_chunked(1024 * 1024):
- await f.write(chunk)
- else:
- async with aiofiles.open(file_path, "wb") as f:
- # 以1MB为单位分块下载
- async for chunk in response.content.iter_chunked(1024 * 1024):
- await f.write(chunk)
- # 判断文件是否为空, 若为空则继续重试
- if await is_empty(file_path):
- await asyncio.sleep(3)
- retries += 1
- if retries >= max_retries:
- return False
- else:
- return file_path
- else:
- # 下载失败,等待3秒后重试
- await asyncio.sleep(3)
- retries += 1
- if retries >= max_retries:
- print(f"下载失败,已达到最大重试次数:{max_retries}")
- return False
- def generate_video_path(platform, video_id):
- """
- 通过视频信息生成唯一视频地址
- :return:
- """
- index = "{}-{}-{}".format(platform, video_id, uuid4())
- index = md5(index.encode()).hexdigest()
- file_name = "{}.mp4".format(index)
- cover_name = "{}.png".format(index)
- file_path = os.path.join(os.getcwd(), "static", file_name)
- cover_path = os.path.join(os.getcwd(), "static", cover_name)
- return file_path, cover_path
- async def upload_to_oss(local_video_path, download_type):
- """
- 把视频上传到 oss
- :return:
- """
- oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
- access_key_id = "LTAIP6x1l3DXfSxm"
- access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
- endpoint = "oss-cn-hangzhou.aliyuncs.com"
- bucket_name = "art-pubbucket"
- bucket = oss2.Bucket(
- oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
- )
- bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
- return oss_video_key
|