系列文章

手把手教你:人脸识别考勤系统



项目简介

本文主要介绍如何使用python搭建:一个基于:粒子群优化算法(PSO:Particle swarm optimization) 优化CNN网络,并实现文本的分类。

博主也参考过网上其他博主介绍:粒子群优化算法(PSO)的文章,但大多是理论大于方法。并且很少有用到优化CNN或其他网络的代码。很多同学肯定对原理不需要过多了解,只需要搭建出一个分类或预测系统即可。

本文只会告诉你如何快速搭建一个基于粒子群优化算法优化CNN的系统并运行,原理的东西可以参考其他博主

也正是因为我发现网上大多的帖子只是针对原理进行介绍,功能实现的相对很少。

如果您有以上想法,那就找对地方了!


提示:以下是本篇文章正文内容

一、粒子群算法(PSO)简介

粒子群优化算法(PSO)是一种最优化算法,源于对鸟群捕食的行为研究。粒子群优化算法的基本思想:是通过群体中个体之间的协作和信息共享来寻找最优解

PSO的优势:在于简单容易实现并且没有许多参数的调节。

目前已被广泛应用于函数优化、神经网络训练、模糊系统控制以及其他遗传算法的应用领域

二、项目展示

模型预测:
模型评估
模型训练:
模型训练
特征工程:
特征工程

项目演示可以参考我在B站发的同名视频
手把手教你:基于粒子群优化算法(PSO)优化卷积神经网络(CNN)的文本分类

二、环境需求

因为本项目基于TensorFlow因此需要以下环境:

  • tensorflow==2.0
  • pandas
  • scikit-learn
  • numpy
  • Word2Vec

环境安装实例

环境都可以通过pip进行安装。如果只是想要功能跑起来,这边建议tensorflow安装cpu版的。

如果没使用过pycharm通过pip安装包的同学可以参考如下:

环境安装方法
点开“终端”,然后通过pip进行安装tensorflow,其他环境包也可以通过上面的方法安装。

三、重要功能模块介绍

1.数据预处理模块(data_create.py)

首先读取书本信息内容并构建训练数据

部分功能代码:

def read_book_path(path):
    """
    读取书本内容
    :param path:
    :return:
    """
    book_msg_list = []
    for root, dirs, files in os.walk(path):
        # 获取作者
        for author_dir in dirs:
            # 子文件夹目录
            file_path = os.path.join(root, author_dir)
            for root_2, dirs_2, files_2 in os.walk(file_path):
                for file in files_2:
                    # 获取书本地址
                    book_path = os.path.join(file_path, file)
                    # 获取书本名称
                    book_name = str(file.split('.txt')[0])
                    with open(book_path, "r", encoding='utf-8') as f:  # 打开文件
                        try:
                            txt_data = f.read()  # 读取文件
                        except UnicodeDecodeError:
                            with open(book_path, "r", encoding='gbk') as f:
                                try:
                                    txt_data = f.read()  # 读取文件
                                except UnicodeDecodeError as e:
                                    print(e)
                                    print("错误文件:" + str(book_path))
                                # gbk编码
                                else:
                                    book_msg = [author_dir, book_name, txt_data]
                                    book_msg_list.append(book_msg)
                        # utf-8编码
                        else:
                            book_msg = [author_dir, book_name, txt_data]
                            book_msg_list.append(book_msg)
    return book_msg_list


def word_2_vec(data_in, words_num=2000, vec_num=128):
    """
    构建书本-文字特征向量
    :param vec_num: 单词的文本向量维度大小
    :param words_num:段落大小
    :param data_in:输入数据,格式为list:作者,书名,内容
    :return:
    """
    label_list = []
    name_list = []
    word_split_list = []
    # 获取所有文本,文本分割
    for i in tqdm(range(len(data_in))):
        label = data_in[i][0]
        book_name = data_in[i][1]
        content = data_in[i][2]
        # 文本处理
        new_content = words_regularized(content)
        words_list = new_content.split()
        # 文本截取
        words_list = words_list[:len(words_list) - len(words_list) % words_num]
        # 将文本按定义的维度进行截取
        for e in range(1, int(len(words_list) / words_num) + 1):
            word_split_list.append(words_list[(e - 1) * words_num:e * words_num])
            label_list.append(label)
            name_list.append(book_name)
    time.sleep(1)
    print("完成文本预处理,共计获取:", len(word_split_list), "个段落。")
    time_s = datetime.now()
    print("****开始预训练词向量,此处预计耗时20秒(根据文本多少变化)")
    # 词向量训练
    # model = Word2Vec(sentences=word_split_list, vector_size=vec_num, min_count=1)
    time_e = datetime.now()
    time_cql = int((time_e - time_s).total_seconds())
    model = Word2Vec.load('models/word2vec.model')
    # model.save('models/word2vec.model')
    print("完成文本词向量特征预训练,耗时:", time_cql, "秒。", "预训练词向量保存地址:models/word2vec.model")
    # 完成特征构建
    words_vec_list = []
    for i in tqdm(range(len(word_split_list))):
        content = word_split_list[i]
        vec_list = []
        for word in content:
            vec = model.wv[word]
            vec_list.append(vec)
        words_vec_list.append(vec_list)
    time.sleep(1)
    print("完成特征构建。")
    return words_vec_list, label_list, name_list

