supeng 1 month ago
commit
93a08fde9c
6 changed files with 305 additions and 0 deletions
  1. 36 0
      bitrate.py
  2. 16 0
      clear_redis.py
  3. 73 0
      clear_redis_concurrent.py
  4. 55 0
      loop_redis.py
  5. 49 0
      renew_redis.py
  6. 76 0
      renew_redis_concurrent.py

+ 36 - 0
bitrate.py

@@ -0,0 +1,36 @@
+import subprocess
+import json
+
+
+def get_video_info(video_path):
+    # 使用 ffprobe 获取视频信息
+    command = [
+        'ffprobe',
+        '-v', 'error',
+        '-select_streams', 'v:0',
+        '-show_entries', 'format=size,bit_rate',
+        '-of', 'json',
+        video_path
+    ]
+    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
+
+    if result.returncode != 0:
+        print(f"Error: {result.stderr}")
+        return None
+
+    # 解析 JSON 输出
+    video_info = json.loads(result.stdout)
+    return video_info['format']
+
+
+if __name__ == "__main__":
+    # video_path = 'path/to/your/video.mp4'
+    video_path = 'http://rescdn.yishihui.com/longvideo/multitranscodeh265/crawler_local/video/prod/20250301/088c1f984789aa3ef65aa7e41018237f7446740420250301194002586558054-2SD.mp4'
+    info = get_video_info(video_path)
+
+    if info:
+        size = int(info['size'])
+        bit_rate = int(info['bit_rate'])
+
+        print(f"File Size: {size / (1024 * 1024):.2f} MB")
+        print(f"Bit Rate: {bit_rate / 1000:.2f} kbps")

+ 16 - 0
clear_redis.py

@@ -0,0 +1,16 @@
+import redis
+import time
+
+# 连接到Redis
+#r = redis.Redis(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')
+r = redis.Redis(host='r-bp1oyhyx4mxgs6klyt561.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')
+
+# 使用SCAN命令逐步扫描key
+cursor = '0'
+while True:
+    cursor, keys = r.scan(cursor=cursor, match='*', count=1000)
+    if cursor == 0:
+        print("finish")
+        break
+    else:
+        time.sleep(0.1)

+ 73 - 0
clear_redis_concurrent.py

