Browse Source

add download videos task

liqian 1 year ago
parent
commit
95fad9592a
3 changed files with 212 additions and 0 deletions
  1. 21 0
      config.py
  2. 93 0
      download_videos_task.py
  3. 98 0
      utils.py

+ 21 - 0
config.py

@@ -2,6 +2,27 @@ import os
 
 
 class BaseConfig(object):
+    # 每日AI标签视频信息表
+    DAILY_VIDEO = {
+        'project': 'loghubods',
+        'table': 'video_aigc_tag_yesterday_upload'
+    }
+
+    # ODPS服务配置
+    ODPS_CONFIG = {
+        'ENDPOINT': 'http://service.cn.maxcompute.aliyun.com/api',
+        'ACCESSID': 'LTAIWYUujJAm7CbH',
+        'ACCESSKEY': 'RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
+    }
+
+    # OSS配置
+    OSS_CONFIG = {
+        'endpoint': 'http://oss-cn-hangzhou-internal.aliyuncs.com',
+        'accessKeyId': 'LTAIP6x1l3DXfSxm',
+        'accessKeySecret': 'KbTaM9ars4OX3PMS6Xm7rtxGr1FLon',
+        'bucket_name': 'art-pubbucket'
+    }
+
     # 讯飞asr配置
     XFASR_HOST = 'https://raasr.xfyun.cn/v2/api'
     XF_API = {

+ 93 - 0
download_videos_task.py

@@ -0,0 +1,93 @@
+import traceback
+import datetime
+import os
+import oss2
+import multiprocessing
+from threading import Timer
+from utils import data_check, get_feature_data
+from config import set_config
+from log import Log
+config_ = set_config()
+log_ = Log()
+features = ['videoid', 'title', 'video_path']
+
+
+def download_video_from_oss(video_id, video_path, download_folder):
+    """从oss下载视频"""
+    try:
+        print(f"{video_id} download start ...")
+        if not os.path.exists(download_folder):
+            os.makedirs(download_folder)
+        video_local_dir = os.path.join(download_folder, video_id)
+        os.makedirs(video_local_dir)
+        video_filename = video_path.split('/')[-1]
+        video_local_path = os.path.join(video_local_dir, video_filename)
+        # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
+        # auth = oss2.Auth(access_key_id=config_.ODPS_CONFIG['ACCESSID'], access_key_secret=config_.ODPS_CONFIG['ACCESSKEY'])
+        auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'],
+                         access_key_secret=config_.OSS_CONFIG['accessKeySecret'])
+        # Endpoint以杭州为例,其它Region请按实际情况填写。
+        bucket = oss2.Bucket(auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
+        # 下载OSS文件到本地文件。
+        # <yourObjectName>由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。
+        # <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
+        bucket.get_object_to_file(video_path, video_local_path)
+
+        # m3u8文件,需下载所有ts文件
+        if video_filename.split('.')[-1] == 'm3u8':
+            root_path = '/'.join(video_path.split('/')[:-1])
+            with open(video_local_path, 'r') as rf:
+                lines = rf.readlines()
+                for line in lines:
+                    line = line.strip()
+                    if line[-3:] == '.ts':
+                        ts_path = os.path.join(root_path, line)
+                        ts_local_path = os.path.join(video_local_dir, ts_path)
+                        bucket.get_object_to_file(ts_path, ts_local_path)
+        print(f"{video_id} download end!")
+    except:
+        print(f"{video_id} download fail!")
+
+
+def download_videos(project, table, dt):
+    # 获取特征数据
+    feature_df = get_feature_data(project=project, table=table, dt=dt, features=features)
+    download_folder = 'videos'
+    video_id_list = feature_df['videoid'].to_list()
+    pool = multiprocessing.Pool(processes=3)
+    for video_id in video_id_list[21:31]:
+        video_path = feature_df[feature_df['videoid'] == video_id]['video_path'].values[0].strip()
+        video_path = video_path.replace(' ', '')
+        print(video_id, video_path)
+        pool.apply_async(
+            func=download_video_from_oss,
+            args=(video_id, video_path, download_folder)
+        )
+    pool.close()
+    pool.join()
+
+
+def timer_check():
+    try:
+        project = config_.DAILY_VIDEO['project']
+        table = config_.DAILY_VIDEO['table']
+        now_date = datetime.datetime.today()
+        log_.info(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
+        dt = datetime.datetime.strftime(now_date-datetime.timedelta(days=1), '%Y%m%d')
+        # 查看数据是否已准备好
+        data_count = data_check(project=project, table=table, dt=dt)
+        if data_count > 0:
+            log_.info(f'videos count = {data_count}')
+            # 数据准备好,进行视频下载
+            download_videos(project=project, table=table, dt=dt)
+            log_.info(f"videos download end!")
+
+        else:
+            # 数据没准备好,1分钟后重新检查
+            Timer(60, timer_check).start()
+    except Exception as e:
+        log_.error(f"视频下载失败, exception: {e}, traceback: {traceback.format_exc()}")
+
+
+if __name__ == '__main__':
+    timer_check()

+ 98 - 0
utils.py

@@ -1,13 +1,109 @@
+import oss2
 import requests
 import os
 import json
 import traceback
+import pandas as pd
+from odps import ODPS
 from log import Log
 from config import set_config
 log_ = Log()
 config_ = set_config()
 
 
+def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
+                       pool_maxsize=1000, pool_connections=1000):
+    """
+    从odps获取数据
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
+    )
+    records = odps.read_table(name=table, partition='dt=%s' % date)
+    return records
+
+
+def get_feature_data(project, table, dt, features):
+    """获取特征数据"""
+    records = get_data_from_odps(date=dt, project=project, table=table)
+    feature_data = []
+    for record in records:
+        item = {}
+        for feature_name in features:
+            item[feature_name] = record[feature_name]
+        feature_data.append(item)
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df
+
+
+def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
+                                pool_maxsize=1000, pool_connections=1000):
+    """
+    判断表中是否存在这个分区
+    :param date: 日期 type-string '%Y%m%d'
+    :param project: type-string
+    :param table: 表名 type-string
+    :param connect_timeout: 连接超时设置
+    :param read_timeout: 读取超时设置
+    :param pool_maxsize:
+    :param pool_connections:
+    :return: records
+    """
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=connect_timeout,
+        read_timeout=read_timeout,
+        pool_maxsize=pool_maxsize,
+        pool_connections=pool_connections
+    )
+    t = odps.get_table(name=table)
+    return t.exist_partition(partition_spec=f'dt={date}')
+
+
+def data_check(project, table, dt):
+    """检查数据是否准备好"""
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+        connect_timeout=3000,
+        read_timeout=500000,
+        pool_maxsize=1000,
+        pool_connections=1000
+    )
+
+    try:
+        check_res = check_table_partition_exits(date=dt, project=project, table=table)
+        if check_res:
+            sql = f'select * from {project}.{table} where dt = {dt}'
+            with odps.execute_sql(sql=sql).open_reader() as reader:
+                data_count = reader.count
+        else:
+            data_count = 0
+    except Exception as e:
+        data_count = 0
+    return data_count
+
+
 def request_post(request_url, headers, request_data):
     """
     post 请求 HTTP接口
@@ -76,6 +172,8 @@ def asr_validity_discrimination(text):
 
 
 
+
+
 if __name__ == '__main__':
     text = """现场和电视机前的观众朋友,大家晚上好。
 这里是非常说明的访谈现场,