上一篇DETR代码学习笔记(一) 记录完了DETR的backbone以及数据进入encoder之前的部分,这篇开始介绍DETR的transformer部分,大体上DETR的transformer和Attention is All You Need 中提出框架基本是一致的,只是数据上有些不同。

        首先是encoder部分,整个encoder部分,从代码上可以很直观的看出他的整体流程,其中的最初的q,k是由backbone中得到的Feature Map加上位置编码得到,这个位置编码是由backbone中与Feature Map一同输出的mask生成的,如果忘了这个是怎么来的可以回去看上一篇加深印象。同样接上一篇,还是假设输入为[2,768,768],得到的特征图为[2,256,24,24],reshape并转换维度之后得到[576,2,256],此时的初始的q,k相等(Feature Map加上位置编码),而v则是Feature Map,他并没有没有加上位置编码,q,k,v的维度保持不变还是[576,2,256]

        进入自注意力层后q,k,v通过线性层进行初始化,之后reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[576,2,256]->[16,576,32],通过如下的公式保持最后的输出维度不变,还是[576,2,256]。

         encoder的代码:

class TransformerEncoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)
        self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        return tensor if pos is None else tensor + pos

    def forward_post(self,
                     src,
                     src_mask: Optional[Tensor] = None,
                     src_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None):
        # q,k由最初输入的src加上pos的位置编码构成,且q=k,shape为[576,2,256]
        # 该位置编码是由backbone中输出的mask生成
        q = k = self.with_pos_embed(src, pos)
        # 自注意力层,src2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
        # 进入自注意力层后会对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[576,2,256]->[16,576,32]
        # 自注意力层的输出依旧是[576,2,256]
        src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
                              key_padding_mask=src_key_padding_mask)[0]
        src = src + self.dropout1(src2)
        src = self.norm1(src)
        src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
        src = src + self.dropout2(src2)
        src = self.norm2(src)
        return src
class TransformerEncoder(nn.Module):

    def __init__(self, encoder_layer, num_layers, norm=None):
        super().__init__()
        self.layers = _get_clones(encoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm

    def forward(self, src,
                mask: Optional[Tensor] = None,
                src_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None):
        output = src

        for index,layer in enumerate(self.layers):
            # output=src[576,2,256]
            # src_key_padding_mask[2,576]
            # pos[576,2,256]
            output = layer(output, src_mask=mask,
                           src_key_padding_mask=src_key_padding_mask, pos=pos)

        if self.norm is not None:
            output = self.norm(output)
        # output[576,2,256]
        return output

 从代码上能比较直观的得到encoder的框架图。

         原文中encoder有6层,也就是第一层encoder的输出作为下一层encoder的输入,直到第六层最后输出的memory,这个memory将作为decoder的输入。

        接下来就是decoder部分,decoder与encoder的输入上存在差异,最初自注意力层的输入是词嵌入向量,其中q和k是一个[100,2,256]全零张量加上词嵌入向量,v则是[100,2,256]全零张量,之后进入自注意力层,这里的流程和上面encoder中是一样的。

        自注意力层的输出将作为多头注意力层中的q,而k和v来自encoder的输出,其中k还要加上位置编码。此时k和v维度为[576,2,256],q的维度为[100,2,256],同样的在计算权重时,会将各个张量进行reshape,即q:[100,2,256]->[16,100,32],k,v:[576,2,256]->[16,576,32],再利用公式计算,最后多头注意力层的输出依旧是[100,2,256]。

        decoder代码:

class TransformerDecoderLayer(nn.Module):

    def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        # Implementation of Feedforward model
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.dropout = nn.Dropout(dropout)
        self.linear2 = nn.Linear(dim_feedforward, d_model)

        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)

        self.activation = _get_activation_fn(activation)
        self.normalize_before = normalize_before

    def with_pos_embed(self, tensor, pos: Optional[Tensor]):
        return tensor if pos is None else tensor + pos

    def forward_post(self, tgt, memory,
                     tgt_mask: Optional[Tensor] = None,
                     memory_mask: Optional[Tensor] = None,
                     tgt_key_padding_mask: Optional[Tensor] = None,
                     memory_key_padding_mask: Optional[Tensor] = None,
                     pos: Optional[Tensor] = None,
                     query_pos: Optional[Tensor] = None):
        # q,k由最初输入的tgt加上query_pos的词嵌入向量构成,且q=k,shape为[100,2,256]
        # 其中tgt是shape为[100,2,256]的全零输入
        q = k = self.with_pos_embed(tgt, query_pos)
        # 自注意力层,tgt2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
        # 进入自注意力层后会对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[100,2,256]->[16,100,32]
        # 自注意力层的输出依旧是[100,2,256]
        tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
                              key_padding_mask=tgt_key_padding_mask)[0]
        tgt = tgt + self.dropout1(tgt2)
        tgt = self.norm1(tgt)
        # 多头注意力层,计算方式相同tgt2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
        # 但是出入的shape发生变化,memory是encoder的输出,shape为[576,2,256],用它作为k,v,k还要加上位置编码
        # 多头自注意力层同样对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim]
        # 即q:[100,2,256]->[16,100,32]与k,v:[576,2,256]->[16,576,32]
        # 多头注意力层的输出依旧是[100,2,256]
        tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
                                   key=self.with_pos_embed(memory, pos),
                                   value=memory, attn_mask=memory_mask,
                                   key_padding_mask=memory_key_padding_mask)[0]
        tgt = tgt + self.dropout2(tgt2)
        tgt = self.norm2(tgt)
        tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
        tgt = tgt + self.dropout3(tgt2)
        tgt = self.norm3(tgt)
        return tgt