@@ -0,0 +1,73 @@
+import datetime
+import sys
+
+import redis
+import concurrent.futures
+
+def process_key(r, key):
+    """处理单个 key 的过期时间延长逻辑。"""
+    try:
+        ttl = r.ttl(key)
+        if ttl is not None and 0 < ttl < threshold_second:
+            r.expire(key, ttl + extend_second)
+            #logging.info(f"Key: {key.decode()}, 原 TTL: {ttl} 秒, 延长 {extend_second} 秒")
+            return 1  # 成功续期的 key 数量
+        return 0
+    except redis.exceptions.RedisError as e:
+        print(f"处理 Key: {key.decode()} 时发生 Redis 错误: {e}")
+        return 0
+    except Exception as e:
+        print(f"处理 Key: {key.decode()} 时发生其他错误: {e}")
+        return 0
+
+
+def clear_redis_key(host='localhost', port=6379, db=0, password=None, num_workers=10):
+    """
+    扫描 Redis key,如果过期时间小于 threshold_second,则过期时间延长 extend_second。
+    使用并行处理,尽量不改动原代码结构。
+    """
+    try:
+        print(f'time = {datetime.datetime.now()}')
+        sys.stdout.flush()
+        sys.stderr.flush()
+        r = redis.Redis(host=host, port=port, db=db, password=password)
+        count = 0
+        clear_count = 0
+
+        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
+            futures = []
+            for key in r.scan_iter(match='com.weiqu.longvideo.video.message.cache.*', count=1000):
+                futures.append(executor.submit(process_key, r, key))
+                count += 1
+                if count % 1000000 == 0:
+                    print(f"submit count: {count} time = {datetime.datetime.now()}")
+                    sys.stdout.flush()
+                    sys.stderr.flush()
+
+            for future in concurrent.futures.as_completed(futures):
+                try:
+                    clear_count += future.result()
+                except Exception as e:
+                    print(f"future error: {e}")
+                    sys.stdout.flush()
+                    sys.stderr.flush()
+
+        print(f"scan finish count: {count} clear_count: {clear_count} time = {datetime.datetime.now()}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+    except redis.exceptions.ConnectionError as e:
+        print(f"connect Redis error: {e}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+    except Exception as e:
+        print(f"error: {e}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+
+if __name__ == "__main__":
+    num_workers = 20  # 根据 CPU 核心数和网络情况调整
+    # test
+    clear_redis_key(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')
+
+    # prod
+    # clear_redis_key(host='r-bp1oyhyx4mxgs6klyt561.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019')

+ 55 - 0
loop_redis.py

@@ -0,0 +1,55 @@
+import redis
+from odps import ODPS
+
+# Redis 连接配置
+redis_host = 'r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com'  # Redis 服务器地址
+redis_port = 6379  # Redis 服务器端口
+redis_password = 'Wqsd@2019'  # Redis 服务器端口
+redis_db = 0  # Redis 数据库编号
+
+# DataWorks 连接配置
+access_id = 'LTAI9EBa0bd5PrDa'  # 阿里云 Access ID
+access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5'  # 阿里云 Access Key
+project_name = 'loghubods'  # MaxCompute 项目名称
+endpoint = 'http://service.odps.aliyun.com/api'  # MaxCompute Endpoint
+# 文件路径
+input_file = 'input.txt'  # 输入文件路径
+output_file = 'output.txt'  # 输出文件路径
+
+
+def process_data():
+    """
+    从阿里云 DataWorks (MaxCompute) 读取数据,从 Redis 获取 value,并写入新文件。
+    """
+
+    try:
+        # 连接 DataWorks (MaxCompute)
+        o = ODPS(access_id, access_key, project=project_name, endpoint=endpoint)
+
+        # 连接 Redis
+        r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=redis_db)
+
+        # 构建 SQL 查询语句 (根据你的实际表结构修改)
+        sql = "SELECT * FROM loghubods.mid_generate_date_18;"  # 替换为你的表名和列名
+
+        with o.execute_sql(sql).open_reader(tunnel=True) as reader:  # 使用 Tunnel 模式,提高读取效率
+            with open(output_file, 'w') as outfile:
+                for record in reader:
+                    key = record['mid']
+                    redis_key = "mid:generate:timestamp:" + str(key)  # 将 key 转换为字符串,因为 Redis 的 key 是字符串类型
+
+                    value = r.get(redis_key)  # 从 Redis 获取 value
+
+                    if value:
+                        # 将 key-value 写入新文件 (注意解码)
+                        outfile.write(f"{key} {value.decode('utf-8')}\n")
+                    else:
+                        outfile.write(f"{key} -\n")
+                        print(f"Key '{key}' not found in Redis.")
+
+    except Exception as e:
+        print(f"An error occurred: {e}")
+
+
+if __name__ == "__main__":
+    process_data()

+ 49 - 0
renew_redis.py

@@ -0,0 +1,49 @@
+import datetime
+
+import redis
+
+def extend_redis_key_expiry(host='localhost', port=6379, db=0, password=None, threshold_second=60, extend_second=300):
+    """
+    扫描 Redis key,如果过期时间小于 threshold_days 天,则过期时间延长 extend_days 天。
+
+    Args:
+        host: Redis 主机地址。
+        port: Redis 端口。
+        db: Redis 数据库编号。
+        password: Redis 密码(如果没有则为 None)。
+        threshold_second: 过期时间阈值(秒)。
+        extend_second: 延长时间(秒)。
+    """
+    try:
+        print(f'time = {datetime.datetime.now()}')
+        r = redis.Redis(host=host, port=port, db=db, password=password)
+        count = 0
+        renew_count = 0
+        # 获取所有 key (生产环境谨慎使用 keys '*')
+        # 建议使用 scan_iter 迭代 key,避免阻塞 Redis
+        for key in r.scan_iter(match='mid:generate:timestamp*', count=1000):
+        # for key in r.scan_iter(match='renew_1*', count=1000):
+            ttl = r.ttl(key)
+            print(f'key: {key}, ttl: {ttl}')
+            if ttl is not None and ttl > 0:  # 检查 key 是否设置了过期时间
+                if ttl < threshold_second:
+                    r.expire(key, ttl + extend_second)  # 在原过期时间基础上延长
+                    print(f"Key: {key.decode()}, 原 TTL: {ttl} 秒, 延长 {extend_second} 秒")
+                    renew_count += 1
+            count += 1
+            if count % 1000000 == 0:
+                print(f"count: {count}")
+        print(f"扫描完成 count: {count} renew_count: {renew_count} time = {datetime.datetime.now()}")
+    except redis.exceptions.ConnectionError as e:
+        print(f"连接 Redis 失败: {e}")
+    except Exception as e:
+        print(f"发生错误: {e}")
+
+if __name__ == "__main__":
+    # 或者指定 Redis 连接参数
+    threshold_second = 7 * 24 * 60 * 60
+    extend_second = 20 * 24 * 60 * 60
+    # test
+    # extend_redis_key_expiry(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second)
+    # prod
+    extend_redis_key_expiry(host='r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second)

+ 76 - 0
renew_redis_concurrent.py

@@ -0,0 +1,76 @@
+import datetime
+import sys
+
+import redis
+import concurrent.futures
+
+def process_key(r, key, threshold_second, extend_second):
+    """处理单个 key 的过期时间延长逻辑。"""
+    try:
+        ttl = r.ttl(key)
+        if ttl is not None and 0 < ttl < threshold_second:
+            r.expire(key, ttl + extend_second)
+            #logging.info(f"Key: {key.decode()}, 原 TTL: {ttl} 秒, 延长 {extend_second} 秒")
+            return 1  # 成功续期的 key 数量
+        return 0
+    except redis.exceptions.RedisError as e:
+        print(f"处理 Key: {key.decode()} 时发生 Redis 错误: {e}")
+        return 0
+    except Exception as e:
+        print(f"处理 Key: {key.decode()} 时发生其他错误: {e}")
+        return 0
+
+
+def extend_redis_key_expiry(host='localhost', port=6379, db=0, password=None, threshold_second=60, extend_second=300, num_workers=10):
+    """
+    扫描 Redis key,如果过期时间小于 threshold_second,则过期时间延长 extend_second。
+    使用并行处理,尽量不改动原代码结构。
+    """
+    try:
+        print(f'time = {datetime.datetime.now()}')
+        sys.stdout.flush()
+        sys.stderr.flush()
+        r = redis.Redis(host=host, port=port, db=db, password=password)
+        count = 0
+        renew_count = 0
+
+        with concurrent.futures.ThreadPoolExecutor(max_workers=num_workers) as executor:
+            futures = []
+            # for key in r.scan_iter(match='mid:generate:timestamp*', count=1000):
+            for key in r.scan_iter(match='renew_1*', count=1000):
+                futures.append(executor.submit(process_key, r, key, threshold_second, extend_second))
+                count += 1
+                if count % 1000000 == 0:
+                    print(f"submit count: {count} time = {datetime.datetime.now()}")
+                    sys.stdout.flush()
+                    sys.stderr.flush()
+
+            for future in concurrent.futures.as_completed(futures):
+                try:
+                    renew_count += future.result()
+                except Exception as e:
+                    print(f"future error: {e}")
+                    sys.stdout.flush()
+                    sys.stderr.flush()
+
+        print(f"scan finish count: {count} renew_count: {renew_count} time = {datetime.datetime.now()}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+    except redis.exceptions.ConnectionError as e:
+        print(f"connect Redis error: {e}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+    except Exception as e:
+        print(f"error: {e}")
+        sys.stdout.flush()
+        sys.stderr.flush()
+
+if __name__ == "__main__":
+    threshold_second = 7 * 24 * 60 * 60
+    extend_second = 20 * 24 * 60 * 60
+    num_workers = 50  # 根据 CPU 核心数和网络情况调整
+    # test
+    extend_redis_key_expiry(host='r-bp1ps6my7lzg8rdhwx682.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second)
+
+    # prod
+    # extend_redis_key_expiry(host='r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com', port=6379, db=0, password='Wqsd@2019', threshold_second=threshold_second, extend_second=extend_second, num_workers=num_workers)