Parcourir la source

Merge branch 'feature/sunxy/20240430/syncTop2000Videos' into feature/sunxy/20240426/missionCronTask

sunxy il y a 11 mois
Parent
commit
b9dca4be2b
10 fichiers modifiés avec 90 ajouts et 103 suppressions
  1. 3 0
      ODPSQueryUtil.py
  2. 18 0
      ReadXlsxFile.py
  3. 2 0
      ai_tag_task.py
  4. 1 1
      ai_tag_task.sh
  5. 28 1
      asr_task.py
  6. 0 3
      asr_task.sh
  7. 37 15
      download_videos_task.py
  8. BIN
      past_videos.xlsx
  9. 1 0
      requirements.txt
  10. 0 83
      result_convertor.py

+ 3 - 0
ODPSQueryUtil.py

@@ -22,3 +22,6 @@ def query_videos(start_idx, limit):
             # 处理查询结果
             result.append(record)
     return result
+
+
+

+ 18 - 0
ReadXlsxFile.py

@@ -0,0 +1,18 @@
+import pandas as pd
+
+# 读取Excel文件
+
+
+def getVideoInfoInXlxs(xlsx_file):
+    df = pd.read_excel(xlsx_file)
+
+    feature_data = []
+    for index, row in df.iterrows():
+        item = {}
+        item['videoid'] = row['videoid']
+        item['title'] = row['title']
+        item['video_path'] = row['video_path']
+        feature_data.append(item)
+
+    feature_df = pd.DataFrame(feature_data)
+    return feature_df

+ 2 - 0
ai_tag_task.py

@@ -13,6 +13,8 @@ from whisper_asr import get_whisper_asr
 from gpt_tag import request_gpt
 from config import set_config
 from log import Log
+
+from result_save import insert_content
 from result_save import insert_content
 config_ = set_config()
 log_ = Log()

+ 1 - 1
ai_tag_task.sh

@@ -1,6 +1,6 @@
 ps -ef | grep ai_tag_task.py | grep -v grep | awk '{print $2}' | xargs kill -9
 
-# cd /data/aigc-test
+cd /sunxy/aigc-test 
 
 # source activate srt
 

+ 28 - 1
asr_task.py

@@ -142,4 +142,31 @@ def timer_check():
 
 
 if __name__ == '__main__':
-    timer_check()
+   #  timer_check()
+    cuda_id = sys.argv[1]
+    download_folder = 'videos'
+    download_folder = f'{download_folder}_{cuda_id}'
+    if not os.path.exists(download_folder):
+        print(f"download_folder: {download_folder} not exists!")
+        exit(0)
+    # 遍历download_folder下所有的子文件夹名,即video_id list
+    video_folder_list = os.listdir(download_folder)
+    if len(video_folder_list) < 1:
+        print(f"video_folder_list is empty!")
+        exit(0)
+
+    asr_folder = 'asr_res'
+    if not os.path.exists(asr_folder):
+        os.makedirs(asr_folder)
+
+    pool = multiprocessing.Pool(processes=2)
+    for video_id in video_folder_list:
+        pool.apply_async(
+            func=get_asr,
+            args=(video_id, download_folder, asr_folder)
+        )
+    pool.close()
+    pool.join()
+
+    print(f"videos asr finished!")
+

+ 0 - 3
asr_task.sh

@@ -2,10 +2,7 @@ ps -ef | grep asr_task.py | grep -v grep | awk '{print $2}' | xargs kill -9
 
 rm -rf asr_res/
 
-# source activate srt
-
 nohup env CUDA_VISIBLE_DEVICES=0 python asr_task.py 0 > logs/asr_task_0.log 2>&1 &
 
 nohup env CUDA_VISIBLE_DEVICES=1 python asr_task.py 1 > logs/asr_task_1.log 2>&1 &
 
-# conda deactivate

+ 37 - 15
download_videos_task.py

@@ -8,10 +8,23 @@ from utils import data_check, get_feature_data
 from config import set_config
 from log import Log
 import ODPSQueryUtil
+from ReadXlsxFile import getVideoInfoInXlxs
+import requests
 config_ = set_config()
 log_ = Log()
 features = ['videoid', 'title', 'video_path']
 
+cdn_domain = 'http://rescdn.yishihui.com/'
+
+
+def download_file(url, local_path):
+    response = requests.get(url)
+    if response.status_code == 200:
+        with open(local_path, 'wb') as f:
+            f.write(response.content)
+    else:
+        print(f"Failed to download {url}")
+
 
 def download_video_from_oss(video_id, video_path, download_folder):
     """从oss下载视频"""
@@ -21,21 +34,14 @@ def download_video_from_oss(video_id, video_path, download_folder):
         download_folder = f"{download_folder}_{pid}"
         if not os.path.exists(download_folder):
             os.makedirs(download_folder)
-        video_local_dir = os.path.join(download_folder, video_id)
+        video_local_dir = os.path.join(download_folder, str(video_id))
         os.makedirs(video_local_dir)
         video_filename = video_path.split('/')[-1]
         video_local_path = os.path.join(video_local_dir, video_filename)
