使用 Atari Breakout 进行 DQN 实验

该实验训练 Deep Q Network (DQN) 在 OpenAI Gym 上玩 Atari Breakout 游戏。它在多个进程上运行游戏环境以高效采样。

Open In Colab

15import numpy as np
16import torch
17
18from labml import tracker, experiment, logger, monit
19from labml.internal.configs.dynamic_hyperparam import FloatDynamicHyperParam
20from labml_helpers.schedule import Piecewise
21from labml_nn.rl.dqn import QFuncLoss
22from labml_nn.rl.dqn.model import Model
23from labml_nn.rl.dqn.replay_buffer import ReplayBuffer
24from labml_nn.rl.game import Worker

选择设备

27if torch.cuda.is_available():
28    device = torch.device("cuda:0")
29else:
30    device = torch.device("cpu")

将观测值从缩放[0, 255][0, 1]

33def obs_to_torch(obs: np.ndarray) -> torch.Tensor:
35    return torch.tensor(obs, dtype=torch.float32, device=device) / 255.

训练师

38class Trainer:
43    def __init__(self, *,
44                 updates: int, epochs: int,
45                 n_workers: int, worker_steps: int, mini_batch_size: int,
46                 update_target_model: int,
47                 learning_rate: FloatDynamicHyperParam,
48                 ):

工作人员人数

50        self.n_workers = n_workers

每次更新时采样的步骤

52        self.worker_steps = worker_steps

训练迭代次数

54        self.train_epochs = epochs

更新次数

57        self.updates = updates

用于训练的微型批次的大小

59        self.mini_batch_size = mini_batch_size

每 250 次更新一次目标网络

62        self.update_target_model = update_target_model

学习率

65        self.learning_rate = learning_rate

作为更新函数的探索

68        self.exploration_coefficient = Piecewise(
69            [
70                (0, 1.0),
71                (25_000, 0.1),
72                (self.updates / 2, 0.01)
73            ], outside_value=0.01)

作为更新函数的重播缓冲区

76        self.prioritized_replay_beta = Piecewise(
77            [
78                (0, 0.4),
79                (self.updates, 1)
80            ], outside_value=1)
使用@@

重播缓冲区。重播缓冲区的容量必须是 2 的幂。

83        self.replay_buffer = ReplayBuffer(2 ** 14, 0.6)

采样和训练模型

86        self.model = Model().to(device)

要获取的目标模型

88        self.target_model = Model().to(device)

创建工作人员

91        self.workers = [Worker(47 + i) for i in range(self.n_workers)]

初始化观测值的张量

94        self.obs = np.zeros((self.n_workers, 4, 84, 84), dtype=np.uint8)

重置工作人员

97        for worker in self.workers:
98            worker.child.send(("reset", None))

获得初步观测值

101        for i, worker in enumerate(self.workers):
102            self.obs[i] = worker.child.recv()

损失函数

105        self.loss_func = QFuncLoss(0.99)

优化者

107        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=2.5e-4)

-贪婪采样

在对@@

动作进行抽样时,我们使用-greedy策略,其中我们采取概率的贪婪动作,并随机采取概率动作。我们称之exploration_coefficient

109    def _sample_action(self, q_value: torch.Tensor, exploration_coefficient: float):

采样不需要渐变

119        with torch.no_grad():

采样具有最高 Q 值的动作。这是贪婪的行动。

121            greedy_action = torch.argmax(q_value, dim=-1)

统一采样和行动

123            random_action = torch.randint(q_value.shape[-1], greedy_action.shape, device=q_value.device)

选择贪婪动作还是随机动作

125            is_choose_rand = torch.rand(greedy_action.shape, device=q_value.device) < exploration_coefficient

根据以下内容选择操作is_choose_rand

127            return torch.where(is_choose_rand, random_action, greedy_action).cpu().numpy()

样本数据

129    def sample(self, exploration_coefficient: float):

这不需要渐变

133        with torch.no_grad():

样本worker_steps

135            for t in range(self.worker_steps):

获取当前观测值的 Q_Values

137                q_value = self.model(obs_to_torch(self.obs))

操作示例

139                actions = self._sample_action(q_value, exploration_coefficient)

