loop_redis.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import redis
  2. from odps import ODPS
  3. # Redis 连接配置
  4. redis_host = 'r-bp1j1vsznx8h813ddk.redis.rds.aliyuncs.com' # Redis 服务器地址
  5. redis_port = 6379 # Redis 服务器端口
  6. redis_password = 'Wqsd@2019' # Redis 服务器端口
  7. redis_db = 0 # Redis 数据库编号
  8. # DataWorks 连接配置
  9. access_id = 'LTAI9EBa0bd5PrDa' # 阿里云 Access ID
  10. access_key = 'vAalxds7YxhfOA2yVv8GziCg3Y87v5' # 阿里云 Access Key
  11. project_name = 'loghubods' # MaxCompute 项目名称
  12. endpoint = 'http://service.odps.aliyun.com/api' # MaxCompute Endpoint
  13. # 文件路径
  14. input_file = 'input.txt' # 输入文件路径
  15. output_file = 'output.txt' # 输出文件路径
  16. def process_data():
  17. """
  18. 从阿里云 DataWorks (MaxCompute) 读取数据,从 Redis 获取 value,并写入新文件。
  19. """
  20. try:
  21. # 连接 DataWorks (MaxCompute)
  22. o = ODPS(access_id, access_key, project=project_name, endpoint=endpoint)
  23. # 连接 Redis
  24. r = redis.Redis(host=redis_host, port=redis_port, password=redis_password, db=redis_db)
  25. # 构建 SQL 查询语句 (根据你的实际表结构修改)
  26. sql = "SELECT * FROM loghubods.mid_generate_date_18;" # 替换为你的表名和列名
  27. with o.execute_sql(sql).open_reader(tunnel=True) as reader: # 使用 Tunnel 模式,提高读取效率
  28. with open(output_file, 'w') as outfile:
  29. for record in reader:
  30. key = record['mid']
  31. redis_key = "mid:generate:timestamp:" + str(key) # 将 key 转换为字符串,因为 Redis 的 key 是字符串类型
  32. value = r.get(redis_key) # 从 Redis 获取 value
  33. if value:
  34. # 将 key-value 写入新文件 (注意解码)
  35. outfile.write(f"{key} {value.decode('utf-8')}\n")
  36. else:
  37. outfile.write(f"{key} -\n")
  38. print(f"Key '{key}' not found in Redis.")
  39. except Exception as e:
  40. print(f"An error occurred: {e}")
  41. if __name__ == "__main__":
  42. process_data()