# -*- coding: utf-8 -*- import traceback import datetime from odps import ODPS from threading import Timer from my_utils import RedisHelper, get_data_from_odps, send_msg_to_feishu from my_config import set_config from log import Log from queue import Queue from tqdm import tqdm import threading import sys config_, _ = set_config() log_ = Log() region_name2code: dict = config_.REGION_CODE # region_name2code["中国"] = "-1" redis_helper = RedisHelper() def worker(queue, executor): while True: row = queue.get() if row is None: # 结束信号 queue.task_done() break executor(row) queue.task_done() def records_process_for_list(records, executor, max_size=50, num_workers=10): # 创建一个线程安全的队列 queue = Queue(maxsize=max_size) # 可以调整 maxsize 以控制内存使用 # 设置线程池大小 num_workers = num_workers # 启动工作线程 threads = [] for _ in range(num_workers): t = threading.Thread(target=worker, args=(queue, executor)) t.start() threads.append(t) # 读取数据并放入队列 for row in tqdm(records): queue.put(row) # 发送结束信号 for _ in range(num_workers): queue.put(None) # 等待所有任务完成 queue.join() # 等待所有工作线程结束 for t in threads: t.join() def check_data(project, table, partition) -> int: """检查数据是否准备好,输出数据条数""" odps = ODPS( access_id=config_.ODPS_CONFIG['ACCESSID'], secret_access_key=config_.ODPS_CONFIG['ACCESSKEY'], project=project, endpoint=config_.ODPS_CONFIG['ENDPOINT'], connect_timeout=3000, read_timeout=500000, pool_maxsize=1000, pool_connections=1000 ) try: t = odps.get_table(name=table) log_.info(f"检查分区是否存在-【 dt={partition} 】") check_res = t.exist_partition(partition_spec=f'dt={partition}') if check_res: sql = f'select * from {project}.{table} where dt = {partition}' log_.info(sql) with odps.execute_sql(sql=sql).open_reader() as reader: data_count = reader.count else: log_.info("表{}分区{}不存在".format(table, partition)) data_count = 0 except Exception as e: log_.error("table:{},partition:{} no data. return data_count=0:{}".format(table, partition, e)) data_count = 0 return data_count def get_table_data(project, table, partition): """获取全部分区数据""" records = get_data_from_odps(date=partition, project=project, table=table) data = [] for record in records: tmp = {} for col_name in ["region", "videoid_array_sum", "videoid_array_avg"]: tmp[col_name] = record[col_name] data.append(tmp) return data def region_match(region_cn: str, region_name2code: dict): for r in region_name2code: if region_cn == r or region_cn in r or ( region_cn.endswith("省") and region_cn.split("省")[0] in r ): return region_name2code[r] return None def process_and_store(row): region_code = row["region_code"] videoid_array_sum = row["videoid_array_sum"] videoid_array_avg = row["videoid_array_avg"] key1 = "alg_recsys_recall_4h_region_trend_sum_" + region_code key2 = "alg_recsys_recall_4h_region_trend_avg_" + region_code expire_time = 24 * 3600 * 4 redis_helper.set_data_to_redis(key1, videoid_array_sum, expire_time) redis_helper.set_data_to_redis(key2, videoid_array_avg, expire_time) log_.info("trend-sum写入数据key={},value={}".format(key1, videoid_array_sum)) log_.info("trend-avg写入数据key={},value={}".format(key2, videoid_array_avg)) """ 数据表链接:https://dmc-cn-hangzhou.data.aliyun.com/dm/odps-table/odps.loghubods.alg_recsys_recall_strategy_trend/ """ def h_timer_check(): try: log_.info(f"开始执行: {datetime.datetime.strftime(datetime.datetime.today(), '%Y%m%d%H')}") try: date = sys.argv[1] hour = sys.argv[2] except Exception as e: now_date = datetime.datetime.today() date = datetime.datetime.strftime(now_date, '%Y%m%d%H') hour = datetime.datetime.now().hour log_.info("没有读取到参数,采用系统时间,报错info:{}".format(e)) partition = str(date) + str(hour) log_.info("打印partition={}".format(partition)) #1 判断数据表是否生产完成 project = "loghubods" table = "alg_recsys_recall_strategy_trend" table_data_cnt = check_data(project, table, partition) if table_data_cnt == 0: log_.info("上游数据{}未就绪{},等待...".format(table, partition)) Timer(60, h_timer_check).start() else: log_.info("上游数据就绪,count={},开始读取数据表".format(table_data_cnt)) #2 读取数据表+区域过滤 data = get_table_data(project, table, partition) data_new = [] for one in data: region_code = region_match(one["region"], region_name2code) if region_code: one["region_code"] = region_code data_new.append(one) else: log_.info("{}被过滤掉了".format(one["region"])) log_.info("数据处理完成,数据数量={},所有的地域:{}".format(len(data_new), ",".join([ one["region"] for one in data_new ]))) log_.info("开始处理和写入redis") #3 写入redis records_process_for_list(data_new, process_and_store, max_size=10, num_workers=5) except Exception as e: log_.error(f"4小时地域-趋势统计数据更新失败, exception: {e}, traceback: {traceback.format_exc()}") send_msg_to_feishu( webhook=config_.FEISHU_ROBOT['server_robot'].get('webhook'), key_word=config_.FEISHU_ROBOT['server_robot'].get('key_word'), msg_text=f"rov-offline{config_.ENV_TEXT} - 4小时地域-趋势统计数据更新失败\n" f"exception: {e}\n" f"traceback: {traceback.format_exc()}" ) if __name__ == '__main__': log_.info("文件alg_recsys_recall_4h_region_trend.py:「4小时地域-趋势统计」 开始执行") h_timer_check()