hive_test.py 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. from odps import ODPS
  2. import pandas as pd
  3. def get_odps_instance(project):
  4. odps = ODPS(
  5. access_id='LTAIWYUujJAm7CbH',
  6. secret_access_key='RfSjdiWwED1sGFlsjXv0DlfTnZTG1P',
  7. project=project,
  8. endpoint='http://service.cn.maxcompute.aliyun.com/api',
  9. )
  10. return odps
  11. if __name__ == '__main__':
  12. project = 'pai_algo'
  13. odps = get_odps_instance(project)
  14. table = 'pai_temp_flow_1p2ujuopb682ktq1h6_node_uxam9b0n7k2v11i9rz_outputTable'
  15. num = 10
  16. try:
  17. # 要查询的 SQL 语句
  18. sql = f'select * from {table} limit {num}'
  19. # 执行 SQL 查询
  20. with odps.execute_sql(sql).open_reader() as reader:
  21. # 查询数量小于目标数量时 返回空
  22. if reader.count < num:
  23. print("error")
  24. # 获取字段名称
  25. column_names = [col.name for col in reader._schema.columns]
  26. # 获取查询结果数据
  27. data = []
  28. for record in reader:
  29. record_list = list(record)
  30. numbers = []
  31. for item in record_list:
  32. numbers.append(item[1])
  33. data.append(numbers)
  34. # 将数据和字段名称组合成 DataFrame
  35. df = pd.DataFrame(data, columns=column_names)
  36. print(df)
  37. except Exception as e:
  38. print(f"发生错误: {e}")