From bb6f4fb17ced07e49181daad54b4a525c41fcfa1 Mon Sep 17 00:00:00 2001 From: erwincoumans Date: Sat, 24 Nov 2018 11:54:36 -0800 Subject: [PATCH] improve the ARS implementation: add multiprocessing Gym environment stepping, add command-line parameters to resume a policy, --render, --movie, --steps, --env --- .../pybullet/gym/pybullet_envs/ARS/ars.py | 191 +++++++++++++++--- 1 file changed, 160 insertions(+), 31 deletions(-) diff --git a/examples/pybullet/gym/pybullet_envs/ARS/ars.py b/examples/pybullet/gym/pybullet_envs/ARS/ars.py index b4c0c3127..92189d5d5 100644 --- a/examples/pybullet/gym/pybullet_envs/ARS/ars.py +++ b/examples/pybullet/gym/pybullet_envs/ARS/ars.py @@ -6,13 +6,17 @@ import numpy as np import gym from gym import wrappers import pybullet_envs +import time +import multiprocessing as mp +from multiprocessing import Process, Pipe +import argparse + # Setting the Hyper Parameters - class Hp(): def __init__(self): - self.nb_steps = 1000 + self.nb_steps = 10000 self.episode_length = 1000 self.learning_rate = 0.02 self.nb_directions = 16 @@ -22,6 +26,58 @@ class Hp(): self.seed = 1 self.env_name = 'HalfCheetahBulletEnv-v0' + +# Multiprocess Exploring the policy on one specific direction and over one episode + +_RESET = 1 +_CLOSE = 2 +_EXPLORE = 3 + +def ExploreWorker(rank,childPipe, envname, args): + env = gym.make(envname) + nb_inputs = env.observation_space.shape[0] + normalizer = Normalizer(nb_inputs) + observation_n = env.reset() + n=0 + while True: + n+=1 + try: + # Only block for short times to have keyboard exceptions be raised. + if not childPipe.poll(0.001): + continue + message, payload = childPipe.recv() + except (EOFError, KeyboardInterrupt): + break + if message == _RESET: + observation_n = env.reset() + childPipe.send(["reset ok"]) + continue + if message == _EXPLORE: + #normalizer = payload[0] #use our local normalizer + policy = payload[1] + hp = payload[2] + direction = payload[3] + delta = payload[4] + state = env.reset() + done = False + num_plays = 0. + sum_rewards = 0 + while not done and num_plays < hp.episode_length: + normalizer.observe(state) + state = normalizer.normalize(state) + action = policy.evaluate(state, delta, direction,hp) + state, reward, done, _ = env.step(action) + reward = max(min(reward, 1), -1) + sum_rewards += reward + num_plays += 1 + childPipe.send([sum_rewards]) + continue + if message == _CLOSE: + childPipe.send(["close ok"]) + break + childPipe.close() + + # Normalizing the states class Normalizer(): @@ -47,11 +103,14 @@ class Normalizer(): # Building the AI class Policy(): - - def __init__(self, input_size, output_size): - self.theta = np.zeros((output_size, input_size)) - print("self.theta=",self.theta) - def evaluate(self, input, delta = None, direction = None): + def __init__(self, input_size, output_size, env_name, args): + try: + self.theta = np.load(args.policy) + except: + self.theta = np.zeros((output_size, input_size)) + self.env_name = env_name + print("Starting policy theta=",self.theta) + def evaluate(self, input, delta, direction, hp): if direction is None: return np.clip(self.theta.dot(input), -1.0, 1.0) elif direction == "positive": @@ -62,15 +121,18 @@ class Policy(): def sample_deltas(self): return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)] - def update(self, rollouts, sigma_r): + def update(self, rollouts, sigma_r, args): step = np.zeros(self.theta.shape) for r_pos, r_neg, d in rollouts: step += (r_pos - r_neg) * d self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step + timestr = time.strftime("%Y%m%d-%H%M%S") + np.save(args.logdir+"/policy_"+self.env_name+"_"+timestr+".npy", self.theta) + # Exploring the policy on one specific direction and over one episode -def explore(env, normalizer, policy, direction = None, delta = None): +def explore(env, normalizer, policy, direction, delta, hp): state = env.reset() done = False num_plays = 0. @@ -78,7 +140,7 @@ def explore(env, normalizer, policy, direction = None, delta = None): while not done and num_plays < hp.episode_length: normalizer.observe(state) state = normalizer.normalize(state) - action = policy.evaluate(state, delta, direction) + action = policy.evaluate(state, delta, direction, hp) state, reward, done, _ = env.step(action) reward = max(min(reward, 1), -1) sum_rewards += reward @@ -87,7 +149,7 @@ def explore(env, normalizer, policy, direction = None, delta = None): # Training the AI -def train(env, policy, normalizer, hp): +def train(env, policy, normalizer, hp, parentPipes, args): for step in range(hp.nb_steps): @@ -96,13 +158,29 @@ def train(env, policy, normalizer, hp): positive_rewards = [0] * hp.nb_directions negative_rewards = [0] * hp.nb_directions - # Getting the positive rewards in the positive directions - for k in range(hp.nb_directions): - positive_rewards[k] = explore(env, normalizer, policy, direction = "positive", delta = deltas[k]) + if parentPipes: + for k in range(hp.nb_directions): + parentPipe = parentPipes[k] + parentPipe.send([_EXPLORE,[normalizer, policy, hp, "positive", deltas[k]]]) + for k in range(hp.nb_directions): + positive_rewards[k] = parentPipes[k].recv()[0] + + for k in range(hp.nb_directions): + parentPipe = parentPipes[k] + parentPipe.send([_EXPLORE,[normalizer, policy, hp, "negative", deltas[k]]]) + for k in range(hp.nb_directions): + negative_rewards[k] = parentPipes[k].recv()[0] + + else: + # Getting the positive rewards in the positive directions + for k in range(hp.nb_directions): + positive_rewards[k] = explore(env, normalizer, policy, "positive", deltas[k], hp) - # Getting the negative rewards in the negative/opposite directions - for k in range(hp.nb_directions): - negative_rewards[k] = explore(env, normalizer, policy, direction = "negative", delta = deltas[k]) + + # Getting the negative rewards in the negative/opposite directions + for k in range(hp.nb_directions): + negative_rewards[k] = explore(env, normalizer, policy, "negative", deltas[k], hp) + # Gathering all the positive/negative rewards to compute the standard deviation of these rewards all_rewards = np.array(positive_rewards + negative_rewards) @@ -114,10 +192,10 @@ def train(env, policy, normalizer, hp): rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order] # Updating our policy - policy.update(rollouts, sigma_r) + policy.update(rollouts, sigma_r, args) # Printing the final reward of the policy after the update - reward_evaluation = explore(env, normalizer, policy) + reward_evaluation = explore(env, normalizer, policy, None, None, hp) print('Step:', step, 'Reward:', reward_evaluation) # Running the main code @@ -127,16 +205,67 @@ def mkdir(base, name): if not os.path.exists(path): os.makedirs(path) return path -work_dir = mkdir('exp', 'brs') -monitor_dir = mkdir(work_dir, 'monitor') -hp = Hp() -np.random.seed(hp.seed) -env = gym.make(hp.env_name) -# env.render(mode = "human") -#env = wrappers.Monitor(env, monitor_dir, force = True) -nb_inputs = env.observation_space.shape[0] -nb_outputs = env.action_space.shape[0] -policy = Policy(nb_inputs, nb_outputs) -normalizer = Normalizer(nb_inputs) -train(env, policy, normalizer, hp) + + + +if __name__ == "__main__": + mp.freeze_support() + + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('--env', help='Gym environment name', type=str, default='HalfCheetahBulletEnv-v0') + parser.add_argument('--seed', help='RNG seed', type=int, default=1) + parser.add_argument('--render', help='OpenGL Visualizer', type=int, default=0) + parser.add_argument('--movie',help='rgb_array gym movie',type=int, default=0) + parser.add_argument('--steps', help='Number of steps', type=int, default=10000) + parser.add_argument('--policy', help='Starting policy file (npy)', type=str, default='') + parser.add_argument('--logdir', help='Directory root to log policy files (npy)', type=str, default='.') + parser.add_argument('--mp', help='Enable multiprocessing', type=int, default=1) + + args = parser.parse_args() + + hp = Hp() + hp.env_name = args.env + hp.seed = args.seed + hp.nb_steps = args.steps + print("seed = ", hp.seed) + np.random.seed(hp.seed) + + parentPipes = None + if args.mp: + num_processes = hp.nb_directions + processes = [] + childPipes = [] + parentPipes = [] + + for pr in range (num_processes): + parentPipe, childPipe = Pipe() + parentPipes.append(parentPipe) + childPipes.append(childPipe) + + for rank in range(num_processes): + p = mp.Process(target=ExploreWorker, args=(rank,childPipes[rank], hp.env_name, args)) + p.start() + processes.append(p) + + work_dir = mkdir('exp', 'brs') + monitor_dir = mkdir(work_dir, 'monitor') + env = gym.make(hp.env_name) + if args.render: + env.render(mode = "human") + if args.movie: + env = wrappers.Monitor(env, monitor_dir, force = True) + nb_inputs = env.observation_space.shape[0] + nb_outputs = env.action_space.shape[0] + policy = Policy(nb_inputs, nb_outputs,hp.env_name, args) + normalizer = Normalizer(nb_inputs) + + print("start training") + train(env, policy, normalizer, hp, parentPipes, args) + + if args.mp: + for parentPipe in parentPipes: + parentPipe.send([_CLOSE,"pay2"]) + + for p in processes: + p.join()