def words_regularized(text):
    """
    过滤特殊符号以及还原常见缩写单词
    :param text:原始文本
    :return:处理后文本
    """

2.定义粒子群优化算法(n_PSO.py)

部分功能代码:

import numpy as np
import random
import n_model as md
import tensorflow as tf


def fit_fun(param, X):  # 适应函数,此处为模型训练
    # 设置GPU按需使用
    gpus = tf.config.experimental.list_physical_devices('GPU')
    tf.config.experimental.set_virtual_device_configuration(gpus[0], [
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=5120)])
    # 获取模型参数
    label_count = param['label_count']
    a_shape = param['a_shape']
    b_shape = param['b_shape']
    train_data = param['data']
    train_label = param['label']
    model = md.cnn_model(label_count, data_shape=(a_shape, b_shape))
    # 传入待优化参数learning_rate
    res_model = model.model_create(X[-1])
    history = res_model.fit(train_data, train_label, epochs=5, batch_size=8, validation_split=0.2)
    # 获取最小的loss值,优化为loss最小时learning_rate
    val_loss = 1 - max(history.history['val_acc'])
    return val_loss


class Particle:
    # 初始化
    def __init__(self, model_param, x_max, x_min, max_vel, dim):
        self.__pos = [random.uniform(x_min, x_max) for i in range(dim)]  # 粒子的位置
        self.__vel = [random.uniform(-max_vel, max_vel) for i in range(dim)]  # 粒子的速度
        self.__bestPos = [0.0 for i in range(dim)]  # 粒子最好的位置
        self.__fitnessValue = fit_fun(model_param, self.__pos)  # 适应度函数值

    def set_pos(self, i, value):
        self.__pos[i] = value

    def get_pos(self):
        return self.__pos

    def set_best_pos(self, i, value):
        self.__bestPos[i] = value

    def get_best_pos(self):
        return self.__bestPos

    def set_vel(self, i, value):
        self.__vel[i] = value

    def get_vel(self):
        return self.__vel

    def set_fitness_value(self, value):
        self.__fitnessValue = value

    def get_fitness_value(self):
        return self.__fitnessValue


class PSO:
    def __init__(self, model_param, pso_param, best_fitness_value=float('Inf'), C1=2,
                 C2=2, W=1):
        self.C1 = C1
        self.C2 = C2
        self.W = W
        self.dim = pso_param['dim']  # 粒子的维度
        self.size = pso_param['size']  # 粒子个数
        self.iter_num = pso_param['iter_num']  # 迭代次数
        self.x_max = pso_param['x_max']  # 粒子最大位置
        self.x_min = pso_param['x_min']  # 粒子最小位置
        self.max_vel = pso_param['max_vel']  # 粒子最大速度
        self.best_position = [0.0 for i in range(pso_param['dim'])]  # 种群最优位置
        self.model_param = model_param  # 模型参数
        self.best_fitness_value = best_fitness_value
        self.fitness_val_list = []  # 每次迭代最优适应值

        # 对种群进行初始化
        self.Particle_list = [Particle(self.model_param, self.x_max, self.x_min, self.max_vel, self.dim) for i in
                              range(self.size)]

3.定义被优化CNN模型

部分功能代码:

from tensorflow import keras
from tensorflow.keras import layers, models


