forked from PaddlePaddle/PARL
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtrain.py
217 lines (176 loc) · 7.71 KB
/
train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gym
import os
import parl
import numpy as np
import threading
import utils
from es import ES
from obs_filter import MeanStdFilter
from mujoco_agent import MujocoAgent
from mujoco_model import MujocoModel
from noise import SharedNoiseTable
from parl.utils import logger, summary
from parl.utils.window_stat import WindowStat
from six.moves import queue
from actor import Actor
class Learner(object):
def __init__(self, config):
self.config = config
env = gym.make(self.config['env_name'])
self.config['obs_dim'] = env.observation_space.shape[0]
self.config['act_dim'] = env.action_space.shape[0]
self.obs_filter = MeanStdFilter(self.config['obs_dim'])
self.noise = SharedNoiseTable(self.config['noise_size'])
model = MujocoModel(self.config['act_dim'])
algorithm = ES(model)
self.agent = MujocoAgent(algorithm, self.config)
self.latest_flat_weights = self.agent.get_flat_weights()
self.latest_obs_filter = self.obs_filter.as_serializable()
self.sample_total_episodes = 0
self.sample_total_steps = 0
self.actors_signal_input_queues = []
self.actors_output_queues = []
self.create_actors()
self.eval_rewards_stat = WindowStat(self.config['report_window_size'])
self.eval_lengths_stat = WindowStat(self.config['report_window_size'])
def create_actors(self):
""" create actors for parallel training.
"""
parl.connect(self.config['master_address'])
self.remote_count = 0
for i in range(self.config['actor_num']):
signal_queue = queue.Queue()
output_queue = queue.Queue()
self.actors_signal_input_queues.append(signal_queue)
self.actors_output_queues.append(output_queue)
self.remote_count += 1
remote_thread = threading.Thread(
target=self.run_remote_sample,
args=(signal_queue, output_queue))
remote_thread.setDaemon(True)
remote_thread.start()
logger.info('All remote actors are ready, begin to learn.')
def run_remote_sample(self, signal_queue, output_queue):
""" Sample data from remote actor or get filters of remote actor.
"""
remote_actor = Actor(self.config)
while True:
info = signal_queue.get()
if info['signal'] == 'sample':
result = remote_actor.sample(self.latest_flat_weights)
output_queue.put(result)
elif info['signal'] == 'get_filter':
actor_filter = remote_actor.get_filter(flush_after=True)
output_queue.put(actor_filter)
elif info['signal'] == 'set_filter':
remote_actor.set_filter(self.latest_obs_filter)
else:
raise NotImplementedError
def step(self):
"""Run a step in ES.
1. kick off all actors to synchronize weights and sample data;
2. update parameters of the model based on sampled data.
3. update global observation filter based on local filters of all actors, and synchronize global
filter to all actors.
"""
num_episodes, num_timesteps = 0, 0
results = []
while num_episodes < self.config['min_episodes_per_batch'] or \
num_timesteps < self.config['min_steps_per_batch']:
# Send sample signal to all actors
for q in self.actors_signal_input_queues:
q.put({'signal': 'sample'})
# Collect results from all actors
for q in self.actors_output_queues:
result = q.get()
results.append(result)
# result['noisy_lengths'] is a list of lists, where the inner lists have length 2.
num_episodes += sum(
len(pair) for pair in result['noisy_lengths'])
num_timesteps += sum(
sum(pair) for pair in result['noisy_lengths'])
all_noise_indices = []
all_training_rewards = []
all_training_lengths = []
all_eval_rewards = []
all_eval_lengths = []
for result in results:
all_eval_rewards.extend(result['eval_rewards'])
all_eval_lengths.extend(result['eval_lengths'])
all_noise_indices.extend(result['noise_indices'])
all_training_rewards.extend(result['noisy_rewards'])
all_training_lengths.extend(result['noisy_lengths'])
assert len(all_eval_rewards) == len(all_eval_lengths)
assert (len(all_noise_indices) == len(all_training_rewards) ==
len(all_training_lengths))
self.sample_total_episodes += num_episodes
self.sample_total_steps += num_timesteps
eval_rewards = np.array(all_eval_rewards)
eval_lengths = np.array(all_eval_lengths)
noise_indices = np.array(all_noise_indices)
noisy_rewards = np.array(all_training_rewards)
noisy_lengths = np.array(all_training_lengths)
# normalize rewards to (-0.5, 0.5)
proc_noisy_rewards = utils.compute_centered_ranks(noisy_rewards)
noises = [
self.noise.get(index, self.agent.weights_total_size)
for index in noise_indices
]
# Update the parameters of the model.
self.agent.learn(proc_noisy_rewards, noises)
self.latest_flat_weights = self.agent.get_flat_weights()
# Update obs filter
self._update_filter()
# Store the evaluate rewards
if len(all_eval_rewards) > 0:
self.eval_rewards_stat.add(np.mean(eval_rewards))
self.eval_lengths_stat.add(np.mean(eval_lengths))
metrics = {
"episodes_this_iter": noisy_lengths.size,
"sample_total_episodes": self.sample_total_episodes,
'sample_total_steps': self.sample_total_steps,
"evaluate_rewards_mean": self.eval_rewards_stat.mean,
"evaluate_steps_mean": self.eval_lengths_stat.mean,
"timesteps_this_iter": noisy_lengths.sum(),
}
self.log_metrics(metrics)
return metrics
def _update_filter(self):
# Send get_filter signal to all actors
for q in self.actors_signal_input_queues:
q.put({'signal': 'get_filter'})
filters = []
# Collect filters from all actors and update global filter
for q in self.actors_output_queues:
actor_filter = q.get()
self.obs_filter.apply_changes(actor_filter)
# Send set_filter signal to all actors
self.latest_obs_filter = self.obs_filter.as_serializable()
for q in self.actors_signal_input_queues:
q.put({'signal': 'set_filter'})
def log_metrics(self, metrics):
logger.info(metrics)
for k, v in metrics.items():
if v is not None:
summary.add_scalar(k, v, self.sample_total_steps)
if __name__ == '__main__':
from es_config import config
logger.info(
"Before training, it takes a few mimutes to initialize a noise table for exploration"
)
learner = Learner(config)
while True:
learner.step()