""" @author: luojunhui """ import os import time import oss2 import json import aiohttp import aiofiles from hashlib import md5 from uuid import uuid4 import requests from fake_useragent import FakeUserAgent async def upload_to_oss(local_video_path): """ 把视频上传到 oss :return: """ oss_video_key = str(uuid4()) access_key_id = "LTAIP6x1l3DXfSxm" access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" endpoint = "oss-cn-hangzhou.aliyuncs.com" bucket_name = "art-pubbucket" bucket = oss2.Bucket( oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name ) bucket.put_object_from_file(key=oss_video_key, filename=local_video_path) return oss_video_key class AsyncETL(object): """ 视频下载功能 """ def __init__(self, video_obj): self.platform = video_obj["platform"] self.video_id = video_obj["video_id"] self.video_url = video_obj["video_url"] self.uid = video_obj["user_id"] self.title = video_obj["video_title"] self.cover_url = video_obj["cover_url"] self.proxy = { "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/", "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/", } self.max_retry = 5 def request_header(self): """ 请求头 :return: """ if self.platform == "xg_search": if "v9-xg-web-pc.ixigua.com" in self.video_url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v9-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif "v3-xg-web-pc.ixigua.com" in self.video_url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } else: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif self.platform == "baidu_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, } elif self.platform == "wx_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, "Origin": "https://mp.weixin.qq.com", "Referer": "https://mp.weixin.qq.com" } else: headers = {} return headers def generate_video_path(self): """ 通过视频信息生成唯一视频地址 :return: """ index = "{}-{}".format(self.platform, self.video_id) index = md5(index.encode()).hexdigest() file_name = "{}.mp4".format(index) cover_name = "{}.png".format(index) file_path = os.path.join(os.getcwd(), "videos", file_name) cover_path = os.path.join(os.getcwd(), "videos", cover_name) return file_path, cover_path async def publish_by__request(self, video_path, cover): """ 发布 :return: """ url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send" headers = { "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0", "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78", "referer": "http://appspeed.piaoquantv.com", "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f", "accept-language": "zh-CN,zh-Hans;q=0.9", "Content-Type": "application/x-www-form-urlencoded", } payload = { "coverImgPath": cover, "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408", "fileExtensions": "MP4", "loginUid": self.uid, "networkType": "Wi-Fi", "platform": "iOS", "requestId": "fb972cbd4f390afcfd3da1869cd7d001", "sessionId": "362290597725ce1fa870d7be4f46dcc2", "subSessionId": "362290597725ce1fa870d7be4f46dcc2", "title": self.title, "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f", "uid": self.uid, "versionCode": "486", "versionName": "3.4.12", "videoFromScene": "1", "videoPath": video_path, "viewStatus": "1", } response = requests.post( url=url, headers=headers, data=payload, ) return response.json() async def download(self, file_path): """ :param file_path: :return: """ headers = self.request_header() if os.path.exists(file_path): file_size = os.path.getsize(file_path) headers["Range"] = f"bytes={file_size}-" else: file_size = 0 async with aiohttp.ClientSession() as session: async with session.get(self.video_url, headers=headers) as response: if response.status in [200, 206]: mode = "ab+" if file_size > 0 else "wb" f = await aiofiles.open(file_path, mode) await f.write(await response.read()) await f.close() else: print(response.status) return file_path async def download_cover(self, file_path): """ 下载视频封面 :param file_path: :return: """ headers = self.request_header() response = requests.get(url=self.cover_url, headers=headers) with open(file_path, "wb") as f: f.write(response.content) return file_path async def etl_deal(self): """ ETL Deal Task :return: """ local_video_path, local_cover_path = self.generate_video_path() # download videos file_path = await self.download(local_video_path) # download cover cover_path = await self.download_cover(local_cover_path) # upload to oss oss_video = await upload_to_oss( local_video_path=file_path, ) oss_cover = await upload_to_oss( local_video_path=cover_path ) # publish to pq result = await self.publish_by__request( video_path=oss_video, cover=oss_cover ) print(json.dumps(result, ensure_ascii=False, indent=4)) a = time.time() os.remove(file_path) os.remove(cover_path) b = time.time() print(b - a) return result["data"]["id"]