-        # 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
-        # auth = oss2.Auth(access_key_id=config_.ODPS_CONFIG['ACCESSID'], access_key_secret=config_.ODPS_CONFIG['ACCESSKEY'])
-        auth = oss2.Auth(access_key_id=config_.OSS_CONFIG['accessKeyId'],
-                         access_key_secret=config_.OSS_CONFIG['accessKeySecret'])
-        # Endpoint以杭州为例,其它Region请按实际情况填写。
-        bucket = oss2.Bucket(
-            auth, endpoint=config_.OSS_CONFIG['endpoint'], bucket_name='art-pubbucket')
-        # 下载OSS文件到本地文件。
-        # <yourObjectName>由包含文件后缀,不包含Bucket名称组成的Object完整路径,例如abc/efg/123.jpg。
-        # <yourLocalFile>由本地文件路径加文件名包括后缀组成,例如/users/local/myfile.txt。
-        bucket.get_object_to_file(video_path, video_local_path)
+
+        # 下载视频文件到本地
+        video_url = cdn_domain + video_path
+        download_file(video_url, video_local_path)
 
         # m3u8文件,需下载所有ts文件
         if video_filename.split('.')[-1] == 'm3u8':
@@ -46,9 +52,9 @@ def download_video_from_oss(video_id, video_path, download_folder):
                     line = line.strip()
                     print(line)
                     if line[-3:] == '.ts':
-                        ts_path = os.path.join(root_path, line)
+                        ts_url = cdn_domain + os.path.join(root_path, line)
                         ts_local_path = os.path.join(video_local_dir, line)
-                        bucket.get_object_to_file(ts_path, ts_local_path)
+                        download_file(ts_url, ts_local_path)
         print(f"{video_id} download end!")
     except:
         print(f"{video_id} download fail!")
@@ -79,6 +85,7 @@ def timer_check():
     try:
         project = config_.DAILY_VIDEO['project']
         table = config_.DAILY_VIDEO['table']
+        # 昨天
         now_date = datetime.datetime.today()
         print(f"now_date: {datetime.datetime.strftime(now_date, '%Y%m%d')}")
         dt = datetime.datetime.strftime(
@@ -99,4 +106,19 @@ def timer_check():
 
 
 if __name__ == '__main__':
-    timer_check()
+    # timer_check()
+    feature_df = getVideoInfoInXlxs('past_videos.xlsx')
+    download_folder = 'videos'
+    video_id_list = feature_df['videoid'].to_list()
+    pool = multiprocessing.Pool(processes=6)
+    for video_id in video_id_list:
+        video_path = feature_df[feature_df['videoid']
+                                == video_id]['video_path'].values[0].strip()
+        video_path = video_path.replace(' ', '')
+        print(video_id, video_path)
+        pool.apply_async(
+            func=download_video_from_oss,
+            args=(video_id, video_path, download_folder)
+        )
+    pool.close()
+    pool.join()

BIN
past_videos.xlsx


+ 1 - 0
requirements.txt

@@ -9,3 +9,4 @@ aliyun-log-python-sdk
 odps
 whisper
 mysql-connector-python
+openpyxl

+ 0 - 83
result_convertor.py

@@ -1,83 +0,0 @@
-import mysql.connector
-import json
-
-# 配置数据库连接参数
-db_config = {
-    'host': 'rm-bp1k5853td1r25g3n690.mysql.rds.aliyuncs.com',
-    'database': 'incentive',
-    'port': 3306,
-    'user': 'wx2016_longvideo',
-    'password': 'wx2016_longvideoP@assword1234',
-}
-
-# 连接到MySQL数据库
-cnx = mysql.connector.connect(**db_config)
-cursor = cnx.cursor()
-
-# 定义JSON字段名称
-all_field_names = ['key_words', 'search_keys', 'extra_keys', 'category_list', 'tone', 'target_audience',
-                   'target_age', 'target_gender', 'address', 'theme']
-
-json_field_names = ['key_words', 'search_keys', 'extra_keys', 'category_list']
-
-normal_field_names = ['tone', 'target_audience',
-                      'target_age', 'target_gender', 'address', 'theme']
-
-# 批量插入的参数列表
-insert_batch = []
-
-
-# 读取video_content表中的JSON数据并解析
-select_sql = "SELECT * FROM video_content;"
-cursor.execute(select_sql)
-rows = cursor.fetchall()  # 使用fetchall()确保读取所有行
-print("Reading data from video_content table...")
-print("row count: ", len(rows))
-
-for row in rows:
-    video_id = row[1]
-    # 遍历所有的JSON字段
-    for field_name in json_field_names:
-        # 获取对应的JSON字符串
-        json_data = row[all_field_names.index(field_name) + 2]
-        # 判断是否是json字符串
-        if not json_data:
-            continue
-        if json_data[0] != '[':
-            continue
-        # 解析JSON字符串
-        tags = json.loads(json_data) if json_data else []
-        # 构建批量插入的参数
-        for tag in tags:
-            insert_batch.append((video_id, tag, field_name))
-
-    for field_name in normal_field_names:
-        # 获取对应的字段值
-        value = row[all_field_names.index(field_name) + 2]
-        # 构建批量插入的参数
-        insert_batch.append((video_id, value, field_name))
-
-    # 每1000个记录执行一次批量插入
-    if len(insert_batch) >= 1000:
-        cursor.executemany("""
-            INSERT INTO video_content_mapping (video_id, tag, tag_type)
-            VALUES (%s, %s, %s)
-        """, insert_batch)
-        # 清空列表以便下一次批量插入
-        print(f"Inserting records {len(insert_batch)} rows...")
-        insert_batch.clear()
-
-# 插入剩余的记录(如果有)
-if insert_batch:
-    cursor.executemany("""
-        INSERT INTO video_content_mapping (video_id, tag, tag_type)
-        VALUES (%s, %s, %s)
-    """, insert_batch)
-    print(f"Inserting records {len(insert_batch)} rows...")
-
-# 提交事务
-cnx.commit()
-
-# 关闭游标和连接
-cursor.close()
-cnx.close()