该实验训练 Deep Q Network (DQN) 在 OpenAI Gym 上玩 Atari Breakout 游戏。它在多个进程上运行游戏环境以高效采样。
15import numpy as np
16import torch
17
18from labml import tracker, experiment, logger, monit
19from labml.internal.configs.dynamic_hyperparam import FloatDynamicHyperParam
20from labml_helpers.schedule import Piecewise
21from labml_nn.rl.dqn import QFuncLoss
22from labml_nn.rl.dqn.model import Model
23from labml_nn.rl.dqn.replay_buffer import ReplayBuffer
24from labml_nn.rl.game import Worker
选择设备
27if torch.cuda.is_available():
28 device = torch.device("cuda:0")
29else:
30 device = torch.device("cpu")
将观测值从缩放[0, 255]
到[0, 1]
33def obs_to_torch(obs: np.ndarray) -> torch.Tensor:
35 return torch.tensor(obs, dtype=torch.float32, device=device) / 255.
38class Trainer:
43 def __init__(self, *,
44 updates: int, epochs: int,
45 n_workers: int, worker_steps: int, mini_batch_size: int,
46 update_target_model: int,
47 learning_rate: FloatDynamicHyperParam,
48 ):
工作人员人数
50 self.n_workers = n_workers
每次更新时采样的步骤
52 self.worker_steps = worker_steps
训练迭代次数
54 self.train_epochs = epochs
更新次数
57 self.updates = updates
用于训练的微型批次的大小
59 self.mini_batch_size = mini_batch_size
每 250 次更新一次目标网络
62 self.update_target_model = update_target_model
学习率
65 self.learning_rate = learning_rate
作为更新函数的探索
68 self.exploration_coefficient = Piecewise(
69 [
70 (0, 1.0),
71 (25_000, 0.1),
72 (self.updates / 2, 0.01)
73 ], outside_value=0.01)
作为更新函数的重播缓冲区
76 self.prioritized_replay_beta = Piecewise(
77 [
78 (0, 0.4),
79 (self.updates, 1)
80 ], outside_value=1)
重播缓冲区。重播缓冲区的容量必须是 2 的幂。
83 self.replay_buffer = ReplayBuffer(2 ** 14, 0.6)
采样和训练模型
86 self.model = Model().to(device)
要获取的目标模型
88 self.target_model = Model().to(device)
创建工作人员
91 self.workers = [Worker(47 + i) for i in range(self.n_workers)]
初始化观测值的张量
94 self.obs = np.zeros((self.n_workers, 4, 84, 84), dtype=np.uint8)
重置工作人员
97 for worker in self.workers:
98 worker.child.send(("reset", None))
获得初步观测值
101 for i, worker in enumerate(self.workers):
102 self.obs[i] = worker.child.recv()
损失函数
105 self.loss_func = QFuncLoss(0.99)
优化者
107 self.optimizer = torch.optim.Adam(self.model.parameters(), lr=2.5e-4)
109 def _sample_action(self, q_value: torch.Tensor, exploration_coefficient: float):
采样不需要渐变
119 with torch.no_grad():
采样具有最高 Q 值的动作。这是贪婪的行动。
121 greedy_action = torch.argmax(q_value, dim=-1)
统一采样和行动
123 random_action = torch.randint(q_value.shape[-1], greedy_action.shape, device=q_value.device)
选择贪婪动作还是随机动作
125 is_choose_rand = torch.rand(greedy_action.shape, device=q_value.device) < exploration_coefficient
根据以下内容选择操作is_choose_rand
127 return torch.where(is_choose_rand, random_action, greedy_action).cpu().numpy()
129 def sample(self, exploration_coefficient: float):
这不需要渐变
133 with torch.no_grad():
样本worker_steps
135 for t in range(self.worker_steps):
获取当前观测值的 Q_Values
137 q_value = self.model(obs_to_torch(self.obs))
操作示例
139 actions = self._sample_action(q_value, exploration_coefficient)
对每个工作器运行采样操作
142 for w, worker in enumerate(self.workers):
143 worker.child.send(("step", actions[w]))
收集每位员工的信息
146 for w, worker in enumerate(self.workers):
执行操作后获取结果
148 next_obs, reward, done, info = worker.child.recv()
将过渡添加到重播缓冲区
151 self.replay_buffer.add(self.obs[w], actions[w], reward, next_obs, done)
更新剧集信息。收集剧集信息,如果剧集结束则可用;这包括总奖励和剧集时长——看看Game
它是如何运作的。
157 if info:
158 tracker.add('reward', info['reward'])
159 tracker.add('length', info['length'])
更新当前观测值
162 self.obs[w] = next_obs
164 def train(self, beta: float):
168 for _ in range(self.train_epochs):
来自优先级重播缓冲区的样本
170 samples = self.replay_buffer.sample(self.mini_batch_size, beta)
获取预测的 Q 值
172 q_value = self.model(obs_to_torch(samples['obs']))
获取 “双 Q 学习” 的下一个状态的 Q 值。梯度不应该为这些传播
176 with torch.no_grad():
得到
178 double_q_value = self.model(obs_to_torch(samples['next_obs']))
得到
180 target_q_value = self.target_model(obs_to_torch(samples['next_obs']))
计算时差 (TD) 误差和损失。
183 td_errors, loss = self.loss_func(q_value,
184 q_value.new_tensor(samples['action']),
185 double_q_value, target_q_value,
186 q_value.new_tensor(samples['done']),
187 q_value.new_tensor(samples['reward']),
188 q_value.new_tensor(samples['weights']))
计算重播缓冲区的优先级
191 new_priorities = np.abs(td_errors.cpu().numpy()) + 1e-6
更新重播缓冲区优先级
193 self.replay_buffer.update_priorities(samples['indexes'], new_priorities)
设置学习速率
196 for pg in self.optimizer.param_groups:
197 pg['lr'] = self.learning_rate()
将先前计算的梯度归零
199 self.optimizer.zero_grad()
计算梯度
201 loss.backward()
剪辑渐变
203 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)
根据渐变更新参数
205 self.optimizer.step()
207 def run_training_loop(self):
最近 100 集信息
213 tracker.set_queue('reward', 100, True)
214 tracker.set_queue('length', 100, True)
最初复制到目标网络
217 self.target_model.load_state_dict(self.model.state_dict())
218
219 for update in monit.loop(self.updates):
,勘探分数
221 exploration = self.exploration_coefficient(update)
222 tracker.add('exploration', exploration)
用于优先重播
224 beta = self.prioritized_replay_beta(update)
225 tracker.add('beta', beta)
当前政策的示例
228 self.sample(exploration)
缓冲区满后开始训练
231 if self.replay_buffer.is_full():
训练模型
233 self.train(beta)
定期更新目标网络
236 if update % self.update_target_model == 0:
237 self.target_model.load_state_dict(self.model.state_dict())
保存跟踪的指标。
240 tracker.save()
定期在屏幕上添加新行
242 if (update + 1) % 1_000 == 0:
243 logger.log()
245 def destroy(self):
250 for worker in self.workers:
251 worker.child.send(("close", None))
254def main():
创建实验
256 experiment.create(name='dqn')
配置
259 configs = {
更新次数
261 'updates': 1_000_000,
采样数据训练模型的周期数。
263 'epochs': 8,
工作进程数
265 'n_workers': 8,
单次更新的每个进程要运行的步骤数
267 'worker_steps': 4,
小批量
269 'mini_batch_size': 32,
目标模型更新间隔
271 'update_target_model': 250,
学习率。
273 'learning_rate': FloatDynamicHyperParam(1e-4, (0, 1e-3)),
274 }
配置
277 experiment.configs(configs)
初始化训练器
280 m = Trainer(**configs)
运行并监控实验
282 with experiment.start():
283 m.run_training_loop()
阻止工人
285 m.destroy()
289if __name__ == "__main__":
290 main()