12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import traceback
- import datetime
- import json
- import time
- from odps import ODPS
- from db_helper import RedisHelper
- from utils import send_msg_to_feishu
- from config import set_config
- from log import Log
- config_ = set_config()
- log_ = Log()
- def main(project, table, dt):
- try:
- # 获取广告ad_idea_id对应的点击次数、转化次数 和 cvr
- odps = ODPS(
- access_id=config_.ODPS_CONFIG['ACCESSID'],
- secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'],
- project=project,
- endpoint=config_.ODPS_CONFIG['ENDPOINT'],
- )
- t = odps.get_table(name=table)
- if not t.exist_partition(partition_spec=f'dt={dt}'):
- log_.info(f"广告Thompson-cvr数据更新失败, 分区dt={dt}数据未准备好")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 广告Thompson-cvr数据更新失败\n"
- f"分区dt={dt}数据未准备好"
- )
- return
- records = odps.read_table(name=table, partition='dt=%s' % dt)
- ad_creative_data = []
- for item in records:
- creative_id = item['creative_id']
- click_pv = item['click_pv']
- conversion_pv = item['conversion_pv']
- cvr = item['cvr']
- if creative_id is None or creative_id == '':
- continue
- try:
- cvr = float(cvr)
- except:
- continue
- if cvr == 0:
- continue
- ad_creative_data.append(
- {'creative_id': creative_id, 'click_pv': click_pv, 'conversion_pv': conversion_pv, 'cvr': cvr}
- )
- log_.info(f"ad_creative_data count: {len(ad_creative_data)}")
- log_.info(f"ad_creative_data: {ad_creative_data}")
- # 更新redis
- redis_helper = RedisHelper()
- i = 0
- for ad_creative in ad_creative_data:
- try:
- key_name = f"{config_.CREATIVE_CVR_KEY_PREFIX}{ad_creative['creative_id']}"
- value = json.dumps(ad_creative['cvr'])
- redis_helper.set_data_to_redis(key_name=key_name, value=value, expire_time=24 * 3600)
- i += 1
- except:
- continue
- log_.info(f"to redis count: {i}")
- except Exception as e:
- log_.error(f"广告Thompson-cvr数据更新失败, exception: {e}, traceback: {traceback.format_exc()}")
- send_msg_to_feishu(
- webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'),
- key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'),
- msg_text=f"rov-offline{config_.ENV_TEXT} - 广告Thompson-cvr数据更新失败\n"
- f"exception: {e}\n"
- f"traceback: {traceback.format_exc()}"
- )
- if __name__ == '__main__':
- project = 'loghubods'
- table = 'ad_own_creative_cvr'
- now_date = datetime.datetime.today()
- log_.info(f"now: {datetime.datetime.strftime(now_date, '%Y%m%d %H:%M:%S')}")
- dt = datetime.datetime.strftime(now_date - datetime.timedelta(hours=1), '%Y%m%d%H')
- start_time = time.time()
- main(project=project, table=table, dt=dt)
- log_.info(f"excuteTime: {(time.time() - start_time) * 1000}ms")
|