# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import os os.environ['FLAGS_enable_pir_api'] = '0' from utils.static_ps.reader_helper_hdfs import get_infer_reader from utils.static_ps.program_helper import get_model, get_strategy, set_dump_config from utils.static_ps.metric_helper import set_zero, get_global_auc from utils.static_ps.common_ps import YamlHelper, is_distributed_env import argparse import time import sys import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker import paddle from paddle.base.executor import FetchHandler import threading import warnings import logging import ast import numpy as np import struct from utils.utils_single import auc from utils.oss_client import HangZhouOSSClient import utils.compress as compress __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) root_loger = logging.getLogger() for handler in root_loger.handlers[:]: root_loger.removeHandler(handler) logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) import json class InferenceFetchHandler(FetchHandler): def __init__(self, var_dict, output_file, batch_size=1000): super().__init__(var_dict=var_dict, period_secs=1) self.output_file = output_file self.batch_size = batch_size self.current_batch = [] self.total_samples = 0 # 创建输出目录(如果不存在) output_dir = os.path.dirname(output_file) if not os.path.exists(output_dir): os.makedirs(output_dir) # 创建或清空输出文件 with open(self.output_file, 'w') as f: f.write('') def handler(self, fetch_vars): """处理每批次的推理结果""" result_dict = {} logger.info("InferenceFetchHandler fetch_vars {}".format(fetch_vars)) for var_name, var_value in fetch_vars.items(): # 转换数据类型 if isinstance(var_value, np.ndarray): result = var_value.tolist() else: result = var_value result_dict[var_name] = result self.current_batch.append(result_dict) self.total_samples += len(result_dict.get(list(result_dict.keys())[0], [])) # 当累积足够的结果时,写入文件 if len(self.current_batch) >= self.batch_size: self._write_batch() logger.info(f"Saved {self.total_samples} samples to {self.output_file}") def _write_batch(self): """将批次结果写入文件""" with open(self.output_file, 'a') as f: for result in self.current_batch: f.write(json.dumps(result) + '\n') self.current_batch = [] def finish(self): logger.info("InferenceFetchHandler finish") """确保所有剩余结果都被保存""" if self.current_batch: self._write_batch() logger.info(f"Final save: total {self.total_samples} samples saved to {self.output_file}") self.done_event.set() def parse_args(): parser = argparse.ArgumentParser("PaddleRec train script") parser.add_argument("-o", "--opt", nargs='*', type=str) parser.add_argument( '-m', '--config_yaml', type=str, required=True, help='config file path') parser.add_argument( '-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16") args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) # modify config from command if args.opt: for parameter in args.opt: parameter = parameter.strip() key, value = parameter.split("=") if type(config.get(key)) is int: value = int(value) if type(config.get(key)) is float: value = float(value) if type(config.get(key)) is bool: value = (True if value.lower() == "true" else False) config[key] = value config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('