123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125 |
- """
- @author: luojunhui
- """
- from applications.etl_function import *
- from applications.const import new_content_id_task_const
- from applications.log import logging
- async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client, illegal_videos):
- """
- 下载视频
- """
- select_sql = f"""
- SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
- FROM {article_crawler_video_table}
- WHERE content_id = '{content_id}'
- AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
- AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
- ORDER BY score DESC;
- """
- videos_need_to_download_tuple = await db_client.async_select(select_sql)
- downloaded_count = 0
- for line in videos_need_to_download_tuple:
- params = {
- "id": line[0],
- "video_id": line[1],
- "platform": line[2],
- "video_title": line[3],
- "video_url": line[4],
- "cover_url": line[5],
- "user_id": line[6],
- "trace_id": line[7]
- }
- out_key = "{}_{}".format(params['platform'], params['video_id'])
- if out_key in illegal_videos:
- continue
- try:
- local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
- # download videos
- file_path = await download_video(
- file_path=local_video_path,
- platform=params['platform'],
- video_url=params['video_url']
- )
- if not file_path:
- # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
- update_sql = f"""
- UPDATE {article_crawler_video_table}
- SET download_status = %s
- WHERE id = %s;
- """
- await db_client.async_insert(
- sql=update_sql,
- params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
- )
- logging(
- code="etl_1001",
- info="etl_下载视频失败",
- trace_id=trace_id,
- function="etl_task"
- )
- else:
- # download cover
- cover_path = await download_cover(
- file_path=local_cover_path,
- platform=params['platform'],
- cover_url=params['cover_url']
- )
- # upload video to oss
- oss_video = await upload_to_oss(
- local_video_path=file_path,
- download_type="video"
- )
- # upload cover to oss
- if cover_path:
- oss_cover = await upload_to_oss(
- local_video_path=cover_path,
- download_type="image"
- )
- else:
- oss_cover = None
- # change status to success
- update_sql = f"""
- UPDATE {article_crawler_video_table}
- SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
- WHERE id = %s;
- """
- await db_client.async_insert(
- sql=update_sql,
- params=(
- oss_video,
- oss_cover,
- new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
- params['id']
- )
- )
- downloaded_count += 1
- logging(
- code="etl_1002",
- info="etl_视频下载成功",
- trace_id=trace_id,
- function="etl_task"
- )
- # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
- if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
- return downloaded_count
- except Exception as e:
- update_sql = f"""
- UPDATE {article_crawler_video_table}
- SET download_status = %s
- WHERE id = %s;
- """
- await db_client.async_insert(
- sql=update_sql,
- params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
- )
- logging(
- code="etl_1001",
- info="etl_下载视频失败",
- trace_id=trace_id,
- function="etl_task"
- )
- return downloaded_count
|