export_hour_vid.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253
  1. #coding utf-8
  2. from odps import ODPS
  3. from config import set_config
  4. import datetime
  5. import pandas as pd
  6. from collections import defaultdict
  7. import sys
  8. config_ = set_config()
  9. odps = ODPS(
  10. access_id=config_.ODPS_CONFIG['ACCESSID'],
  11. secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
  12. project="loghubods",
  13. endpoint=config_.ODPS_CONFIG['ENDPOINT'])
  14. def get_data_from_odps(date, project, table, connect_timeout=3000, read_timeout=500000,
  15. pool_maxsize=1000, pool_connections=1000):
  16. """
  17. 从odps获取数据
  18. :param date: 日期 type-string '%Y%m%d'
  19. :param project: type-string
  20. :param table: 表名 type-string
  21. :param connect_timeout: 连接超时设置
  22. :param read_timeout: 读取超时设置
  23. :param pool_maxsize:
  24. :param pool_connections:
  25. :return: records
  26. """
  27. records = odps.read_table(name=table, partition='dt=%s' % date)
  28. return records
  29. def exe_sql(sql):
  30. data = []
  31. with odps.execute_sql(sql).open_reader() as reader:
  32. d = defaultdict(list) # collection默认一个dict
  33. for record in reader:
  34. for res in record:
  35. d[res[0]].append(res[1]) # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容;
  36. data = pd.DataFrame.from_dict(d, orient='index').T # 转换为数据框,并转置,不转置的话是横条数据
  37. return data
  38. if __name__=="__main__":
  39. project = 'loghubods'
  40. now_date=sys.argv[1]
  41. print("now date:", now_date)
  42. table = 'video_data_each_hour_dataset_24h_total_apptype'
  43. sql = "select apptype, videoid, lastonehour_view, lastonehour_view_total, lastonehour_play, lastonehour_play_total,lastonehour_share, lastonehour_share_total, lastonehour_return from loghubods.video_each_hour_update_province_apptype where dt="+now_date
  44. print(sql)
  45. data = exe_sql(sql)
  46. data.to_csv("./data/hour_video_data_"+now_date, sep='\t', index=None)