本文公开一个基于强化学习算法DQN的五子棋游戏自动下棋算法源码,并对思路进行讲解。

完整代码和预训练模型(Saver文件夹)地址:

python_强化学习算法DQN_玩五子棋游戏

一个基于CNN构成的DQN算法的8*8的五子棋游戏

在这里插入图片描述

1、Q-Learning介绍

Q-Learning的思想并不是很复杂,很多文章都有详细的介绍,这里只是简单举个例子,不做详细讲解。

如何用简单例子讲解 Q - learning 的具体过程?

2、DQN介绍

DQN也叫deepQ-Learning,在Q-Learning前面加一个Deep。Q-Learning有一个缺点,如果状态特别多,比如五子棋的棋盘,每个位置都有(空白、黑子、白子)三个状态,那么假如一个10*10的棋盘 就有3^100个状态,那么这个Q表是没办法做出来的。那么我们就没办法构建这个Q表来获取状态价值状态转移价值了。

DQN就是搭建一个人工神经网络,输入是当前状态,输出是状态转移价值。或者输入是当前状态,输出是当前状态的Q值。通过多次迭代训练,使得神经网络输出逼近真实的Q值(逼近而不是等于,因为毕竟是神经网络,参数数量,存储占用量远小于Q表,如果能做到完全等于的话,还要存储干啥)

那么神经网络的训练的损失就是 预测Q值和(max(下一步的真实Q值)乘系数 +奖励值)的差的平方。 预测Q值就是神经网络一次前向传播输出的Q值,真实Q值就是神经网络曾经预测的Q值。为什么真实Q值是神经网络曾经预测过的Q值呢?因为神经网络每次训练都会对输出值产生影响,如果这个真实Q值一直变化的话,那么神经网络是没办法收敛的。所以需要搭建另一个参数一模一样的神经网络来生成真实Q值。这个生成真实Q值的网络不需要训练,只需要迭代一定次数以后,复制一份预测网络的参数即可。就好比一个笨老师教一个学生,学生学会了以后当了老师,教新的学生,然后青出于蓝而胜于蓝,这个学生越来越强。

本文中代码用的方法是,保存历史预测的Q值,等一个棋局结束后,再用这些Q值来训练每一步的预测Q值,这样做到一个神经网络就可以了。相当于一个聪明的学生,不停的复习,归纳,总结,然后逐渐变强。

3、对抗算法介绍

根据上面介绍的Q-Learning算法,解决的是一个单智能体的问题,这个智能体如何能够用最小的代价获得最大的回报。但是对弈的学习过程不一样,博弈中存在两个智能体,当前状态和当前动作对应的下一个状态会有很多,因为对手怎么下子我们不知道。那么当前状态和当前动作对应的什么状态是固定的呢?对手的状态。那么我能不能预测一下对手下一步能达到的最大的Q值呢?对手的Q值和我的Q值又有什么关系呢?对于零和博弈,对手的优势就是我得劣势,对手的劣势就是我的优势,那么我就可以用对手的Q值乘一个负的系数来训练当前的Q值。这样就解决了。

训练的过程就是,先自己和自己下一局棋,并记录每一步和每一步预测的最大Q值。等棋局结束后,再把整个棋局用神经网络"回顾"一遍,用记录的步子,Q值训练。

4、训练过程中注意的地方

下子的时候按照常理,咱们都是选择Q值最大的动作来下子,这样下子是没问题的,但是我们是来训练网络的,如果每次选择最大的步子下子的话容易陷入一个僵局。获胜方一直用同样或相似的套路打败败方,神经网络很快损失下降很快,但是还是不会正确的落子,或者说它只对某一种棋局局面的风格掌握得很好,对不按照套路出牌的人就没办法应对。那么我们就要加一个随即事件,一部分步子是按照最大值去走的,一部分步子是随机走的,但是最大Q值是每次都要计算出来保存用于回顾训练用的。

不同的棋子最好放在不同的channel里面,我发现如果用0背景1白棋2黑棋这样标注放到一个棋盘里面神经网络无法收敛

5、完整代码

运行代码入口如下,已经写了很详细的注释了,我很辛苦的
如果需要预训练模型需要从文章开始的链接下载Saver文件夹

import numpy as np
import random
import os
import tensorflow.compat.v1 as tf

tf.disable_v2_behavior()

from DQN_point_game import Map
'''
此文件主要用于实现强化学习算法DQN玩五子棋
'''

