try:
import google.colab
= True
IN_COLAB except:
= False
IN_COLAB
if IN_COLAB:
!pip install -U gymnasium pygame moviepy
!pip install gymnasium[box2d]
Q-learning
import numpy as np
= np.random.default_rng()
rng import matplotlib.pyplot as plt
import os
import gymnasium as gym
print("gym version:", gym.__version__)
from moviepy.editor import ImageSequenceClip, ipython_display
class GymRecorder(object):
"""
Simple wrapper over moviepy to generate a .gif with the frames of a gym environment.
The environment must have the render_mode `rgb_array_list`.
"""
def __init__(self, env):
self.env = env
self._frames = []
def record(self, frames):
"To be called at the end of an episode."
for frame in frames:
self._frames.append(np.array(frame))
def make_video(self, filename):
"Generates the gif video."
= os.path.dirname(os.path.abspath(filename))
directory if not os.path.exists(directory):
os.mkdir(directory)self.clip = ImageSequenceClip(list(self._frames), fps=self.env.metadata["render_fps"])
self.clip.write_gif(filename, fps=self.env.metadata["render_fps"], loop=0)
del self._frames
self._frames = []
def running_average(x, N):
= np.ones(N) / N
kernel return np.convolve(x, kernel, mode='same')
gym version: 0.26.3
In this short exercise, we are going to apply Q-learning on the Taxi environment used last time for MC control.
As a reminder, Q-learning updates the Q-value of a state-action pair after each transition, using the update rule:
\Delta Q(s_t, a_t) = \alpha \, (r_{t+1} + \gamma \, \max_{a'} \, Q(s_{t+1}, a') - Q(s_t, a_t))
Q: Update the class you designed for online MC in the last exercise so that it implements Q-learning.
The main difference is that the update()
method has to be called after each step of the episode, not at the end. It simplifies a lot the code too (no need to iterate backwards on the episode).
You can use the following parameters at the beginning, but feel free to change them:
- Discount factor \gamma = 0.9.
- Learning rate \alpha = 0.1.
- Epsilon-greedy action selection, with an initial exploration parameter of 1.0 and an exponential decay of 10^{-5} after each update (i.e. every step!).
- A total number of episodes of 20000.
Keep the general structure of the class: train()
for the main loop, test()
to run one episode without exploration, etc.
Plot the training and test performance in the end and render the learned deterministic policy for one episode.
Note: if s_{t+1} is terminal (done
is true after the transition), the target should not be r_{t+1} + \gamma \, \max_{a'} \, Q(s_{t+1}, a'), but simply r_{t+1} as there is no next action.
class QLearningAgent:
"""
Q-learning agent.
"""
def __init__(self, env, gamma, epsilon, decay_epsilon, alpha):
"""
:param env: gym-like environment
:param gamma: discount factor
:param epsilon: exploration parameter
:param decay_epsilon: exploration decay parameter
:param alpha: learning rate
"""
self.env = env
self.gamma = gamma
self.epsilon = epsilon
self.decay_epsilon = decay_epsilon
self.alpha = alpha
# Q_table
self.Q = np.zeros([self.env.observation_space.n, self.env.action_space.n])
def act(self, state):
"Returns an action using epsilon-greedy action selection."
= rng.choice(np.where(self.Q[state, :] == self.Q[state, :].max())[0])
action
if rng.random() < self.epsilon:
= self.env.action_space.sample()
action
return action
def update(self, state, action, reward, next_state, done):
"Updates the agent using a single transition."
# Bellman target
= reward
target
if not done:
+= self.gamma * self.Q[next_state, :].max()
target
# Update the Q-value
self.Q[state, action] += self.alpha * (target - self.Q[state, action])
# Decay epsilon
self.epsilon = self.epsilon * (1 - self.decay_epsilon)
def train(self, nb_episodes, recorder=None):
"Runs the agent on the environment for nb_episodes. Returns the list of obtained returns."
# Returns
= []
returns = []
steps
# Fixed number of episodes
for episode in range(nb_episodes):
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps
# Store rewards
= 0.0
return_episode
# Sample the episode
while not done:
# Select an action
= self.act(state)
action
# Perform the action
= self.env.step(action)
next_state, reward, terminal, truncated, info
# End of the episode
= terminal or truncated
done
# Learn from the transition
self.update(state, action, reward, next_state, done)
# Go in the next state
= next_state
state
# Increment time
+= 1
nb_steps += reward
return_episode
# Record at the end of the episode
if recorder is not None and episode == nb_episodes -1:
self.env.render())
recorder.record(
# Store info
returns.append(return_episode)
steps.append(nb_steps)
return returns, steps
def test(self, recorder=None):
"Performs a test episode without exploration."
= self.epsilon
previous_epsilon self.epsilon = 0.0
# Reset
= self.env.reset()
state, info = False
done = 0
nb_steps = 0
return_episode
# Sample the episode
while not done:
= self.act(state)
action = self.env.step(action)
next_state, reward, terminal, truncated, info = terminal or truncated
done += reward
return_episode = next_state
state += 1
nb_steps
self.epsilon = previous_epsilon
if recorder is not None:
self.env.render())
recorder.record(
return return_episode, nb_steps
# Parameters
= 0.9
gamma = 1.0
epsilon = 1e-5
decay_epsilon = 0.1
alpha = 20000
nb_episodes
# Create the environment
= gym.make("Taxi-v3")
env
# Create the agent
= QLearningAgent(env, gamma, epsilon, decay_epsilon, alpha)
agent
# Train the agent
= agent.train(nb_episodes)
returns, steps
# Plot training returns
=(15, 6))
plt.figure(figsize121)
plt.subplot(
plt.plot(returns)1000))
plt.plot(running_average(returns, "Episodes")
plt.xlabel("Returns")
plt.ylabel(122)
plt.subplot(
plt.plot(steps)1000))
plt.plot(running_average(steps, "Episodes")
plt.xlabel("steps")
plt.ylabel( plt.show()
# Test the agent for 1000 episodes
= []
test_returns = []
test_steps for episode in range(1000):
= agent.test()
return_episode, nb_steps
test_returns.append(return_episode)
test_steps.append(nb_steps)print("Test performance", np.mean(test_returns))
=(15, 6))
plt.figure(figsize121)
plt.subplot(
plt.hist(test_returns)"Returns")
plt.xlabel(122)
plt.subplot(
plt.hist(test_steps)"Number of steps")
plt.xlabel( plt.show()
Test performance 7.9
= gym.make("Taxi-v3", render_mode="rgb_array_list")
env = GymRecorder(env)
recorder = env
agent.env
= agent.test(recorder)
return_episode, nb_steps
= "videos/taxi-trained-td.gif"
video
recorder.make_video(video)=0, autoplay=1) ipython_display(video, loop
MoviePy - Building file videos/taxi-trained-td.gif with imageio.