|
@@ -1,303 +0,0 @@
|
|
|
-"""
|
|
|
-@author: luojunhui
|
|
|
-"""
|
|
|
-import os
|
|
|
-
|
|
|
-import oss2
|
|
|
-import aiohttp
|
|
|
-import aiofiles
|
|
|
-import asyncio
|
|
|
-import requests
|
|
|
-
|
|
|
-from datetime import datetime
|
|
|
-from hashlib import md5
|
|
|
-from uuid import uuid4
|
|
|
-
|
|
|
-from fake_useragent import FakeUserAgent
|
|
|
-from applications.config import Config
|
|
|
-from applications.log import logging
|
|
|
-
|
|
|
-
|
|
|
-async def download_cover(file_path, platform, cover_url):
|
|
|
- """
|
|
|
- 下载视频封面
|
|
|
- :param platform:
|
|
|
- :param cover_url:
|
|
|
- :param file_path:
|
|
|
- :return:
|
|
|
- """
|
|
|
- headers = request_header(platform=platform, url=cover_url, download_type="cover")
|
|
|
- response = requests.get(url=cover_url, headers=headers)
|
|
|
- if b"<html>" in response.content:
|
|
|
- return None
|
|
|
- elif response.status_code != 200:
|
|
|
- return None
|
|
|
- else:
|
|
|
- with open(file_path, "wb") as f:
|
|
|
- f.write(response.content)
|
|
|
- return file_path
|
|
|
-
|
|
|
-
|
|
|
-def request_header(platform, url, download_type="video"):
|
|
|
- """
|
|
|
- 请求头
|
|
|
- :return:
|
|
|
- """
|
|
|
- if platform == "xg_search":
|
|
|
- if "v9-xg-web-pc.ixigua.com" in url:
|
|
|
- headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
- "Host": "v9-xg-web-pc.ixigua.com",
|
|
|
- "User-Agent": FakeUserAgent().chrome,
|
|
|
- "Origin": "https://www.ixigua.com/",
|
|
|
- "Referer": "https://www.ixigua.com/"
|
|
|
- }
|
|
|
- elif "v3-xg-web-pc.ixigua.com" in url:
|
|
|
- headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
- "Host": "v3-xg-web-pc.ixigua.com",
|
|
|
- "User-Agent": FakeUserAgent().chrome,
|
|
|
- "Origin": "https://www.ixigua.com/",
|
|
|
- "Referer": "https://www.ixigua.com/"
|
|
|
- }
|
|
|
- elif download_type == "cover":
|
|
|
- headers = {
|
|
|
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
|
|
|
- 'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',
|
|
|
- 'Cache-Control': 'max-age=0',
|
|
|
- 'Proxy-Connection': 'keep-alive',
|
|
|
- 'Upgrade-Insecure-Requests': '1',
|
|
|
- 'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'
|
|
|
- }
|
|
|
- else:
|
|
|
- headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
- "Host": "v3-xg-web-pc.ixigua.com",
|
|
|
- "User-Agent": FakeUserAgent().chrome,
|
|
|
- "Origin": "https://www.ixigua.com/",
|
|
|
- "Referer": "https://www.ixigua.com/"
|
|
|
- }
|
|
|
- elif platform == "baidu_search":
|
|
|
- headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
- "User-Agent": FakeUserAgent().chrome,
|
|
|
- }
|
|
|
- elif platform == "wx_search":
|
|
|
- headers = {
|
|
|
- "Accept": "*/*",
|
|
|
- "Accept-Language": "zh-CN,zh;q=0.9",
|
|
|
- "User-Agent": FakeUserAgent().chrome,
|
|
|
- "Origin": "https://mp.weixin.qq.com",
|
|
|
- "Referer": "https://mp.weixin.qq.com"
|
|
|
- }
|
|
|
- elif platform == "dy_search":
|
|
|
- headers = {
|
|
|
- 'accept': '*/*',
|
|
|
- 'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',
|
|
|
- 'priority': 'i',
|
|
|
- 'range': 'bytes=0-',
|
|
|
- 'referer': 'https://v11-coldf.douyinvod.com/',
|
|
|
- 'user-agent': FakeUserAgent().chrome
|
|
|
- }
|
|
|
- else:
|
|
|
- headers = {}
|
|
|
- return headers
|
|
|
-
|
|
|
-
|
|
|
-async def download_video(file_path, platform, video_url, download_type="video"):
|
|
|
- """
|
|
|
- :param download_type:
|
|
|
- :param video_url:
|
|
|
- :param platform:
|
|
|
- :param file_path:
|
|
|
- :return:
|
|
|
- """
|
|
|
- headers = request_header(platform=platform, url=video_url, download_type=download_type)
|
|
|
- if os.path.exists(file_path):
|
|
|
- file_size = os.path.getsize(file_path)
|
|
|
- headers["Range"] = f"bytes={file_size}-"
|
|
|
- else:
|
|
|
- file_size = 0
|
|
|
- async with aiohttp.ClientSession() as session:
|
|
|
- async with session.get(video_url, headers=headers) as response:
|
|
|
- if response.status in [200, 206]:
|
|
|
- if file_size > 0:
|
|
|
- async with aiofiles.open(file_path, "ab+") as f:
|
|
|
- # 以1MB为单位分块下载
|
|
|
- async for chunk in response.content.iter_chunked(1024 * 1024):
|
|
|
- await f.write(chunk)
|
|
|
- else:
|
|
|
- async with aiofiles.open(file_path, "wb") as f:
|
|
|
- # 以1MB为单位分块下载
|
|
|
- async for chunk in response.content.iter_chunked(1024 * 1024):
|
|
|
- await f.write(chunk)
|
|
|
-
|
|
|
- else:
|
|
|
- print(response.status)
|
|
|
- return file_path
|
|
|
-
|
|
|
-
|
|
|
-def generate_video_path(platform, video_id):
|
|
|
- """
|
|
|
- 通过视频信息生成唯一视频地址
|
|
|
- :return:
|
|
|
- """
|
|
|
- index = "{}-{}-{}".format(platform, video_id, uuid4())
|
|
|
- index = md5(index.encode()).hexdigest()
|
|
|
- file_name = "{}.mp4".format(index)
|
|
|
- cover_name = "{}.png".format(index)
|
|
|
- file_path = os.path.join(os.getcwd(), "static", file_name)
|
|
|
- cover_path = os.path.join(os.getcwd(), "static", cover_name)
|
|
|
- return file_path, cover_path
|
|
|
-
|
|
|
-
|
|
|
-async def upload_to_oss(local_video_path, download_type):
|
|
|
- """
|
|
|
- 把视频上传到 oss
|
|
|
- :return:
|
|
|
- """
|
|
|
- oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())
|
|
|
- access_key_id = "LTAIP6x1l3DXfSxm"
|
|
|
- access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"
|
|
|
- endpoint = "oss-cn-hangzhou.aliyuncs.com"
|
|
|
- bucket_name = "art-pubbucket"
|
|
|
- bucket = oss2.Bucket(
|
|
|
- oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name
|
|
|
- )
|
|
|
- bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)
|
|
|
- return oss_video_key
|
|
|
-
|
|
|
-
|
|
|
-class AsyncETL(object):
|
|
|
- """
|
|
|
- 视频下载功能
|
|
|
- """
|
|
|
-
|
|
|
- def __init__(self, mysql_client):
|
|
|
- # self.proxy = {
|
|
|
- # "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
|
|
|
- # "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",
|
|
|
- # }
|
|
|
- self.max_retry = 5
|
|
|
- self.mysql_client = mysql_client
|
|
|
- self.config = Config()
|
|
|
- self.article_crawler_video_table = self.config.article_crawler_video_table
|
|
|
- self.article_match_video_table = self.config.article_match_video_table
|
|
|
-
|
|
|
- async def get_tasks(self):
|
|
|
- """
|
|
|
- 获取视频 id
|
|
|
- :return:
|
|
|
- """
|
|
|
- select_sql = f"""
|
|
|
- SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
|
|
|
- FROM {self.article_crawler_video_table}
|
|
|
- WHERE download_status = 0
|
|
|
- ORDER BY id
|
|
|
- LIMIT 10;
|
|
|
- """
|
|
|
- result = await self.mysql_client.async_select(select_sql)
|
|
|
- if result:
|
|
|
- tasks = [
|
|
|
- {
|
|
|
- "id": line[0],
|
|
|
- "video_id": line[1],
|
|
|
- "platform": line[2],
|
|
|
- "video_title": line[3],
|
|
|
- "video_url": line[4],
|
|
|
- "cover_url": line[5],
|
|
|
- "user_id": line[6],
|
|
|
- "trace_id": line[7]
|
|
|
- }
|
|
|
- for line in result
|
|
|
- ]
|
|
|
- return tasks
|
|
|
- else:
|
|
|
- return []
|
|
|
-
|
|
|
- async def process_task(self, params):
|
|
|
- """
|
|
|
- 处理 task
|
|
|
- :return:
|
|
|
- """
|
|
|
- downloading_status = 1
|
|
|
- downloaded_status = 2
|
|
|
- download_failed_status = 3
|
|
|
- update_sql_0 = f"""
|
|
|
- UPDATE {self.article_crawler_video_table}
|
|
|
- SET download_status = %s
|
|
|
- WHERE id = %s;
|
|
|
- """
|
|
|
- await self.mysql_client.async_insert(
|
|
|
- sql=update_sql_0,
|
|
|
- params=(downloading_status, params['id'])
|
|
|
- )
|
|
|
- try:
|
|
|
- local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
|
|
|
- # download videos
|
|
|
- file_path = await download_video(
|
|
|
- file_path=local_video_path,
|
|
|
- platform=params['platform'],
|
|
|
- video_url=params['video_url']
|
|
|
- )
|
|
|
- # download cover
|
|
|
- cover_path = await download_cover(
|
|
|
- file_path=local_cover_path,
|
|
|
- platform=params['platform'],
|
|
|
- cover_url=params['cover_url']
|
|
|
- )
|
|
|
- oss_video = await upload_to_oss(
|
|
|
- local_video_path=file_path,
|
|
|
- download_type="video"
|
|
|
- )
|
|
|
- if cover_path:
|
|
|
- oss_cover = await upload_to_oss(
|
|
|
- local_video_path=cover_path,
|
|
|
- download_type="image"
|
|
|
- )
|
|
|
- else:
|
|
|
- oss_cover = None
|
|
|
- update_sql = f"""
|
|
|
- UPDATE {self.article_crawler_video_table}
|
|
|
- SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
|
|
|
- WHERE id = %s;
|
|
|
- """
|
|
|
- await self.mysql_client.async_insert(
|
|
|
- sql=update_sql,
|
|
|
- params=(
|
|
|
- oss_video,
|
|
|
- oss_cover,
|
|
|
- downloaded_status,
|
|
|
- params['id']
|
|
|
- )
|
|
|
- )
|
|
|
- except Exception as e:
|
|
|
- update_sql = f"""
|
|
|
- UPDATE {self.article_crawler_video_table}
|
|
|
- SET download_status = %s
|
|
|
- WHERE id = %s;
|
|
|
- """
|
|
|
- await self.mysql_client.async_insert(
|
|
|
- sql=update_sql,
|
|
|
- params=(download_failed_status, params['id'])
|
|
|
- )
|
|
|
- print("抓取 failed--{}".format(e))
|
|
|
-
|
|
|
- async def deal(self):
|
|
|
- """
|
|
|
- ETL Deal Task
|
|
|
- :return:
|
|
|
- """
|
|
|
- task_list = await self.get_tasks()
|
|
|
- logging(
|
|
|
- code="5001",
|
|
|
- info="ETL Task Got {} this time".format(len(task_list)),
|
|
|
- function="ETL"
|
|
|
- )
|
|
|
- if task_list:
|
|
|
- tasks = [self.process_task(params) for params in task_list]
|
|
|
- await asyncio.gather(*tasks)
|