浏览代码

download_video

sunxy 1 年之前
父节点
当前提交
e6cbaee1e0
共有 2 个文件被更改,包括 68 次插入5 次删除
  1. 24 0
      ODPSQueryUtil.py
  2. 44 5
      download_videos_task.py

+ 24 - 0
ODPSQueryUtil.py

@@ -0,0 +1,24 @@
+from odps import ODPS
+# 阿里云odps连接
+access_id = 'LTAIWYUujJAm7CbH'
+access_key = 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P'
+endpoint = 'http://service.cn.maxcompute.aliyun.com/api'
+project_name = 'loghubods'
+
+odps = ODPS(
+    access_id=access_id,
+    secret_access_key=access_key,
+    project=project_name,
+    endpoint=endpoint
+)
+
+
+def query_videos(start_idx, limit):
+    # 查询视频标题的表现(从阿里云odps中查询)
+    sql = f"SELECT DISTINCT a.videoid, a.title, transed_video_path AS video_path FROM loghubods.video_return_top_500 a LEFT JOIN videoods.dim_video b ON      a.videoid = b.videoid LEFT JOIN videoods.wx_video c ON      a.videoid = c.id WHERE a.dt >= 20230101 LIMIT {start_idx}, {limit};"
+    result = []
+    with odps.execute_sql(sql).open_reader() as reader:
+        for record in reader:
+            # 处理查询结果
+            result.append(record)
+    return result

+ 44 - 5
download_videos_task.py

@@ -7,6 +7,7 @@ from threading import Timer
 from utils import data_check, get_feature_data
 from config import set_config
 from log import Log
+import ODPSQueryUtil
 config_ = set_config()
 log_ = Log()
 features = ['videoid', 'title', 'video_path']
@@ -29,7 +30,8 @@ def download_video_from_oss(video_id, video_path, download_folder):
         auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'],
                          access_key_secret=config_.OSS_CONFIG['accessKeySecret'])
         # Endpoint以杭州为例,其它Region请按实际情况填写。
-        bucket = oss2.Bucket(auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
+        bucket = oss2.Bucket(
+            auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
         # 下载OSS文件到本地文件。
         # <yourObjectName>由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。
         # <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
@@ -55,12 +57,14 @@ def download_video_from_oss(video_id, video_path, download_folder):
 
 def download_videos(project, table, dt):
     # 获取特征数据
-    feature_df = get_feature_data(project=project, table=table, dt=dt, features=features)
+    feature_df = get_feature_data(
+        project=project, table=table, dt=dt, features=features)
     download_folder = 'videos'
     video_id_list = feature_df['videoid'].to_list()
     pool = multiprocessing.Pool(processes=6)
     for video_id in video_id_list:
-        video_path = feature_df[feature_df['videoid'] == video_id]['video_path'].values[0].strip()
+        video_path = feature_df[feature_df['videoid']
+                                == video_id]['video_path'].values[0].strip()
         video_path = video_path.replace(' ', '')
         print(video_id, video_path)
         pool.apply_async(
@@ -77,7 +81,8 @@ def timer_check():
         table = config_.DAILY_VIDEO['table']
         now_date = datetime.datetime.today()
         print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
-        dt = datetime.datetime.strftime(now_date-datetime.timedelta(days=1), '%Y%m%d')
+        dt = datetime.datetime.strftime(
+            now_date-datetime.timedelta(days=1), '%Y%m%d')
         # 查看数据是否已准备好
         data_count = data_check(project=project, table=table, dt=dt)
         if data_count > 0:
@@ -94,4 +99,38 @@ def timer_check():
 
 
 if __name__ == '__main__':
-    timer_check()
+    # timer_check()
+    # size = 5
+    # for i in range(0, 10, size):
+    #     print(f"query_videos start i = {i} ...")
+    #     records = ODPSQueryUtil.query_videos(i, size)
+    #     if records is None or len(records) == 0:
+    #         continue
+    #     print(f"Got {len(records)} records")
+    #     pool = multiprocessing.Pool(processes=6)
+    #     for record in records:
+    #         try:
+    #             videoid = record.videoid
+    #             video_path = record.video_path
+    #             pool.apply_async(
+    #                 func=download_video_from_oss,
+    #                 args=(videoid, video_path, 'videos')
+    #             )
+    #         except Exception as e:
+    #             print(
+    #                 f"download video fail, exception: {e}, traceback: {traceback.format_exc()}")
+    #     print(f"query_videos end i = {i} ...")
+    #     pool.close()
+    #     pool.join()
+    # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
+    # auth = oss2.Auth(access_key_id=config_.ODPS_CONFIG['ACCESSID'], access_key_secret=config_.ODPS_CONFIG['ACCESSKEY'])
+    auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'],
+                     access_key_secret=config_.OSS_CONFIG['accessKeySecret'])
+    # Endpoint以杭州为例,其它Region请按实际情况填写。
+    bucket = oss2.Bucket(
+        auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
+    # 下载OSS文件到本地文件。
+    # <yourObjectName>由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。
+    # <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
+    bucket.get_object_to_file(
+        'longvideo/transcode/video/vpc/20221221/17608628PUf8nomrsSCIhllyT3.mp4', 'videos/17608628PUf8nomrsSCIhllyT3.mp4')