123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220 |
- """
- @author: luojunhui
- """
- import os
- import time
- import oss2
- import json
- import aiohttp
- import aiofiles
- from hashlib import md5
- from uuid import uuid4
- import requests
- from fake_useragent import FakeUserAgent
- async def upload_to_oss(local_video_path):
- """
- 把视频上传到 oss
- :return:
- """
- oss_video_key = str(uuid4())
- access_key_id = "LTAIP6x1l3DXfSxm"
- access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
- endpoint = "oss-cn-hangzhou.aliyuncs.com"
- bucket_name = "art-pubbucket"
- bucket = oss2.Bucket(
- oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
- )
- bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
- return oss_video_key
- class AsyncETL(object):
- """
- 视频下载功能
- """
- def __init__(self, video_obj):
- self.platform = video_obj["platform"]
- self.video_id = video_obj["video_id"]
- self.video_url = video_obj["video_url"]
- self.uid = video_obj["user_id"]
- self.title = video_obj["video_title"]
- self.cover_url = video_obj["cover_url"]
- self.proxy = {
- "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
- "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
- }
- self.max_retry = 5
- def request_header(self):
- """
- 请求头
- :return:
- """
- if self.platform == "xg_search":
- if "v9-xg-web-pc.ixigua.com" in self.video_url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v9-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif "v3-xg-web-pc.ixigua.com" in self.video_url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- else:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif self.platform == "baidu_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- }
- elif self.platform == "wx_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://mp.weixin.qq.com",
- "Referer": "https://mp.weixin.qq.com"
- }
- else:
- headers = {}
- return headers
- def generate_video_path(self):
- """
- 通过视频信息生成唯一视频地址
- :return:
- """
- index = "{}-{}".format(self.platform, self.video_id)
- index = md5(index.encode()).hexdigest()
- file_name = "{}.mp4".format(index)
- cover_name = "{}.png".format(index)
- file_path = os.path.join(os.getcwd(), "videos", file_name)
- cover_path = os.path.join(os.getcwd(), "videos", cover_name)
- return file_path, cover_path
- async def publish_by__request(self, video_path, cover):
- """
- 发布
- :return:
- """
- url = "https://vlogapi.piaoquantv.com/longvideoapi/crawler/video/send"
- headers = {
- "User-Agent": "PQSpeed/486 CFNetwork/1410.1 Darwin/22.6.0",
- "cookie": "JSESSIONID=4DEA2B5173BB9A9E82DB772C0ACDBC9F; JSESSIONID=D02C334150025222A0B824A98B539B78",
- "referer": "http://appspeed.piaoquantv.com",
- "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
- "accept-language": "zh-CN,zh-Hans;q=0.9",
- "Content-Type": "application/x-www-form-urlencoded",
- }
- payload = {
- "coverImgPath": cover,
- "deviceToken": "9ef064f2f7869b3fd67d6141f8a899175dddc91240971172f1f2a662ef891408",
- "fileExtensions": "MP4",
- "loginUid": self.uid,
- "networkType": "Wi-Fi",
- "platform": "iOS",
- "requestId": "fb972cbd4f390afcfd3da1869cd7d001",
- "sessionId": "362290597725ce1fa870d7be4f46dcc2",
- "subSessionId": "362290597725ce1fa870d7be4f46dcc2",
- "title": self.title,
- "token": "524a8bc871dbb0f4d4717895083172ab37c02d2f",
- "uid": self.uid,
- "versionCode": "486",
- "versionName": "3.4.12",
- "videoFromScene": "1",
- "videoPath": video_path,
- "viewStatus": "1",
- }
- response = requests.post(
- url=url,
- headers=headers,
- data=payload,
- )
- return response.json()
- async def download(self, file_path):
- """
- :param file_path:
- :return:
- """
- headers = self.request_header()
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path)
- headers["Range"] = f"bytes={file_size}-"
- else:
- file_size = 0
- async with aiohttp.ClientSession() as session:
- async with session.get(self.video_url, headers=headers) as response:
- if response.status in [200, 206]:
- mode = "ab+" if file_size > 0 else "wb"
- f = await aiofiles.open(file_path, mode)
- await f.write(await response.read())
- await f.close()
- else:
- print(response.status)
- return file_path
- async def download_cover(self, file_path):
- """
- 下载视频封面
- :param file_path:
- :return:
- """
- headers = self.request_header()
- response = requests.get(url=self.cover_url, headers=headers)
- with open(file_path, "wb") as f:
- f.write(response.content)
- return file_path
- async def etl_deal(self):
- """
- ETL Deal Task
- :return:
- """
- local_video_path, local_cover_path = self.generate_video_path()
- # download videos
- file_path = await self.download(local_video_path)
- # download cover
- cover_path = await self.download_cover(local_cover_path)
- # upload to oss
- oss_video = await upload_to_oss(
- local_video_path=file_path,
- )
- oss_cover = await upload_to_oss(
- local_video_path=cover_path
- )
- # publish to pq
- result = await self.publish_by__request(
- video_path=oss_video,
- cover=oss_cover
- )
- print(json.dumps(result, ensure_ascii=False, indent=4))
- a = time.time()
- os.remove(file_path)
- os.remove(cover_path)
- b = time.time()
- print(b - a)
- return result["data"]["id"]
|