#coding utf-8 from odps import ODPS from config import set_config import datetime import pandas as pd from collections import defaultdict import sys config_ = set_config() odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project="loghubods", endpoint=config_.ODPS_CONFIG['ENDPOINT']) def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000): """ 从odps获取数据 :param date: 日期 type-string '%Y%m%d' :param project: type-string :param table: 表名 type-string :param connect_timeout: 连接超时设置 :param read_timeout: 读取超时设置 :param pool_maxsize: :param pool_connections: :return: records """ records = odps.read_table(name=table, partition='dt=%s' % date) return records def exe_sql(sql): data = [] with odps.execute_sql(sql).open_reader() as reader: d = defaultdict(list) # collection默认一个dict for record in reader: for res in record: d[res[0]].append(res[1]) # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容; data = pd.DataFrame.from_dict(d, orient='index').T # 转换为数据框,并转置,不转置的话是横条数据 return data if __name__=="__main__": project = 'loghubods' now_date=sys.argv[1] print("now date:", now_date) table = 'video_data_each_hour_dataset_24h_total_apptype' sql = "select apptype, videoid, lastonehour_view, lastonehour_view_total, lastonehour_play, lastonehour_play_total,lastonehour_share, lastonehour_share_total, CASE WHEN lastonehour_return is NULL then 0 else lastonehour_return end as lastonehour_return from loghubods.video_each_hour_update_province_apptype where dt="+now_date print(sql) data = exe_sql(sql) data.to_csv("./data/hour_video_data_"+now_date, sep='\t', index=None)