try:
import google.colab
= True
IN_COLAB except:
= False
IN_COLAB
if IN_COLAB:
!pip install -U gymnasium pygame moviepy
!pip install gymnasium[box2d]
Eligibility traces
import numpy as np
= np.random.default_rng()
rng import matplotlib.pyplot as plt
import os
# os.environ["IMAGEIO_FFMPEG_EXE"] = "/opt/homebrew/bin/ffmpeg" # if moviepy complains about not finding ffmpeg, put its path here
from IPython.display import clear_output
import gymnasium as gym
print("gym version:", gym.__version__)
import pygame
from moviepy.editor import ImageSequenceClip, ipython_display
class GymRecorder(object):
"""
Simple wrapper over moviepy to generate a .gif with the frames of a gym environment.
The environment must have the render_mode `rgb_array_list`.
"""
def __init__(self, env):
self.env = env
self._frames = []
def record(self, frames):
"To be called at the end of an episode."
for frame in frames:
self._frames.append(np.array(frame))
def make_video(self, filename):
"Generates the gif video."
= os.path.dirname(os.path.abspath(filename))
directory if not os.path.exists(directory):
os.mkdir(directory)self.clip = ImageSequenceClip(list(self._frames), fps=self.env.metadata["render_fps"])
self.clip.write_gif(filename, fps=self.env.metadata["render_fps"], loop=0)
del self._frames
self._frames = []
def running_average(x, N):
= np.ones(N) / N
kernel return np.convolve(x, kernel, mode='same')
gym version: 0.29.1
Q-learning in Gridworld
Random interaction with the environment
The goal of this exercise is to solve the Gridworld problem using Q-learning. The code is adapted from https://gymnasium.farama.org/tutorials/environment_creation/
The agent is represented by the blue circle: the state s of the agent is its position in the 5x5 grid, i.e. a number between 0 and 24.
The agent can move either to the left, right, top or bottom. When the agent tries to move outside of the environment, it stays at its current position. There are four actions a available, which are deterministic.
Its goal is to reach the green circle, while avoiding the red ones. Actions leading to the green circle receive a reward r of +100, actions leading to a red square receive a reward of -100. The episode ends in those states. All other actions have a reward of -1. An episode stops after 100 steps if a goal has not been reached.
class GridWorldEnv(gym.Env):
= {"render_modes": ["human", "rgb_array", "rgb_array_list"], "render_fps": 4}
metadata
def __init__(self, render_mode=None, size=5, rewards=[100, -100, -1]):
self.size = size # The size of the square grid
self.window_size = 512 # The size of the PyGame window
self.rewards = rewards
self._step = 0
# The state is the flattened (x, y) coordinate of the agent
self.observation_space = gym.spaces.Discrete(size**2)
# Goal location
self._target_location = np.array([3, 2], dtype=int)
self._distractor1_location = np.array([3, 1], dtype=int)
self._distractor2_location = np.array([2, 2], dtype=int)
# We have 4 actions, corresponding to "right", "up", "left", "down"
self.action_space = gym.spaces.Discrete(4)
self._action_to_direction = {
0: np.array([1, 0]), # right
1: np.array([0, 1]), # down
2: np.array([-1, 0]), # left
3: np.array([0, -1]), # up
}
assert render_mode is None or render_mode in self.metadata["render_modes"]
self.render_mode = render_mode
if self.render_mode == "rgb_array_list":
self._frames = []
self.window = None
self.clock = None
self.font = pygame.font.SysFont(None, 16)
self.Q = np.zeros((self.observation_space.n, self.action_space.n))
def _state2coordinates(self, state):
"Returns coordinates of a state."
return (state % self.size, int(state/self.size))
def _coordinate2state(self, coord):
"Returns the state with the coordinates."
return coord[1] * self.size + coord[0]
def reset(self, seed=None, options=None):
self._step = 0
# Initial location
self._agent_location = np.array([0, 0], dtype=int)
if self.render_mode == "human":
self._render_frame()
if self.render_mode == "rgb_array_list":
self._frames = []
self._render_frame()
return self._coordinate2state(self._agent_location), {}
def step(self, action):
# Map the action (element of {0,1,2,3}) to the direction we walk in
= self._action_to_direction[action]
direction
# We use `np.clip` to make sure we don't leave the grid
self._agent_location = np.clip(
self._agent_location + direction, 0, self.size - 1
)
# An episode is done if the agent has reached the target or the distractors
if np.array_equal(self._agent_location, self._target_location):
= True
terminal = self.rewards[0]
reward elif np.array_equal(self._agent_location, self._distractor1_location) \
or np.array_equal(self._agent_location, self._distractor2_location):
= True
terminal = self.rewards[1]
reward else:
= False
terminal = self.rewards[2]
reward
if self.render_mode == "human" or self.render_mode == "rgb_array_list":
self._render_frame()
self._step += 1
if self._step == 100:
= True
truncated else:
= False
truncated
return self._coordinate2state(self._agent_location), reward, terminal, truncated, {}
def render(self):
if self.render_mode == "rgb_array":
return self._render_frame()
elif self.render_mode == "rgb_array_list":
= self._frames.copy()
f self._frames = []
return f
def _render_frame(self):
if self.window is None and self.render_mode == "human":
pygame.init()
pygame.display.init()self.window = pygame.display.set_mode(
self.window_size, self.window_size)
(
)if self.clock is None and self.render_mode == "human":
self.clock = pygame.time.Clock()
= pygame.Surface((self.window_size, self.window_size))
canvas 255, 255, 255))
canvas.fill((= (
pix_square_size self.window_size / self.size
# The size of a single grid square in pixels
)
# First we draw the target and the distractors
pygame.draw.rect(
canvas,0, 255, 0),
(
pygame.Rect(* self._target_location,
pix_square_size
(pix_square_size, pix_square_size),
),
)
pygame.draw.rect(
canvas,255, 0, 0),
(
pygame.Rect(* self._distractor1_location,
pix_square_size
(pix_square_size, pix_square_size),
),
)
pygame.draw.rect(
canvas,255, 0, 0),
(
pygame.Rect(* self._distractor2_location,
pix_square_size
(pix_square_size, pix_square_size),
),
)
# Now we draw the agent
pygame.draw.circle(
canvas,0, 0, 255),
(self._agent_location + 0.5) * pix_square_size,
(/ 3,
pix_square_size
)
# Add some gridlines
for x in range(self.size + 1):
pygame.draw.line(
canvas,0,
0, pix_square_size * x),
(self.window_size, pix_square_size * x),
(=3,
width
)
pygame.draw.line(
canvas,0,
* x, 0),
(pix_square_size * x, self.window_size),
(pix_square_size =3,
width
)
# Print Q-values
for x in range(self.size):
for y in range(self.size):
= self._coordinate2state((x, y))
s
# Up
= f"{self.Q[s, 3]:+.2f}"
val = self.font.render(val, True, (0, 0, 0))
text
canvas.blit(text, + 0.5) * pix_square_size - 6,
((x * pix_square_size + 6)
(y)
)# Down
= f"{self.Q[s, 1]:+.2f}"
val = self.font.render(val, True, (0, 0, 0))
text
canvas.blit(text, + 0.5) * pix_square_size - 6,
((x +1) * pix_square_size - 12)
(y
)# Left
= f"{self.Q[s, 2]:+.2f}"
val = self.font.render(val, True, (0, 0, 0))
text
canvas.blit(text, * pix_square_size + 6,
((x) + 0.5) * pix_square_size - 6)
(y
)# Right
= f"{self.Q[s, 0]:+.2f}"
val = self.font.render(val, True, (0, 0, 0))
text
canvas.blit(text, + 1) * pix_square_size - 32,
((x + 0.5) * pix_square_size - 6)
(y
)
if self.render_mode == "human":
# The following line copies our drawings from `canvas` to the visible window
self.window.blit(canvas, canvas.get_rect())
pygame.event.pump()
pygame.display.update()
# We need to ensure that human-rendering occurs at the predefined framerate.
# The following line will automatically add a delay to keep the framerate stable.
self.clock.tick(self.metadata["render_fps"])
elif self.render_mode == "rgb_array":
return np.transpose(
=(1, 0, 2)
np.array(pygame.surfarray.pixels3d(canvas)), axes
)elif self.render_mode == "rgb_array_list":
= np.transpose(
array =(1, 0, 2)
np.array(pygame.surfarray.pixels3d(canvas)), axes
)self._frames.append(array)
def close(self):
if self.window is not None:
pygame.display.quit() pygame.quit()
class RandomAgent:
def __init__(self, env):
self.env = env
self.Q = np.zeros((self.env.observation_space.n, self.env.action_space.n))
def act(self, state):
"Selects an action randomly"
return self.env.action_space.sample()
def train(self, nb_episodes, recorder=None):
"Runs the agent on the environment for nb_episodes."
# Returns
= []
returns = []
steps
# Fixed number of episodes
for episode in range(nb_episodes):
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps
# Store rewards
= 0.0
return_episode
# Sample the episode
while not done:
# Select an action
= self.act(state)
action
# Perform the action
= self.env.step(action)
next_state, reward, terminal, truncated, info
# Append reward
+= reward
return_episode
# Go in the next state
= next_state
state
# Increment time
+= 1
nb_steps
# Terminal state
= terminal or truncated
done
# Pass the Q table to the GUI
self.env.Q = self.Q
# Store info
returns.append(return_episode)
steps.append(nb_steps)
return returns, steps
# Create the environment
= GridWorldEnv(render_mode='human')
env
# Create the agent
= RandomAgent(env)
agent
# Perform random episodes
= agent.train(2) returns, steps
Q: Adapt your Q-learning agent from last exercise to the problem. The main difference is the call to self.env.Q = self.Q
so that the GUI displays the Q-values, the rest is similar. Train it for 100 episodes with the right hyperparameters and without rendering.
class QLearningAgent:
"""
Q-learning agent.
"""
def __init__(self, env, gamma, exploration, decay, alpha):
"""
:param env: gym-like environment
:param gamma: discount factor
:param exploration: exploration parameter
:param decay: exploration decay parameter
:param alpha: learning rate
"""
self.env = env
self.gamma = gamma
self.exploration = exploration
self.decay = decay
self.alpha = alpha
# Q_table
self.Q = np.zeros([self.env.observation_space.n, self.env.action_space.n])
def act(self, state):
"Returns an action using epsilon-greedy action selection."
= rng.choice(np.where(self.Q[state, :] == self.Q[state, :].max())[0])
action
if rng.random() < self.exploration:
= self.env.action_space.sample()
action
return action
def update(self, state, action, reward, next_state, done):
"Updates the agent using a single transition."
# Bellman target
= reward
target
if not done:
+= self.gamma * self.Q[next_state, :].max()
target
# Update the Q-value
self.Q[state, action] += self.alpha * (target - self.Q[state, action])
# Decay exploration parameter
self.exploration = self.exploration * (1 - self.decay)
def train(self, nb_episodes, recorder=None):
"Runs the agent on the environment for nb_episodes."
# Returns
= []
returns = []
steps
# Fixed number of episodes
for episode in range(nb_episodes):
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps
# Store rewards
= 0.0
return_episode
# Sample the episode
while not done:
# Select an action
= self.act(state)
action
# Perform the action
= self.env.step(action)
next_state, reward, terminal, truncated, info
# Terminal state
= terminal or truncated
done
# Append reward
+= reward
return_episode
# Learn from the transition
self.update(state, action, reward, next_state, done)
# Go in the next state
= next_state
state
# Increment time
+= 1
nb_steps
# Pass the Q table to the GUI
self.env.Q = self.Q
# Store info
returns.append(return_episode)
steps.append(nb_steps)
if recorder is not None:
self.env.render())
recorder.record(
return returns, steps
def test(self, recorder=None):
"Performs a test episode without exploration."
= self.epsilon
previous_epsilon self.epsilon = 0.0
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps = 0
return_episode
# Sample the episode
while not done:
= self.act(state)
action = self.env.step(action)
next_state, reward, done, info += reward
return_episode = next_state
state += 1
nb_steps
self.epsilon = previous_epsilon
if recorder is not None:
self.env.render())
recorder.record(
return return_episode, nb_steps
# Parameters
= 0.99
gamma = 0.1
epsilon = 0
decay_epsilon = 0.1
alpha = 100
nb_episodes
# Create the environment
= GridWorldEnv(render_mode=None)
env
# Create the agent
= QLearningAgent(env, gamma, epsilon, decay_epsilon, alpha)
agent
# Train the agent
= agent.train(nb_episodes)
returns, steps
=(12, 5))
plt.figure(figsize121)
plt.subplot(
plt.plot(returns)"Episodes")
plt.xlabel("Return")
plt.ylabel(122)
plt.subplot(
plt.plot(steps)"Episodes")
plt.xlabel("Steps")
plt.ylabel( plt.show()
Q: Train a Q-learning agent with rendering on. Observe in particular which Q-values are updated when the agent reaches the target. Is it efficient?
# Parameters
= 0.99
gamma = 0.1
epsilon = 0
decay_epsilon = 0.1
alpha = 10
nb_episodes
# Create the environment
= GridWorldEnv(render_mode='human')
env
# Create the agent
= QLearningAgent(env, gamma, epsilon, decay_epsilon, alpha)
agent
# Train the agent
= agent.train(nb_episodes) returns, steps
Q: Modify your agent so that it uses softmax action selection, with a temperature \tau = 1.0 and a suitable decay. What does it change?
If you have time, write a generic class for the Q-learning agent where you can select the action selection method flexibly.
class SoftQLearningAgent(QLearningAgent):
"""
Q-learning agent with softmax or e-greedy AS.
"""
def __init__(self, env, gamma, action_selection, alpha):
"""
:param env: gym-like environment
:param gamma: discount factor
:param action selection: exploration mechanism
:param alpha: learning rate
"""
self.action_selection = action_selection
super().__init__(env, gamma, action_selection['param'], action_selection['decay'], alpha)
def act(self, state):
"Returns an action using epsilon-greedy or softmax action selection."
if self.action_selection['type'] == "egreedy":
# epsilon-greedy
if rng.uniform(0, 1, 1) < self.exploration:
= self.env.action_space.sample()
action else:
= rng.choice(np.where(self.Q[state, :] == self.Q[state, :].max())[0])
action else:
# softmax
= np.exp((self.Q[state, :] - self.Q[state, :].max())/self.exploration)
logits = logits / np.sum(logits)
probas = rng.choice(range(4), p=probas)
action
return action
# Parameters
= 0.99
gamma #action_selection = {'type': "egreedy", "param": 0.1, "decay": 0.0}
= {'type': "softmax", "param": 1.0, "decay": 0.0}
action_selection = 0.1
alpha = 100
nb_episodes
# Create the environment
= GridWorldEnv(render_mode=None)
env
# Create the agent
= SoftQLearningAgent(env, gamma, action_selection, alpha)
agent
# Train the agent
= agent.train(nb_episodes)
returns, steps
=(12, 5))
plt.figure(figsize121)
plt.subplot(
plt.plot(returns)"Episodes")
plt.xlabel("Return")
plt.ylabel(122)
plt.subplot(
plt.plot(steps)"Episodes")
plt.xlabel("Steps")
plt.ylabel( plt.show()
A: The agent explores much less at the end of training, as the difference between the Q-values becomes high enough to become greedy. In particular, it quickly stops to go to the red squares. In this environment, there is no real need to decay tau.
Eligibility traces
The main drawback of Q-learning is that it needs many episodes to converge (sample complexity).
One way to speed up learning is to use eligibility traces, one per state-action pair:
= np.zeros((nb_states, nb_actions)) traces
After each transition (s_t, a_t), Q(\lambda) updates a trace e(s_t, a_t) and modifies all Q-values as:
- The trace of the last transition is incremented from 1:
e(s_t, a_t) = e(s_t, a_t) +1
- Q(\lambda)-learning is applied on ALL Q-values, using the TD error at time t:
Q(s, a) = Q(s, a) + \alpha \, (r_{t+1} + \gamma \, \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t)) \, e(s, a)
- All traces are exponentially decreased using the trace parameter \lambda (e.g. 0.7):
e(s, a) = \lambda \, \gamma \, e(s, a)
All traces are reset to 0 at the beginning of an episode.
Q: Implement eligibility traces in your Q(\lambda)-learning agent and see if it improves convergence. Train it with rendering on and observe how all Q-values are updated.
class QLambdaLearningAgent(SoftQLearningAgent):
"""
Q(lambda)-learning agent with softmax or e-greedy AS and eligibility traces.
"""
def __init__(self, env, gamma, lbda, action_selection, alpha):
"""
:param env: gym-like environment
:param gamma: discount factor
:param lbda: trace
:param action selection: exploration mechanism
:param alpha: learning rate
"""
self.lbda = lbda
# Traces
self.traces = np.zeros([env.observation_space.n, env.action_space.n])
super().__init__(env, gamma, action_selection, alpha)
def update(self, state, action, reward, next_state, done):
# Bellman target
= reward
target if not done:
+= self.gamma * self.Q[next_state, :].max()
target
# Update ALL Q-values
self.Q += self.alpha * (target - self.Q[state, action]) * self.traces
# Decay exploration parameter
self.exploration = self.exploration * (1 - self.decay)
def train(self, nb_episodes, recorder=None):
"Runs the agent on the environment for nb_episodes."
# Returns
= []
returns = []
steps
# Fixed number of episodes
for episode in range(nb_episodes):
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps
# Reset traces
self.traces *= 0.0
# Store rewards
= 0.0
return_episode
# Sample the episode
while not done:
# Select an action
= self.act(state)
action
# Perform the action
= self.env.step(action)
next_state, reward, terminal, truncated, info
# Terminal state
= terminal or truncated
done
# Update return
+= reward
return_episode
# Increment trace
self.traces[state, action] += 1
# Learn from the transition
self.update(state, action, reward, next_state, done)
# Update all traces
self.traces *= self.gamma * self.lbda
# Go in the next state
= next_state
state
# Increment time
+= 1
nb_steps
# Pass the Q table to the GUI
self.env.Q = self.Q
# Store info
returns.append(return_episode)
steps.append(nb_steps)
if recorder is not None:
self.env.render())
recorder.record(
return returns, steps
# Parameters
= 0.99
gamma = 0.7
lbda #action_selection = {'type': "egreedy", "param": 0.1, "decay": 0.0}
= {'type': "softmax", "param": 1.0, "decay": 0.0}
action_selection = 0.1
alpha = 100
nb_episodes
# Create the environment
= GridWorldEnv()
env
# Create the agent
= QLambdaLearningAgent(env, gamma, lbda, action_selection, alpha)
agent
# Train the agent
= agent.train(nb_episodes)
returns, steps
=(12, 5))
plt.figure(figsize121)
plt.subplot(
plt.plot(returns)"Episodes")
plt.xlabel("Return")
plt.ylabel(122)
plt.subplot(
plt.plot(steps)"Episodes")
plt.xlabel("Steps")
plt.ylabel( plt.show()
# Parameters
= 0.99
gamma = 0.7
lbda #action_selection = {'type': "egreedy", "param": 0.1, "decay": 0.0}
= {'type': "softmax", "param": 1.0, "decay": 0.0}
action_selection = 0.1
alpha = 10
nb_episodes
# Create the environment
= GridWorldEnv(render_mode='human')
env
# Create the agent
= QLambdaLearningAgent(env, gamma, lbda, action_selection, alpha)
agent
# Train the agent
= agent.train(nb_episodes) returns
Q: Vary the trace parameter \lambda and discuss its influence.
# Parameters
= 0.99
gamma #action_selection = {'type': "egreedy", "param": 0.1, "decay": 0.0}
= {'type': "softmax", "param": 1.0, "decay": 0.0}
action_selection = 0.1
alpha = 100
nb_episodes
= []
list_returns
for lbda in np.linspace(0.1, 1.0, 10):
# Create the environment
= GridWorldEnv()
env
# Create the agent
= QLambdaLearningAgent(env, gamma, lbda, action_selection, alpha)
agent
# Train the agent
= agent.train(nb_episodes)
returns, steps
list_returns.append(returns)
=(12, 6))
plt.figure(figsizefor idx, lbda in enumerate(np.linspace(0.1, 1.0, 10)):
=str(lbda))
plt.plot(list_returns[idx], label
plt.legend() plt.show()
A: \lambda should not be too high nor too low in order to speed up learning. It controls the bias/variance trade-off.
Q: Increase the size of Gridworld to 100x100 and observe how long it takes to learn the optimal strategy using eligibility traces or not.
= GridWorldEnv(size=100) env
Comment on the curse of dimensionality and the interest of tabular RL for complex tasks with large state spaces and sparse rewards (e.g. robotics).
# Parameters
= 0.99
gamma = 0.7
lbda #action_selection = {'type': "egreedy", "param": 0.1, "decay": 0.0}
= {'type': "softmax", "param": 1.0, "decay": 0.0}
action_selection = 0.1
alpha = 100
nb_episodes
# Create the environment
= GridWorldEnv(size=100)
env
# Create the agent
= SoftQLearningAgent(env, gamma, action_selection, alpha)
agent #agent = QLambdaLearningAgent(env, gamma, lbda, action_selection, alpha)
# Train the agent
= agent.train(nb_episodes)
returns, steps
=(12, 5))
plt.figure(figsize121)
plt.subplot(
plt.plot(returns)"Episodes")
plt.xlabel("Return")
plt.ylabel(122)
plt.subplot(
plt.plot(steps)"Episodes")
plt.xlabel("Steps")
plt.ylabel( plt.show()
A: When the Gridworld is too big, the likelihood to hit the target per chance when exploring is very low. There are a lot of unsuccessful trials before learning starts to occur. But it happens after a while.