1. 官网实例

PyTorch官网示例

PyTorch官网函数手册

2. 运行过程

torch.nn.DataParallel(module, device_ids=None, output_device=None, dim=0)

  • module (Module) – module to be parallelized
  • device_ids (list of python:int or torch.device) – CUDA devices (default: all devices)
  • output_device (int or torch.device) – device location of output (default: device_ids[0])

首先把模型分别加载到各个GPU上,同时将数据平均分配到GPU上;然后每个GPU分别前向传播,将最后的结果汇总到第一块GPU上;最终计算Loss反向传播

这里有一个问题,当所有的batch进行汇总到第一个GPU时(默认的output_device=0,即在第一块GPU上进行汇总),第一块GPU的内存会明显高于其他的,造成不均衡,同时计算Loss的时候可能会出现问题,而且如果分别计算Loss是不是会更快呢,这个问题之后写解决方案,这个问题在pytorch上有人提出,由于目前很少用DataParallel的方法了,大多用DistributedDataParallel,所以怎么优化DataParallel就不展开了

3. 使用方法

具体使用也比较简单,如下所示,其余的不需要变化

# 当不限制GPUs的个数时,默认使用全部的GPU
model = Model()
model = nn.DataParallel(model)
model.cuda()

# 当限制GPUs的个数时
os.environ['CUDA_VISIBLE_DEVICES'] = '0, 1'
model = Model()
model = nn.DataParallel(model)
model.cuda()

4. 源代码解读

这里我将源代码copy了出来进行调试,加了注释,很方便理解

所在电脑有两块GPU,即device_ids = [0, 1]

class DataParallel(Module):
    def __init__(self, module, device_ids=None, output_device=None, dim=0):
        super(DataParallel, self).__init__()

       	'''1.
       	查看用GPU还是CPU,如果是CPU直接返回,如果是GPU继续
       	torch.cuda.is_available()
       	return "cuda"/None
       	'''
        device_type = _get_available_device_type()  # cuda()
        if device_type is None:
            self.module = module
            self.device_ids = []
            return

        
        '''2.
        如果没有指定device_ids,默认使用所有GPU
        如果没有指定output_device,默认devices=0
        torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
       	'''
        if device_ids is None:
            device_ids = _get_all_device_indices()

        if output_device is None:
            output_device = device_ids[0]

            
        '''3.
        获取变量
        torch._utils._get_all_device_indices() 返回GPUid,例如[0, 1]
        torch._utils._get_device_index() 默认返回第一个GPU
        device_ids = [0, 1]
        output_device = 0 默认第一块GPU,即0
       	'''
        self.dim = dim
        self.module = module
        self.device_ids = [_get_device_index(x, True) for x in device_ids]
        self.output_device = _get_device_index(output_device, True)
        self.src_device_obj = torch.device(device_type, self.device_ids[0])

        
        '''4.
        检查GPU之间的性能
		_check_balance会检查各个GPU之间的性能差异,有两种情况会报错:
		(1)剩余内存最小的GPU与最大的GPU比值小于0.75,不论是GPU本身的内存还是
		   被其他人占用后剩余的内存只要小于0.75就会报错
		(2)线程最小的GPU与最大的GPU比值小于0.75
		'''
        _check_balance(self.device_ids)
        
        
        '''5.
       	如果只有一块GPU,将module载入即可
        '''
        if len(self.device_ids) == 1:
            self.module.to(self.src_device_obj)

    def forward(self, *inputs, **kwargs):
        with torch.autograd.profiler.record_function("DataParallel.forward"):
            if not self.device_ids:
                return self.module(*inputs, **kwargs)

            
            '''6.
       	    将module.parameters()以及module.buffers()进行迭代,看看是否在GPU上
       	    没有的话继续,有的话返回错误信息cpu
        	'''
            for t in chain(self.module.parameters(), self.module.buffers()):
                if t.device != self.src_device_obj:
                    raise RuntimeError("module must have its parameters and buffers "
                                       "on device {} (device_ids[0]) but found one of "
                                       "them on device: {}".format(self.src_device_obj, t.device))

                    
            '''7.
            后面这部分在下面的data_parallel中看注释
            '''
            inputs, kwargs = self.scatter(inputs, kwargs, self.device_ids)
            # for forward function without any inputs, empty list and dict will be created
            # so the module can be executed on one device which is the first one in device_ids
            if not inputs and not kwargs:
                inputs = ((),)
                kwargs = ({},)

            if len(self.device_ids) == 1:
                return self.module(*inputs[0], **kwargs[0])
            replicas = self.replicate(self.module, self.device_ids[:len(inputs)])
            outputs = self.parallel_apply(replicas, inputs, kwargs)
            return self.gather(outputs, self.output_device)

    def replicate(self, module, device_ids):
        return replicate(module, device_ids, not torch.is_grad_enabled())

    def scatter(self, inputs, kwargs, device_ids):
        return scatter_kwargs(inputs, kwargs, device_ids, dim=self.dim)

    def parallel_apply(self, replicas, inputs, kwargs):
        return parallel_apply(replicas, inputs, kwargs, self.device_ids[:len(replicas)])

    def gather(self, outputs, output_device):
        return gather(outputs, output_device, dim=self.dim)



