Browse Source

Merge branch '2024-11-17-download-video-fails' of Server/title_with_video into 2024-09-23newDbTasks

luojunhui 5 months ago
parent
commit
4f775dce0b

+ 59 - 20
applications/etl_function/__init__.py

@@ -3,6 +3,7 @@
 """
 import os
 import oss2
+import asyncio
 import aiohttp
 import aiofiles
 import requests
@@ -12,6 +13,17 @@ from uuid import uuid4
 from fake_useragent import FakeUserAgent
 
 
+async def is_empty(file_path: str) -> bool:
+    """
+    判断文件size
+    """
+    # 判断文件是否大于10kb, 若小于10 kb,认为该视频文件为空
+    TEN_KB = 1024 * 10
+    if os.path.getsize(file_path) > TEN_KB:
+        return False
+    return True
+
+
 async def download_cover(file_path, platform, cover_url):
     """
     下载视频封面
@@ -111,28 +123,55 @@ async def download_video(file_path, platform, video_url, download_type="video"):
     :return:
     """
     headers = request_header(platform=platform, url=video_url, download_type=download_type)
-    if os.path.exists(file_path):
-        file_size = os.path.getsize(file_path)
-        headers["Range"] = f"bytes={file_size}-"
-    else:
-        file_size = 0
-    async with aiohttp.ClientSession() as session:
-        async with session.get(video_url, headers=headers) as response:
-            if response.status in [200, 206]:
-                if file_size > 0:
-                    async with aiofiles.open(file_path, "ab+") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
-                else:
-                    async with aiofiles.open(file_path, "wb") as f:
-                        # 以1MB为单位分块下载
-                        async for chunk in response.content.iter_chunked(1024 * 1024):
-                            await f.write(chunk)
+    max_retries = 3  # 设置最大重试次数
+    retries = 0  # 初始化重试次数
+
+    tunnel = "l901.kdltps.com:15818"
+    username = "t11983523373311"
+    password = "mtuhdr2z"
+    proxy_auth = aiohttp.BasicAuth(username, password)
 
+    while retries < max_retries:
+        if os.path.exists(file_path):
+            file_size = os.path.getsize(file_path)
+            if file_size > 0:
+                headers["Range"] = f"bytes={file_size}-"
             else:
-                print(response.status)
-    return file_path
+                # 文件存在但大小为0,删除文件以便重新下载
+                os.remove(file_path)
+                file_size = 0
+        else:
+            file_size = 0
+
+        # start download
+        async with aiohttp.ClientSession() as session:
+            async with session.get(video_url, headers=headers, proxy_auth=proxy_auth, proxy='http://'+tunnel) as response:
+                if response.status in [200, 206]:
+                    if file_size > 0:
+                        async with aiofiles.open(file_path, "ab+") as f:
+                            # 以1MB为单位分块下载
+                            async for chunk in response.content.iter_chunked(1024 * 1024):
+                                await f.write(chunk)
+                    else:
+                        async with aiofiles.open(file_path, "wb") as f:
+                            # 以1MB为单位分块下载
+                            async for chunk in response.content.iter_chunked(1024 * 1024):
+                                await f.write(chunk)
+                    # 判断文件是否为空, 若为空则继续重试
+                    if await is_empty(file_path):
+                        await asyncio.sleep(3)
+                        retries += 1
+                        if retries >= max_retries:
+                            return False
+                    else:
+                        return file_path
+                else:
+                    # 下载失败,等待3秒后重试
+                    await asyncio.sleep(3)
+                    retries += 1
+                    if retries >= max_retries:
+                        print(f"下载失败,已达到最大重试次数:{max_retries}")
+                        return False
 
 
 def generate_video_path(platform, video_id):

+ 34 - 6
applications/search/dy_search.py

@@ -2,6 +2,7 @@
 @author: luojunhui
 """
 import json
+
 import requests
 
 from applications.functions.common import sensitive_flag
@@ -32,6 +33,11 @@ def douyin_search(keyword, sensitive_words, trace_id):
     response = requests.request("POST", url, headers=headers, data=payload)
     try:
         dt_list = response.json()['data']['data']
+        logging(
+            code="4002",
+            info="抖音搜索成功",
+            trace_id=trace_id
+        )
         L = []
         for obj in dt_list:
             try:
@@ -45,6 +51,7 @@ def douyin_search(keyword, sensitive_words, trace_id):
                 else:
                     continue
             except Exception as e:
+                # print(traceback.format_exc())
                 continue
         logging(
             code="8001",
@@ -60,8 +67,9 @@ def douyin_search(keyword, sensitive_words, trace_id):
     except Exception as e:
         logging(
             code="4003",
-            info="抖音搜索失败-搜索词:{} 原因:-{}".format(keyword, e),
-            trace_id=trace_id
+            info="抖音搜索失败",
+            trace_id=trace_id,
+            data={"error": str(e)}
         )
         return []
     # logging(
@@ -86,8 +94,28 @@ def douyin_detail(video_id):
         'Content-Type': 'application/json'
     }
     response = requests.request("POST", url, headers=headers, data=payload).json()
-    video_info = response['data']['data']
-    if video_info['content_type'] == "note":
-        return None
+    logging(
+        code="4005",
+        info="抖音请求详情",
+        data=response
+    )
+    if response['code'] != 0:
+        logging(
+            code="4006",
+            info="抖音请求详情失败",
+            data={"error": response['msg']}
+        )
     else:
-        return video_info
+        try:
+            video_info = response['data']['data']
+            if video_info['content_type'] == "note":
+                return None
+            else:
+                return video_info
+        except Exception as e:
+            logging(
+                code="4006",
+                info="抖音请求详情失败",
+                data={"error": str(e)}
+            )
+            return None

