etl_task.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """
  2. @author: luojunhui
  3. """
  4. from applications.etl_function import *
  5. from applications.const import new_content_id_task_const
  6. from applications.log import logging
  7. async def async_download_videos(trace_id, content_id, article_crawler_video_table, db_client):
  8. """
  9. 下载视频
  10. """
  11. select_sql = f"""
  12. SELECT id, out_video_id, platform, video_title, video_url, cover_url, user_id, trace_id
  13. FROM {article_crawler_video_table}
  14. WHERE content_id = '{content_id}'
  15. AND download_status != {new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS}
  16. AND is_illegal = {new_content_id_task_const.VIDEO_SAFE}
  17. ORDER BY score DESC;
  18. """
  19. videos_need_to_download_tuple = await db_client.async_select(select_sql)
  20. downloaded_count = 0
  21. for line in videos_need_to_download_tuple:
  22. params = {
  23. "id": line[0],
  24. "video_id": line[1],
  25. "platform": line[2],
  26. "video_title": line[3],
  27. "video_url": line[4],
  28. "cover_url": line[5],
  29. "user_id": line[6],
  30. "trace_id": line[7]
  31. }
  32. try:
  33. local_video_path, local_cover_path = generate_video_path(params['platform'], params['video_id'])
  34. # download videos
  35. file_path = await download_video(
  36. file_path=local_video_path,
  37. platform=params['platform'],
  38. video_url=params['video_url']
  39. )
  40. if not file_path:
  41. # 说明视频下载失败,无需上传该视频, 将该条记录设置为失败状态
  42. update_sql = f"""
  43. UPDATE {article_crawler_video_table}
  44. SET download_status = %s
  45. WHERE id = %s;
  46. """
  47. await db_client.async_insert(
  48. sql=update_sql,
  49. params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  50. )
  51. logging(
  52. code="etl_1001",
  53. info="etl_下载视频失败",
  54. trace_id=trace_id,
  55. function="etl_task"
  56. )
  57. else:
  58. # download cover
  59. cover_path = await download_cover(
  60. file_path=local_cover_path,
  61. platform=params['platform'],
  62. cover_url=params['cover_url']
  63. )
  64. # upload video to oss
  65. oss_video = await upload_to_oss(
  66. local_video_path=file_path,
  67. download_type="video"
  68. )
  69. # upload cover to oss
  70. if cover_path:
  71. oss_cover = await upload_to_oss(
  72. local_video_path=cover_path,
  73. download_type="image"
  74. )
  75. else:
  76. oss_cover = None
  77. # change status to success
  78. update_sql = f"""
  79. UPDATE {article_crawler_video_table}
  80. SET video_oss_path = %s, cover_oss_path = %s, download_status = %s
  81. WHERE id = %s;
  82. """
  83. await db_client.async_insert(
  84. sql=update_sql,
  85. params=(
  86. oss_video,
  87. oss_cover,
  88. new_content_id_task_const.VIDEO_DOWNLOAD_SUCCESS_STATUS,
  89. params['id']
  90. )
  91. )
  92. downloaded_count += 1
  93. logging(
  94. code="etl_1002",
  95. info="etl_视频下载成功",
  96. trace_id=trace_id,
  97. function="etl_task"
  98. )
  99. # 如果下载的视频数已经大于3, 则直接退出循环,修改状态为ETL成功状态
  100. if downloaded_count > new_content_id_task_const.MIN_MATCH_VIDEO_NUM:
  101. return downloaded_count
  102. except Exception as e:
  103. update_sql = f"""
  104. UPDATE {article_crawler_video_table}
  105. SET download_status = %s
  106. WHERE id = %s;
  107. """
  108. await db_client.async_insert(
  109. sql=update_sql,
  110. params=(new_content_id_task_const.VIDEO_DOWNLOAD_FAIL_STATUS, params['id'])
  111. )
  112. logging(
  113. code="etl_1001",
  114. info="etl_下载视频失败",
  115. trace_id=trace_id,
  116. function="etl_task"
  117. )
  118. return downloaded_count