Gym 介绍

Gym是一个用于测试和比较强化学习算法的工具包,它不依赖强化学习算法结构,并且可以使用很多方法对它进行调用,像Tensorflow、Theano。Gym库收集、解决了很多环境的测试过程中的问题,能够很好地使得你的强化学习算法得到很好的工作。并且含有游戏界面,能够帮助你去写更适用的算法。

Gym 环境标准

基本的Gym环境如下图所示:

import gym
env = gym.make('CartPole-v0')
for i_episode in range(20):
    observation = env.reset()
    for t in range(100):
        env.render()
        print(observation)
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        if done:
            print("Episode finished after {} timesteps".format(t+1))
            break

每一步环境都会返回Observation、Reward、Done、Info 四个值,而输入则是一个action。
在日常做强化学习的时候,很多时候是找不到合适的gym的环境的,对于用户自定义的环境,可以按照gym的标准进行搭建标准的强化学习环境,这样就可以很好的跟其他强化学习的算法或者接口进行对接。
Gym的开发框架如下所示:

import gym
from gym import spaces

class CustomEnv(gym.Env):
  """Custom Environment that follows gym interface"""
  metadata = {'render.modes': ['human']}

  def __init__(self, arg1, arg2, ...):
    super(CustomEnv, self).__init__()
    # Define action and observation space
    # They must be gym.spaces objects
    # Example when using discrete actions:
    self.action_space = spaces.Discrete(N_DISCRETE_ACTIONS)
    # Example for using image as input:
    self.observation_space = spaces.Box(low=0, high=255,
                                        shape=(HEIGHT, WIDTH, N_CHANNELS), dtype=np.uint8)

  def step(self, action):
    ...
    return observation, reward, done, info

  def reset(self):
    ...
    return observation  # reward, done, info can't be included

  def render(self, mode='human'):
    ...
    
  def close(self):
    ...

其中前三个为必须项。
搭建好后,可以使用stable_baselines3中的check_env检查环境

from stable_baselines3.common.env_checker import check_env

env = CustomEnv(arg1, ...)
# It will check your custom environment and output additional warnings if needed
check_env(env)

样例

下面是两个用户自定义的gym环境,第一个是线性方程,第二个是flexsim仿真软件与强化学习的对接:
1.

import numpy as np
import pandas as pd
# 定义非线性系统环境,按照GYM的格式
import gym
from gym import spaces, logger
from gym.utils import seeding

class NonLinearEnv(gym.Env):
    """
    描述:
        一个离散时间非线性非仿射系统
        
    来源:
        论文《Policy Gradient Adaptive Dynamic Programming for Data-Based Optimal Control》
    
    状态:
        State1,State2
    
    动作:
        单输入系统,u
    
    回报:
        
    
    初始状态:
        x0=[0.2, 0.7]'
    
    episode结束条件:
        
    """    
    def __init__(self, Q: np.array, R: np.array):
        self.Q = Q
        self.R = R
        self.state = np.array([0.2, 0.7])
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1, ), dtype=np.float64)
        self.observation_space = spaces.Box(low=-np.inf, high=np.inf, shape=(2, ), dtype=np.float64)
    
    def reset(self):
        self.state = np.array([0.2, 0.7])
        return self.state
    
    def step(self, action):
        next_state_x1 = (self.state[0]+self.state[1]**2+action)*np.cos(self.state[1])
        next_state_x2 = 0.5*(self.state[0]**2+self.state[1]+action)*np.sin(self.state[1])
        next_state = [next_state_x1, next_state_x2]
        reward = np.matrix(self.state)*self.Q*np.matrix(self.state).T + action**2*self.R
        self.state = np.array(next_state).reshape(2, )
        done = False
        info = {}
        return self.state, -float(reward[0][0]), done, info
    
    def render(self):
        pass

测试代码:

from NLEnv import NonLinearEnv
from stable_baselines3.common.env_checker import check_env
import numpy as np 

Q = np.matrix([[1, 0],
              [0, 1]])
R = np.matrix([[1]])
env = NonLinearEnv(Q, R)
check_env(env)
  1. flexsim
import gym
import os
import subprocess
import socket
import json
from gym import error, spaces, utils
from gym.utils import seeding
import numpy as np

