import traceback import datetime import pandas as pd from odps import ODPS from collections import defaultdict from config import set_config from log import Log config_ = set_config() log_ = Log() now_date = datetime.datetime.today() log_.info(f"now: {datetime.datetime.strftime(now_date, '%Y%m%d')}") dt = datetime.datetime.strftime(now_date - datetime.timedelta(days=1), '%Y%m%d') project = 'loghubods' odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], ) sql = f""" SELECT pqtid -- 串联每次广告从请求到点击整个链路唯一标识 ,apptype ,machinecode -- mid ,networktype -- 网络状态 ,brand -- 手机品牌 ,platform -- 操作系统 ,GET_JSON_OBJECT(machineinfo,'$.weChatVersion') AS weChatVersion -- 微信版本号 ,GET_JSON_OBJECT(machineinfo,'$.sdkVersion') AS sdkVersion -- 微信小程序基础库版本号 ,softversion -- 票圈版本号 ,ownadpositionid -- 自营广告位id ,planid -- 广告计划id ,ownaddetailid -- 广告id ,clienttimestamp ,clientip ,ANALYSISIP(clientip,"region") AS province ,ANALYSISIP(clientip,"city") AS city ,headvideoid ,businesstype FROM loghubods.ad_action_log_own WHERE dt = {dt} AND ownadsystemtype = 'own' AND ( businesstype = 'adView' OR businesstype = 'adClick' ) ; """ with odps.execute_sql(sql=sql).open_reader() as reader: d = defaultdict(list) # collection默认一个dict for record in reader: for res in record: # print(res) d[res[0]].append(res[1]) # 解析record中的每一个元组,存储方式为(k,v),以k作为key,存储每一列的内容; data = pd.DataFrame.from_dict(d, orient='index').T # 转换为数据框,并转置,不转置的话是横条数据 print(data) data.to_csv(f"{dt}.csv", index=False)