9import multiprocessing
10import multiprocessing.connection
11
12import cv2
13import gym
14import numpy as np
This is a wrapper for OpenAI gym game environment. We do a few things here:
1. Apply the same action on four frames and get the last frame 2. Convert observation frames to gray and scale it to (84, 84) 3. Stack four frames of the last four actions 4. Add episode information (total reward for the entire episode) for monitoring 5. Restrict an episode to a single life (game has 5 lives, we reset after every single life)
Observation is tensor of size (4, 84, 84). It is four frames (images of the game screen) stacked on first axis. i.e, each channel is a frame.
17class Game:
38 def __init__(self, seed: int):
create environment
40 self.env = gym.make('BreakoutNoFrameskip-v4')
41 self.env.seed(seed)
tensor for a stack of 4 frames
44 self.obs_4 = np.zeros((4, 84, 84))
buffer to keep the maximum of last 2 frames
47 self.obs_2_max = np.zeros((2, 84, 84))
keep track of the episode rewards
50 self.rewards = []
and number of lives left
52 self.lives = 0
Executes action
for 4 time steps and returns a tuple of (observation, reward, done, episode_info).
observation
: stacked 4 frames (this frame and frames for last 3 actions) reward
: total reward while the action was executed done
: whether the episode finished (a life lost) episode_info
: episode information if completed54 def step(self, action):
66 reward = 0.
67 done = None
run for 4 steps
70 for i in range(4):
execute the action in the OpenAI Gym environment
72 obs, r, done, info = self.env.step(action)
73
74 if i >= 2:
75 self.obs_2_max[i % 2] = self._process_obs(obs)
76
77 reward += r
get number of lives left
80 lives = self.env.unwrapped.ale.lives()
reset if a life is lost
82 if lives < self.lives:
83 done = True
84 break
maintain rewards for each step
87 self.rewards.append(reward)
88
89 if done:
if finished, set episode information if episode is over, and reset
91 episode_info = {"reward": sum(self.rewards), "length": len(self.rewards)}
92 self.reset()
93 else:
94 episode_info = None
get the max of last two frames
97 obs = self.obs_2_max.max(axis=0)
push it to the stack of 4 frames
100 self.obs_4 = np.roll(self.obs_4, shift=-1, axis=0)
101 self.obs_4[-1] = obs
102
103 return self.obs_4, reward, done, episode_info
105 def reset(self):
reset OpenAI Gym environment
112 obs = self.env.reset()
reset caches
115 obs = self._process_obs(obs)
116 for i in range(4):
117 self.obs_4[i] = obs
118 self.rewards = []
119
120 self.lives = self.env.unwrapped.ale.lives()
121
122 return self.obs_4
124 @staticmethod
125 def _process_obs(obs):
130 obs = cv2.cvtColor(obs, cv2.COLOR_RGB2GRAY)
131 obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_AREA)
132 return obs
135def worker_process(remote: multiprocessing.connection.Connection, seed: int):
create game
143 game = Game(seed)
wait for instructions from the connection and execute them
146 while True:
147 cmd, data = remote.recv()
148 if cmd == "step":
149 remote.send(game.step(data))
150 elif cmd == "reset":
151 remote.send(game.reset())
152 elif cmd == "close":
153 remote.close()
154 break
155 else:
156 raise NotImplementedError
Creates a new worker and runs it in a separate process.
159class Worker:
164 def __init__(self, seed):
165 self.child, parent = multiprocessing.Pipe()
166 self.process = multiprocessing.Process(target=worker_process, args=(parent, seed))
167 self.process.start()