| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 | """@author: luojunhui"""import osimport oss2import aiohttpimport aiofilesimport asyncioimport requestsfrom datetime import datetimefrom hashlib import md5from uuid import uuid4from fake_useragent import FakeUserAgentfrom applications.config import Configfrom applications.log import loggingasync def downloadCover(file_path, platform, cover_url):    """    下载视频封面    :param platform:    :param cover_url:    :param file_path:    :return:    """    headers = requestHeader(platform=platform, url=cover_url, download_type="cover")    response = requests.get(url=cover_url, headers=headers)    if b"<html>" in response.content:        return None    elif response.status_code != 200:        return None    else:        with open(file_path, "wb") as f:            f.write(response.content)        return file_pathdef requestHeader(platform, url, download_type="video"):    """    请求头    :return:    """    if platform == "xg_search":        if "v9-xg-web-pc.ixigua.com" in url:            headers = {                "Accept": "*/*",                "Accept-Language": "zh-CN,zh;q=0.9",                "Host": "v9-xg-web-pc.ixigua.com",                "User-Agent": FakeUserAgent().chrome,                "Origin": "https://www.ixigua.com/",                "Referer": "https://www.ixigua.com/"            }        elif "v3-xg-web-pc.ixigua.com" in url:            headers = {                "Accept": "*/*",                "Accept-Language": "zh-CN,zh;q=0.9",                "Host": "v3-xg-web-pc.ixigua.com",                "User-Agent": FakeUserAgent().chrome,                "Origin": "https://www.ixigua.com/",                "Referer": "https://www.ixigua.com/"            }        elif download_type == "cover":            headers = {                'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',                'Accept-Language': 'en,zh;q=0.9,zh-CN;q=0.8',                'Cache-Control': 'max-age=0',                'Proxy-Connection': 'keep-alive',                'Upgrade-Insecure-Requests': '1',                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0.0.0 Safari/537.36'            }        else:            headers = {                "Accept": "*/*",                "Accept-Language": "zh-CN,zh;q=0.9",                "Host": "v3-xg-web-pc.ixigua.com",                "User-Agent": FakeUserAgent().chrome,                "Origin": "https://www.ixigua.com/",                "Referer": "https://www.ixigua.com/"            }    elif platform == "baidu_search":        headers = {            "Accept": "*/*",            "Accept-Language": "zh-CN,zh;q=0.9",            "User-Agent": FakeUserAgent().chrome,        }    elif platform == "wx_search":        headers = {            "Accept": "*/*",            "Accept-Language": "zh-CN,zh;q=0.9",            "User-Agent": FakeUserAgent().chrome,            "Origin": "https://mp.weixin.qq.com",            "Referer": "https://mp.weixin.qq.com"        }    elif platform == "dy_search":        headers = {            'accept': '*/*',            'accept-language': 'en,zh;q=0.9,zh-CN;q=0.8',            'priority': 'i',            'range': 'bytes=0-',            'referer': 'https://v11-coldf.douyinvod.com/',            'user-agent': FakeUserAgent().chrome        }    else:        headers = {}    return headersasync def downloadVideo(file_path, platform, video_url, download_type="video"):    """    :param download_type:    :param video_url:    :param platform:    :param file_path:    :return:    """    headers = requestHeader(platform=platform, url=video_url, download_type=download_type)    if os.path.exists(file_path):        file_size = os.path.getsize(file_path)        headers["Range"] = f"bytes={file_size}-"    else:        file_size = 0    async with aiohttp.ClientSession() as session:        async with session.get(video_url, headers=headers) as response:            if response.status in [200, 206]:                if file_size > 0:                    async with aiofiles.open(file_path, "ab+") as f:                        # 以1MB为单位分块下载                        async for chunk in response.content.iter_chunked(1024 * 1024):                            await f.write(chunk)                else:                    async with aiofiles.open(file_path, "wb") as f:                        # 以1MB为单位分块下载                        async for chunk in response.content.iter_chunked(1024 * 1024):                            await f.write(chunk)            else:                print(response.status)    return file_pathdef generateVideoPath(platform, video_id):    """    通过视频信息生成唯一视频地址    :return:    """    index = "{}-{}-{}".format(platform, video_id, uuid4())    index = md5(index.encode()).hexdigest()    file_name = "{}.mp4".format(index)    cover_name = "{}.png".format(index)    file_path = os.path.join(os.getcwd(), "static", file_name)    cover_path = os.path.join(os.getcwd(), "static", cover_name)    return file_path, cover_pathasync def uploadToOss(local_video_path, download_type):    """    把视频上传到 oss    :return:    """    oss_video_key = "long_articles/{}/".format(download_type) + str(uuid4())    access_key_id = "LTAIP6x1l3DXfSxm"    access_key_secret = "KbTaM9ars4OX3PMS6Xm7rtxGr1FLon"    endpoint = "oss-cn-hangzhou.aliyuncs.com"    bucket_name = "art-pubbucket"    bucket = oss2.Bucket(        oss2.Auth(access_key_id, access_key_secret), endpoint, bucket_name    )    bucket.put_object_from_file(key=oss_video_key, filename=local_video_path)    return oss_video_keyclass AsyncETL(object):    """    视频下载功能    """    def __init__(self, mysql_client):        # self.proxy = {        #     "http://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",        #     "https://": "http://t17772369458618:5zqcjkmy@q796.kdltps.com:15818/",        # }        self.max_retry = 5        self.mysql_client = mysql_client        self.article_crawler_videos = Config().articleCrawlerVideos    async def getTasks(self):        """        获取视频 id        :return:        """        select_sql = f"""        SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id        FROM {self.article_crawler_videos}        WHERE download_status = 0        ORDER BY id        LIMIT 10;        """        result = await self.mysql_client.asyncSelect(select_sql)        if result:            tasks = [                {                    "id": line[0],                    "video_id": line[1],                    "platform": line[2],                    "video_title": line[3],                    "video_url": line[4],                    "cover_url": line[5],                    "user_id": line[6]                }                for line in result            ]            return tasks        else:            return []    async def processTask(self, params):        """        处理 task        :return:        {                    "id": line[0],                    "video_id": line[1],                    "platform": line[2],                    "video_title": line[3],                    "video_url": line[4],                    "cover_url": line[5],                    "user_id": line[6]                }        """        update_sql_0 = f"""                    UPDATE {self.article_crawler_videos}                    SET download_status = %s                    WHERE id = %s;                    """        await self.mysql_client.asyncInsert(            sql=update_sql_0,            params=(1, params['id'])        )        try:            local_video_path, local_cover_path = generateVideoPath(params['platform'], params['video_id'])            # download videos            file_path = await downloadVideo(                file_path=local_video_path,                platform=params['platform'],                video_url=params['video_url']            )            # download cover            cover_path = await downloadCover(                file_path=local_cover_path,                platform=params['platform'],                cover_url=params['cover_url']            )            oss_video = await uploadToOss(                local_video_path=file_path,                download_type="video"            )            if cover_path:                oss_cover = await uploadToOss(                    local_video_path=cover_path,                    download_type="image"                )            else:                oss_cover = None            update_sql = f"""            UPDATE {self.article_crawler_videos}            SET video_oss_path = %s, cover_oss_path = %s, download_status = %s            WHERE id = %s;            """            await self.mysql_client.asyncInsert(                sql=update_sql,                params=(                    oss_video,                    oss_cover,                    2,                    params['id']                )            )        except Exception as e:            print("failed", e)            update_sql = f"""            UPDATE {self.article_crawler_videos}            SET download_status = %s            WHERE id = %s;            """            await self.mysql_client.asyncInsert(                sql=update_sql,                params=(3, params['id'])            )    async def deal(self):        """        ETL Deal Task        :return:        """        task_list = await self.getTasks()        logging(            code="5001",            info="ETL Task Got {} this time".format(len(task_list)),            function="ETL"        )        if task_list:            tasks = [self.processTask(params) for params in task_list]            await asyncio.gather(*tasks)
 |