| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 | 
							- """
 
- @author: luojunhui
 
- """
 
- from applications.etl_function import *
 
- from applications.const import new_content_id_task_const
 
- from applications.log import logging
 
- async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client, illegal_videos):
 
-     """
 
-     下载视频
 
-     """
 
-     select_sql = f"""
 
-                     SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
 
-                     FROM {article_crawler_video_table}
 
-                     WHERE content_id = '{content_id}' 
 
-                         AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
 
-                         AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
 
-                     ORDER BY score DESC;
 
-                 """
 
-     videos_need_to_download_tuple = await db_client.async_select(select_sql)
 
-     downloaded_count = 0
 
-     for line in videos_need_to_download_tuple:
 
-         params = {
 
-             "id": line[0],
 
-             "video_id": line[1],
 
-             "platform": line[2],
 
-             "video_title": line[3],
 
-             "video_url": line[4],
 
-             "cover_url": line[5],
 
-             "user_id": line[6],
 
-             "trace_id": line[7]
 
-         }
 
-         out_key = "{}_{}".format(params['platform'], params['video_id'])
 
-         if out_key in illegal_videos:
 
-             continue
 
-         try:
 
-             local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
 
-             # download videos
 
-             file_path = await download_video(
 
-                 file_path=local_video_path,
 
-                 platform=params['platform'],
 
-                 video_url=params['video_url']
 
-             )
 
-             if not file_path:
 
-                 # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
 
-                 update_sql = f"""
 
-                         UPDATE {article_crawler_video_table}
 
-                         SET download_status = %s
 
-                         WHERE id = %s;
 
-                 """
 
-                 await db_client.async_insert(
 
-                     sql=update_sql,
 
-                     params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
 
-                 )
 
-                 logging(
 
-                     code="etl_1001",
 
-                     info="etl_下载视频失败",
 
-                     trace_id=trace_id,
 
-                     function="etl_task"
 
-                 )
 
-             else:
 
-                 # download cover
 
-                 cover_path = await download_cover(
 
-                     file_path=local_cover_path,
 
-                     platform=params['platform'],
 
-                     cover_url=params['cover_url']
 
-                 )
 
-                 # upload video to oss
 
-                 oss_video = await upload_to_oss(
 
-                     local_video_path=file_path,
 
-                     download_type="video"
 
-                 )
 
-                 # upload cover to oss
 
-                 if cover_path:
 
-                     oss_cover = await upload_to_oss(
 
-                         local_video_path=cover_path,
 
-                         download_type="image"
 
-                     )
 
-                 else:
 
-                     oss_cover = None
 
-                 # change status to success
 
-                 update_sql = f"""
 
-                                             UPDATE {article_crawler_video_table}
 
-                                             SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
 
-                                             WHERE id = %s;
 
-                             """
 
-                 await db_client.async_insert(
 
-                     sql=update_sql,
 
-                     params=(
 
-                         oss_video,
 
-                         oss_cover,
 
-                         new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
 
-                         params['id']
 
-                     )
 
-                 )
 
-                 downloaded_count += 1
 
-                 logging(
 
-                     code="etl_1002",
 
-                     info="etl_视频下载成功",
 
-                     trace_id=trace_id,
 
-                     function="etl_task"
 
-                 )
 
-             # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
 
-             if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
 
-                 return downloaded_count
 
-         except Exception as e:
 
-             update_sql = f"""
 
-                         UPDATE {article_crawler_video_table}
 
-                         SET download_status = %s
 
-                         WHERE id = %s;
 
-                         """
 
-             await db_client.async_insert(
 
-                 sql=update_sql,
 
-                 params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
 
-             )
 
-             logging(
 
-                 code="etl_1001",
 
-                 info="etl_下载视频失败",
 
-                 trace_id=trace_id,
 
-                 function="etl_task"
 
-             )
 
-     return downloaded_count
 
 
  |