123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- """
- @author: luojunhui
- """
- import os
- import oss2
- import aiohttp
- import aiofiles
- import asyncio
- import requests
- from datetime import datetime
- from hashlib import md5
- from uuid import uuid4
- from fake_useragent import FakeUserAgent
- from applications.config import Config
- from applications.log import logging
- async def downloadCover(file_path, platform, cover_url):
- """
- 下载视频封面
- :param platform:
- :param cover_url:
- :param file_path:
- :return:
- """
- headers = requestHeader(platform=platform, url=cover_url, download_type="cover")
- response = requests.get(url=cover_url, headers=headers)
- if b"<html>" in response.content:
- return None
- elif response.status_code != 200:
- return None
- else:
- with open(file_path, "wb") as f:
- f.write(response.content)
- return file_path
- def requestHeader(platform, url, download_type="video"):
- """
- 请求头
- :return:
- """
- if platform == "xg_search":
- if "v9-xg-web-pc.ixigua.com" in url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v9-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif "v3-xg-web-pc.ixigua.com" in url:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif download_type == "cover":
- headers = {
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
- 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
- 'Cache-Control': 'max-age=0',
- 'Proxy-Connection': 'keep-alive',
- 'Upgrade-Insecure-Requests': '1',
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
- }
- else:
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Host": "v3-xg-web-pc.ixigua.com",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://www.ixigua.com/",
- "Referer": "https://www.ixigua.com/"
- }
- elif platform == "baidu_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- }
- elif platform == "wx_search":
- headers = {
- "Accept": "*/*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "User-Agent": FakeUserAgent().chrome,
- "Origin": "https://mp.weixin.qq.com",
- "Referer": "https://mp.weixin.qq.com"
- }
- elif platform == "dy_search":
- headers = {
- 'accept': '*/*',
- 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
- 'priority': 'i',
- 'range': 'bytes=0-',
- 'referer': 'https://v11-coldf.douyinvod.com/',
- 'user-agent': FakeUserAgent().chrome
- }
- else:
- headers = {}
- return headers
- async def downloadVideo(file_path, platform, video_url, download_type="video"):
- """
- :param download_type:
- :param video_url:
- :param platform:
- :param file_path:
- :return:
- """
- headers = requestHeader(platform=platform, url=video_url, download_type=download_type)
- if os.path.exists(file_path):
- file_size = os.path.getsize(file_path)
- headers["Range"] = f"bytes={file_size}-"
- else:
- file_size = 0
- async with aiohttp.ClientSession() as session:
- async with session.get(video_url, headers=headers) as response:
- if response.status in [200, 206]:
- if file_size > 0:
- async with aiofiles.open(file_path, "ab+") as f:
- # 以1MB为单位分块下载
- async for chunk in response.content.iter_chunked(1024 * 1024):
- await f.write(chunk)
- else:
- async with aiofiles.open(file_path, "wb") as f:
- # 以1MB为单位分块下载
- async for chunk in response.content.iter_chunked(1024 * 1024):
- await f.write(chunk)
- else:
- print(response.status)
- return file_path
- def generateVideoPath(platform, video_id):
- """
- 通过视频信息生成唯一视频地址
- :return:
- """
- index = "{}-{}-{}".format(platform, video_id, uuid4())
- index = md5(index.encode()).hexdigest()
- file_name = "{}.mp4".format(index)
- cover_name = "{}.png".format(index)
- file_path = os.path.join(os.getcwd(), "static", file_name)
- cover_path = os.path.join(os.getcwd(), "static", cover_name)
- return file_path, cover_path
- async def uploadToOss(local_video_path, download_type):
- """
- 把视频上传到 oss
- :return:
- """
- oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
- access_key_id = "LTAIP6x1l3DXfSxm"
- access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
- endpoint = "oss-cn-hangzhou.aliyuncs.com"
- bucket_name = "art-pubbucket"
- bucket = oss2.Bucket(
- oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
- )
- bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
- return oss_video_key
- class AsyncETL(object):
- """
- 视频下载功能
- """
- def __init__(self, mysql_client):
- # self.proxy = {
- # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
- # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
- # }
- self.max_retry = 5
- self.mysql_client = mysql_client
- self.article_crawler_videos = Config().articleCrawlerVideos
- async def getTasks(self):
- """
- 获取视频 id
- :return:
- """
- select_sql = f"""
- SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id
- FROM {self.article_crawler_videos}
- WHERE download_status = 0
- ORDER BY id
- LIMIT 10;
- """
- result = await self.mysql_client.asyncSelect(select_sql)
- if result:
- tasks = [
- {
- "id": line[0],
- "video_id": line[1],
- "platform": line[2],
- "video_title": line[3],
- "video_url": line[4],
- "cover_url": line[5],
- "user_id": line[6]
- }
- for line in result
- ]
- return tasks
- else:
- return []
- async def processTask(self, params):
- """
- 处理 task
- :return:
- {
- "id": line[0],
- "video_id": line[1],
- "platform": line[2],
- "video_title": line[3],
- "video_url": line[4],
- "cover_url": line[5],
- "user_id": line[6]
- }
- """
- update_sql_0 = f"""
- UPDATE {self.article_crawler_videos}
- SET download_status = %s
- WHERE id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_sql_0,
- params=(1, params['id'])
- )
- try:
- local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])
- # download videos
- file_path = await downloadVideo(
- file_path=local_video_path,
- platform=params['platform'],
- video_url=params['video_url']
- )
- # download cover
- cover_path = await downloadCover(
- file_path=local_cover_path,
- platform=params['platform'],
- cover_url=params['cover_url']
- )
- oss_video = await uploadToOss(
- local_video_path=file_path,
- download_type="video"
- )
- if cover_path:
- oss_cover = await uploadToOss(
- local_video_path=cover_path,
- download_type="image"
- )
- else:
- oss_cover = None
- update_sql = f"""
- UPDATE {self.article_crawler_videos}
- SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
- WHERE id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_sql,
- params=(
- oss_video,
- oss_cover,
- 2,
- params['id']
- )
- )
- except Exception as e:
- print("failed", e)
- update_sql = f"""
- UPDATE {self.article_crawler_videos}
- SET download_status = %s
- WHERE id = %s;
- """
- await self.mysql_client.asyncInsert(
- sql=update_sql,
- params=(3, params['id'])
- )
- async def deal(self):
- """
- ETL Deal Task
- :return:
- """
- task_list = await self.getTasks()
- logging(
- code="5001",
- info="ETL Task Got {} this time".format(len(task_list)),
- function="ETL"
- )
- if task_list:
- tasks = [self.processTask(params) for params in task_list]
- await asyncio.gather(*tasks)
|