""" @author: luojunhui """ import os import oss2 import aiohttp import aiofiles import asyncio import requests from datetime import datetime from hashlib import md5 from uuid import uuid4 from fake_useragent import FakeUserAgent from applications.config import Config from applications.log import logging async def downloadCover(file_path, platform, cover_url): """ 下载视频封面 :param platform: :param cover_url: :param file_path: :return: """ headers = requestHeader(platform=platform, url=cover_url, download_type="cover") response = requests.get(url=cover_url, headers=headers) if b"" in response.content: return None elif response.status_code != 200: return None else: with open(file_path, "wb") as f: f.write(response.content) return file_path def requestHeader(platform, url, download_type="video"): """ 请求头 :return: """ if platform == "xg_search": if "v9-xg-web-pc.ixigua.com" in url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v9-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif "v3-xg-web-pc.ixigua.com" in url: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif download_type == "cover": headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8', 'Cache-Control': 'max-age=0', 'Proxy-Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36' } else: headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "Host": "v3-xg-web-pc.ixigua.com", "User-Agent": FakeUserAgent().chrome, "Origin": "https://www.ixigua.com/", "Referer": "https://www.ixigua.com/" } elif platform == "baidu_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, } elif platform == "wx_search": headers = { "Accept": "*/*", "Accept-Language": "zh-CN,zh;q=0.9", "User-Agent": FakeUserAgent().chrome, "Origin": "https://mp.weixin.qq.com", "Referer": "https://mp.weixin.qq.com" } elif platform == "dy_search": headers = { 'accept': '*/*', 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8', 'priority': 'i', 'range': 'bytes=0-', 'referer': 'https://v11-coldf.douyinvod.com/', 'user-agent': FakeUserAgent().chrome } else: headers = {} return headers async def downloadVideo(file_path, platform, video_url, download_type="video"): """ :param download_type: :param video_url: :param platform: :param file_path: :return: """ headers = requestHeader(platform=platform, url=video_url, download_type=download_type) if os.path.exists(file_path): file_size = os.path.getsize(file_path) headers["Range"] = f"bytes={file_size}-" else: file_size = 0 async with aiohttp.ClientSession() as session: async with session.get(video_url, headers=headers) as response: if response.status in [200, 206]: if file_size > 0: async with aiofiles.open(file_path, "ab+") as f: # 以1MB为单位分块下载 async for chunk in response.content.iter_chunked(1024 * 1024): await f.write(chunk) else: async with aiofiles.open(file_path, "wb") as f: # 以1MB为单位分块下载 async for chunk in response.content.iter_chunked(1024 * 1024): await f.write(chunk) else: print(response.status) return file_path def generateVideoPath(platform, video_id): """ 通过视频信息生成唯一视频地址 :return: """ index = "{}-{}-{}".format(platform, video_id, uuid4()) index = md5(index.encode()).hexdigest() file_name = "{}.mp4".format(index) cover_name = "{}.png".format(index) file_path = os.path.join(os.getcwd(), "static", file_name) cover_path = os.path.join(os.getcwd(), "static", cover_name) return file_path, cover_path async def uploadToOss(local_video_path, download_type): """ 把视频上传到 oss :return: """ oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4()) access_key_id = "LTAIP6x1l3DXfSxm" access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon" endpoint = "oss-cn-hangzhou.aliyuncs.com" bucket_name = "art-pubbucket" bucket = oss2.Bucket( oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name ) bucket.put_object_from_file(key=oss_video_key, filename=local_video_path) return oss_video_key class AsyncETL(object): """ 视频下载功能 """ def __init__(self, mysql_client): # self.proxy = { # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/", # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/", # } self.max_retry = 5 self.mysql_client = mysql_client self.article_crawler_videos = Config().articleCrawlerVideos async def getTasks(self): """ 获取视频 id :return: """ select_sql = f""" SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id FROM {self.article_crawler_videos} WHERE download_status = 0 ORDER BY id LIMIT 10; """ result = await self.mysql_client.asyncSelect(select_sql) if result: tasks = [ { "id": line[0], "video_id": line[1], "platform": line[2], "video_title": line[3], "video_url": line[4], "cover_url": line[5], "user_id": line[6] } for line in result ] return tasks else: return [] async def processTask(self, params): """ 处理 task :return: { "id": line[0], "video_id": line[1], "platform": line[2], "video_title": line[3], "video_url": line[4], "cover_url": line[5], "user_id": line[6] } """ update_sql_0 = f""" UPDATE {self.article_crawler_videos} SET download_status = %s WHERE id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql_0, params=(1, params['id']) ) try: local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id']) # download videos file_path = await downloadVideo( file_path=local_video_path, platform=params['platform'], video_url=params['video_url'] ) # download cover cover_path = await downloadCover( file_path=local_cover_path, platform=params['platform'], cover_url=params['cover_url'] ) oss_video = await uploadToOss( local_video_path=file_path, download_type="video" ) if cover_path: oss_cover = await uploadToOss( local_video_path=cover_path, download_type="image" ) else: oss_cover = None update_sql = f""" UPDATE {self.article_crawler_videos} SET video_oss_path = %s, cover_oss_path = %s, download_status = %s WHERE id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=( oss_video, oss_cover, 2, params['id'] ) ) except Exception as e: print("failed", e) update_sql = f""" UPDATE {self.article_crawler_videos} SET download_status = %s WHERE id = %s; """ await self.mysql_client.asyncInsert( sql=update_sql, params=(3, params['id']) ) async def deal(self): """ ETL Deal Task :return: """ task_list = await self.getTasks() logging( code="5001", info="ETL Task Got {} this time".format(len(task_list)), function="ETL" ) if task_list: tasks = [self.processTask(params) for params in task_list] await asyncio.gather(*tasks)