Browse Source

download fails

luojunhui 5 months ago
parent
commit
fca5db31a2
2 changed files with 96 additions and 52 deletions
  1. 59 21
      applications/etl_function/__init__.py
  2. 37 31
      tasks/newContentIdTask.py

+ 59 - 21
applications/etl_function/__init__.py

@@ -3,6 +3,7 @@
 """
 import os
 import oss2
+import asyncio
 import aiohttp
 import aiofiles
 import requests
@@ -12,6 +13,17 @@ from uuid import uuid4
 from fake_useragent import FakeUserAgent
 
 
+async def is_empty(file_path: str) -> bool:
+    """
+    判断文件size
+    """
+    # 判断文件是否大于10kb, 若小于10 kb,认为该视频文件为空
+    TEN_KB = 1024 * 10
+    if os.path.getsize(file_path) > TEN_KB:
+        return False
+    return True
+
+
 async def download_cover(file_path, platform, cover_url):
     """
     下载视频封面
@@ -111,29 +123,55 @@ async def download_video(file_path, platform, video_url, download_type="video"):
     :return:
     """
     headers = request_header(platform=platform, url=video_url, download_type=download_type)
-    if os.path.exists(file_path):
-        file_size = os.path.getsize(file_path)
-        headers["Range"] = f"bytes={file_size}-"
-    else:
-        file_size = 0
-    async with aiohttp.ClientSession() as session:
-        async with session.get(video_url, headers=headers) as response:
-            if response.status in [200, 206]:
-                if file_size > 0:
-                    async with aiofiles.open(file_path, "ab+") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
-                else:
-                    async with aiofiles.open(file_path, "wb") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
+    max_retries = 3  # 设置最大重试次数
+    retries = 0  # 初始化重试次数
+
+    tunnel = "l901.kdltps.com:15818"
+    username = "t11983523373311"
+    password = "mtuhdr2z"
+    proxy_auth = aiohttp.BasicAuth(username, password)
 
+    while retries < max_retries:
+        if os.path.exists(file_path):
+            file_size = os.path.getsize(file_path)
+            if file_size > 0:
+                headers["Range"] = f"bytes={file_size}-"
             else:
-                print("下载失败")
-                return False
-    return file_path
+                # 文件存在但大小为0,删除文件以便重新下载
+                os.remove(file_path)
+                file_size = 0
+        else:
+            file_size = 0
+
+        # start download
+        async with aiohttp.ClientSession() as session:
+            async with session.get(video_url, headers=headers, proxy_auth=proxy_auth, proxy='http://'+tunnel) as response:
+                if response.status in [200, 206]:
+                    if file_size > 0:
+                        async with aiofiles.open(file_path, "ab+") as f:
+                            # 以1MB为单位分块下载
+                            async for chunk in response.content.iter_chunked(1024 * 1024):
+                                await f.write(chunk)
+                    else:
+                        async with aiofiles.open(file_path, "wb") as f:
+                            # 以1MB为单位分块下载
+                            async for chunk in response.content.iter_chunked(1024 * 1024):
+                                await f.write(chunk)
+                    # 判断文件是否为空, 若
+                    if await is_empty(file_path):
+                        await asyncio.sleep(3)
+                        retries += 1
+                        if retries >= max_retries:
+                            return False
+                    else:
+                        return file_path
+                else:
+                    # 下载失败,等待3秒后重试
+                    await asyncio.sleep(3)
+                    retries += 1
+                    if retries >= max_retries:
+                        print(f"下载失败,已达到最大重试次数:{max_retries}")
+                        return False
 
 
 def generate_video_path(platform, video_id):

+ 37 - 31
tasks/newContentIdTask.py

@@ -521,6 +521,7 @@ class NewContentIdTask(object):
                         video_url=params['video_url']
                     )
                     if not file_path:
+                        # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
                         update_sql = f"""
                                             UPDATE {self.article_crawler_video_table}
                                             SET download_status = %s
@@ -530,39 +531,44 @@ class NewContentIdTask(object):
                             sql=update_sql,
                             params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                         )
-                        return False
-                    # download cover
-                    cover_path = await download_cover(
-                        file_path=local_cover_path,
-                        platform=params['platform'],
-                        cover_url=params['cover_url']
-                    )
-                    oss_video = await upload_to_oss(
-                        local_video_path=file_path,
-                        download_type="video"
-                    )
-                    if cover_path:
-                        oss_cover = await upload_to_oss(
-                            local_video_path=cover_path,
-                            download_type="image"
-                        )
                     else:
-                        oss_cover = None
-                    update_sql = f"""
-                                    UPDATE {self.article_crawler_video_table}
-                                    SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
-                                    WHERE id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql,
-                        params=(
-                            oss_video,
-                            oss_cover,
-                            VIDEO_DOWNLOAD_SUCCESS_STATUS,
-                            params['id']
+                        # download cover
+                        cover_path = await download_cover(
+                            file_path=local_cover_path,
+                            platform=params['platform'],
+                            cover_url=params['cover_url']
                         )
-                    )
-                    downloaded_count += 1
+                        # upload video to oss
+                        oss_video = await upload_to_oss(
+                            local_video_path=file_path,
+                            download_type="video"
+                        )
+                        # upload cover to oss
+                        if cover_path:
+                            oss_cover = await upload_to_oss(
+                                local_video_path=cover_path,
+                                download_type="image"
+                            )
+                        else:
+                            oss_cover = None
+
+                        # change status to success
+                        update_sql = f"""
+                                        UPDATE {self.article_crawler_video_table}
+                                        SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                                        WHERE id = %s;
+                        """
+                        await self.mysql_client.async_insert(
+                            sql=update_sql,
+                            params=(
+                                oss_video,
+                                oss_cover,
+                                VIDEO_DOWNLOAD_SUCCESS_STATUS,
+                                params['id']
+                            )
+                        )
+                        downloaded_count += 1
+                    # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
                     if downloaded_count > 3:
                         await self.update_content_status(
                             ori_content_status=self.TASK_PROCESSING_STATUS,