class TransformerDecoder(nn.Module):

    def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
        super().__init__()
        self.layers = _get_clones(decoder_layer, num_layers)
        self.num_layers = num_layers
        self.norm = norm
        self.return_intermediate = return_intermediate

    def forward(self, tgt, memory,
                tgt_mask: Optional[Tensor] = None,
                memory_mask: Optional[Tensor] = None,
                tgt_key_padding_mask: Optional[Tensor] = None,
                memory_key_padding_mask: Optional[Tensor] = None,
                pos: Optional[Tensor] = None,
                query_pos: Optional[Tensor] = None):
        output = tgt

        intermediate = []

        for index,layer in enumerate(self.layers):
            output = layer(output, memory, tgt_mask=tgt_mask,
                           memory_mask=memory_mask,
                           tgt_key_padding_mask=tgt_key_padding_mask,
                           memory_key_padding_mask=memory_key_padding_mask,
                           pos=pos, query_pos=query_pos)
            if self.return_intermediate:
                intermediate.append(self.norm(output))

        if self.norm is not None:
            output = self.norm(output)
            if self.return_intermediate:
                intermediate.pop()
                intermediate.append(output)

        if self.return_intermediate:
            # 由于return_intermediate设置为True,所以decoder每一层的输出都被保
            # 存在intermediate列表中,decoder每一层的输出为[100,2,256],最终
            # 返回的是将intermediate进行stack的结果,所以最后的输出为[6,100,2,256]
            return torch.stack(intermediate)

        return output.unsqueeze(0)

        可以注意到,由于return_intermediate设置为True,所以decoder每一层的输出都被保存在intermediate列表中,decoder每一层的输出为[100,2,256],最终返回的是将intermediate进行stack的结果,所以最后的输出为[6,100,2,256]。

        根据代码也比较好得到如下的框架图 

query=self.with_pos_embed(tgt, query_pos),
key=self.with_pos_embed(memory, pos),
value=memory

        传入多头注意力层的参数应该是引用中的部分

 

        与encoder相同,decoder也有6层,第一层decoder的输出作为下一层decoder的输入,最后一层的输出会边界框和类别分类。

        将以上的流程组合后得到整体的框架图。

transformer的代码:

