12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import redis
- from odps import ODPS
- # Redis 连接配置
- redis_host = 'r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com' # Redis 服务器地址
- redis_port = 6379 # Redis 服务器端口
- redis_password = 'Wqsd@2019' # Redis 服务器端口
- redis_db = 0 # Redis 数据库编号
- # DataWorks 连接配置
- access_id = 'LTAI9EBa0bd5PrDa' # 阿里云 Access ID
- access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5' # 阿里云 Access Key
- project_name = 'loghubods' # MaxCompute 项目名称
- endpoint = 'http://service.odps.aliyun.com/api' # MaxCompute Endpoint
- # 文件路径
- input_file = 'input.txt' # 输入文件路径
- output_file = 'output.txt' # 输出文件路径
- def process_data():
- """
- 从阿里云 DataWorks (MaxCompute) 读取数据,从 Redis 获取 value,并写入新文件。
- """
- try:
- # 连接 DataWorks (MaxCompute)
- o = ODPS(access_id, access_key, project=project_name, endpoint=endpoint)
- # 连接 Redis
- r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=redis_db)
- # 构建 SQL 查询语句 (根据你的实际表结构修改)
- sql = "SELECT * FROM loghubods.mid_generate_date_18;" # 替换为你的表名和列名
- with o.execute_sql(sql).open_reader(tunnel=True) as reader: # 使用 Tunnel 模式,提高读取效率
- with open(output_file, 'w') as outfile:
- for record in reader:
- key = record['mid']
- redis_key = "mid:generate:timestamp:" + str(key) # 将 key 转换为字符串,因为 Redis 的 key 是字符串类型
- value = r.get(redis_key) # 从 Redis 获取 value
- if value:
- # 将 key-value 写入新文件 (注意解码)
- outfile.write(f"{key} {value.decode('utf-8')}\n")
- else:
- outfile.write(f"{key} -\n")
- print(f"Key '{key}' not found in Redis.")
- except Exception as e:
- print(f"An error occurred: {e}")
- if __name__ == "__main__":
- process_data()
|