Deep Q Networks

Sources:

  1. DQN 2013 paper
  2. Reinforcement Learning Explained Visually (Part 5): Deep Q Networks, step-by-step by Ketan Doshi
  3. My github repo for DQN

# TODO

Recalling that Q Learning builds a Q-table that maps state and action pairs to Q-values. However, in a real-world scenario, the number of states could be huge, making it computationally intractable to build a table.

To address this limitation we use a Q-function rather than a Q-table.

Replay buffer

Replay buffer (or experience replay in the original paper) is used in DQN to make the samples i.i.d.

from collections import deque

class ReplayBuffer(object):
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity)

def push(self, state, action, reward, next_state, done):
# print("State shape:", state.shape)
# print("Next state shape:", next_state.shape)

state = np.expand_dims(state, 0)
next_state = np.expand_dims(next_state, 0)

self.buffer.append((state, action, reward, next_state, done))

def sample(self, batch_size):
state, action, reward, next_state, done = zip(*random.sample(self.buffer, batch_size))
return np.concatenate(state), action, reward, np.concatenate(next_state), done

def __len__(self):
return len(self.buffer)

Target network

Firstly, it is possible to build a DQN with a single Q Network and no Target Network. In that case, we do two passes through the Q Network, first to output the Predicted Q value, and then to output the Target Q value.

But that could create a potential problem. The Q Network’s weights get updated at each time step, which improves the prediction of the Predicted Q value. However, since the network and its weights are the same, it also changes the direction of our predicted Target Q values. They do not remain steady but can fluctuate after each update. This is like chasing a moving target.

By employing a second network that doesn’t get trained, we ensure that the Target Q values remain stable, at least for a short period. But those Target Q values are also predictions after all and we do want them to improve, so a compromise is made. After a pre-configured number of time-steps, the learned weights from the Q Network are copied over to the Target Network.

This is like EMA?

DQN

class DQN(nn.Module):
def __init__(self, num_inputs, num_actions, device='cuda'):
super(DQN, self).__init__()

self.device = device
self.num_inputs = num_inputs
self.num_actions = num_actions
self.layers = nn.Sequential(
nn.Linear(num_inputs, 128),
nn.ReLU(),
nn.Linear(128, 128),
nn.ReLU(),
nn.Linear(128, num_actions)
).to(self.device)

def forward(self, x):
return self.layers(x)

def act(self, state, epsilon):
if random.random() > epsilon:
with torch.no_grad(): # Ensures that no gradients are computed, which saves memory and computations
state = torch.FloatTensor(state).unsqueeze(0).to(self.device) # Convert state to tensor and add batch dimension
q_value = self.forward(state) # Get Q-values for all actions
action = q_value.max(1)[1].item() # Get the action with the maximum Q-value and convert to integer
else:
action = random.randrange(self.num_actions)
return action


Loss

def compute_td_loss(batch_size, replay_buffer, model, gamma, optimizer):
state, action, reward, next_state, done = replay_buffer.sample(batch_size)

# Convert numpy arrays to torch tensors
state = torch.FloatTensor(state).to(model.device)
next_state = torch.FloatTensor(next_state).to(model.device)
action = torch.LongTensor(action).to(model.device)
reward = torch.FloatTensor(reward).to(model.device)
done = torch.FloatTensor(done).to(model.device)

# Compute Q-values for current states
q_values = model(state)

# Compute Q-values for next states using no gradient computation to speed up and reduce memory usage
with torch.no_grad():
next_q_values = model(next_state)
next_q_value = next_q_values.max(1)[0] # Get the max Q-value along the action dimension

# Calculate the expected Q-values
expected_q_value = reward + gamma * next_q_value * (1 - done)

# Compute the loss between actual Q values and the expected Q values
q_value = q_values.gather(1, action.unsqueeze(1)).squeeze(1)
loss = (q_value - expected_q_value.detach()).pow(2).mean() # Detach expected_q_value to prevent gradients from flowing

# Backpropagation
optimizer.zero_grad()
loss.backward()
optimizer.step()

return loss

Training process

num_frames = 10000
batch_size = 32
gamma = 0.99

losses = []
all_rewards = []
episode_reward = 0

state, info = env.reset()
for frame_idx in range(1, num_frames + 1):
epsilon = epsilon_by_frame(frame_idx)
action = model.act(state, epsilon)

# print(f"Select action: {action}. type: {type(action)}")
next_state, reward, terminated, truncated, info = env.step(action)


replay_buffer.push(state, action, reward, next_state, terminated)

state = next_state
episode_reward += reward

if terminated:
state, info = env.reset()
all_rewards.append(episode_reward)
episode_reward = 0

if len(replay_buffer) > batch_size:
loss = compute_td_loss(batch_size, replay_buffer, model, gamma, optimizer)
losses.append(loss.data.item())


if frame_idx % 200 == 0:
plot(frame_idx, all_rewards, losses)