9import multiprocessing
10import multiprocessing.connection
11
12import cv2
13import gym
14import numpy as np
这是 OpenAI 健身游戏环境的封装。我们在这里做了几件事:
1.在四帧上应用相同的动作并获得最后一帧 2。将观测帧转换为灰色并将其缩放为 (84, 84) 3。堆叠最后四个动作中的四帧 4.添加剧集信息(整个剧集的总奖励)以进行监控 5.将一集限制为单人生命(游戏有 5 条生命,我们在每一次生命之后重置)
观测值是大小为张量(4、84、84)。它是堆叠在第一轴上的四个帧(游戏屏幕的图像)。也就是说,每个通道都是一个帧。
17class Game:
38 def __init__(self, seed: int):
创建环境
40 self.env = gym.make('BreakoutNoFrameskip-v4')
41 self.env.seed(seed)
4 帧堆栈的张量
44 self.obs_4 = np.zeros((4, 84, 84))
缓冲区以保持最后 2 帧的最大值
47 self.obs_2_max = np.zeros((2, 84, 84))
跟踪剧集奖励
50 self.rewards = []
和剩下的生命数量
52 self.lives = 0
执行 4 个action
时间步长并返回一个元组(观察、奖励、完成、episode_info)。
observation
: 堆叠 4 帧(此帧和最后 3 个动作的帧)reward
: 执行行动时的总奖励done
: 剧集是否结束(失去生命)episode_info
: 剧集信息(如果已完成)54 def step(self, action):
66 reward = 0.
67 done = None
跑 4 步
70 for i in range(4):
在 OpenAI Gym 环境中执行动作
72 obs, r, done, info = self.env.step(action)
73
74 if i >= 2:
75 self.obs_2_max[i % 2] = self._process_obs(obs)
76
77 reward += r
获得剩余的生命数
80 lives = self.env.unwrapped.ale.lives()
如果生命丧失则重置
82 if lives < self.lives:
83 done = True
84 break
保持每一步的奖励
87 self.rewards.append(reward)
88
89 if done:
如果已完成,则在剧集结束时设置剧集信息,然后重置
91 episode_info = {"reward": sum(self.rewards), "length": len(self.rewards)}
92 self.reset()
93 else:
94 episode_info = None
获得最后两帧的最大值
97 obs = self.obs_2_max.max(axis=0)
把它推到 4 帧的堆栈中
100 self.obs_4 = np.roll(self.obs_4, shift=-1, axis=0)
101 self.obs_4[-1] = obs
102
103 return self.obs_4, reward, done, episode_info
105 def reset(self):
重置 OpenAI 健身房环境
112 obs = self.env.reset()
重置缓存
115 obs = self._process_obs(obs)
116 for i in range(4):
117 self.obs_4[i] = obs
118 self.rewards = []
119
120 self.lives = self.env.unwrapped.ale.lives()
121
122 return self.obs_4
124 @staticmethod
125 def _process_obs(obs):
130 obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
131 obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
132 return obs
135def worker_process(remote: multiprocessing.connection.Connection, seed: int):
创建游戏
143 game = Game(seed)
等待来自连接的指令并执行它们
146 while True:
147 cmd, data = remote.recv()
148 if cmd == "step":
149 remote.send(game.step(data))
150 elif cmd == "reset":
151 remote.send(game.reset())
152 elif cmd == "close":
153 remote.close()
154 break
155 else:
156 raise NotImplementedError
创建一个新的 worker 并在单独的进程中运行它。
159class Worker:
164 def __init__(self, seed):
165 self.child, parent = multiprocessing.Pipe()
166 self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
167 self.process.start()