Using deep reinforcement learning to solve the Zelda Thrill Digger in PyTorch: static environment (part 1)
Views and opinions expressed are solely my own.
Background
In a previous post - about two years ago - I speculated that one could solve Thrill Digger using reinforcement learning (RL). This post outlines my progress and thoughts around attempting to solve Thrill Digger using RL. Note: none of this work was used for my work at Georgia Tech and is separate from any work I have submitted to the courses.
Limitations (in brief)
This post solves the game, but only in a fixed environment. Further investigation will need to be done to extend this to the most general case, which I hope to provide in a future post.
Coding a custom environment
This was done using Google Colab with an A100 GPU. We will begin by loading packages and representing the Thrill Digger game as its own class.
import numpy as np
import torch
import scipy
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# https://stackoverflow.com/a/24818304/3625022
from IPython.display import clear_output
#import optuna
# https://stackoverflow.com/a/52300696/3625022
from google.colab import drive
drive.mount('/content/drive')
# https://docs.python.org/3/tutorial/classes.html
class ThrillDigger:
# Initialize the environment, figure out rewards
def __init__(self, mode, seed = None):
super(ThrillDigger, self).__init__()
# https://numpy.org/doc/stable/reference/random/generator.html
self.rng = np.random.default_rng(seed)
self.mode = mode
self.seed = seed
# Set level of the game
# https://game8.co/games/Zelda-Skyward-Sword/archives/335443#hl_2
if mode == "beginner":
self.nrow = 4
self.ncol = 5
self.nBombs = 4
self.nRupoors = 0
self.initReward = -30
if mode == "intermediate":
self.nrow = 5
self.ncol = 6
self.nBombs = 4
self.nRupoors = 4
self.initReward = -50
if mode == "expert":
self.nrow = 5
self.ncol = 8
self.nBombs = 8
self.nRupoors = 8
self.initReward = -70
# Randomly place bombs and rupoors
# https://numpy.org/doc/stable/reference/random/generated/numpy.random.Generator.choice.html
idx = self.rng.choice(range(self.nrow * self.ncol),
size = self.nBombs + self.nRupoors,
replace = False)
bombIdx = idx[0 : self.nBombs]
rupoorIdx = None
if self.nRupoors > 0:
rupoorIdx = idx[self.nBombs : self.nBombs + self.nRupoors]
self.info = {"seed" : self.seed,
"mode" : self.mode,
"n_rows" : self.nrow,
"n_columns" : self.ncol,
"n_bombs" : self.nBombs}
# https://numpy.org/doc/stable/reference/arrays.scalars.html#numpy.byte
grid = np.zeros(self.nrow * self.ncol, dtype = np.int16)
grid[bombIdx] = -128 # smallest possible reward, game ends
if self.nRupoors > 0:
grid[rupoorIdx] = -10 # lose 10 rupees
# generate all negative grid spaces
grid = grid.reshape((self.nrow, self.ncol))
gridSign = grid.copy()
gridSign[gridSign < 0] = -1
# add up everything within distance 1 of a pixel via convolution
def neighbor_sums(grid):
# Generate kernel
kernel = np.ones((3, 3))
kernel[1, 1] = 0 # do not include the center
# https://docs.scipy.org/doc/scipy/reference/generated/scipy.signal.convolve2d.html
return(scipy.signal.convolve2d(grid, kernel, mode = 'same'))
# Gather the number of bad spaces around each pixel
nBad = -neighbor_sums(gridSign)
rewards = nBad
# Change rewards based on rupee amounts
# https://www.ign.com/wikis/the-legend-of-zelda-skyward-sword/Thrill_Digger
# https://numpy.org/doc/stable/reference/generated/numpy.isin.html
whereGreen = np.where(nBad == 0)
whereBlue = np.where(np.isin(nBad, [1, 2]))
whereRed = np.where(np.isin(nBad, [3, 4]))
whereSilver = np.where(np.isin(nBad, [5, 6]))
whereGold = np.where(np.isin(nBad, [7, 8]))
def set_rewards(grid, where, reward):
new_grid = grid.copy()
new_grid[where[0], where[1]] = reward
return(new_grid)
rewards = set_rewards(rewards, whereGreen, 1)
rewards = set_rewards(rewards, whereBlue, 5)
rewards = set_rewards(rewards, whereRed, 20)
rewards = set_rewards(rewards, whereSilver, 100)
rewards = set_rewards(rewards, whereGold, 300)
# Wherever the grid does not have a rupoor or bomb,
# change to the reward amount above
grid[grid == 0] = rewards[grid == 0]
# https://stackoverflow.com/questions/551038/private-implementation-class-in-python
self._grid = grid.copy()
self.allActions = np.indices(grid.shape).reshape(2, -1).T.reshape(-1, 2).tolist()
self.allActions = [tuple(action) for action in self.allActions]
self.actionIdx = [i for i in range(0, len(self.allActions))]
# Actions that may be taken
self.actions = self.allActions.copy()
def reset(self):
# Actions that may be taken
self.actions = self.allActions.copy()
# Number of rupees remaining
self._nRupeesRemain = len(np.where(self._grid.copy().flatten() > 0)[0])
# All actions and rewards taken to this point
self.actionsTaken = None
# stateNN is for processing states through a convolutional NN
# First channel: binary indicator for knowing/not knowing what is in the cell
# Second: binary indicator for if there is a bomb there. 0 by default.
# Third: rupee/rupoor amount. 0 by default.
self.stateNN = np.zeros((3, self.nrow, self.ncol))
# https://numpy.org/doc/stable/reference/generated/numpy.indices.html
# The first value is the accumulated balance.
# The second value is the possible indices in the grid.
# The third value denotes termination
self.state = [self.initReward, self.actionsTaken, self.actions, self.stateNN, False]
return(self.state)
# Choose a cell
def step(self, idxTup):
terminated = False
reward = 0
# If not a valid action, return 0 reward
actIdx = self.allActions.index(idxTup)
if idxTup not in self.actions:
reward = 0
# if a bomb is chosen, terminate
elif self._grid[idxTup[0], idxTup[1]] == -128:
terminated = True
# https://www.programiz.com/python-programming/methods/list/index
self.stateNN[0, idxTup[0], idxTup[1]] = 1 # we know what's there
self.stateNN[1, idxTup[0], idxTup[1]] = 1 # it's a bomb
else:
reward = self._grid[idxTup[0], idxTup[1]]
self.stateNN[0, idxTup[0], idxTup[1]] = 1 # we know what's there
self.stateNN[2, idxTup[0], idxTup[1]] = reward # throw the reward in
actionsArr = np.array(self.actions)
# print(actionsArr)
self._nRupeesRemain = len(np.where(self._grid[actionsArr[:,0], actionsArr[:,1]].copy().flatten() > 0)[0])
# If no rupees remain, return a reward of 200 for the rare item
# https://www.ign.com/wikis/the-legend-of-zelda-skyward-sword/Thrill_Digger
if self._nRupeesRemain == 0:
reward = 200
terminated = True
# (x, y, reward) from those chosen thus far
gridReward = (idxTup[0], idxTup[1], reward)
if self.actionsTaken is None:
self.actionsTaken = [gridReward]
elif idxTup in self.actions: # exclude repeating actions
self.actionsTaken.append(gridReward)
self.actions = [idx for idx in self.state[2] if idx != idxTup]
self.state = [reward, self.actionsTaken, self.actions, self.stateNN, terminated]
return(self.state)
# gather actions that are valid
def getValidActions(self):
return(self.actions)
# gather all actions
def getAllActions(self):
return(self.allActions)
# gather all action indices (for the neural net)
def getActionIdx(self):
return(self.actionIdx)
def getActionsTaken(self):
return(self.actionsTaken)
def getNNState(self):
return(self.stateNN)
To explain the code above, at a high level, several enhancements have been made compared to the original post:
- All three modes - beginner, intermediate, and expert - have been incorporated into the code into a single class.
- As actions are taken in the grid, a 3-channel grid is outputted. These channels consist of:
- A channel with 0/1 values describing whether or not the contents of the cells are known.
- A channel with 0/1 values describing whether or not the cells consist of a bomb.
- A channel consisting of the rupee reward amount.
- To determine the number of bad spaces (rupoors, bombs) adjacent to a space, I performed a two-dimensional convolution, convolving the grid of all bad spaces (-1/0 indicators) with \[\begin{equation} \begin{bmatrix} 1 & 1 & 1 \\ 1 & 0 & 1 \\ 1 & 1 & 1 \end{bmatrix}\text{.} \end{equation}\]
For the reasons given above, one could view the way I have represented as three channels, each with a 2D representation. This is extremely similar to how images are represented, in that images are 2D matrices with three channels: red, green, and blue. For this reason, analogous to how images are used in neural networks, I am using a convolutional neural network to learn how to play Thrill Digger.
class NeuralNetwork(nn.Module):
def __init__(self, OUTPUT_SIZE):
super(NeuralNetwork, self).__init__()
layers = []
# https://pytorch.org/docs/stable/generated/torch.nn.Conv2d.html#torch.nn.Conv2d
layers.append(nn.Conv2d(3, 3, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(3, 8, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(8, 16, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(16, 32, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(32, 64, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(64, 32, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(32, 8, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Conv2d(8, 3, 3, padding = 1))
layers.append(nn.ReLU())
layers.append(nn.Flatten())
layers.append(nn.Linear(60, OUTPUT_SIZE))
self.stack = nn.Sequential(*layers)
def forward(self, x):
return self.stack(x)
Some principles need to be outlined here:
- Padding is set to 1 in each layer due to the 3 x 3 kernel reducing the size of the grid each time, maintaining consistency from layer to layer.
- Powers of 2 are used in adding and decreasing complexity of the convolutional neural network.
- A linear layer is used at the end to determine out of the
OUTPUT_SIZE
number of actions which action should be used.
Now we continue with some setup as needed:
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using {} device".format(device))
M = 1000000 # number of episodes for training
mode = "beginner"
grid = ThrillDigger(mode)
initState = grid.reset()
MAXACTIONS = len(initState[2]) # gives the number of possible actions
loss = torch.nn.MSELoss() # MSE loss will be used for training
rewardEp = []
REWARD_AVG_EPS = 100 # how far back we will calculate a moving avg for the reward
REWARDTHRESH = 100 # how much the moving avg needs to be until we stop training
We define a greedy action: fix an \(\epsilon \in (0, 1)\). Generate a uniform random variate in \((0, 1)\), say \(u\). If \(u < \epsilon\), choose a random square in the grid. Otherwise, choose what the neural net suggests is the most optimal action.
def greedyAction(grid, model, epsilon):
prob = np.random.random()
isGreedy = (prob < epsilon)
allActions = grid.getAllActions()
if isGreedy:
action = np.random.randint(0, MAXACTIONS)
else:
newState = torch.from_numpy(grid.getNNState()).to(device).float().unsqueeze(0)
#print("non-greedy action:")
#print(newState)
with torch.no_grad():
# # https://pytorch.org/docs/stable/generated/torch.cat.html
# float32 is used by default,
# see https://discuss.pytorch.org/t/runtimeerror-mat1-and-mat2-must-have-the-same-dtype/166759/5
action = torch.argmax(model(newState).to(device))
action = action.item()
return(allActions[action])
Next, we do some additional setup and additional variable storage for capturing the behavior of the neural nets:
C = 200 # how often to update the target neural net
expReplay = []
rewardEp = []
movingAvg = []
DECAYRATE = 0.01
# For each episode
epsilon = 1
replayCapacity = 10000
gamma = 0.99
learningRate = 0.001
minibatchSize = 45
# Initialize two neural nets: one for learning optimal actions and
# a "target" one for calibration.
model = NeuralNetwork(MAXACTIONS).to(device)
modelTarget = NeuralNetwork(MAXACTIONS).to(device)
# https://pytorch.org/docs/stable/generated/torch.nn.Module.html#torch.nn.Module.load_state_dict
modelTarget.load_state_dict(model.state_dict())
adam = torch.optim.NAdam(model.parameters(), lr = learningRate)
totalSteps = 0
epsilonFloor = 0
staticMap = True
if staticMap:
grid = ThrillDigger(mode)
You may be confused at this point for why I am defining some of the above variables - that’s fine! Here, I will go through the pseudocode which covers why we have the above variables. I draw from the approach from https://arxiv.org/abs/1312.5602, modified by https://huggingface.co/learn/deep-rl-course/en/unit3/deep-q-algorithm.
Algorithm. DDQN adapted to Thrill Digger with a static environment:
Initialize an initial epsilon
Initialize expReplay with capacity replayCapcity
Initialize model with parameters theta
Initialize modelTarget with parameters theta
for each episode:
decay epsilon
while the episode has not terminated:
observe a 3-channel state s_t
execute a greedy action a_t on the grid
observe a reward r_t due to a_t
observe a 3-channel state s_(t+1) due to a_t
determine whether a_t leads to the episode terminating
append to expReplay (s_t, a_t, r_t, s_(t+1), terminated)
when expReplay is sufficiently large:
sample with replacement from expReplay of size minibatchSize
calculate y = sum(r_j + gamma * max_a'[modelTarget(s_{t+1}, a')] * terminated_flag)
use gradient descent to update model using ADAM, comparing model(s_t in minibatch) and y
every C steps:
update modelTarget parameters with model parameters
Here, I show the code for executing the above, with a plot of the results:
for i in range(M):
if epsilon < epsilonFloor:
epsilon = epsilonFloor
else:
epsilon *= (1 - DECAYRATE)
if not staticMap:
grid = ThrillDigger(mode)
initState = grid.reset()
terminated = False
step = 0
totalReward = grid.initReward
while not terminated:
#print(grid.getNNState())
adam.zero_grad()
action = greedyAction(grid, model, epsilon)
# print(action)
nextState = grid.step(action)
step += 1
totalSteps += 1
reward = nextState[0]
terminated = nextState[-1]
# print(terminated)
totalReward += reward
if len(expReplay) > replayCapacity:
expReplay = expReplay[1:]
expReplay.append([initState[3], action, reward, nextState[3], terminated])
# print(flattenState(initState[3]))
if len(expReplay) >= minibatchSize:
minibatch = np.random.choice(range(len(expReplay)), size = minibatchSize, replace = False)
initStates = [expReplay[m][0] for m in minibatch]
actions = [expReplay[m][1] for m in minibatch]
rewards = [expReplay[m][2] for m in minibatch]
nextStates = [expReplay[m][3] for m in minibatch]
terminatedTens = [expReplay[m][4] for m in minibatch]
notTerminatedTens = [not t for t in terminatedTens]
# Convert to action indices
actions = [grid.getAllActions().index(action) for action in actions]
initStates = torch.tensor(initStates).float().to(device)
actions = torch.tensor(actions).int().to(device)
rewards = torch.tensor(rewards).float().to(device).unsqueeze(1)
nextStates = torch.tensor(nextStates).float().to(device)
terminatedTens = torch.tensor(terminatedTens).float().to(device).unsqueeze(1)
notTerminatedTens = torch.tensor(notTerminatedTens).float().to(device).unsqueeze(1)
#print("nextStates.shape: " + str(nextStates.shape))
with torch.no_grad():
maxQ = torch.max(modelTarget(nextStates), 1)[0].unsqueeze(1) * notTerminatedTens
y = torch.add(rewards, torch.mul(maxQ, gamma))
q = model(initStates)
# Gather Q(state, action)
q = q[torch.arange(initStates.size(0)), actions].unsqueeze(1)
diff = loss(q, y)
diff.backward()
adam.step()
initState = nextState
if totalSteps % C == 0:
modelTarget.load_state_dict(model.state_dict())
rewardEp.append(totalReward)
if i > REWARD_AVG_EPS - 1:
movingAvgReward = np.mean(rewardEp[-REWARD_AVG_EPS:])
movingAvg.append(movingAvgReward)
if movingAvgReward >= REWARDTHRESH:
print("Reward threshold met with " + str(REWARD_AVG_EPS) +
"-moving average reward of " + str(round(movingAvgReward, 2)) +
". Terminating training...")
break
else:
movingAvgReward = 0
movingAvg.append(None)
if i < M - 1 and movingAvgReward < REWARDTHRESH:
clear_output(wait = True)
# Display status
print("Episode {0}".format(i), end=": ")
print("total reward: {0}".format(totalReward), end = ", ")
print("moving avg reward: {0}".format(movingAvgReward), end = ", ")
print("epsilon: " + str(epsilon))
# # Plot the reward
plt.figure(0)
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.plot(np.array(rewardEp[:i]))
plt.plot(np.array(movingAvg[:i]), color = 'red')
plt.show()
After 411 episodes, here is the behavior of model
. The blue is the reward each episode, and the red is the average reward over the most recent 100 episodes.
Limitations (in further detail)
Though this model does excellently in training, it fails to generalize: the model has essentially learned how to play one specific Thrill Digger environment. It cannot play new Thrill Digger environments particularly well.
Thrill Digger, as I soon discovered, breaks one of the core principles of reinforcement learning. It is not a Markov decision process (MDP) for the following reasons:
- The environment is not fully observable. We only learn more about the environment as we choose squares in the game.
- Not only is the environment not fully observable, it is not static, making it extra difficult to learn how train a learner to play this game.
Next steps
This problem, I believe, is not an MDP but a Partially Observable Markov Decision Process (POMDP). I will be spending time (this summer, hopefully) learning deep reinforcement learning methods for solving POMDPs.
Acknowledgments
This project, though it is far from being finished, draws from material that I have learned through OMSCS via Computer Vision, Reinforcement Learning, and Deep Learning. I am thankful for the many people I have interacted with throughout these courses and appreciate that these courses have provided me a path toward solving this problem. And who knows, maybe this problem isn’t solvable at all. But regardless, I have learned a lot through this one problem that has followed me the last few years.