|
@@ -24,10 +24,7 @@ import time
|
|
|
import sys
|
|
|
import paddle.distributed.fleet as fleet
|
|
|
import paddle.distributed.fleet.base.role_maker as role_maker
|
|
|
-from hdfs import InsecureClient
|
|
|
import paddle
|
|
|
-import paddle.distributed as dist
|
|
|
-from paddle.io import Dataset, DataLoader
|
|
|
|
|
|
import warnings
|
|
|
import logging
|
|
@@ -36,7 +33,6 @@ import numpy as np
|
|
|
import struct
|
|
|
from utils.utils_single import auc
|
|
|
|
|
|
-
|
|
|
__dir__ = os.path.dirname(os.path.abspath(__file__))
|
|
|
sys.path.append(os.path.abspath(os.path.join(__dir__, '..')))
|
|
|
|
|
@@ -47,25 +43,6 @@ logging.basicConfig(
|
|
|
format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
-client = InsecureClient('http://192.168.141.208:50070', user='root')
|
|
|
-
|
|
|
-class HDFSDataset(Dataset):
|
|
|
- def __init__(self, hdfs_path, batch_size=32):
|
|
|
- self.hdfs_path = hdfs_path
|
|
|
- self.batch_size = batch_size
|
|
|
- self.file_names = [f for f in self.client.list(hdfs_path) if f.type == 'FILE']
|
|
|
-
|
|
|
-
|
|
|
- def __getitem__(self, idx):
|
|
|
- # 读取单个样本的逻辑
|
|
|
- file_name = self.file_names[idx]
|
|
|
- with client.read('/path/to/file.txt') as reader:
|
|
|
- data = reader.read()
|
|
|
- return data
|
|
|
-
|
|
|
- def __len__(self):
|
|
|
- return len(self.file_names)
|
|
|
-
|
|
|
|
|
|
def parse_args():
|
|
|
parser = argparse.ArgumentParser("PaddleRec train script")
|
|
@@ -123,9 +100,9 @@ class Main(object):
|
|
|
self.pure_bf16 = self.config['pure_bf16']
|
|
|
|
|
|
def run(self):
|
|
|
- logger.info("Begin 11111111")
|
|
|
+ logger.info("Begin 11111111")
|
|
|
self.init_fleet_with_gloo()
|
|
|
- logger.info("Begin 22222222")
|
|
|
+ logger.info("Begin 22222222")
|
|
|
self.network()
|
|
|
if fleet.is_server():
|
|
|
self.run_server()
|
|
@@ -138,12 +115,12 @@ class Main(object):
|
|
|
def init_fleet_with_gloo(use_gloo=True):
|
|
|
if use_gloo:
|
|
|
os.environ["PADDLE_WITH_GLOO"] = "0"
|
|
|
- logger.info("Begin 11111111222222")
|
|
|
+ logger.info("Begin 11111111222222")
|
|
|
role = role_maker.PaddleCloudRoleMaker(
|
|
|
is_collective=False,
|
|
|
init_gloo=False
|
|
|
)
|
|
|
- logger.info("Begin 11111111333333")
|
|
|
+ logger.info("Begin 11111111333333")
|
|
|
fleet.init(role)
|
|
|
#logger.info("worker_index: %s", fleet.worker_index())
|
|
|
#logger.info("is_first_worker: %s", fleet.is_first_worker())
|
|
@@ -203,10 +180,19 @@ class Main(object):
|
|
|
else:
|
|
|
opt_info['stat_var_names'] = []
|
|
|
|
|
|
+ if reader_type == "InmemoryDataset":
|
|
|
+ self.reader.load_into_memory()
|
|
|
|
|
|
for epoch in range(epochs):
|
|
|
epoch_start_time = time.time()
|
|
|
- self.dataset_train_loop(epoch)
|
|
|
+
|
|
|
+ if sync_mode == "heter":
|
|
|
+ self.heter_train_loop(epoch)
|
|
|
+ elif reader_type == "QueueDataset":
|
|
|
+ self.dataset_train_loop(epoch)
|
|
|
+ elif reader_type == "InmemoryDataset":
|
|
|
+ self.dataset_train_loop(epoch)
|
|
|
+
|
|
|
epoch_time = time.time() - epoch_start_time
|
|
|
epoch_speed = self.example_nums / epoch_time
|
|
|
if use_auc is True:
|
|
@@ -246,6 +232,9 @@ class Main(object):
|
|
|
[feed.name for feed in self.inference_feed_var],
|
|
|
[self.inference_target_var], self.exe)
|
|
|
|
|
|
+ if reader_type == "InmemoryDataset":
|
|
|
+ self.reader.release_memory()
|
|
|
+
|
|
|
def init_reader(self):
|
|
|
if fleet.is_server():
|
|
|
return
|
|
@@ -266,17 +255,27 @@ class Main(object):
|
|
|
|
|
|
def dataset_train_loop(self, epoch):
|
|
|
logger.info("Epoch: {}, Running Dataset Begin.".format(epoch))
|
|
|
+
|
|
|
fetch_info = [
|
|
|
"Epoch {} Var {}".format(epoch, var_name)
|
|
|
for var_name in self.metrics
|
|
|
]
|
|
|
+
|
|
|
fetch_vars = [var for _, var in self.metrics.items()]
|
|
|
+
|
|
|
print_step = int(config.get("runner.print_interval"))
|
|
|
- print(paddle.static.default_main_program()._fleet_opt)
|
|
|
- dataset = HDFSDataset(hdfs_path='/path/to/data')
|
|
|
- # 创建分布式采样器
|
|
|
- sampler = DistributedSampler(dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank())
|
|
|
- loader = DataLoader(dataset, batch_size=32, sampler=sampler)
|
|
|
+
|
|
|
+ debug = config.get("runner.dataset_debug", False)
|
|
|
+ if config.get("runner.need_dump"):
|
|
|
+ debug = True
|
|
|
+ dump_fields_path = "{}/{}".format(
|
|
|
+ config.get("runner.dump_fields_path"), epoch)
|
|
|
+ set_dump_config(paddle.static.default_main_program(), {
|
|
|
+ "dump_fields_path": dump_fields_path,
|
|
|
+ "dump_fields": config.get("runner.dump_fields")
|
|
|
+ })
|
|
|
+ logger.info(paddle.static.default_main_program()._fleet_opt)
|
|
|
+
|
|
|
self.exe.train_from_dataset(
|
|
|
program=paddle.static.default_main_program(),
|
|
|
dataset=self.reader,
|
|
@@ -284,6 +283,7 @@ class Main(object):
|
|
|
fetch_info=fetch_info,
|
|
|
print_period=print_step,
|
|
|
debug=debug)
|
|
|
+
|
|
|
|
|
|
def heter_train_loop(self, epoch):
|
|
|
logger.info(
|