Cartpole is one of the many environments offered by the OpenAI Gym repository, a collection of games ranging from the simple to the complex. What makes Gym unique, though, is that it provides an API for interacting with the games programatically and allows us to train an AI to play the games it hosts. Using a form of deep learning, called reinforcement learning, we can teach an AI agent how to master a game without hard-coding any aspects of the game into the AI. Intuitively, as a game's action space increases so does the difficulty in successfully training an agent to learn the game, which is why we'll start with Cartpole, a simple game that only allows for just two actions: moving left or right.

The objective in Cartpole is to balance the pole on the moving cart without moving the cart past the boundaries of the game window. OpenAI Gym considers this environment to be solved when an agent can acheive an average reward of 195 over 100 runs, meaning that the agent can balance the pole for an average of 195 frames over 100 runs. To learn how to achieve this goal, our agent will first need to have some information about the game it's playing. The Gym API provides an easy way for our agent to see the state of the game at every frame and receive a signal as to whether this state is beneficial or harmful. But before we go into that, let's quickly go over how our agent will use this data to learn.

I like the above picture because I think it succinctly describes how a typical agent learns to solve a problem through reinforcement learning. By giving our agent a view of the state of the game, having it perform an action (move left or right), and giving it a reward or punishment based on that action, we can teach it how to play the game.

Now that we have a basic understanding of how reinforcement learning works, let's take a look at how we would actually implement this system with code.


from os import makedirs
import numpy as np
import random
import math
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import gym
from gym import wrappers


use_cuda = torch.cuda.is_available()
device = torch.device('cuda:0' if use_cuda else 'cpu')
          

First we'll import a bunch of standard python libraries that we'll need later on, as well as a bunch of Pytorch modules to help us build our agent, and then we'll import the OpenAI Gym library so we can get our cartpole environment.

You might remember the last two lines in the block above from our first deep learning project on the MNIST dataset. These two lines of code will specify which device PyTorch should use when backpropogating error through our neural network: the CPU or the GPU. In the last decade graphics cards have shown to be a much more efficient way to process large amounts of data than CPUs. Cuda, Nvidia's library for performing data processing on their GPUs, is currently the most widely used library that unlocks this number crunching potential, which is why most deep learning libraries offer support for it. If you want to follow along but aren't sure if you have an Nvidia GPU, or if you don't have Cuda installed, don't worry! You can still run this code on your CPU just fine, but you might need to scale down some of our hyper-parameters later on.

Next, we'll define our neural network: a shallow network consisting of just three layers.


class Network(nn.Module):
    def __init__(self):
        nn.Module.__init__(self)
        
        self.hidden_layer = 256
        
        self.l1 = nn.Linear(4, self.hidden_layer)
        self.l2 = nn.Linear(self.hidden_layer, 2)
    
    def forward(self, x):
        x = F.relu(self.l1(x))
        x = self.l2(x)
        return x
          

Notice that the output from the model is just a 1x2 array, this will be our agent deciding whether to move left or right. Also notice that the input data is just one 1x4 array - this has to do with the data that the Gym API passes back on each step. Instead of passing us a numpy array that represents a single frame of the environment, the Gym API simply gives us information that is relevant to solving the problem. Though, it is certainly possible to solve the cartpole problem by passing an array of RGB pixels to our network (like we did with the MNIST dataset). In the case of the cartpole environment we are given the cart's position on the track, the cart's velocity, the angle of the pole, and the velocity of the tip of the pole. Using just these four parameters and an associated reward/punishment, we can solve this problem! But before we can jump into setting up our agent and training it we need to define one more helper class.


class ReplayMemory:
    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []

    def push(self, transition):
        self.memory.append(transition)
        if len(self.memory) > self.capacity:
            del self.memory[0]

    def sample(self, batch_size):
        try:
            sample = random.sample(self.memory, batch_size)
        except ValueError:
            sample = self.memory
        return sample

    def __len__(self):
        return len(self.memory)
          

The ReplayMemory class will store a long list of data that our agent will accumulate during its training, containing: current state, the action it took, the next state, and the reward it received. Our agent will randomly sample this list of memories during training in order to learn which actions it should take in a given state.

Now that we have the preliminaries out of the way, let's setup our agent and training loop.