+ 8 - 2
applications/search/hksp_search.py

@@ -107,6 +107,11 @@ def hksp_search(key, sensitive_words, trace_id):
         ).json()
         data_list = response['data']['list']
         L = []
+        logging(
+            code="4002",
+            info="百度搜索成功",
+            trace_id=trace_id
+        )
         for data in data_list:
             try:
                 video_id = data['vid']
@@ -134,7 +139,8 @@ def hksp_search(key, sensitive_words, trace_id):
     except Exception as e:
         logging(
             code="4003",
-            info="百度搜索失败-搜索词:{} 原因:-{}".format(key, e),
-            trace_id=trace_id
+            info="百度搜索失败",
+            trace_id=trace_id,
+            data={"error": str(e)}
         )
         return []

+ 82 - 31
tasks/newContentIdTask.py

@@ -4,8 +4,6 @@
 import json
 import time
 
-import asyncio
-
 from applications.config import Config
 from applications.log import logging
 from applications.functions.pqFunctions import publish_to_pq, get_pq_video_detail
@@ -412,6 +410,12 @@ class NewContentIdTask(object):
             )
             return False
         try:
+            logging(
+                code="spider_1001",
+                info="开始执行搜索任务",
+                trace_id=trace_id,
+                data=kimi_result
+            )
             search_videos_count = await search_videos_from_web(
                 info={
                     "ori_title": kimi_result['ori_title'],
@@ -427,6 +431,12 @@ class NewContentIdTask(object):
             )
             if search_videos_count >= 3:
                 # 表示爬虫任务执行成功, 将状态从 101  改为 2
+                logging(
+                    code="spider_1002",
+                    info="搜索成功",
+                    trace_id=trace_id,
+                    data=kimi_result
+                )
                 await self.update_content_status(
                     new_content_status=self.TASK_SPIDER_FINISHED_STATUS,
                     trace_id=trace_id,
@@ -434,6 +444,12 @@ class NewContentIdTask(object):
                 )
                 return True
             else:
+                logging(
+                    code="spider_1003",
+                    info="搜索失败",
+                    trace_id=trace_id,
+                    data=kimi_result
+                )
                 await self.roll_back_content_status_when_fails(
                     process_times=process_times + 1,
                     trace_id=trace_id
@@ -520,38 +536,67 @@ class NewContentIdTask(object):
                         platform=params['platform'],
                         video_url=params['video_url']
                     )
-                    # download cover
-                    cover_path = await download_cover(
-                        file_path=local_cover_path,
-                        platform=params['platform'],
-                        cover_url=params['cover_url']
-                    )
-                    oss_video = await upload_to_oss(
-                        local_video_path=file_path,
-                        download_type="video"
-                    )
-                    if cover_path:
-                        oss_cover = await upload_to_oss(
-                            local_video_path=cover_path,
-                            download_type="image"
-                        )
-                    else:
-                        oss_cover = None
-                    update_sql = f"""
+                    if not file_path:
+                        # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
+                        update_sql = f"""
                                     UPDATE {self.article_crawler_video_table}
-                                    SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                                    SET download_status = %s
                                     WHERE id = %s;
-                    """
-                    await self.mysql_client.async_insert(
-                        sql=update_sql,
-                        params=(
-                            oss_video,
-                            oss_cover,
-                            VIDEO_DOWNLOAD_SUCCESS_STATUS,
-                            params['id']
+                        """
+                        await self.mysql_client.async_insert(
+                            sql=update_sql,
+                            params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                         )
-                    )
-                    downloaded_count += 1
+                        logging(
+                            code="etl_1001",
+                            info="etl_下载视频失败",
+                            trace_id=trace_id,
+                            function="etl_task"
+                        )
+                    else:
+                        # download cover
+                        cover_path = await download_cover(
+                            file_path=local_cover_path,
+                            platform=params['platform'],
+                            cover_url=params['cover_url']
+                        )
+                        # upload video to oss
+                        oss_video = await upload_to_oss(
+                            local_video_path=file_path,
+                            download_type="video"
+                        )
+                        # upload cover to oss
+                        if cover_path:
+                            oss_cover = await upload_to_oss(
+                                local_video_path=cover_path,
+                                download_type="image"
+                            )
+                        else:
+                            oss_cover = None
+
+                        # change status to success
+                        update_sql = f"""
+                                        UPDATE {self.article_crawler_video_table}
+                                        SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
+                                        WHERE id = %s;
+                        """
+                        await self.mysql_client.async_insert(
+                            sql=update_sql,
+                            params=(
+                                oss_video,
+                                oss_cover,
+                                VIDEO_DOWNLOAD_SUCCESS_STATUS,
+                                params['id']
+                            )
+                        )
+                        downloaded_count += 1
+                        logging(
+                            code="etl_1002",
+                            info="etl_视频下载成功",
+                            trace_id=trace_id,
+                            function="etl_task"
+                        )
+                    # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
                     if downloaded_count > 3:
                         await self.update_content_status(
                             ori_content_status=self.TASK_PROCESSING_STATUS,
@@ -569,6 +614,12 @@ class NewContentIdTask(object):
                         sql=update_sql,
                         params=(VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
                     )
+                    logging(
+                        code="etl_1001",
+                        info="etl_下载视频失败",
+                        trace_id=trace_id,
+                        function="etl_task"
+                    )
             if downloaded_count >= 3:
                 await self.update_content_status(
                     ori_content_status=self.TASK_PROCESSING_STATUS,