class DQN():
    def __init__(self):
        self.n_input = Map.mapsize * Map.mapsize
        self.n_output = 1
        self.current_q_step = 0
        self.avg_loss = 0
        # placeholder是在神经网络构建graph的时候在模型中的占位,此时并没有把要输入的数据传入模型,它只会分配必要的内存。
        # 建立完session后,在会话中,运行模型的时候通过feed_dict()函数向占位符喂入数据。
        self.x = tf.placeholder("float", [None, Map.mapsize, Map.mapsize], name='x')
        self.y = tf.placeholder("float", [None, self.n_output], name='y')
        self.create_Q_network()
        self.create_training_method()
        self.saver = tf.train.Saver()
        self.sess = tf.Session()
        # 它能让你在运行图的时候,插入一些计算
        self.sess = tf.InteractiveSession()
        self.sess.run(tf.global_variables_initializer())

    def create_Q_network(self):
        # tf.random_normal()函数用于从“服从指定正态分布的序列”中随机取出指定个数的值。  stddev: 正态分布的标准差
        wc1 = tf.Variable(tf.random_normal([3, 3, 1, 64], stddev=0.1), dtype=tf.float32, name='wc1')
        wc2 = tf.Variable(tf.random_normal([3, 3, 64, 128], stddev=0.1), dtype=tf.float32, name='wc2')
        wc3 = tf.Variable(tf.random_normal([3, 3, 128, 256], stddev=0.1), dtype=tf.float32, name='wc3')
        wd1 = tf.Variable(tf.random_normal([256, 128], stddev=0.1), dtype=tf.float32, name='wd1')
        wd2 = tf.Variable(tf.random_normal([128, self.n_output], stddev=0.1), dtype=tf.float32, name='wd2')
        # tf.Variable 得到的是张量,而张量并不是具体的值,而是计算过程
        bc1 = tf.Variable(tf.random_normal([64], stddev=0.1), dtype=tf.float32, name='bc1')
        bc2 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bc2')
        bc3 = tf.Variable(tf.random_normal([256], stddev=0.1), dtype=tf.float32, name='bc3')
        bd1 = tf.Variable(tf.random_normal([128], stddev=0.1), dtype=tf.float32, name='bd1')
        bd2 = tf.Variable(tf.random_normal([self.n_output], stddev=0.1), dtype=tf.float32, name='bd2')

        weights = {
            'wc1': wc1,
            'wc2': wc2,
            'wc3': wc3,
            'wd1': wd1,
            'wd2': wd2
        }

        biases = {
            'bc1': bc1,
            'bc2': bc2,
            'bc3': bc3,
            'bd1': bd1,
            'bd2': bd2
        }

        self.Q_value = self.conv_basic(self.x, weights, biases)
        self.Q_Weihgts = [weights, biases]

    def conv_basic(self, _input, _w, _b):
        # input
        _out = tf.reshape(_input, shape=[-1, Map.mapsize, Map.mapsize, 1])
        # conv layer 1  conv2d 用于做二维卷积  strides, # 步长参数  padding, # 卷积方式
        _out = tf.nn.conv2d(_out, _w['wc1'], strides=[1, 1, 1, 1], padding='SAME')
        # bias_add 一个叫bias的向量加到一个叫value的矩阵上,是向量与矩阵的每一行进行相加
        _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc1']))
        # ksize 池化窗口的大小,取一个四维向量  padding: 填充的方法,SAME或VALID,SAME表示添加全0填充,VALID表示不添加
        _out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
        # conv layer2
        _out = tf.nn.conv2d(_out, _w['wc2'], strides=[1, 1, 1, 1], padding='SAME')
        _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc2']))
        _out = tf.nn.max_pool(_out, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME')
        # conv layer3
        _out = tf.nn.conv2d(_out, _w['wc3'], strides=[1, 1, 1, 1], padding='SAME')
        _out = tf.nn.relu(tf.nn.bias_add(_out, _b['bc3']))
        # 计算张量tensor沿着指定的数轴(tensor的某一维度)上的的平均值,主要用作降维或者计算tensor(图像)的平均值。
        _out = tf.reduce_mean(_out, [1, 2])
        # fully connected layer1 matmul 两个矩阵中对应元素各自相乘
        _out = tf.nn.relu(tf.add(tf.matmul(_out, _w['wd1']), _b['bd1']))
        # fully connected layer2
        _out = tf.add(tf.matmul(_out, _w['wd2']), _b['bd2'])
        return _out

    def create_training_method(self):
        # squared_difference 计算张量 x、y 对应元素差平方
        self.cost = tf.reduce_mean(tf.squared_difference(self.Q_value, self.y))
        self.optm = tf.train.AdamOptimizer(learning_rate=0.001, name='Adam').minimize(self.cost)

    def restore(self):
        if os.path.exists('Saver/cnnsaver.ckpt-0.index'):
            self.saver.restore(self.sess, os.path.abspath('Saver/cnnsaver.ckpt-0'))

    def computerPlay(self, IsTurnWhite):
        if IsTurnWhite:
            print('白旗走')
            # 如果该白旗走的话 用黑的棋盘,1代表黑,-1代表白
            board = np.array(Map.blackBoard)
        else:
            print('黑旗走')
            # 如果该黑旗走的话 用白的棋盘 1代表白,-1代表黑
            board = np.array(Map.whiteBoard)
        # 建立所有可下位置的数组,每下一个位置一个数组
        boards = []
        # 当前棋谱中空白的地方
        positions = []
        for i in range(Map.mapsize):
            for j in range(Map.mapsize):
                # 如果这个当前棋谱这个位置是空白的
                if board[j][i] == Map.backcode:
                    predx = np.copy(board)
                    # -1代表自己,更方便计算
                    predx[j][i] = -1
                    boards.append(predx)
                    positions.append([i, j])
        if len(positions) == 0:
            return 0, 0, 0
        # 计算所有可下的位置的价值
        nextStep = self.sess.run(self.Q_value, feed_dict={self.x: boards})
        maxx = 0
        maxy = 0
        maxValue = -1000  # 实际最大价值  用于后续学习
        # 从所有可下的地方找一个价值最大的位置下棋
        for i in range(len(positions)):
            value = nextStep[i] + random.randint(0, 10) / 1000  # 如果没有最优步子 则随机选择一步
            if value > maxValue:
                maxValue = value
                maxx = positions[i][0]
                maxy = positions[i][1]
        print(str(maxx) + ',' + str(maxy))
        print('此位置的价值为:' + str(maxValue[0]))
        return maxx, maxy, maxValue

    # 下完了一局就更新一下AI模型
    def TrainOnce(self, winner):
        # 记录棋图
        # board1 白棋 board2 黑棋
        board1 = np.array(Map.mapRecords1)
        board2 = np.array(Map.mapRecords2)
        # 记录棋步
        step1 = np.array(Map.stepRecords1)
        step2 = np.array(Map.stepRecords2)
        # 记录得分
        scoreR1 = np.array(Map.scoreRecords1)
        scoreR2 = np.array(Map.scoreRecords2)
        board1 = np.reshape(board1, [-1, Map.mapsize, Map.mapsize])
        board2 = np.reshape(board2, [-1, Map.mapsize, Map.mapsize])
        step1 = np.reshape(step1, [-1, Map.mapsize, Map.mapsize])
        step2 = np.reshape(step2, [-1, Map.mapsize, Map.mapsize])

        score1 = []
        score2 = []

        board1 = (board1 * (1 - step1)) + step1 * Map.blackcode
        board2 = (board2 * (1 - step2)) + step2 * Map.blackcode
        # 每步的价值 = 奖励(胜1 负-0.9) + 对方棋盘能达到的最大价值(max taget Q) * (-0.9)
        for i in range(len(board1)):
            if i == len(scoreR2):  # 白方已经五连  白方赢
                print('白方已经五连,白方赢')
                score1.append([1.0])  # 白方的最后一步获得1分奖励
            else:
                # 白方的价值为:黑方棋盘能达到的最大价值(max taget Q) * (-0.9)
                score1.append([scoreR2[i][0] * -0.9])
        if winner == 2:
            print('惩罚白方的最后一步,将其价值设为 -0.9')
            score1[len(score1) - 1][0] = -0.9

        # 1 白棋 2 黑棋
        for i in range(len(board2)):
            if i == len(scoreR1) - 1:  # 黑方赢
                print('黑方已经五连,黑方赢')
                score2.append([1.0])
            else:
                # 黑棋的得分为:白方棋盘能达到的最大价值(max taget Q) * (-0.9)
                score2.append([scoreR1[i + 1][0] * -0.9])
        if winner == 1:
            print('惩罚黑方的最后一步,将其价值设为 -0.9')
            # 惩罚黑方的最后一步
            score2[len(score2) - 1][0] = -0.9

        # 一次完成多个数组的拼接
        borders = np.concatenate([board1, board2], axis=0)
        scores = np.concatenate([score1, score2], axis=0)
        _, totalLoss = self.sess.run([self.optm, self.cost], feed_dict={self.x: borders,
                                                                        self.y: scores})
        self.avg_loss += totalLoss
        print('train avg loss ' + str(self.avg_loss))
        self.avg_loss = 0
        # os.path.abspath取决于os.getcwd,如果是一个绝对路径,就返回,
        # 如果不是绝对路径,根据编码执行getcwd/getcwdu.然后把path和当前工作路径连接起来
        self.saver.save(self.sess, os.path.abspath('Saver/cnnsaver.ckpt'), global_step=0)

    def PlayWidthHuman(self):
        # 读取历史存储的模型
        self.restore()
        Map.PlayWithComputer = self.computerPlay
        Map.TrainNet = self.TrainOnce
        Map.ShowWind()


if __name__ == '__main__':
    dqn = DQN()
    dqn.PlayWidthHuman()

用于构建棋谱的代码
Map.py

import tkinter as tk
import os
import time
import copy

# 定义窗口
top = tk.Tk()
top.title("AI自动玩五子棋")
top.geometry('400x300')

# 定义地图尺寸
mapsize = 8

# 元素尺寸
pixsize = 20

# 连子个数
winSet = 5

# 空白编号
backcode = 0
# 白棋
whitecode = 1
# 黑棋
blackcode = -1

# 定义画布
canvas = tk.Canvas(top, height=mapsize * pixsize, width=mapsize * pixsize,
                   bg="gray")
canvas.pack(pady=25)

for i in range(mapsize):
    canvas.create_line(i * pixsize, 0,
                       i * pixsize, mapsize * pixsize,
                       fill='black')
    canvas.create_line(0, i * pixsize,
                       mapsize * pixsize, i * pixsize,
                       fill='black')

# 初始棋盘
whiteBoard = []
stepBoard = []
for i in range(mapsize):
    row = []
    rowBak = []
    for j in range(mapsize):
        row.append(0)
        rowBak.append(backcode)
    whiteBoard.append(rowBak)
    stepBoard.append(row)
blackBoard = copy.deepcopy(whiteBoard)

# 棋子列表
childMap = []

# 记录棋图
mapRecords1 = []
mapRecords2 = []

# 记录棋步
stepRecords1 = []
stepRecords2 = []
# 记录得分
scoreRecords1 = []
scoreRecords2 = []

isGameOver = False

IsTurnWhite = True


def Restart():
    global isGameOver
    global IsTurnWhite
    for child in childMap:
        canvas.delete(child)
    childMap.clear()
    isGameOver = False
    IsTurnWhite = True
    mapRecords1.clear()
    mapRecords2.clear()
    stepRecords1.clear()
    stepRecords2.clear()
    scoreRecords1.clear()
    scoreRecords2.clear()
    for i in range(mapsize):
        for j in range(mapsize):
            whiteBoard[j][i] = backcode
            blackBoard[j][i] = backcode


WinDataSetPath = 'DataSets\\win'
LosDataSetPath = 'DataSets\\los'

TrainNet = None


def SaveDataSet(tag):
    if TrainNet != None:
        TrainNet(tag)
    else:
        winfilename = WinDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'
        losfilename = LosDataSetPath + '\\' + time.strftime("%Y%m%d%H%M%S", time.localtime()) + '.txt'
        if not os.path.exists('DataSets'):
            os.mkdir('DataSets')
        if not os.path.exists(WinDataSetPath):
            os.mkdir(WinDataSetPath)
        if not os.path.exists(LosDataSetPath):
            os.mkdir(LosDataSetPath)
        strInfo1 = ''
        for i in range(len(mapRecords1)):
            for j in range(mapsize):
                for k in range(mapsize):
                    strInfo1 += str(mapRecords1[i][j][k]) + ','
            strInfo1 += '\n'
            for j in range(mapsize):
                for k in range(mapsize):
                    strInfo1 += str(stepRecords1[i][j][k]) + ','
            strInfo1 += '\n'
        strInfo2 = ''
        for i in range(len(mapRecords2)):
            for j in range(mapsize):
                for k in range(mapsize):
                    strInfo2 += str(mapRecords2[i][j][k]) + ','
            strInfo2 += '\n'
            for j in range(mapsize):
                for k in range(mapsize):
                    strInfo2 += str(stepRecords2[i][j][k]) + ','
            strInfo2 += '\n'
        if tag == 1:
            with open(winfilename, "w") as f:
                f.write(strInfo1)
            with open(losfilename, "w") as f:
                f.write(strInfo2)
        else:
            with open(winfilename, "w") as f:
                f.write(strInfo2)
            with open(losfilename, "w") as f:
                f.write(strInfo1)


def JudgementResult():
    global isGameOver
    judgemap = whiteBoard
    for i in range(mapsize):
        for j in range(mapsize):
            if judgemap[j][i] != backcode:
                tag = judgemap[j][i]
                checkrow = True
                checkCol = True
                checkLine = True
                checkLine2 = True
                for k in range(winSet - 1):
                    if i + k + 1 < mapsize:  # 行
                        if (judgemap[j][i + k + 1] != tag) and checkrow:
                            checkrow = False
                        if j + k + 1 < mapsize:  # 斜线
                            if (judgemap[j + k + 1][i + k + 1] != tag) and checkLine:
                                checkLine = False
                        else:
                            checkLine = False
                    else:
                        checkrow = False
                        checkLine = False
                    if j + k + 1 < mapsize:  # 列
                        if (judgemap[j + k + 1][i] != tag) and checkCol:
                            checkCol = False
                        if i - k - 1 >= 0:  # 斜线
                            if (judgemap[j + k + 1][i - k - 1] != tag) and checkLine2:
                                checkLine2 = False
                        else:
                            checkLine2 = False
                    else:
                        checkCol = False
                        checkLine2 = False
                    if not checkrow and not checkCol and not checkLine and not checkLine2:
                        break
                if checkrow or checkCol or checkLine or checkLine2:
                    isGameOver = True
                    SaveDataSet(tag)
                    return tag
    return 0


PlayWithComputer = None

GetMaxScore = None


def playChess(event):
    if isGameOver:
        print('game is over, restart!')
        Restart()
        return
    x = event.x // pixsize
    y = event.y // pixsize
    if x >= mapsize or y >= mapsize:
        return
    if whiteBoard[y][x] != backcode:
        return
    score = 0
    if PlayWithComputer != None:
        _x, _y, score = PlayWithComputer(IsTurnWhite)
    res = chess(x, y, score)
    if res == 0:
        if PlayWithComputer != None:
            x, y, score = PlayWithComputer(IsTurnWhite)
            res = chess(x, y, score)


def chess(x, y, score):
    global IsTurnWhite
    if isGameOver:
        print('game is over, restart!')
        Restart()
        return -1
    if whiteBoard[y][x] != backcode:
        print('game is over, restart!')
        Restart()
        return -1
    step = copy.deepcopy(stepBoard)
    step[y][x] = 1
    if IsTurnWhite:  # 白棋是人工走的 如果过用来当训练集 用反转棋盘
        mapRecords1.append(copy.deepcopy(blackBoard))
        stepRecords1.append(step)
        scoreRecords1.append(score)
        whiteBoard[y][x] = whitecode  # 1白 -1黑
        blackBoard[y][x] = blackcode
        child = canvas.create_oval(x * pixsize,
                                   y * pixsize,
                                   x * pixsize + pixsize,
                                   y * pixsize + pixsize, fill='white')
    else:
        mapRecords2.append(copy.deepcopy(whiteBoard))
        stepRecords2.append(step)
        scoreRecords2.append(score)
        whiteBoard[y][x] = blackcode  # 1白 -1黑
        blackBoard[y][x] = whitecode
        child = canvas.create_oval(x * pixsize,
                                   y * pixsize,
                                   x * pixsize + pixsize,
                                   y * pixsize + pixsize, fill='black')
    IsTurnWhite = not IsTurnWhite
    childMap.append(child)
    return JudgementResult()



# 按钮的点击事件
def AutoPlayOnce():
    if PlayWithComputer != None:
        x, y, score = PlayWithComputer(IsTurnWhite)
        chess(x, y, score)


btnAuto = tk.Button(top, text="重新开始或者自动走1次", command=AutoPlayOnce)
btnAuto.pack()
# 画布与鼠标左键进行绑定
# canvas.bind("<B1-Motion>", playChess)
canvas.bind("<Button-1>", playChess)


# 按钮的点击事件
def AutoPlayOne():
    global isGameOver
    if PlayWithComputer != None:
        for i in range(222):
            if isGameOver:
                break
            x, y, score = PlayWithComputer(IsTurnWhite)
            chess(x, y, score)

btnAuto = tk.Button(top, text="自动玩一局", command=AutoPlayOne)
btnAuto.pack()
canvas.bind("<Button-2>", playChess)



# 显示游戏窗口
def ShowWind():
    top.mainloop()

喜欢请记得一键三连,之后会更新更有趣的算法,欢迎大家一起交流!

Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