Pytorch并行计算(一): DataParallel
文章目录PyTorch并行计算1. 官网实例2. 使用方法3. 运行过程4. 源代码解读PyTorch并行计算nn.DataParallel1. 官网实例PyTorch官网的例子:DATA PARALLELISMPyTorch官网的手册:torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)2. 使用方法具
·
1. 官网实例
2. 运行过程
torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)
- module (Module) – module to be parallelized
- device_ids (list of python:int or torch.device) – CUDA devices (default: all devices)
- output_device (int or torch.device) – device location of output (default: device_ids[0])
首先把模型分别加载到各个GPU上,同时将数据平均分配到GPU上;然后每个GPU分别前向传播,将最后的结果汇总到第一块GPU上;最终计算Loss反向传播
这里有一个问题,当所有的batch进行汇总到第一个GPU时(默认的output_device=0
,即在第一块GPU上进行汇总),第一块GPU的内存会明显高于其他的,造成不均衡,同时计算Loss的时候可能会出现问题,而且如果分别计算Loss是不是会更快呢,这个问题之后写解决方案,这个问题在pytorch上有人提出,由于目前很少用DataParallel的方法了,大多用DistributedDataParallel,所以怎么优化DataParallel
就不展开了
3. 使用方法
具体使用也比较简单,如下所示,其余的不需要变化
# 当不限制GPUs的个数时,默认使用全部的GPU
model = Model()
model = nn.DataParallel(model)
model.cuda()
# 当限制GPUs的个数时
os.environ['CUDA_VISIBLE_DEVICES'] = '0, 1'
model = Model()
model = nn.DataParallel(model)
model.cuda()
4. 源代码解读
这里我将源代码copy了出来进行调试,加了注释,很方便理解
所在电脑有两块GPU,即device_ids = [0, 1]
class DataParallel(Module):
def __init__(self, module, device_ids=None, output_device=None, dim=0):
super(DataParallel, self).__init__()
'''1.
查看用GPU还是CPU,如果是CPU直接返回,如果是GPU继续
torch.cuda.is_available()
return "cuda"/None
'''
device_type = _get_available_device_type() # cuda()
if device_type is None:
self.module = module
self.device_ids = []
return
'''2.
如果没有指定device_ids,默认使用所有GPU
如果没有指定output_device,默认devices=0
torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
'''
if device_ids is None:
device_ids = _get_all_device_indices()
if output_device is None:
output_device = device_ids[0]
'''3.
获取变量
torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
torch._utils._get_device_index() 默认返回第一个GPU
device_ids = [0, 1]
output_device = 0 默认第一块GPU,即0
'''
self.dim = dim
self.module = module
self.device_ids = [_get_device_index(x, True) for x in device_ids]
self.output_device = _get_device_index(output_device, True)
self.src_device_obj = torch.device(device_type, self.device_ids[0])
'''4.
检查GPU之间的性能
_check_balance会检查各个GPU之间的性能差异,有两种情况会报错:
(1)剩余内存最小的GPU与最大的GPU比值小于0.75,不论是GPU本身的内存还是
被其他人占用后剩余的内存只要小于0.75就会报错
(2)线程最小的GPU与最大的GPU比值小于0.75
'''
_check_balance(self.device_ids)
'''5.
如果只有一块GPU,将module载入即可
'''
if len(self.device_ids) == 1:
self.module.to(self.src_device_obj)
def forward(self, *inputs, **kwargs):
with torch.autograd.profiler.record_function("DataParallel.forward"):
if not self.device_ids:
return self.module(*inputs, **kwargs)
'''6.
将module.parameters()以及module.buffers()进行迭代,看看是否在GPU上
没有的话继续,有的话返回错误信息cpu
'''
for t in chain(self.module.parameters(), self.module.buffers()):
if t.device != self.src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(self.src_device_obj, t.device))
'''7.
后面这部分在下面的data_parallel中看注释
'''
inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
# for forward function without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not kwargs:
inputs = ((),)
kwargs = ({},)
if len(self.device_ids) == 1:
return self.module(*inputs[0], **kwargs[0])
replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
outputs = self.parallel_apply(replicas, inputs, kwargs)
return self.gather(outputs, self.output_device)
def replicate(self, module, device_ids):
return replicate(module, device_ids, not torch.is_grad_enabled())
def scatter(self, inputs, kwargs, device_ids):
return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)
def parallel_apply(self, replicas, inputs, kwargs):
return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])
def gather(self, outputs, output_device):
return gather(outputs, output_device, dim=self.dim)
def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
r"""Evaluates module(input) in parallel across the GPUs given in device_ids.
This is the functional version of the DataParallel module.
Args:
module (Module): the module to evaluate in parallel
inputs (Tensor): inputs to the module
device_ids (list of int or torch.device): GPU ids on which to replicate module
output_device (list of int or torch.device): GPU location of the output Use -1 to indicate the CPU.
(default: device_ids[0])
Returns:
a Tensor containing the result of module(input) located on
output_device
"""
'''1.
检查是否有输入,即我们的batchsize_input
如果有,不改变输入;如果没有变为空()
'''
if not isinstance(inputs, tuple):
inputs = (inputs,) if inputs is not None else ()
'''2.
检查当前设备类型:一般为cuda()
'''
device_type = _get_available_device_type()
'''3.
检查当前是否规定了输入和输出的GPU
默认:输入设备为所有GPU;输出为GPU:0,即第一块GPU(理解为所有输入GPU中的第一块)
'''
if device_ids is None:
device_ids = _get_all_device_indices()
if output_device is None:
output_device = device_ids[0]
'''4.
确定输入输出GPU,同时确定源GPU,可能是最终计算用来汇总的GPU:0
'''
device_ids = [_get_device_index(x, True) for x in device_ids]
output_device = _get_device_index(output_device, True)
src_device_obj = torch.device(device_type, device_ids[0])
'''5.
检查输入以及模型参数是否都在同一个设备上,比如GPU或者CPU
'''
for t in chain(module.parameters(), module.buffers()):
if t.device != src_device_obj:
raise RuntimeError("module must have its parameters and buffers "
"on device {} (device_ids[0]) but found one of "
"them on device: {}".format(src_device_obj, t.device))
'''6.
scatter_kwargs将输入分成m份,m=batch_size/GPUs
返回tuple(inputs),有几个GPU,inputs有几份
'''
inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
# for module without any inputs, empty list and dict will be created
# so the module can be executed on one device which is the first one in device_ids
if not inputs and not module_kwargs:
inputs = ((),)
module_kwargs = ({},)
if len(device_ids) == 1:
return module(*inputs[0], **module_kwargs[0])
used_device_ids = device_ids[:len(inputs)]
'''7.
replicate将模型复制m份,m为GPUs数目,并加载到每个GPU上
outputs与inputs对应,为每个GPUs的输出结果
'''
replicas = replicate(module, used_device_ids)
outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)
'''8.
gather将outputs整合为一个,比如有2个GPU,outputs分别在每个GPU上各有一份
gather就将每个GPU上的outputs整合起来默认放到GPU:0上
'''
return gather(outputs, output_device, dim)
更多推荐
已为社区贡献14条内容
所有评论(0)