123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- #coding utf-8
- from odps import ODPS
- from config import set_config
- import datetime
- import pandas as pd
- from collections import defaultdict
- import sys
- config_ = set_config()
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project="loghubods",
- endpoint=config_.ODPS_CONFIG['ENDPOINT'])
- def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
- pool_maxsize=1000, pool_connections=1000):
- """
- 从odps获取数据
- :param date: 日期 type-string '%Y%m%d'
- :param project: type-string
- :param table: 表名 type-string
- :param connect_timeout: 连接超时设置
- :param read_timeout: 读取超时设置
- :param pool_maxsize:
- :param pool_connections:
- :return: records
- """
- records = odps.read_table(name=table, partition='dt=%s' % date)
- return records
- def exe_sql(sql):
- data = []
- with odps.execute_sql(sql).open_reader() as reader:
- d = defaultdict(list) # collection默认一个dict
- for record in reader:
- for res in record:
- d[res[0]].append(res[1]) # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
- data = pd.DataFrame.from_dict(d, orient='index').T # 转换为数据框,并转置,不转置的话是横条数据
- return data
- if __name__=="__main__":
- project = 'loghubods'
- last7day=sys.argv[1]
- now_date=sys.argv[2]
- print("now date:", now_date)
- table = 'user_share_log'
- sql = "select machinecode, shareobjectid from loghubods.user_share_log where dt between '"+last7day+"' and '"+now_date+"' and topic='share';"
- print(sql)
- data = exe_sql(sql)
- data.to_csv("./data/user_item_share_"+now_date, sep='\t')
|