class Agent:
    def __init__(self, eps_start=0.9, eps_end=0.05, eps_decay=200, gamma=0.8, learning_rate=0.001, batch_size=512):
        
        # hyper parameters
        self.network = Network().to(device)
        self.replay_memory = ReplayMemory(10000)
        self.num_episodes_trained = 0
        self.eps_start = eps_start  # e-greedy threshold start value
        self.eps_end = eps_end  # e-greedy threshold end value
        self.eps_decay = eps_decay  # e-greedy threshold decay
        self.gamma = gamma  # Q-learning discount factor
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        self.steps_done = 0
        self.optimizer = optim.Adam(self.network.parameters(), self.learning_rate)
        self.episode_durations = []
    
    def train(self, episodes, environment):
        for e in range(episodes):
            state = environment.reset()
            steps = 0
            episode_ended = False
            while not episode_ended:
                action = self.select_action(torch.FloatTensor([state]).to(device))
                next_state, reward, done, _ = environment.step(action.item())

                # negative reward when attempt ends
                if (done) & (steps < 195):
                    reward = -1

                self.replay_memory.push((torch.FloatTensor([state]).to(device),
                                         action,
                                         torch.FloatTensor([next_state]),
                                         torch.FloatTensor([reward])))

                # random transition batch is taken from ReplayMemory
                transitions = self.replay_memory.sample(self.batch_size)
                batch_state, batch_action, batch_next_state, batch_reward = zip(*transitions)

                batch_state = torch.cat(batch_state).to(device)
                batch_action = torch.cat(batch_action).to(device)
                batch_reward = torch.cat(batch_reward).to(device)
                batch_next_state = torch.cat(batch_next_state).to(device)

                # current Q values are estimated by network for all actions
                current_q_values = self.network(batch_state).gather(1, batch_action).squeeze().to(device)
                # expected Q values are estimated from actions which gives maximum Q value
                max_next_q_values = self.network(batch_next_state).detach().max(1)[0].to(device)
                expected_q_values = batch_reward + (self.gamma * max_next_q_values).to(device)

                # loss is measured from error between current and newly expected Q values
                loss = F.smooth_l1_loss(current_q_values, expected_q_values).to(device)

                self.optimizer.zero_grad()
                loss.backward()
                self.optimizer.step()

                state = next_state
                steps += 1

                if done:
                    self.num_episodes_trained += 1
                    self.episode_durations.append(steps)
                    if (e+1) % 25 == 0:
                        self.plot_durations()
                    episode_ended = True
                else:
                    pass
        environment.close()
        self.replay_memory.memory = []
        
    def select_action(self, state):
        sample = random.random()
        eps_threshold = self.eps_end + (self.eps_start - self.eps_end) * math.exp(-1. * self.steps_done / self.eps_decay)
        self.steps_done += 1
        if sample > eps_threshold:
            return self.network(state).type(torch.FloatTensor).detach().max(1)[1].view(1, 1).to(device)
        else:
            return torch.tensor([[random.randrange(2)]], dtype=torch.long).to(device)
        
    def plot_durations(self):
        plt.figure(2)
        plt.clf()
        plt.xlabel('Episode')
        plt.ylabel('Steps per Episode')
        plt.plot(self.episode_durations)
        plt.pause(0.001)
        
    def test(self, episodes, environment):
        for e in range(episodes):
            state = environment.reset()
            steps = 0
            episode_ended = False
            while not episode_ended:
                environment.render()
                action = self.network(torch.FloatTensor([state]).to(device)).detach().max(1)[1].view(1, 1).to(device)
                next_state, reward, done, _ = environment.step(action.item())
                state = next_state
                steps += 1
                if done:
                    episode_ended = True
                    print('Ran for {} steps'.format(steps))
                else:
                    pass
        environment.close()
          

There's quite a bit to unpack here, but what the Agent class is basically doing is joining our neural network with the Gym environment and performing training based on the previous actions of the agent. To ensure that our agent has a wide breadth of knowledge about how Cartpole works we define an epsilon value that causes the agent to take a random action a lot at first, but less and less over time. The learning rate is a hyper parameter that we saw in the MNIST project and will continue to see. The learning rate controls the amount of error that gets backpropogated through our network. If the learning rate is too low, our network will take a long time to learn what we want it to do, but, if the learning rate is too high, our network won't learn anything, as gradient descent won't function properly. Don't worry if this doesn't make any sense, we will come back to the learning rate and gradient descent in greater detail in future posts! Finally, the batch_size parameter defines how much of our agent's memory will get sampled at each step interval. If you are worried that your CPU doesn't have enough processing power then I would recommend lowering this value down to 100 or less and feeling it out from there. This is where having a GPU comes in handy, as we can quickly feed in more memories to our agent and train it faster.

I know I'm skipping over a lot, but the concepts, parameters, and functions used here are recurring in nearly every deep learning/reinforcement problem, and I would rather gradually introduce you to these ideas than throw them all at you at once.

Now that we have everything defined we can run our training loop.


env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, './tmp/cartpole-v0-1', video_callable=False, force=True)
agent = Agent()
agent.train(episodes=100, environment=env)
          

We instantiate our cartpole environment without rendering (to move training along) and then instantiate our Agent before training it with the desired number of training loops. After running this you should see some matplotlib charts pop up, showing the number of steps the agent achieved in each episode - it will print the chart every 25 episodes. Using the default parameters I've given above, my agent was able to achieve 200 steps somewhat consistently.

Finally, if you want to see your agent in action you can call the test() method on your agent like I've shown below.


env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, './tmp/cartpole-v0-test_100', force=True)

agent.test(episodes=5, environment=env)
          

This will run your trained agent in a rendered environment for 5 episodes and print the number of steps acheived in each episode. It might not be much to look at, but the concepts covered here can be generalized to solve a multitude of problems, allowing us to create AIs that can solve many different types of problems. Skynet awaits!

Just like last time you can find a slightly more comprehensive version of this code on my Github, where I've added features for saving and loading our agent, in case you need to interrupt training for some reason.