# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import print_function import os os.environ['FLAGS_enable_pir_api'] = '0' from utils.static_ps.reader_helper_hdfs import get_infer_reader from utils.static_ps.program_helper import get_model, get_strategy, set_dump_config from utils.static_ps.metric_helper import set_zero, get_global_auc from utils.static_ps.common_ps import YamlHelper, is_distributed_env import argparse import time import sys import paddle.distributed.fleet as fleet import paddle.distributed.fleet.base.role_maker as role_maker import paddle from paddle.base.executor import FetchHandler import queue import threading import warnings import logging import ast import numpy as np import struct from utils.utils_single import auc from utils.oss_client import HangZhouOSSClient import utils.compress as compress __dir__ = os.path.dirname(os.path.abspath(__file__)) sys.path.append(os.path.abspath(os.path.join(__dir__, '..'))) root_loger = logging.getLogger() for handler in root_loger.handlers[:]: root_loger.removeHandler(handler) logging.basicConfig( format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) import json class InferenceFetchHandler(FetchHandler): def __init__(self, var_dict, output_file, batch_size=1000): super().__init__(var_dict=var_dict, period_secs=1) self.output_file = output_file self.batch_size = batch_size self.result_queue = queue.Queue() self.writer_thread = threading.Thread(target=self._writer) self.writer_thread.daemon = True # 设置为守护线程 self.writer_thread.start() # 创建输出目录(如果不存在) output_dir = os.path.dirname(output_file) if not os.path.exists(output_dir): os.makedirs(output_dir) # 创建或清空输出文件 with open(self.output_file, 'w') as f: f.write('') def handler(self, fetch_vars): """处理每批次的推理结果""" result_dict = {} for key in fetch_vars: # 转换数据类型 if type(fetch_vars[key]) is np.ndarray: result = fetch_vars[key][0] else: result = fetch_vars[key] result_dict[key] = result self.result_queue.put(result_dict) # 将结果放入队列 def _writer(self): batch = [] while True: try: result_dict = self.result_queue.get(timeout=1) # 非阻塞获取 logger.info("write vector {}".format(result_dict)) batch.append(result_dict) if len(batch) >= self.batch_size: logger.info("write vector") with open(self.output_file, 'a') as f: for result in batch: f.write(json.dumps(result) + '\n') batch = [] except queue.Empty: pass def _write_batch(self, batch): with open(self.output_file, 'a') as f: for result in batch: f.write(json.dumps(result) + '\n') def flush(self): """确保所有结果都被写入文件""" # 等待队列中剩余的结果被处理 self.result_queue.join() # 写入最后一批结果 self._write_batch(self.result_queue.queue) def parse_args(): parser = argparse.ArgumentParser("PaddleRec train script") parser.add_argument("-o", "--opt", nargs='*', type=str) parser.add_argument( '-m', '--config_yaml', type=str, required=True, help='config file path') parser.add_argument( '-bf16', '--pure_bf16', type=ast.literal_eval, default=False, help="whether use bf16") args = parser.parse_args() args.abs_dir = os.path.dirname(os.path.abspath(args.config_yaml)) yaml_helper = YamlHelper() config = yaml_helper.load_yaml(args.config_yaml) # modify config from command if args.opt: for parameter in args.opt: parameter = parameter.strip() key, value = parameter.split("=") if type(config.get(key)) is int: value = int(value) if type(config.get(key)) is float: value = float(value) if type(config.get(key)) is bool: value = (True if value.lower() == "true" else False) config[key] = value config["yaml_path"] = args.config_yaml config["config_abs_dir"] = args.abs_dir config["pure_bf16"] = args.pure_bf16 yaml_helper.print_yaml(config) return config def bf16_to_fp32(val): return np.float32(struct.unpack('