|
@@ -10,6 +10,7 @@ from odps import ODPS
|
|
|
from config import set_config
|
|
|
from db_helper import HologresHelper, MysqlHelper, RedisHelper
|
|
|
from log import Log
|
|
|
+from collections import defaultdict
|
|
|
|
|
|
config_, env = set_config()
|
|
|
log_ = Log()
|
|
@@ -29,7 +30,25 @@ def execute_sql_from_odps(project, sql, connect_timeout=3000, read_timeout=50000
|
|
|
)
|
|
|
records = odps.execute_sql(sql=sql)
|
|
|
return records
|
|
|
-
|
|
|
+def exe_sql(project, sql, connect_timeout=3000, read_timeout=500000,
|
|
|
+ pool_maxsize=1000, pool_connections=1000):
|
|
|
+ odps = ODPS(
|
|
|
+ access_id=config_.ODPS_CONFIG['ACCESSID'],
|
|
|
+ secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
|
|
|
+ project=project,
|
|
|
+ endpoint=config_.ODPS_CONFIG['ENDPOINT'],
|
|
|
+ connect_timeout=connect_timeout,
|
|
|
+ read_timeout=read_timeout,
|
|
|
+ pool_maxsize=pool_maxsize,
|
|
|
+ pool_connections=pool_connections
|
|
|
+ )
|
|
|
+ with odps.execute_sql(sql).open_reader() as reader:
|
|
|
+ d = defaultdict(list)
|
|
|
+ for record in reader:
|
|
|
+ for res in record:
|
|
|
+ d[res[0]].append(res[1])
|
|
|
+ data = pd.DataFrame.from_dict(d, orient='columns', dtype=str)
|
|
|
+ return data
|
|
|
|
|
|
def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
|
|
|
pool_maxsize=1000, pool_connections=1000):
|