class FlexSimEnv(gym.Env):
    metadata = {'render.modes': ['human', 'rgb_array', 'ansi']}

    def __init__(self, flexsimPath, modelPath, address='localhost', port=5005, verbose=False, visible=False):
        self.flexsimPath = flexsimPath
        self.modelPath = modelPath
        self.address = address
        self.port = port
        self.verbose = verbose
        self.visible = visible

        self.lastObservation = ""

        self._launch_flexsim()
        
        self.action_space = self._get_action_space()
        self.observation_space = self._get_observation_space()

    def reset(self):
        self._reset_flexsim()
        state, reward, done = self._get_observation()
        return state

    def step(self, action):
        self._take_action(action)
        state, reward, done = self._get_observation()
        info = {}
        return state, reward, done, info

    def render(self, mode='human'):
        if mode == 'rgb_array':
            return np.array([0,0,0])
        elif mode == 'human':
            print(self.lastObservation)
        elif mode == 'ansi':
            return self.lastObservation
        else:
            super(FlexSimEnv, self).render(mode=mode)

    def close(self):
        self._close_flexsim()
        
    def seed(self, seed=None):
        self.seedNum = seed
        return self.seedNum

    
    def _launch_flexsim(self):
        if self.verbose:
            print("Launching " + self.flexsimPath + " " + self.modelPath)

        args = [self.flexsimPath, self.modelPath, "-training", self.address + ':' + str(self.port)]
        if self.visible == False:
            args.append("-maintenance")
            args.append("nogui")
        self.flexsimProcess = subprocess.Popen(args)

        self._socket_init(self.address, self.port)
    
    def _close_flexsim(self):
        self.flexsimProcess.kill()

    def _release_flexsim(self):
        if self.verbose:
            print("Sending StopWaiting message")
        self._socket_send(b"StopWaiting?")

    def _get_action_space(self):
        self._socket_send(b"ActionSpace?")
        if self.verbose:
            print("Waiting for ActionSpace message")
        actionSpaceBytes = self._socket_recv()
        
        return self._convert_to_gym_space(actionSpaceBytes)

    def _get_observation_space(self):
        self._socket_send(b"ObservationSpace?")
        if self.verbose:
            print("Waiting for ObservationSpace message")
        observationSpaceBytes = self._socket_recv()
        
        return self._convert_to_gym_space(observationSpaceBytes)

    def _reset_flexsim(self):
        if self.verbose:
            print("Sending Reset message")
        resetString = "Reset?"
        if hasattr(self, "seedNum"):
            resetString = "Reset:" + str(self.seedNum) + "?"
        self._socket_send(resetString.encode())

    def _get_observation(self):
        if self.verbose:
            print("Waiting for Observation message")
        observationBytes = self._socket_recv()
        self.lastObservation = observationBytes.decode('utf-8')
        state, reward, done = self._convert_to_observation(observationBytes)

        return state, reward, done
    
    def _take_action(self, action):
        actionStr = json.dumps(action, cls=NumpyEncoder)
        if self.verbose:
            print("Sending Action message: " + actionStr)
        actionMessage = "TakeAction:" + actionStr + "?"
        self._socket_send(actionMessage.encode())


    def _socket_init(self, host, port):
        if self.verbose:
            print("Waiting for FlexSim to connect to socket on " + self.address + ":" + str(self.port))

        self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.serversocket.bind((host, port))
        self.serversocket.listen();

        (self.clientsocket, self.socketaddress) = self.serversocket.accept()
        if self.verbose:
            print("Socket connected")
        
        if self.verbose:
            print("Waiting for READY message")
        message = self._socket_recv()
        if self.verbose:
            print(message.decode('utf-8'))
        if message != b"READY":
            raise RuntimeError("Did not receive READY! message")

    def _socket_send(self, msg):
        totalsent = 0
        while totalsent < len(msg):
            sent = self.clientsocket.send(msg[totalsent:])
            if sent == 0:
                raise RuntimeError("Socket connection broken")
            totalsent = totalsent + sent

    def _socket_recv(self):
        chunks = []
        while 1:
            chunk = self.clientsocket.recv(2048)
            if chunk == b'':
                raise RuntimeError("Socket connection broken")
            if chunk[-1] == ord('!'):
                chunks.append(chunk[:-1])
                break;
            else:
                chunks.append(chunk)
        return b''.join(chunks)


    def _convert_to_gym_space(self, spaceBytes):
        paramsStartIndex = spaceBytes.index(ord('('))
        paramsEndIndex = spaceBytes.index(ord(')'), paramsStartIndex)
        
        type = spaceBytes[:paramsStartIndex]
        params = json.loads(spaceBytes[paramsStartIndex+1:paramsEndIndex])
        
        if type == b'Discrete':
            return gym.spaces.Discrete(params)
        elif type == b'Box':
            return gym.spaces.Box(np.array(params[0]), np.array(params[1]))
        elif type == b'MultiDiscrete':
            return gym.spaces.MultiDiscrete(params)
        elif type == b'MultiBinary':
            return gym.spaces.MultiBinary(params)

        raise RuntimeError("Could not parse gym space string")

    def _convert_to_observation(self, spaceBytes):
        observation = json.loads(spaceBytes)
        state = observation["state"]
        if isinstance(state, list):
            state = np.array(observation["state"])
        reward = observation["reward"]
        done = (observation["done"] == 1)
        return state, reward, done


class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.integer):
            return int(obj)
        elif isinstance(obj, np.floating):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return obj.tolist()
        return json.JSONEncoder.default(self, obj)


def main():

    env = FlexSimEnv(
        flexsimPath = "C:/Program Files/FlexSim 2022/program/flexsim.exe",
        modelPath = "./ChangeoverTimesRL.fsm",
        verbose = True,
        visible = True
        )

    for i in range(2):
        env.seed(i)
        observation = env.reset()
        env.render()
        done = False
        rewards = []
        while not done:
            action = env.action_space.sample()
            observation, reward, done, info = env.step(action)
            env.render()
            rewards.append(reward)
            if done:
                cumulative_reward = sum(rewards)
                print("Reward: ", cumulative_reward, "\n")
    env._release_flexsim()
    input("Waiting for input to close FlexSim...")
    env.close()


if __name__ == "__main__":
    main()
Logo

为开发者提供学习成长、分享交流、生态实践、资源工具等服务,帮助开发者快速成长。

更多推荐