# -*- coding: utf-8 -*- import traceback import datetime from odps import ODPS from threading import Timer from utils import RedisHelper, get_data_from_odps, send_msg_to_feishu from config import set_config from log import Log from records_process import records_process config_, _ = set_config() log_ = Log() region_name2code: dict = config_.REGION_CODE redis_helper = RedisHelper() def check_data(project, table, partition) -> int: """检查数据是否准备好,输出数据条数""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: t = odps.get_table(name=table) check_res = t.exist_partition(partition_spec=f'dt={partition}') if check_res: sql = f'select * from {project}.{table} where dt = {partition}' log_.info(sql) with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: data_count = 0 except Exception as e: log_.error("table:{},partition:{} no data. return data_count=0".format(table, partition)) data_count = 0 return data_count def get_table_data(project, table, partition): """获取全部分区数据""" records = get_data_from_odps(date=partition, project=project, table=table) print(records) print(len(records)) print(type(records)) data = [] for record in records: tmp = {} for col_name in ["region", "videoid_array_sum", "videoid_array_avg"]: tmp[col_name] = record[col_name] data.append(tmp) return data def process_and_store(row): region = row["region"] if region not in region_name2code: return videoid_array_sum = row["videoid_array_sum"] videoid_array_avg = row["videoid_array_avg"] region_code = region_name2code[region] key1 = "alg_recsys_recall_4h_region_trend_sum_" + region_code key2 = "alg_recsys_recall_4h_region_trend_avg_" + region_code expire_time = 24 * 3600 redis_helper.set_data_to_redis(key1, videoid_array_sum, expire_time) redis_helper.set_data_to_redis(key2, videoid_array_avg, expire_time) log_.info("trend-sum写入数据key={},value={}".format(key1, videoid_array_sum)) log_.info("trend-avg写入数据key={},value={}".format(key2, videoid_array_avg)) """ 数据表链接:https://dmc-cn-hangzhou.data.aliyun.com/dm/odps-table/odps.loghubods.alg_recsys_recall_strategy_trend/ """ def h_timer_check(): try: log_.info(f"开始执行: {datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d%H')}") #1 判断数据表是否生产完成 project = "loghubods" table = "alg_recsys_recall_strategy_trend" partition = "dt=2023122019" table_data_cnt = check_data(project, table, partition) if table_data_cnt == 0: log_.info("上游数据{}未就绪{},等待...".format(table, partition)) Timer(60, h_timer_check).start() else: log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt)) #2 读取数据表 data = get_table_data(project, table, partition) log_.info("数据处理完成,开始处理和写入redis。") #3 写入redis records_process(data, process_and_store, max_size=10, num_workers=5) except Exception as e: log_.error(f"4小时地域-趋势统计数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 4小时地域-趋势统计数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': log_.info("文件alg_recsys_recall_4h_region_trend.py:「4小时地域-趋势统计」 开始执行") h_timer_check()