DETR代码学习笔记(二)
上一篇DETR代码学习笔记(一)记录完了DETR的backbone以及数据进入encoder之前的部分,这篇开始介绍DETR的transformer部分,大体上DETR的transformer和Attention is All You Need中提出框架基本是一致的,只是数据上有些不同。首先是encoder部分,整个encoder部分,从代码上可以很直观的看出他的整体流程,其中的最初的q,k是由b
上一篇DETR代码学习笔记(一) 记录完了DETR的backbone以及数据进入encoder之前的部分,这篇开始介绍DETR的transformer部分,大体上DETR的transformer和Attention is All You Need 中提出框架基本是一致的,只是数据上有些不同。
首先是encoder部分,整个encoder部分,从代码上可以很直观的看出他的整体流程,其中的最初的q,k是由backbone中得到的Feature Map加上位置编码得到,这个位置编码是由backbone中与Feature Map一同输出的mask生成的,如果忘了这个是怎么来的可以回去看上一篇加深印象。同样接上一篇,还是假设输入为[2,768,768],得到的特征图为[2,256,24,24],reshape并转换维度之后得到[576,2,256],此时的初始的q,k相等(Feature Map加上位置编码),而v则是Feature Map,他并没有没有加上位置编码,q,k,v的维度保持不变还是[576,2,256]
进入自注意力层后q,k,v通过线性层进行初始化,之后reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[576,2,256]->[16,576,32],通过如下的公式保持最后的输出维度不变,还是[576,2,256]。
encoder的代码:
class TransformerEncoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]):
return tensor if pos is None else tensor + pos
def forward_post(self,
src,
src_mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
# q,k由最初输入的src加上pos的位置编码构成,且q=k,shape为[576,2,256]
# 该位置编码是由backbone中输出的mask生成
q = k = self.with_pos_embed(src, pos)
# 自注意力层,src2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
# 进入自注意力层后会对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[576,2,256]->[16,576,32]
# 自注意力层的输出依旧是[576,2,256]
src2 = self.self_attn(q, k, value=src, attn_mask=src_mask,
key_padding_mask=src_key_padding_mask)[0]
src = src + self.dropout1(src2)
src = self.norm1(src)
src2 = self.linear2(self.dropout(self.activation(self.linear1(src))))
src = src + self.dropout2(src2)
src = self.norm2(src)
return src
class TransformerEncoder(nn.Module):
def __init__(self, encoder_layer, num_layers, norm=None):
super().__init__()
self.layers = _get_clones(encoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
def forward(self, src,
mask: Optional[Tensor] = None,
src_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None):
output = src
for index,layer in enumerate(self.layers):
# output=src[576,2,256]
# src_key_padding_mask[2,576]
# pos[576,2,256]
output = layer(output, src_mask=mask,
src_key_padding_mask=src_key_padding_mask, pos=pos)
if self.norm is not None:
output = self.norm(output)
# output[576,2,256]
return output
从代码上能比较直观的得到encoder的框架图。
原文中encoder有6层,也就是第一层encoder的输出作为下一层encoder的输入,直到第六层最后输出的memory,这个memory将作为decoder的输入。
接下来就是decoder部分,decoder与encoder的输入上存在差异,最初自注意力层的输入是词嵌入向量,其中q和k是一个[100,2,256]全零张量加上词嵌入向量,v则是[100,2,256]全零张量,之后进入自注意力层,这里的流程和上面encoder中是一样的。
自注意力层的输出将作为多头注意力层中的q,而k和v来自encoder的输出,其中k还要加上位置编码。此时k和v维度为[576,2,256],q的维度为[100,2,256],同样的在计算权重时,会将各个张量进行reshape,即q:[100,2,256]->[16,100,32],k,v:[576,2,256]->[16,576,32],再利用公式计算,最后多头注意力层的输出依旧是[100,2,256]。
decoder代码:
class TransformerDecoderLayer(nn.Module):
def __init__(self, d_model, nhead, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False):
super().__init__()
self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
# Implementation of Feedforward model
self.linear1 = nn.Linear(d_model, dim_feedforward)
self.dropout = nn.Dropout(dropout)
self.linear2 = nn.Linear(dim_feedforward, d_model)
self.norm1 = nn.LayerNorm(d_model)
self.norm2 = nn.LayerNorm(d_model)
self.norm3 = nn.LayerNorm(d_model)
self.dropout1 = nn.Dropout(dropout)
self.dropout2 = nn.Dropout(dropout)
self.dropout3 = nn.Dropout(dropout)
self.activation = _get_activation_fn(activation)
self.normalize_before = normalize_before
def with_pos_embed(self, tensor, pos: Optional[Tensor]):
return tensor if pos is None else tensor + pos
def forward_post(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
# q,k由最初输入的tgt加上query_pos的词嵌入向量构成,且q=k,shape为[100,2,256]
# 其中tgt是shape为[100,2,256]的全零输入
q = k = self.with_pos_embed(tgt, query_pos)
# 自注意力层,tgt2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
# 进入自注意力层后会对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim],即[100,2,256]->[16,100,32]
# 自注意力层的输出依旧是[100,2,256]
tgt2 = self.self_attn(q, k, value=tgt, attn_mask=tgt_mask,
key_padding_mask=tgt_key_padding_mask)[0]
tgt = tgt + self.dropout1(tgt2)
tgt = self.norm1(tgt)
# 多头注意力层,计算方式相同tgt2 = softmax(q*kt/sqrt(dk))*v,其中dk=32
# 但是出入的shape发生变化,memory是encoder的输出,shape为[576,2,256],用它作为k,v,k还要加上位置编码
# 多头自注意力层同样对q,k,v进行reshape,由输入的[HWxNxC]->[NXnum_heads,HxW,head_dim]
# 即q:[100,2,256]->[16,100,32]与k,v:[576,2,256]->[16,576,32]
# 多头注意力层的输出依旧是[100,2,256]
tgt2 = self.multihead_attn(query=self.with_pos_embed(tgt, query_pos),
key=self.with_pos_embed(memory, pos),
value=memory, attn_mask=memory_mask,
key_padding_mask=memory_key_padding_mask)[0]
tgt = tgt + self.dropout2(tgt2)
tgt = self.norm2(tgt)
tgt2 = self.linear2(self.dropout(self.activation(self.linear1(tgt))))
tgt = tgt + self.dropout3(tgt2)
tgt = self.norm3(tgt)
return tgt
class TransformerDecoder(nn.Module):
def __init__(self, decoder_layer, num_layers, norm=None, return_intermediate=False):
super().__init__()
self.layers = _get_clones(decoder_layer, num_layers)
self.num_layers = num_layers
self.norm = norm
self.return_intermediate = return_intermediate
def forward(self, tgt, memory,
tgt_mask: Optional[Tensor] = None,
memory_mask: Optional[Tensor] = None,
tgt_key_padding_mask: Optional[Tensor] = None,
memory_key_padding_mask: Optional[Tensor] = None,
pos: Optional[Tensor] = None,
query_pos: Optional[Tensor] = None):
output = tgt
intermediate = []
for index,layer in enumerate(self.layers):
output = layer(output, memory, tgt_mask=tgt_mask,
memory_mask=memory_mask,
tgt_key_padding_mask=tgt_key_padding_mask,
memory_key_padding_mask=memory_key_padding_mask,
pos=pos, query_pos=query_pos)
if self.return_intermediate:
intermediate.append(self.norm(output))
if self.norm is not None:
output = self.norm(output)
if self.return_intermediate:
intermediate.pop()
intermediate.append(output)
if self.return_intermediate:
# 由于return_intermediate设置为True,所以decoder每一层的输出都被保
# 存在intermediate列表中,decoder每一层的输出为[100,2,256],最终
# 返回的是将intermediate进行stack的结果,所以最后的输出为[6,100,2,256]
return torch.stack(intermediate)
return output.unsqueeze(0)
可以注意到,由于return_intermediate设置为True,所以decoder每一层的输出都被保存在intermediate列表中,decoder每一层的输出为[100,2,256],最终返回的是将intermediate进行stack的结果,所以最后的输出为[6,100,2,256]。
根据代码也比较好得到如下的框架图
query=self.with_pos_embed(tgt, query_pos),
key=self.with_pos_embed(memory, pos),
value=memory
传入多头注意力层的参数应该是引用中的部分
与encoder相同,decoder也有6层,第一层decoder的输出作为下一层decoder的输入,最后一层的输出会边界框和类别分类。
将以上的流程组合后得到整体的框架图。
transformer的代码:
class Transformer(nn.Module):
def __init__(self, d_model=512, nhead=8, num_encoder_layers=6,
num_decoder_layers=6, dim_feedforward=2048, dropout=0.1,
activation="relu", normalize_before=False,
return_intermediate_dec=False):
super().__init__()
encoder_layer = TransformerEncoderLayer(d_model, nhead, dim_feedforward,
dropout, activation, normalize_before)
encoder_norm = nn.LayerNorm(d_model) if normalize_before else None
self.encoder = TransformerEncoder(encoder_layer, num_encoder_layers, encoder_norm)
decoder_layer = TransformerDecoderLayer(d_model, nhead, dim_feedforward,
dropout, activation, normalize_before)
decoder_norm = nn.LayerNorm(d_model)
self.decoder = TransformerDecoder(decoder_layer, num_decoder_layers, decoder_norm,
return_intermediate=return_intermediate_dec)
self._reset_parameters()
self.d_model = d_model
self.nhead = nhead
def _reset_parameters(self):
for p in self.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
def forward(self, src, mask, query_embed, pos_embed):
# flatten NxCxHxW to HWxNxC
bs, c, h, w = src.shape
# 将降维后的src转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
src = src.flatten(2).permute(2, 0, 1)
# 将位置编码转换维度[NxCxHxW]->[HWxNxC],即[2,256,24,24]->[576,2,256]
pos_embed = pos_embed.flatten(2).permute(2, 0, 1)
# 词嵌入向量由[num_embeddings, embedding_dim]->[num_embeddings, N, embedding_dim]
# 即[100,256]->[100,2,256]
query_embed = query_embed.unsqueeze(1).repeat(1, bs, 1)
# 将mask[2,24,24]->[2,576]
mask = mask.flatten(1)
tgt = torch.zeros_like(query_embed)
# memory shape与src相同,为[576,2,256]
memory = self.encoder(src, src_key_padding_mask=mask, pos=pos_embed)
# hs 为decoder的输出,shape为[6,100,2,256]
hs = self.decoder(tgt, memory, memory_key_padding_mask=mask,
pos=pos_embed, query_pos=query_embed)
# 最后返回的hs交换了第二和第三维,shape为[6,2,100,256]
return hs.transpose(1, 2), memory.permute(1, 2, 0).view(bs, c, h, w)
最后附上论文原文中的框架图
DETR代码:
class DETR(nn.Module):
""" This is the DETR module that performs object detection """
def __init__(self, backbone, transformer, num_classes, num_queries, aux_loss=False):
""" Initializes the model.
Parameters:
backbone: torch module of the backbone to be used. See backbone.py
transformer: torch module of the transformer architecture. See transformer.py
num_classes: number of object classes
num_queries: number of object queries, ie detection slot. This is the maximal number of objects
DETR can detect in a single image. For COCO, we recommend 100 queries.
aux_loss: True if auxiliary decoding losses (loss at each decoder layer) are to be used.
"""
super().__init__()
self.num_queries = num_queries
self.transformer = transformer
hidden_dim = transformer.d_model
self.class_embed = nn.Linear(hidden_dim, num_classes + 1)
self.bbox_embed = MLP(hidden_dim, hidden_dim, 4, 3)
self.query_embed = nn.Embedding(num_queries, hidden_dim)
self.input_proj = nn.Conv2d(backbone.num_channels, hidden_dim, kernel_size=1)
self.backbone = backbone
self.aux_loss = aux_loss
def forward(self, samples: NestedTensor):
""" The forward expects a NestedTensor, which consists of:
- samples.tensor: batched images, of shape [batch_size x 3 x H x W]
- samples.mask: a binary mask of shape [batch_size x H x W], containing 1 on padded pixels
It returns a dict with the following elements:
- "pred_logits": the classification logits (including no-object) for all queries.
Shape= [batch_size x num_queries x (num_classes + 1)]
- "pred_boxes": The normalized boxes coordinates for all queries, represented as
(center_x, center_y, height, width). These values are normalized in [0, 1],
relative to the size of each individual image (disregarding possible padding).
See PostProcess for information on how to retrieve the unnormalized bounding box.
- "aux_outputs": Optional, only returned when auxilary losses are activated. It is a list of
dictionnaries containing the two above keys for each decoder layer.
"""
if isinstance(samples, (list, torch.Tensor)):
samples = nested_tensor_from_tensor_list(samples)
# features:{mask,[2,24,24],tensor_list,[2,2048,24,24]},pos:[2,256,24,24]
features, pos = self.backbone(samples)
# src:[2,2048,24,24], mask:[2,24,24]
src, mask = features[-1].decompose()
assert mask is not None
# 将数据送入transformer
# self.input_proj() 将src降维:[2,2048,24,24] -> [2,256,24,24]
# query_embed由nn.Embedding初始化,shape[100,256]
# 最后输出的hs的shape为[6,2,100,256]
hs = self.transformer(self.input_proj(src), mask, self.query_embed.weight, pos[-1])[0]
# outputs_class经过linear层输出大小为[6,2,100,92]
outputs_class = self.class_embed(hs)
# outputs_coord经过两层FFN输出为[6,2,100,4]
outputs_coord = self.bbox_embed(hs).sigmoid()
# 最终out中的pred_logits是outputs_class最后一个元素
# out中的pred_boxes是outputs_coord最后一个元素
out = {'pred_logits': outputs_class[-1], 'pred_boxes': outputs_coord[-1]}
if self.aux_loss:
out['aux_outputs'] = self._set_aux_loss(outputs_class, outputs_coord)
return out
@torch.jit.unused
def _set_aux_loss(self, outputs_class, outputs_coord):
# this is a workaround to make torchscript happy, as torchscript
# doesn't support dictionary with non-homogeneous values, such
# as a dict having both a Tensor and a list.
return [{'pred_logits': a, 'pred_boxes': b}
for a, b in zip(outputs_class[:-1], outputs_coord[:-1])]
decoder输出的shape为[6,100,2,256],转换第二和第三维,shape为[6,2,100,256],之后分别经过一个线性层和一个MLP层,线性层输出的是类别信息,shape为[6,2,100,92],MLP包括两层FFN,输出的是边界框信息shape为[6,2,100,4] 。
到这里大致把transformer部分的处理过程理清,接下来是构建损失函数。可以参看:DETR代码学习笔记(三)
更多推荐
所有评论(0)