class cnn_model:
    def __init__(self, label_num, data_shape=(2000, 128)):  # 默认输入张量为(2000,128)
        # res块数量
        self.num_blocks = 2
        # 过滤器数量
        self.filters = 64
        # 步长
        self.conv_size = 3
        # 分类类别数
        self.label_num = label_num
        # 数据输入的shape
        self.data_shape = data_shape
        self.loss = 'sparse_categorical_crossentropy'
        self.metrics = ['acc']

    def res_net_block(self, input_data):
        # CNN层
        x = layers.Conv1D(self.filters, self.conv_size, activation='relu', padding='same')(input_data)
        x = layers.BatchNormalization()(x)
        x = layers.Conv1D(self.filters, self.conv_size, activation=None, padding='same')(x)
        # 第二层没有激活函数


    def model_create(self, learning_rate):

4.使用PSO优化CNN初始化学习率(ModelTrain.py)

import os
from collections import Counter
import numpy as np
from n_PSO import PSO
import n_model as md
import tensorflow as tf
import json

if __name__ == '__main__':
    # 加载数据
    data, label, label_count = load_data()
    # 生成训练集测试集
    train_data, train_label, val_data, val_label = create_train_data(data, label, 0.9)
    # 输入数据的shape值,用于模型构造
    a_shape = data.shape[1]
    b_shape = data.shape[2]
    # 模型参数
    model_param = {
        "a_shape": a_shape,
        "b_shape": b_shape,
        "label_count": label_count,
        "data": train_data,
        "label": train_label
    }
    """
    用粒子群优化算法对训练模型初始化参数进行优化
    """
    # 设置粒子群优化参数
    # 被优化参数
    dim = 1
    # 粒子数
    size = 5
    # 优化算法迭代次数
    iter_num = 20
    # 被优化参数最大值
    x_max = 0.01
    # 被优化参数最小值
    x_min = 0.00001
    # 每个粒子每次移动的最大速度
    max_vel = 0.0005
    # 粒子群算法参数
    pso_param = {
        "dim": dim,
        "size": size,
        "iter_num": iter_num,
        "x_max": x_max,
        "x_min": x_min,
        "max_vel": max_vel
    }
    # 实例化粒子群算法
    pso = PSO(model_param, pso_param)
    # 寻最优解
    best_err, best_learn_rate = pso.update()
    print("粒子群优化后最优准确率为:", 1 - best_err)
    print("粒子群优化后最优初始化learning_rate:", best_learn_rate)
    # 保存pso优化后参数
    os.path.join("app210323")
    best_param = {
        "acc": 1 - best_err,
        "learn_rate": best_learn_rate,
    }
    b = json.dumps(best_param)
    file = open('models/pso_out_param.json', 'w')
    file.write(b)
    file.close()
    """
    使用最优化初始参数进行训练
    """
    # 模型训练
    model = md.cnn_model(label_count, data_shape=(a_shape, b_shape))
    # 使用最优学习率进行训练
    cnn_model = model.model_create(best_learn_rate)

5.模型分类预测

import tensorflow as tf
import numpy as np
from collections import Counter
from tensorflow import keras
# 评估模型
from sklearn.metrics import roc_curve, roc_auc_score, classification_report, accuracy_score


if __name__ == '__main__':
    # -------------------设置显存按需分配-----------------
    # 设置显存
    gpu_memory = 5120
    gpus = tf.config.experimental.list_physical_devices('GPU')
    # 对需要进行限制的GPU进行设置
    tf.config.experimental.set_virtual_device_configuration(gpus[0], [
        tf.config.experimental.VirtualDeviceConfiguration(memory_limit=gpu_memory)])
    # 查看GPU是否可用
    print("检查GPU是否可用:", tf.test.is_gpu_available())
    # 加载测试数据
    val_data, val_label = load_data()
    # 加载模型
    # 模型地址名称,如重新训练了模型,请在此处修改地址和名称
    model_path = 'models/cnn_model_epoch-30_valAcc-0.94545454.h5'
    model = keras.models.load_model(model_path)
    print("模型:", model_path, "。加载成功!")
    print("*****完成预处理,进行模型评估*****")
    y_pred = model.predict(val_data)
    y_pred = [np.argmax(x) for x in y_pred]
    print('------------------测试集上得分:------------------------')
    print('*' * 5)
    print('测试集准确率得分:', accuracy_score(val_label, y_pred))
    print('*' * 5)
    print('准确率、召回率、f1-值测试报告如下:\n', classification_report(val_label, y_pred))

四、完整代码地址

由于项目代码量较大,感兴趣的同学可以下载完整代码,使用过程中如遇到任何问题可以私信我,我都会一一解答。

完整代码下载:
手把手教你:基于粒子群优化算法(PSO)优化卷积神经网络(CNN)的文本分类

Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