import redis from odps import ODPS # Redis 连接配置 redis_host = 'r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com' # Redis 服务器地址 redis_port = 6379 # Redis 服务器端口 redis_password = 'Wqsd@2019' # Redis 服务器端口 redis_db = 0 # Redis 数据库编号 # DataWorks 连接配置 access_id = 'LTAI9EBa0bd5PrDa' # 阿里云 Access ID access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5' # 阿里云 Access Key project_name = 'loghubods' # MaxCompute 项目名称 endpoint = 'http://service.odps.aliyun.com/api' # MaxCompute Endpoint # 文件路径 input_file = 'input.txt' # 输入文件路径 output_file = 'output.txt' # 输出文件路径 def process_data(): """ 从阿里云 DataWorks (MaxCompute) 读取数据,从 Redis 获取 value,并写入新文件。 """ try: # 连接 DataWorks (MaxCompute) o = ODPS(access_id, access_key, project=project_name, endpoint=endpoint) # 连接 Redis r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=redis_db) # 构建 SQL 查询语句 (根据你的实际表结构修改) sql = "SELECT * FROM loghubods.mid_generate_date_18;" # 替换为你的表名和列名 with o.execute_sql(sql).open_reader(tunnel=True) as reader: # 使用 Tunnel 模式,提高读取效率 with open(output_file, 'w') as outfile: for record in reader: key = record['mid'] redis_key = "mid:generate:timestamp:" + str(key) # 将 key 转换为字符串,因为 Redis 的 key 是字符串类型 value = r.get(redis_key) # 从 Redis 获取 value if value: # 将 key-value 写入新文件 (注意解码) outfile.write(f"{key} {value.decode('utf-8')}\n") else: outfile.write(f"{key} -\n") print(f"Key '{key}' not found in Redis.") except Exception as e: print(f"An error occurred: {e}") if __name__ == "__main__": process_data()