def data_parallel(module, inputs, device_ids=None, output_device=None, dim=0, module_kwargs=None):
    r"""Evaluates module(input) in parallel across the GPUs given in device_ids.

    This is the functional version of the DataParallel module.

    Args:
        module (Module): the module to evaluate in parallel
        inputs (Tensor): inputs to the module
        device_ids (list of int or torch.device): GPU ids on which to replicate module
        output_device (list of int or torch.device): GPU location of the output  Use -1 to indicate the CPU.
            (default: device_ids[0])
    Returns:
        a Tensor containing the result of module(input) located on
        output_device
    """
    
    
    '''1.
	检查是否有输入,即我们的batchsize_input
	如果有,不改变输入;如果没有变为空()
    '''
    if not isinstance(inputs, tuple):
        inputs = (inputs,) if inputs is not None else ()

        
    '''2.
    检查当前设备类型:一般为cuda()
    '''
    device_type = _get_available_device_type()

    
    '''3.
    检查当前是否规定了输入和输出的GPU
    默认:输入设备为所有GPU;输出为GPU:0,即第一块GPU(理解为所有输入GPU中的第一块)
    '''
    if device_ids is None:
        device_ids = _get_all_device_indices()

    if output_device is None:
        output_device = device_ids[0]

        
    '''4.
    确定输入输出GPU,同时确定源GPU,可能是最终计算用来汇总的GPU:0
    '''
    device_ids = [_get_device_index(x, True) for x in device_ids]
    output_device = _get_device_index(output_device, True)
    src_device_obj = torch.device(device_type, device_ids[0])

    
    '''5.
    检查输入以及模型参数是否都在同一个设备上,比如GPU或者CPU
    '''
    for t in chain(module.parameters(), module.buffers()):
        if t.device != src_device_obj:
            raise RuntimeError("module must have its parameters and buffers "
                               "on device {} (device_ids[0]) but found one of "
                               "them on device: {}".format(src_device_obj, t.device))

            
    '''6.
    scatter_kwargs将输入分成m份,m=batch_size/GPUs
    返回tuple(inputs),有几个GPU,inputs有几份
    '''
    inputs, module_kwargs = scatter_kwargs(inputs, module_kwargs, device_ids, dim)
    # for module without any inputs, empty list and dict will be created
    # so the module can be executed on one device which is the first one in device_ids
    if not inputs and not module_kwargs:
        inputs = ((),)
        module_kwargs = ({},)

    if len(device_ids) == 1:
        return module(*inputs[0], **module_kwargs[0])
    used_device_ids = device_ids[:len(inputs)]
    
    
    '''7.
    replicate将模型复制m份,m为GPUs数目,并加载到每个GPU上
    outputs与inputs对应,为每个GPUs的输出结果
    '''
    replicas = replicate(module, used_device_ids)
    outputs = parallel_apply(replicas, inputs, module_kwargs, used_device_ids)
    
    
    '''8.
    gather将outputs整合为一个,比如有2个GPU,outputs分别在每个GPU上各有一份
    gather就将每个GPU上的outputs整合起来默认放到GPU:0上
    '''
    return gather(outputs, output_device, dim)
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