对每个工作器运行采样操作

142                for w, worker in enumerate(self.workers):
143                    worker.child.send(("step", actions[w]))

收集每位员工的信息

146                for w, worker in enumerate(self.workers):

执行操作后获取结果

148                    next_obs, reward, done, info = worker.child.recv()

将过渡添加到重播缓冲区

151                    self.replay_buffer.add(self.obs[w], actions[w], reward, next_obs, done)

更新剧集信息。收集剧集信息,如果剧集结束则可用;这包括总奖励和剧集时长——看看Game 它是如何运作的。

157                    if info:
158                        tracker.add('reward', info['reward'])
159                        tracker.add('length', info['length'])

更新当前观测值

162                    self.obs[w] = next_obs

训练模型

164    def train(self, beta: float):
168        for _ in range(self.train_epochs):

来自优先级重播缓冲区的样本

170            samples = self.replay_buffer.sample(self.mini_batch_size, beta)

获取预测的 Q 值

172            q_value = self.model(obs_to_torch(samples['obs']))

获取 “双 Q 学习” 的下一个状态的 Q 值。梯度不应该为这些传播

176            with torch.no_grad():

得到

178                double_q_value = self.model(obs_to_torch(samples['next_obs']))

得到

180                target_q_value = self.target_model(obs_to_torch(samples['next_obs']))

计算时差 (TD) 误差和损失

183            td_errors, loss = self.loss_func(q_value,
184                                             q_value.new_tensor(samples['action']),
185                                             double_q_value, target_q_value,
186                                             q_value.new_tensor(samples['done']),
187                                             q_value.new_tensor(samples['reward']),
188                                             q_value.new_tensor(samples['weights']))

计算重播缓冲区的优先级

191            new_priorities = np.abs(td_errors.cpu().numpy()) + 1e-6

更新重播缓冲区优先级

193            self.replay_buffer.update_priorities(samples['indexes'], new_priorities)

设置学习速率

196            for pg in self.optimizer.param_groups:
197                pg['lr'] = self.learning_rate()

将先前计算的梯度归零

199            self.optimizer.zero_grad()

计算梯度

201            loss.backward()

剪辑渐变

203            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=0.5)

根据渐变更新参数

205            self.optimizer.step()

跑步训练循环

207    def run_training_loop(self):

最近 100 集信息

213        tracker.set_queue('reward', 100, True)
214        tracker.set_queue('length', 100, True)

最初复制到目标网络

217        self.target_model.load_state_dict(self.model.state_dict())
218
219        for update in monit.loop(self.updates):

,勘探分数

221            exploration = self.exploration_coefficient(update)
222            tracker.add('exploration', exploration)

用于优先重播

224            beta = self.prioritized_replay_beta(update)
225            tracker.add('beta', beta)

当前政策的示例

228            self.sample(exploration)

缓冲区满后开始训练

231            if self.replay_buffer.is_full():

训练模型

233                self.train(beta)

定期更新目标网络

236                if update % self.update_target_model == 0:
237                    self.target_model.load_state_dict(self.model.state_dict())

保存跟踪的指标。

240            tracker.save()

定期在屏幕上添加新行

242            if (update + 1) % 1_000 == 0:
243                logger.log()

摧毁

阻止工人

245    def destroy(self):
250        for worker in self.workers:
251            worker.child.send(("close", None))
254def main():

创建实验

256    experiment.create(name='dqn')

配置

259    configs = {

更新次数

261        'updates': 1_000_000,
使用@@

采样数据训练模型的周期数。

263        'epochs': 8,

工作进程数

265        'n_workers': 8,

单次更新的每个进程要运行的步骤数

267        'worker_steps': 4,

小批量

269        'mini_batch_size': 32,

目标模型更新间隔

271        'update_target_model': 250,

学习率。

273        'learning_rate': FloatDynamicHyperParam(1e-4, (0, 1e-3)),
274    }

配置

277    experiment.configs(configs)

初始化训练器

280    m = Trainer(**configs)

运行并监控实验

282    with experiment.start():
283        m.run_training_loop()

阻止工人

285    m.destroy()

运行它

289if __name__ == "__main__":
290    main()