etl_task.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. @author: luojunhui
  3. """
  4. from applications.etl_function import *
  5. from applications.const import new_content_id_task_const
  6. from applications.log import logging
  7. async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client, illegal_videos):
  8. """
  9. 下载视频
  10. """
  11. select_sql = f"""
  12. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  13. FROM {article_crawler_video_table}
  14. WHERE content_id = '{content_id}'
  15. AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  16. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
  17. ORDER BY score DESC;
  18. """
  19. videos_need_to_download_tuple = await db_client.async_select(select_sql)
  20. downloaded_count = 0
  21. for line in videos_need_to_download_tuple:
  22. params = {
  23. "id": line[0],
  24. "video_id": line[1],
  25. "platform": line[2],
  26. "video_title": line[3],
  27. "video_url": line[4],
  28. "cover_url": line[5],
  29. "user_id": line[6],
  30. "trace_id": line[7]
  31. }
  32. out_key = "{}_{}".format(params['platform'], params['video_id'])
  33. if out_key in illegal_videos:
  34. continue
  35. try:
  36. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  37. # download videos
  38. file_path = await download_video(
  39. file_path=local_video_path,
  40. platform=params['platform'],
  41. video_url=params['video_url']
  42. )
  43. if not file_path:
  44. # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
  45. update_sql = f"""
  46. UPDATE {article_crawler_video_table}
  47. SET download_status = %s
  48. WHERE id = %s;
  49. """
  50. await db_client.async_insert(
  51. sql=update_sql,
  52. params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  53. )
  54. logging(
  55. code="etl_1001",
  56. info="etl_下载视频失败",
  57. trace_id=trace_id,
  58. function="etl_task"
  59. )
  60. else:
  61. # download cover
  62. cover_path = await download_cover(
  63. file_path=local_cover_path,
  64. platform=params['platform'],
  65. cover_url=params['cover_url']
  66. )
  67. # upload video to oss
  68. oss_video = await upload_to_oss(
  69. local_video_path=file_path,
  70. download_type="video"
  71. )
  72. # upload cover to oss
  73. if cover_path:
  74. oss_cover = await upload_to_oss(
  75. local_video_path=cover_path,
  76. download_type="image"
  77. )
  78. else:
  79. oss_cover = None
  80. # change status to success
  81. update_sql = f"""
  82. UPDATE {article_crawler_video_table}
  83. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  84. WHERE id = %s;
  85. """
  86. await db_client.async_insert(
  87. sql=update_sql,
  88. params=(
  89. oss_video,
  90. oss_cover,
  91. new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
  92. params['id']
  93. )
  94. )
  95. downloaded_count += 1
  96. logging(
  97. code="etl_1002",
  98. info="etl_视频下载成功",
  99. trace_id=trace_id,
  100. function="etl_task"
  101. )
  102. # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
  103. if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
  104. return downloaded_count
  105. except Exception as e:
  106. update_sql = f"""
  107. UPDATE {article_crawler_video_table}
  108. SET download_status = %s
  109. WHERE id = %s;
  110. """
  111. await db_client.async_insert(
  112. sql=update_sql,
  113. params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  114. )
  115. logging(
  116. code="etl_1001",
  117. info="etl_下载视频失败",
  118. trace_id=trace_id,
  119. function="etl_task"
  120. )
  121. return downloaded_count