""" @author: luojunhui """ from applications.etl_function import * from applications.const import new_content_id_task_const from applications.log import logging async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client): """ 下载视频 """ select_sql = f""" SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id FROM {article_crawler_video_table} WHERE content_id = '{content_id}' AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS} AND is_illegal = {new_content_id_task_const.VIDEO_SAFE} ORDER BY score DESC; """ videos_need_to_download_tuple = await db_client.async_select(select_sql) downloaded_count = 0 for line in videos_need_to_download_tuple: params = { "id": line[0], "video_id": line[1], "platform": line[2], "video_title": line[3], "video_url": line[4], "cover_url": line[5], "user_id": line[6], "trace_id": line[7] } try: local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id']) # download videos file_path = await download_video( file_path=local_video_path, platform=params['platform'], video_url=params['video_url'] ) if not file_path: # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态 update_sql = f""" UPDATE {article_crawler_video_table} SET download_status = %s WHERE id = %s; """ await db_client.async_insert( sql=update_sql, params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id']) ) logging( code="etl_1001", info="etl_下载视频失败", trace_id=trace_id, function="etl_task" ) else: # download cover cover_path = await download_cover( file_path=local_cover_path, platform=params['platform'], cover_url=params['cover_url'] ) # upload video to oss oss_video = await upload_to_oss( local_video_path=file_path, download_type="video" ) # upload cover to oss if cover_path: oss_cover = await upload_to_oss( local_video_path=cover_path, download_type="image" ) else: oss_cover = None # change status to success update_sql = f""" UPDATE {article_crawler_video_table} SET video_oss_path = %s, cover_oss_path = %s, download_status = %s WHERE id = %s; """ await db_client.async_insert( sql=update_sql, params=( oss_video, oss_cover, new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS, params['id'] ) ) downloaded_count += 1 logging( code="etl_1002", info="etl_视频下载成功", trace_id=trace_id, function="etl_task" ) # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态 if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM: return downloaded_count except Exception as e: update_sql = f""" UPDATE {article_crawler_video_table} SET download_status = %s WHERE id = %s; """ await db_client.async_insert( sql=update_sql, params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id']) ) logging( code="etl_1001", info="etl_下载视频失败", trace_id=trace_id, function="etl_task" ) return downloaded_count