Explorar el Código

Update my_utils: add some support functions

StrayWarrior hace 7 meses
padre
commit
e067fa7c14
Se han modificado 1 ficheros con 91 adiciones y 0 borrados
  1. 91 0
      my_utils.py

+ 91 - 0
my_utils.py

@@ -7,6 +7,7 @@ import traceback
 import pandas as pd
 
 from odps import ODPS
+from odps.df import DataFrame
 from my_config import set_config
 from db_helper import HologresHelper, MysqlHelper, RedisHelper
 from log import Log
@@ -16,6 +17,16 @@ config_, env = set_config()
 log_ = Log()
 
 
+def get_odps_instance(project):
+    odps = ODPS(
+        access_id=config_.ODPS_CONFIG['ACCESSID'],
+        secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
+        project=project,
+        endpoint=config_.ODPS_CONFIG['ENDPOINT'],
+    )
+    return odps
+
+
 def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=500000,
                        pool_maxsize=1000, pool_connections=1000):
     odps = ODPS(
@@ -72,6 +83,65 @@ def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=
     return records
 
 
+def get_dataframe_from_odps(project, table, partition_spec_dict=None):
+    """
+    从odps获取数据
+    :param partition_spec_dict: 分区spec type-dict
+    :param project: type-string
+    :param table: 表名 type-string
+    :return: odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    if partition_spec_dict:
+        spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
+                         partition_spec_dict.keys()])
+        return DataFrame(odps.get_table(name=table)).filter_parts(spec)
+    else:
+        return DataFrame(odps.get_table(name=table))
+
+
+def get_odps_df_of_max_partition(project, table, rb_spec=None):
+    """
+    rb_spec: spec for right bound of partition names. type-dict
+    return odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(table)
+    df = DataFrame(odps.get_table(table))
+    if rb_spec is None:
+        return df.filter_parts(t.get_max_partition().partition_spec)
+    else:
+        spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
+        part_iter = t.iterate_partitions(spec=spec, reverse=True)
+        try:
+            partition = next(part_iter)
+            return df.filter_parts(partition)
+        except StopIteration:
+            return None
+
+def get_odps_df_of_recent_partitions(project, table, n=1, rb_spec=None):
+    """
+    rb_spec: spec for right bound of partition names. type-dict
+    return odps.DataFrame
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(table)
+    df = DataFrame(odps.get_table(table))
+    spec = None
+    if rb_spec:
+        spec = ','.join(['{}<{}'.format(k, rb_spec[k]) for k in rb_spec.keys()])
+    part_iter = t.iterate_partitions(spec=spec, reverse=True)
+    selected_parts = []
+    try:
+        for i in range(0, n):
+            partition = next(part_iter)
+            selected_parts.append(partition)
+            log_.info(f"table: {table}, selected part: {partition.name}")
+    except StopIteration:
+        log_.info(f"table: {table}, no more parts to iterate")
+    return df.filter_parts(selected_parts)
+
+
 def check_table_partition_exits(date, project, table, connect_timeout=3000, read_timeout=500000,
                                 pool_maxsize=1000, pool_connections=1000):
     """
@@ -99,6 +169,27 @@ def check_table_partition_exits(date, project, table, connect_timeout=3000, read
     return t.exist_partition(partition_spec=f'dt={date}')
 
 
+def check_table_partition_exits_v2(project, table, partition_spec_dict):
+    """
+    判断表中是否存在指定分区,并返回分区纪录数量
+    注:ODPS新版本移除了timeout等参数
+    :param project: 库名 type-string
+    :param table: 表名 type-string
+    :param partition_spec_dict: 分区spec type-dict
+    :return: if_exist, num_records
+    """
+    odps = get_odps_instance(project)
+    t = odps.get_table(name=table)
+    spec = ','.join(['{}={}'.format(k, partition_spec_dict[k]) for k in
+                     partition_spec_dict.keys()])
+    if t.exist_partition(partition_spec=spec):
+        with t.open_reader(partition=spec) as reader:
+            count = reader.count
+        return True, count
+    else:
+        return False, 0
+
+
 def write_to_pickle(data, filename, filepath=config_.DATA_DIR_PATH):
     """
     将数据写入pickle文件中