多通道并行卷积神经网络实现
一、网络结构多通道并行卷积神经网络主要由多个卷积池化层和全连接层组合而成,其网络结构图如下所示:二、基于pytorch的实现如下(双通道):1.网络模型class C_lenet(nn.Module):def __init__(self):super(C_lenet, self).__init__()self.conv1 = nn.Sequential(nn.Conv2d(3, 16, 5),nn
·
一、网络结构
多通道并行卷积神经网络主要由多个卷积池化层和全连接层组合而成,其网络结构图如下所示:
二、基于pytorch的实现如下(双通道):
1.网络模型
class C_lenet(nn.Module):
def __init__(self):
super(C_lenet, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(3, 16, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(16, 32, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.conv2 = nn.Sequential(
nn.Conv2d(3, 16, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(16, 32, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.fc1 = nn.Linear(2 * 32 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x1, x2):
x1 = self.conv1(x1)
x2 = self.conv2(x2)
x1 = torch.flatten(x1, 1)
x2 = torch.flatten(x2, 1)
x = torch.cat((x1, x2),1)
x = self.fc1(x) # output(120)
x = self.fc2(x) # output(84)
x = self.fc3(x)
return x
这里做了一个双通道的卷积神经网络,两个通道分别传入x1,x2两个张量进行运算,输入张量尺寸为3232,张量在进行过两次卷积池化再拉直后尺寸变为1(3255),两个张量进行拼接进入Linear层尺寸变为2*(3255)。
2.Dataset
class MyDataSet(Dataset):
"""自定义数据集"""
def __init__(self, images_path1: list, images_path2: list, images_class: list, transform=None):
self.images_path1 = images_path1
self.images_path2 = images_path2
self.images_class = images_class
self.transform = transform
def __len__(self):
return len(self.images_path1)
def __getitem__(self, item):
img1 = Image.open(self.images_path1[item])
img2 = Image.open(self.images_path2[item])
# RGB为彩色图片,L为灰度图片
#img1 = img1.convert('L')
#img2 = img2.convert('L')
label = self.images_class[item]
if self.transform is not None:
img1 = self.transform(img1)
img2 = self.transform(img2)
return img1, img2, label
这里一个标签对应了两张图片,输入是图片1、图片2的地址以及图片1、2对应的标签,返回两张图片和一个标签。
3.Dataloader
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_size=batch_size,
shuffle=True,
num_workers=nw,
)
test_loader = torch.utils.data.DataLoader(test_date_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=nw,
)
加载训练和测试集数据,这里就不多做解释了。
4.training and testing
cnn = C_lenet().to(device)
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)
loss_function = nn.CrossEntropyLoss().to(device)
for epoch in range(EPOCH):
cnn.train()
for step1, data in enumerate(train_loader):
images1, images2, labels = data
optimizer.zero_grad()
output = cnn(images1.to(device), images2.to(device))
loss = loss_function(output, labels.to(device))
loss.backward()
optimizer.step()
cnn.eval()
acc = 0.0
for step1, val_data in enumerate(test_loader):
val_images1, val_images2, val_labels = val_data
if hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
outputs = cnn(val_images1.to(device), val_images2.to(device))
if hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accurate = acc / val_num
print('[epoch %d] val_accuracy: %.3f' %
(epoch + 1, val_accurate))
这里训练和测试每次读入的都是一组数据(两张图一个标签)。
5.read and split data
这一部分本来应该是在代码最前端的,但是和双通道卷积神经网络没多大关系,只是读数据要用到而且比较方便,所以我放到了最后。这一段部分代码也是借鉴别人的。
def read_split_data(root: str, val_rate: float = 0.2):
assert os.path.exists(root), "dataset root: {} does not exist.".format(root)#检查root是否存在
item_class = [cla for cla in os.listdir(root) if os.path.isdir(os.path.join(root, cla))]#读取root中的所有分类(class)
item_class.sort()#把所有分类(calss)按顺序排个序
class_indices = dict((k, v) for v, k in enumerate(item_class))#把class和0,1,2,3,4,5,6进行键值互换
train1_images_path = []#下面是准备几个空的列表
train2_images_path = []
train_images_label = []
test1_images_path = []
test2_images_path = []
test_images_label = []
every_class_num = []
for cla in item_class:#从每个class中读取图片路径
cla_path = os.path.join(root, cla)
count = 0 # 这里放一个计数器
images = []#准备一个空的图片路径列表
for i in os.listdir(cla_path):
count += 1 # 只读单数图片,因为图片1和图片2都是一组数据,区分只有后缀名不同
if count % 2 == 0:
images.append(os.path.join(cla_path, i))#记录单数图片的路径
image_class = class_indices[cla]#记录该图片的class
every_class_num.append(len(images)*2)#看看一共每一个class记录了多少张图片
# 按比例随机采样验证样本
test_path = random.sample(images, k=int(len(images) * val_rate))#随机生成一些测试集数据
for img_path in images:
if img_path in test_path: # 如果该路径在采样的验证集样本中则存入验证集
test1_images_path.append(img_path)#将记录的单数图片路径放入验证集图片1中
str_list = img_path.split(sep='.')#将上述的单数图片路径以“.”为分隔符分开,例如将“D:\FM.jpeg”分成“D:\FM”和“jpeg”
suffix = '(1).'#生成后缀
img_path2 = str_list[0] + suffix + str_list[1]#将后缀加入,例如“D:\FM(1).jpeg”
test2_images_path.append(img_path2)#然后把这个改写过的地址放入验证集图片2中
test_images_label.append(image_class)#把标签也存入标签集
else: # 否则存入训练集
train1_images_path.append(img_path)
str_list = img_path.split(sep='.')
suffix = '(1).'
img_path2 = str_list[0] + suffix + str_list[1]
train2_images_path.append(img_path2)
train_images_label.append(image_class)
print("{} images were found in the dataset.".format(sum(every_class_num)))#打印一共用了多少图片
print("{} images for training1.".format(len(train1_images_path)))#打印训练集图片1中的图片数量
print("{} images for training2.".format(len(train2_images_path)))#打印训练集图片2中的图片数量,应该等于训练集图片1中的图片数量
print("{} images for test1.".format(len(test1_images_path)))#打印验证集图片1中的图片数量
print("{} images for test2.".format(len(test2_images_path)))#打印验证集图片2中的图片数量,应该等于验证集图片1中的图片数量
return train1_images_path, train2_images_path, train_images_label, test1_images_path, test2_images_path, test_images_label
6.完整代码
import torch
import torch.nn as nn
from torch.utils.data import Dataset
from PIL import Image
from torchvision import transforms
import os
import random
root = "D:/0_Data/_hackrf/fake" # 数据集所在根目录
EPOCH = 10
BATCH_SIZE = 10
LR = 0.001
class MyDataSet(Dataset):
"""自定义数据集"""
def __init__(self, images_path1: list, images_path2: list, images_class: list, transform=None):
self.images_path1 = images_path1
self.images_path2 = images_path2
self.images_class = images_class
self.transform = transform
def __len__(self):
return len(self.images_path1)
def __getitem__(self, item):
img1 = Image.open(self.images_path1[item])
img2 = Image.open(self.images_path2[item])
# RGB为彩色图片,L为灰度图片
#img1 = img1.convert('L')
#img2 = img2.convert('L')
label = self.images_class[item]
if self.transform is not None:
img1 = self.transform(img1)
img2 = self.transform(img2)
return img1, img2, label
class C_lenet(nn.Module):
def __init__(self):
super(C_lenet, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(3, 16, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(16, 32, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.conv2 = nn.Sequential(
nn.Conv2d(3, 16, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2),
nn.Conv2d(16, 32, 5),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.fc1 = nn.Linear(2 * 32 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x1, x2):
x1 = self.conv1(x1)
x2 = self.conv2(x2)
x1 = torch.flatten(x1, 1)
x2 = torch.flatten(x2, 1)
x = torch.cat((x1, x2),1)
x = self.fc1(x) # output(120)
x = self.fc2(x) # output(84)
x = self.fc3(x)
return x
def read_split_data(root: str, val_rate: float = 0.2):
assert os.path.exists(root), "dataset root: {} does not exist.".format(root)
item_class = [cla for cla in os.listdir(root) if os.path.isdir(os.path.join(root, cla))]
item_class.sort()
class_indices = dict((k, v) for v, k in enumerate(item_class))
train1_images_path = []
train2_images_path = []
train_images_label = []
test1_images_path = []
test2_images_path = []
test_images_label = []
every_class_num = []
for cla in item_class:
cla_path = os.path.join(root, cla)
count = 0 # 计数器
images = []
for i in os.listdir(cla_path):
count += 1 # 只读单数图片
if count % 2 == 0:
images.append(os.path.join(cla_path, i))
image_class = class_indices[cla]
every_class_num.append(len(images)*2)
# 按比例随机采样验证样本
test_path = random.sample(images, k=int(len(images) * val_rate))
for img_path in images:
if img_path in test_path: # 如果该路径在采样的验证集样本中则存入验证集
test1_images_path.append(img_path)
str_list = img_path.split(sep='.')
suffix = '(1).'
img_path2 = str_list[0] + suffix + str_list[1]
test2_images_path.append(img_path2)
test_images_label.append(image_class)
else: # 否则存入训练集
train1_images_path.append(img_path)
str_list = img_path.split(sep='.')
suffix = '(1).'
img_path2 = str_list[0] + suffix + str_list[1]
train2_images_path.append(img_path2)
train_images_label.append(image_class)
print("{} images were found in the dataset.".format(sum(every_class_num)))
print("{} images for training1.".format(len(train1_images_path)))
print("{} images for training2.".format(len(train2_images_path)))
print("{} images for test1.".format(len(test1_images_path)))
print("{} images for test2.".format(len(test2_images_path)))
return train1_images_path, train2_images_path, train_images_label, test1_images_path, test2_images_path, test_images_label
def main():
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("using {} device.".format(device))
train1_images_path, train2_images_path, train_images_label, test1_images_path, test2_images_path, test_images_label = read_split_data(root)
data_transform = {
"train": transforms.Compose([transforms.Resize([32, 32]),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
"val": transforms.Compose([transforms.Resize([32, 32]),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])}
train_data_set = MyDataSet(images_path1=train1_images_path,
images_path2=train2_images_path,
images_class=train_images_label,
transform=data_transform["train"])
test_date_dataset = MyDataSet(images_path1=test1_images_path,
images_path2=test2_images_path,
images_class=test_images_label,
transform=data_transform["val"])
val_num = len(test_date_dataset)
batch_size = BATCH_SIZE
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8]) # number of workers
print('Using {} dataloader workers'.format(nw))
train_loader = torch.utils.data.DataLoader(train_data_set,
batch_size=batch_size,
shuffle=True,
num_workers=nw,
)
test_loader = torch.utils.data.DataLoader(test_date_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=nw,
)
cnn = C_lenet().to(device)
optimizer = torch.optim.Adam(cnn.parameters(), lr=LR)
loss_function = nn.CrossEntropyLoss().to(device)
for epoch in range(EPOCH):
cnn.train()
for step1, data in enumerate(train_loader):
images1, images2, labels = data
optimizer.zero_grad()
output = cnn(images1.to(device), images2.to(device))
loss = loss_function(output, labels.to(device))
loss.backward()
optimizer.step()
cnn.eval()
acc = 0.0
for step1, val_data in enumerate(test_loader):
val_images1, val_images2, val_labels = val_data
if hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
outputs = cnn(val_images1.to(device), val_images2.to(device))
if hasattr(torch.cuda, 'empty_cache'):
torch.cuda.empty_cache()
predict_y = torch.max(outputs, dim=1)[1]
acc += torch.eq(predict_y, val_labels.to(device)).sum().item()
val_accurate = acc / val_num
print('[epoch %d] val_accuracy: %.3f' %
(epoch + 1, val_accurate))
if __name__ == '__main__':
main()
三、部分数据展示
这里图片5-1和5-1(1)是一组数据,一组数据的命名规则没有什么要求,反正我是这么给图片命名的,每组数据的图片2都加了(1)的后缀。
更多推荐
已为社区贡献1条内容
所有评论(0)