distributed.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # *****************************************************************************
  2. # Copyright (c) 2018, NVIDIA CORPORATION. All rights reserved.
  3. #
  4. # Redistribution and use in source and binary forms, with or without
  5. # modification, are permitted provided that the following conditions are met:
  6. # * Redistributions of source code must retain the above copyright
  7. # notice, this list of conditions and the following disclaimer.
  8. # * Redistributions in binary form must reproduce the above copyright
  9. # notice, this list of conditions and the following disclaimer in the
  10. # documentation and/or other materials provided with the distribution.
  11. # * Neither the name of the NVIDIA CORPORATION nor the
  12. # names of its contributors may be used to endorse or promote products
  13. # derived from this software without specific prior written permission.
  14. #
  15. # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
  16. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
  17. # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
  18. # DISCLAIMED. IN NO EVENT SHALL NVIDIA CORPORATION BE LIABLE FOR ANY
  19. # DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
  20. # (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
  21. # LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
  22. # ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  23. # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
  24. # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  25. #
  26. # *****************************************************************************
  27. import os
  28. import sys
  29. import time
  30. import subprocess
  31. import argparse
  32. import torch
  33. import torch.distributed as dist
  34. from torch.autograd import Variable
  35. def reduce_tensor(tensor, num_gpus):
  36. rt = tensor.clone()
  37. dist.all_reduce(rt, op=dist.reduce_op.SUM)
  38. rt /= num_gpus
  39. return rt
  40. def init_distributed(rank, num_gpus, group_name, dist_backend, dist_url):
  41. assert torch.cuda.is_available(), "Distributed mode requires CUDA."
  42. print("Initializing Distributed")
  43. # Set cuda device so everything is done on the right GPU.
  44. torch.cuda.set_device(rank % torch.cuda.device_count())
  45. # Initialize distributed communication
  46. dist.init_process_group(dist_backend, init_method=dist_url,
  47. world_size=num_gpus, rank=rank,
  48. group_name=group_name)
  49. def _flatten_dense_tensors(tensors):
  50. """Flatten dense tensors into a contiguous 1D buffer. Assume tensors are of
  51. same dense type.
  52. Since inputs are dense, the resulting tensor will be a concatenated 1D
  53. buffer. Element-wise operation on this buffer will be equivalent to
  54. operating individually.
  55. Arguments:
  56. tensors (Iterable[Tensor]): dense tensors to flatten.
  57. Returns:
  58. A contiguous 1D buffer containing input tensors.
  59. """
  60. if len(tensors) == 1:
  61. return tensors[0].contiguous().view(-1)
  62. flat = torch.cat([t.contiguous().view(-1) for t in tensors], dim=0)
  63. return flat
  64. def _unflatten_dense_tensors(flat, tensors):
  65. """View a flat buffer using the sizes of tensors. Assume that tensors are of
  66. same dense type, and that flat is given by _flatten_dense_tensors.
  67. Arguments:
  68. flat (Tensor): flattened dense tensors to unflatten.
  69. tensors (Iterable[Tensor]): dense tensors whose sizes will be used to
  70. unflatten flat.
  71. Returns:
  72. Unflattened dense tensors with sizes same as tensors and values from
  73. flat.
  74. """
  75. outputs = []
  76. offset = 0
  77. for tensor in tensors:
  78. numel = tensor.numel()
  79. outputs.append(flat.narrow(0, offset, numel).view_as(tensor))
  80. offset += numel
  81. return tuple(outputs)
  82. def apply_gradient_allreduce(module):
  83. """
  84. Modifies existing model to do gradient allreduce, but doesn't change class
  85. so you don't need "module"
  86. """
  87. if not hasattr(dist, '_backend'):
  88. module.warn_on_half = True
  89. else:
  90. module.warn_on_half = True if dist._backend == dist.dist_backend.GLOO else False
  91. for p in module.state_dict().values():
  92. if not torch.is_tensor(p):
  93. continue
  94. dist.broadcast(p, 0)
  95. def allreduce_params():
  96. if(module.needs_reduction):
  97. module.needs_reduction = False
  98. buckets = {}
  99. for param in module.parameters():
  100. if param.requires_grad and param.grad is not None:
  101. tp = type(param.data)
  102. if tp not in buckets:
  103. buckets[tp] = []
  104. buckets[tp].append(param)
  105. if module.warn_on_half:
  106. if torch.cuda.HalfTensor in buckets:
  107. print("WARNING: gloo dist backend for half parameters may be extremely slow." +
  108. " It is recommended to use the NCCL backend in this case. This currently requires" +
  109. "PyTorch built from top of tree master.")
  110. module.warn_on_half = False
  111. for tp in buckets:
  112. bucket = buckets[tp]
  113. grads = [param.grad.data for param in bucket]
  114. coalesced = _flatten_dense_tensors(grads)
  115. dist.all_reduce(coalesced)
  116. coalesced /= dist.get_world_size()
  117. for buf, synced in zip(grads, _unflatten_dense_tensors(coalesced, grads)):
  118. buf.copy_(synced)
  119. for param in list(module.parameters()):
  120. def allreduce_hook(*unused):
  121. Variable._execution_engine.queue_callback(allreduce_params)
  122. if param.requires_grad:
  123. param.register_hook(allreduce_hook)
  124. dir(param)
  125. def set_needs_reduction(self, input, output):
  126. self.needs_reduction = True
  127. module.register_forward_hook(set_needs_reduction)
  128. return module
  129. def main(config, stdout_dir, args_str):
  130. args_list = ['train.py']
  131. args_list += args_str.split(' ') if len(args_str) > 0 else []
  132. args_list.append('--config={}'.format(config))
  133. num_gpus = torch.cuda.device_count()
  134. args_list.append('--num_gpus={}'.format(num_gpus))
  135. args_list.append("--group_name=group_{}".format(time.strftime("%Y_%m_%d-%H%M%S")))
  136. if not os.path.isdir(stdout_dir):
  137. os.makedirs(stdout_dir)
  138. os.chmod(stdout_dir, 0o775)
  139. workers = []
  140. for i in range(num_gpus):
  141. args_list[-2] = '--rank={}'.format(i)
  142. stdout = None if i == 0 else open(
  143. os.path.join(stdout_dir, "GPU_{}.log".format(i)), "w")
  144. print(args_list)
  145. p = subprocess.Popen([str(sys.executable)]+args_list, stdout=stdout)
  146. workers.append(p)
  147. for p in workers:
  148. p.wait()
  149. if __name__ == '__main__':
  150. parser = argparse.ArgumentParser()
  151. parser.add_argument('-c', '--config', type=str, required=True,
  152. help='JSON file for configuration')
  153. parser.add_argument('-s', '--stdout_dir', type=str, default=".",
  154. help='directory to save stoud logs')
  155. parser.add_argument(
  156. '-a', '--args_str', type=str, default='',
  157. help='double quoted string with space separated key value pairs')
  158. args = parser.parse_args()
  159. main(args.config, args.stdout_dir, args.args_str)