class Transformer(nn.Module):

    def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
                 num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
                 activation="relu", normalize_before=False,
                 return_intermediate_dec=False):
        super().__init__()

        encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
        self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)

        decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
                                                dropout, activation, normalize_before)
        decoder_norm = nn.LayerNorm(d_model)
        self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
                                          return_intermediate=return_intermediate_dec)

        self._reset_parameters()

        self.d_model = d_model
        self.nhead = nhead

    def _reset_parameters(self):
        for p in self.parameters():
            if p.dim() > 1:
                nn.init.xavier_uniform_(p)

    def forward(self, src, mask, query_embed, pos_embed):
        # flatten NxCxHxW to HWxNxC
        bs, c, h, w = src.shape
        # 将降维后的src转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
        src = src.flatten(2).permute(2, 0, 1)
        # 将位置编码转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
        pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
        # 词嵌入向量由[num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim]
        # 即[100,256]->[100,2,256]
        query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
        # 将mask[2,24,24]->[2,576]
        mask = mask.flatten(1)

        tgt = torch.zeros_like(query_embed)
        # memory shape与src相同,为[576,2,256]
        memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
        # hs 为decoder的输出,shape为[6,100,2,256]
        hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
                          pos=pos_embed, query_pos=query_embed)
        # 最后返回的hs交换了第二和第三维,shape为[6,2,100,256]
        return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)

         最后附上论文原文中的框架图

 DETR代码:

class DETR(nn.Module):
    """ This is the DETR module that performs object detection """
    def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
        """ Initializes the model.
        Parameters:
            backbone: torch module of the backbone to be used. See backbone.py
            transformer: torch module of the transformer architecture. See transformer.py
            num_classes: number of object classes
            num_queries: number of object queries, ie detection slot. This is the maximal number of objects
                         DETR can detect in a single image. For COCO, we recommend 100 queries.
            aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used.
        """
        super().__init__()
        self.num_queries = num_queries
        self.transformer = transformer
        hidden_dim = transformer.d_model
        self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
        self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3)
        self.query_embed = nn.Embedding(num_queries, hidden_dim)
        self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1)
        self.backbone = backbone
        self.aux_loss = aux_loss

    def forward(self, samples: NestedTensor):
        """ The forward expects a NestedTensor, which consists of:
               - samples.tensor: batched images, of shape [batch_size x 3 x H x W]
               - samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels

            It returns a dict with the following elements:
               - "pred_logits": the classification logits (including no-object) for all queries.
                                Shape= [batch_size x num_queries x (num_classes + 1)]
               - "pred_boxes": The normalized boxes coordinates for all queries, represented as
                               (center_x, center_y, height, width). These values are normalized in [0, 1],
                               relative to the size of each individual image (disregarding possible padding).
                               See PostProcess for information on how to retrieve the unnormalized bounding box.
               - "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of
                                dictionnaries containing the two above keys for each decoder layer.
        """
        if isinstance(samples, (list, torch.Tensor)):
            samples = nested_tensor_from_tensor_list(samples)
        # features:{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos:[2,256,24,24]
        features, pos = self.backbone(samples)
        # src:[2,2048,24,24], mask:[2,24,24]
        src, mask = features[-1].decompose()
        assert mask is not None
        # 将数据送入transformer
        # self.input_proj() 将src降维:[2,2048,24,24] -> [2,256,24,24]
        # query_embed由nn.Embedding初始化,shape[100,256]
        # 最后输出的hs的shape为[6,2,100,256]
        hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]

        # outputs_class经过linear层输出大小为[6,2,100,92]
        outputs_class = self.class_embed(hs)
        # outputs_coord经过两层FFN输出为[6,2,100,4]
        outputs_coord = self.bbox_embed(hs).sigmoid()
        # 最终out中的pred_logits是outputs_class最后一个元素
        # out中的pred_boxes是outputs_coord最后一个元素
        out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
        if self.aux_loss:
            out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord)
        return out

    @torch.jit.unused
    def _set_aux_loss(self, outputs_class, outputs_coord):
        # this is a workaround to make torchscript happy, as torchscript
        # doesn't support dictionary with non-homogeneous values, such
        # as a dict having both a Tensor and a list.
        return [{'pred_logits': a, 'pred_boxes': b}
                for a, b in zip(outputs_class[:-1], outputs_coord[:-1])]

        decoder输出的shape为[6,100,2,256],转换第二和第三维,shape为[6,2,100,256],之后分别经过一个线性层和一个MLP层,线性层输出的是类别信息,shape为[6,2,100,92],MLP包括两层FFN,输出的是边界框信息shape为[6,2,100,4] 。

        到这里大致把transformer部分的处理过程理清,接下来是构建损失函数。可以参看:DETR代码学习笔记(三)

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