improve the ARS implementation: add multiprocessing Gym environment stepping, add command-line parameters to resume a policy, --render, --movie, --steps, --env

This commit is contained in:
erwincoumans
2018-11-24 11:54:36 -08:00
parent ca36a82c62
commit bb6f4fb17c

View File

@@ -6,13 +6,17 @@ import numpy as np
import gym import gym
from gym import wrappers from gym import wrappers
import pybullet_envs import pybullet_envs
import time
import multiprocessing as mp
from multiprocessing import Process, Pipe
import argparse
# Setting the Hyper Parameters # Setting the Hyper Parameters
class Hp(): class Hp():
def __init__(self): def __init__(self):
self.nb_steps = 1000 self.nb_steps = 10000
self.episode_length = 1000 self.episode_length = 1000
self.learning_rate = 0.02 self.learning_rate = 0.02
self.nb_directions = 16 self.nb_directions = 16
@@ -22,6 +26,58 @@ class Hp():
self.seed = 1 self.seed = 1
self.env_name = 'HalfCheetahBulletEnv-v0' self.env_name = 'HalfCheetahBulletEnv-v0'
# Multiprocess Exploring the policy on one specific direction and over one episode
_RESET = 1
_CLOSE = 2
_EXPLORE = 3
def ExploreWorker(rank,childPipe, envname, args):
env = gym.make(envname)
nb_inputs = env.observation_space.shape[0]
normalizer = Normalizer(nb_inputs)
observation_n = env.reset()
n=0
while True:
n+=1
try:
# Only block for short times to have keyboard exceptions be raised.
if not childPipe.poll(0.001):
continue
message, payload = childPipe.recv()
except (EOFError, KeyboardInterrupt):
break
if message == _RESET:
observation_n = env.reset()
childPipe.send(["reset ok"])
continue
if message == _EXPLORE:
#normalizer = payload[0] #use our local normalizer
policy = payload[1]
hp = payload[2]
direction = payload[3]
delta = payload[4]
state = env.reset()
done = False
num_plays = 0.
sum_rewards = 0
while not done and num_plays < hp.episode_length:
normalizer.observe(state)
state = normalizer.normalize(state)
action = policy.evaluate(state, delta, direction,hp)
state, reward, done, _ = env.step(action)
reward = max(min(reward, 1), -1)
sum_rewards += reward
num_plays += 1
childPipe.send([sum_rewards])
continue
if message == _CLOSE:
childPipe.send(["close ok"])
break
childPipe.close()
# Normalizing the states # Normalizing the states
class Normalizer(): class Normalizer():
@@ -47,11 +103,14 @@ class Normalizer():
# Building the AI # Building the AI
class Policy(): class Policy():
def __init__(self, input_size, output_size, env_name, args):
def __init__(self, input_size, output_size): try:
self.theta = np.load(args.policy)
except:
self.theta = np.zeros((output_size, input_size)) self.theta = np.zeros((output_size, input_size))
print("self.theta=",self.theta) self.env_name = env_name
def evaluate(self, input, delta = None, direction = None): print("Starting policy theta=",self.theta)
def evaluate(self, input, delta, direction, hp):
if direction is None: if direction is None:
return np.clip(self.theta.dot(input), -1.0, 1.0) return np.clip(self.theta.dot(input), -1.0, 1.0)
elif direction == "positive": elif direction == "positive":
@@ -62,15 +121,18 @@ class Policy():
def sample_deltas(self): def sample_deltas(self):
return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)] return [np.random.randn(*self.theta.shape) for _ in range(hp.nb_directions)]
def update(self, rollouts, sigma_r): def update(self, rollouts, sigma_r, args):
step = np.zeros(self.theta.shape) step = np.zeros(self.theta.shape)
for r_pos, r_neg, d in rollouts: for r_pos, r_neg, d in rollouts:
step += (r_pos - r_neg) * d step += (r_pos - r_neg) * d
self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step self.theta += hp.learning_rate / (hp.nb_best_directions * sigma_r) * step
timestr = time.strftime("%Y%m%d-%H%M%S")
np.save(args.logdir+"/policy_"+self.env_name+"_"+timestr+".npy", self.theta)
# Exploring the policy on one specific direction and over one episode # Exploring the policy on one specific direction and over one episode
def explore(env, normalizer, policy, direction = None, delta = None): def explore(env, normalizer, policy, direction, delta, hp):
state = env.reset() state = env.reset()
done = False done = False
num_plays = 0. num_plays = 0.
@@ -78,7 +140,7 @@ def explore(env, normalizer, policy, direction = None, delta = None):
while not done and num_plays < hp.episode_length: while not done and num_plays < hp.episode_length:
normalizer.observe(state) normalizer.observe(state)
state = normalizer.normalize(state) state = normalizer.normalize(state)
action = policy.evaluate(state, delta, direction) action = policy.evaluate(state, delta, direction, hp)
state, reward, done, _ = env.step(action) state, reward, done, _ = env.step(action)
reward = max(min(reward, 1), -1) reward = max(min(reward, 1), -1)
sum_rewards += reward sum_rewards += reward
@@ -87,7 +149,7 @@ def explore(env, normalizer, policy, direction = None, delta = None):
# Training the AI # Training the AI
def train(env, policy, normalizer, hp): def train(env, policy, normalizer, hp, parentPipes, args):
for step in range(hp.nb_steps): for step in range(hp.nb_steps):
@@ -96,13 +158,29 @@ def train(env, policy, normalizer, hp):
positive_rewards = [0] * hp.nb_directions positive_rewards = [0] * hp.nb_directions
negative_rewards = [0] * hp.nb_directions negative_rewards = [0] * hp.nb_directions
if parentPipes:
for k in range(hp.nb_directions):
parentPipe = parentPipes[k]
parentPipe.send([_EXPLORE,[normalizer, policy, hp, "positive", deltas[k]]])
for k in range(hp.nb_directions):
positive_rewards[k] = parentPipes[k].recv()[0]
for k in range(hp.nb_directions):
parentPipe = parentPipes[k]
parentPipe.send([_EXPLORE,[normalizer, policy, hp, "negative", deltas[k]]])
for k in range(hp.nb_directions):
negative_rewards[k] = parentPipes[k].recv()[0]
else:
# Getting the positive rewards in the positive directions # Getting the positive rewards in the positive directions
for k in range(hp.nb_directions): for k in range(hp.nb_directions):
positive_rewards[k] = explore(env, normalizer, policy, direction = "positive", delta = deltas[k]) positive_rewards[k] = explore(env, normalizer, policy, "positive", deltas[k], hp)
# Getting the negative rewards in the negative/opposite directions # Getting the negative rewards in the negative/opposite directions
for k in range(hp.nb_directions): for k in range(hp.nb_directions):
negative_rewards[k] = explore(env, normalizer, policy, direction = "negative", delta = deltas[k]) negative_rewards[k] = explore(env, normalizer, policy, "negative", deltas[k], hp)
# Gathering all the positive/negative rewards to compute the standard deviation of these rewards # Gathering all the positive/negative rewards to compute the standard deviation of these rewards
all_rewards = np.array(positive_rewards + negative_rewards) all_rewards = np.array(positive_rewards + negative_rewards)
@@ -114,10 +192,10 @@ def train(env, policy, normalizer, hp):
rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order] rollouts = [(positive_rewards[k], negative_rewards[k], deltas[k]) for k in order]
# Updating our policy # Updating our policy
policy.update(rollouts, sigma_r) policy.update(rollouts, sigma_r, args)
# Printing the final reward of the policy after the update # Printing the final reward of the policy after the update
reward_evaluation = explore(env, normalizer, policy) reward_evaluation = explore(env, normalizer, policy, None, None, hp)
print('Step:', step, 'Reward:', reward_evaluation) print('Step:', step, 'Reward:', reward_evaluation)
# Running the main code # Running the main code
@@ -127,16 +205,67 @@ def mkdir(base, name):
if not os.path.exists(path): if not os.path.exists(path):
os.makedirs(path) os.makedirs(path)
return path return path
work_dir = mkdir('exp', 'brs')
monitor_dir = mkdir(work_dir, 'monitor')
if __name__ == "__main__":
mp.freeze_support()
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--env', help='Gym environment name', type=str, default='HalfCheetahBulletEnv-v0')
parser.add_argument('--seed', help='RNG seed', type=int, default=1)
parser.add_argument('--render', help='OpenGL Visualizer', type=int, default=0)
parser.add_argument('--movie',help='rgb_array gym movie',type=int, default=0)
parser.add_argument('--steps', help='Number of steps', type=int, default=10000)
parser.add_argument('--policy', help='Starting policy file (npy)', type=str, default='')
parser.add_argument('--logdir', help='Directory root to log policy files (npy)', type=str, default='.')
parser.add_argument('--mp', help='Enable multiprocessing', type=int, default=1)
args = parser.parse_args()
hp = Hp() hp = Hp()
hp.env_name = args.env
hp.seed = args.seed
hp.nb_steps = args.steps
print("seed = ", hp.seed)
np.random.seed(hp.seed) np.random.seed(hp.seed)
parentPipes = None
if args.mp:
num_processes = hp.nb_directions
processes = []
childPipes = []
parentPipes = []
for pr in range (num_processes):
parentPipe, childPipe = Pipe()
parentPipes.append(parentPipe)
childPipes.append(childPipe)
for rank in range(num_processes):
p = mp.Process(target=ExploreWorker, args=(rank,childPipes[rank], hp.env_name, args))
p.start()
processes.append(p)
work_dir = mkdir('exp', 'brs')
monitor_dir = mkdir(work_dir, 'monitor')
env = gym.make(hp.env_name) env = gym.make(hp.env_name)
# env.render(mode = "human") if args.render:
#env = wrappers.Monitor(env, monitor_dir, force = True) env.render(mode = "human")
if args.movie:
env = wrappers.Monitor(env, monitor_dir, force = True)
nb_inputs = env.observation_space.shape[0] nb_inputs = env.observation_space.shape[0]
nb_outputs = env.action_space.shape[0] nb_outputs = env.action_space.shape[0]
policy = Policy(nb_inputs, nb_outputs) policy = Policy(nb_inputs, nb_outputs,hp.env_name, args)
normalizer = Normalizer(nb_inputs) normalizer = Normalizer(nb_inputs)
train(env, policy, normalizer, hp)
print("start training")
train(env, policy, normalizer, hp, parentPipes, args)
if args.mp:
for parentPipe in parentPipes:
parentPipe.send([_CLOSE,"pay2"])
for p in processes:
p.join()