From 7b030426c150ef8a8d488190078113518a5c766f Mon Sep 17 00:00:00 2001 From: Erwin Coumans Date: Thu, 16 Nov 2017 16:47:14 +0000 Subject: [PATCH] add a temp copy of TF agents (until the API stops changing or configs.py are included) --- .../gym/pybullet_envs/agents/__init__.py | 21 + .../gym/pybullet_envs/agents/configs.py | 27 +- .../gym/pybullet_envs/agents/configs2.py | 130 ++++ .../gym/pybullet_envs/agents/networks.py | 129 ++++ .../gym/pybullet_envs/agents/ppo/__init__.py | 21 + .../gym/pybullet_envs/agents/ppo/algorithm.py | 515 ++++++++++++++++ .../gym/pybullet_envs/agents/ppo/memory.py | 152 +++++ .../gym/pybullet_envs/agents/ppo/normalize.py | 168 ++++++ .../gym/pybullet_envs/agents/ppo/utility.py | 213 +++++++ .../pybullet_envs/agents/tools/__init__.py | 31 + .../pybullet_envs/agents/tools/attr_dict.py | 54 ++ .../pybullet_envs/agents/tools/batch_env.py | 124 ++++ .../agents/tools/count_weights.py | 48 ++ .../agents/tools/in_graph_batch_env.py | 178 ++++++ .../agents/tools/in_graph_env.py | 162 +++++ .../gym/pybullet_envs/agents/tools/loop.py | 233 ++++++++ .../agents/tools/mock_algorithm.py | 49 ++ .../agents/tools/mock_environment.py | 86 +++ .../pybullet_envs/agents/tools/simulate.py | 147 +++++ .../agents/tools/streaming_mean.py | 67 +++ .../pybullet_envs/agents/tools/wrappers.py | 558 ++++++++++++++++++ .../gym/pybullet_envs/agents/train_ppo.py | 10 +- .../gym/pybullet_envs/agents/utility.py | 190 ++++++ .../gym/pybullet_envs/agents/visualize_ppo.py | 8 +- 24 files changed, 3294 insertions(+), 27 deletions(-) create mode 100644 examples/pybullet/gym/pybullet_envs/agents/configs2.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/networks.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/ppo/__init__.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/ppo/algorithm.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/ppo/memory.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/ppo/normalize.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/ppo/utility.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/__init__.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/attr_dict.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/batch_env.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/count_weights.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_batch_env.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_env.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/loop.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/mock_algorithm.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/mock_environment.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/simulate.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/streaming_mean.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/tools/wrappers.py create mode 100644 examples/pybullet/gym/pybullet_envs/agents/utility.py diff --git a/examples/pybullet/gym/pybullet_envs/agents/__init__.py b/examples/pybullet/gym/pybullet_envs/agents/__init__.py index 139597f9c..9e2c9f092 100644 --- a/examples/pybullet/gym/pybullet_envs/agents/__init__.py +++ b/examples/pybullet/gym/pybullet_envs/agents/__init__.py @@ -1,2 +1,23 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Executable scripts for reinforcement learning.""" +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from . import train_ppo +from . import utility +from . import visualize diff --git a/examples/pybullet/gym/pybullet_envs/agents/configs.py b/examples/pybullet/gym/pybullet_envs/agents/configs.py index edf4cd23d..d613c29cb 100644 --- a/examples/pybullet/gym/pybullet_envs/agents/configs.py +++ b/examples/pybullet/gym/pybullet_envs/agents/configs.py @@ -20,21 +20,21 @@ from __future__ import print_function import functools -from agents import ppo -from agents.scripts import networks +from . import ppo +from . import networks from pybullet_envs.bullet import minitaur_gym_env from pybullet_envs.bullet import minitaur_duck_gym_env from pybullet_envs.bullet import minitaur_env_randomizer import pybullet_envs.bullet.minitaur_gym_env as minitaur_gym_env import pybullet_envs - +import tensorflow as tf def default(): """Default configuration for PPO.""" # General algorithm = ppo.PPOAlgorithm - num_agents = 10 - eval_episodes = 20 + num_agents = 30 + eval_episodes = 30 use_gpu = False # Network network = networks.feed_forward_gaussian @@ -44,18 +44,17 @@ def default(): value=r'.*/value/.*') policy_layers = 200, 100 value_layers = 200, 100 - init_mean_factor = 0.05 + init_mean_factor = 0.1 init_logstd = -1 # Optimization - update_every = 20 - policy_optimizer = 'AdamOptimizer' - value_optimizer = 'AdamOptimizer' - update_epochs_policy = 50 - update_epochs_value = 50 - policy_lr = 1e-4 - value_lr = 3e-4 + update_every = 30 + update_epochs = 25 + optimizer = tf.train.AdamOptimizer + update_epochs_policy = 64 + update_epochs_value = 64 + learning_rate = 1e-4 # Losses - discount = 0.985 + discount = 0.995 kl_target = 1e-2 kl_cutoff_factor = 2 kl_cutoff_coef = 1000 diff --git a/examples/pybullet/gym/pybullet_envs/agents/configs2.py b/examples/pybullet/gym/pybullet_envs/agents/configs2.py new file mode 100644 index 000000000..27bd70d1f --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/configs2.py @@ -0,0 +1,130 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Example configurations using the PPO algorithm.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +# pylint: disable=unused-variable + +import tensorflow as tf + +from . import ppo +from . import networks + + +def default(): + """Default configuration for PPO.""" + # General + algorithm = ppo.PPOAlgorithm + num_agents = 30 + eval_episodes = 30 + use_gpu = False + # Network + network = networks.feed_forward_gaussian + weight_summaries = dict( + all=r'.*', policy=r'.*/policy/.*', value=r'.*/value/.*') + policy_layers = 200, 100 + value_layers = 200, 100 + init_mean_factor = 0.1 + init_logstd = -1 + # Optimization + update_every = 30 + update_epochs = 25 + optimizer = tf.train.AdamOptimizer + learning_rate = 1e-4 + # Losses + discount = 0.995 + kl_target = 1e-2 + kl_cutoff_factor = 2 + kl_cutoff_coef = 1000 + kl_init_penalty = 1 + return locals() + + +def pendulum(): + """Configuration for the pendulum classic control task.""" + locals().update(default()) + # Environment + env = 'Pendulum-v0' + max_length = 200 + steps = 2e6 # 2M + return locals() + + +def reacher(): + """Configuration for MuJoCo's reacher task.""" + locals().update(default()) + # Environment + env = 'Reacher-v1' + max_length = 1000 + steps = 5e6 # 5M + discount = 0.985 + update_every = 60 + return locals() + + +def cheetah(): + """Configuration for MuJoCo's half cheetah task.""" + locals().update(default()) + # Environment + env = 'HalfCheetah-v1' + max_length = 1000 + steps = 1e7 # 10M + discount = 0.99 + return locals() + + +def walker(): + """Configuration for MuJoCo's walker task.""" + locals().update(default()) + # Environment + env = 'Walker2d-v1' + max_length = 1000 + steps = 1e7 # 10M + return locals() + + +def hopper(): + """Configuration for MuJoCo's hopper task.""" + locals().update(default()) + # Environment + env = 'Hopper-v1' + max_length = 1000 + steps = 1e7 # 10M + update_every = 60 + return locals() + + +def ant(): + """Configuration for MuJoCo's ant task.""" + locals().update(default()) + # Environment + env = 'Ant-v1' + max_length = 1000 + steps = 2e7 # 20M + return locals() + + +def humanoid(): + """Configuration for MuJoCo's humanoid task.""" + locals().update(default()) + # Environment + env = 'Humanoid-v1' + max_length = 1000 + steps = 5e7 # 50M + update_every = 60 + return locals() diff --git a/examples/pybullet/gym/pybullet_envs/agents/networks.py b/examples/pybullet/gym/pybullet_envs/agents/networks.py new file mode 100644 index 000000000..3d5de1fbb --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/networks.py @@ -0,0 +1,129 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Network definitions for the PPO algorithm.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import functools +import operator + +import tensorflow as tf + + +NetworkOutput = collections.namedtuple( + 'NetworkOutput', 'policy, mean, logstd, value, state') + + +def feed_forward_gaussian( + config, action_size, observations, unused_length, state=None): + """Independent feed forward networks for policy and value. + + The policy network outputs the mean action and the log standard deviation + is learned as independent parameter vector. + + Args: + config: Configuration object. + action_size: Length of the action vector. + observations: Sequences of observations. + unused_length: Batch of sequence lengths. + state: Batch of initial recurrent states. + + Returns: + NetworkOutput tuple. + """ + mean_weights_initializer = tf.contrib.layers.variance_scaling_initializer( + factor=config.init_mean_factor) + logstd_initializer = tf.random_normal_initializer(config.init_logstd, 1e-10) + flat_observations = tf.reshape(observations, [ + tf.shape(observations)[0], tf.shape(observations)[1], + functools.reduce(operator.mul, observations.shape.as_list()[2:], 1)]) + with tf.variable_scope('policy'): + x = flat_observations + for size in config.policy_layers: + x = tf.contrib.layers.fully_connected(x, size, tf.nn.relu) + mean = tf.contrib.layers.fully_connected( + x, action_size, tf.tanh, + weights_initializer=mean_weights_initializer) + logstd = tf.get_variable( + 'logstd', mean.shape[2:], tf.float32, logstd_initializer) + logstd = tf.tile( + logstd[None, None], + [tf.shape(mean)[0], tf.shape(mean)[1]] + [1] * (mean.shape.ndims - 2)) + with tf.variable_scope('value'): + x = flat_observations + for size in config.value_layers: + x = tf.contrib.layers.fully_connected(x, size, tf.nn.relu) + value = tf.contrib.layers.fully_connected(x, 1, None)[..., 0] + mean = tf.check_numerics(mean, 'mean') + logstd = tf.check_numerics(logstd, 'logstd') + value = tf.check_numerics(value, 'value') + policy = tf.contrib.distributions.MultivariateNormalDiag( + mean, tf.exp(logstd)) + return NetworkOutput(policy, mean, logstd, value, state) + + +def recurrent_gaussian( + config, action_size, observations, length, state=None): + """Independent recurrent policy and feed forward value networks. + + The policy network outputs the mean action and the log standard deviation + is learned as independent parameter vector. The last policy layer is + recurrent and uses a GRU cell. + + Args: + config: Configuration object. + action_size: Length of the action vector. + observations: Sequences of observations. + length: Batch of sequence lengths. + state: Batch of initial recurrent states. + + Returns: + NetworkOutput tuple. + """ + mean_weights_initializer = tf.contrib.layers.variance_scaling_initializer( + factor=config.init_mean_factor) + logstd_initializer = tf.random_normal_initializer(config.init_logstd, 1e-10) + cell = tf.contrib.rnn.GRUBlockCell(config.policy_layers[-1]) + flat_observations = tf.reshape(observations, [ + tf.shape(observations)[0], tf.shape(observations)[1], + functools.reduce(operator.mul, observations.shape.as_list()[2:], 1)]) + with tf.variable_scope('policy'): + x = flat_observations + for size in config.policy_layers[:-1]: + x = tf.contrib.layers.fully_connected(x, size, tf.nn.relu) + x, state = tf.nn.dynamic_rnn(cell, x, length, state, tf.float32) + mean = tf.contrib.layers.fully_connected( + x, action_size, tf.tanh, + weights_initializer=mean_weights_initializer) + logstd = tf.get_variable( + 'logstd', mean.shape[2:], tf.float32, logstd_initializer) + logstd = tf.tile( + logstd[None, None], + [tf.shape(mean)[0], tf.shape(mean)[1]] + [1] * (mean.shape.ndims - 2)) + with tf.variable_scope('value'): + x = flat_observations + for size in config.value_layers: + x = tf.contrib.layers.fully_connected(x, size, tf.nn.relu) + value = tf.contrib.layers.fully_connected(x, 1, None)[..., 0] + mean = tf.check_numerics(mean, 'mean') + logstd = tf.check_numerics(logstd, 'logstd') + value = tf.check_numerics(value, 'value') + policy = tf.contrib.distributions.MultivariateNormalDiag( + mean, tf.exp(logstd)) + # assert state.shape.as_list()[0] is not None + return NetworkOutput(policy, mean, logstd, value, state) diff --git a/examples/pybullet/gym/pybullet_envs/agents/ppo/__init__.py b/examples/pybullet/gym/pybullet_envs/agents/ppo/__init__.py new file mode 100644 index 000000000..26a87baf9 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/ppo/__init__.py @@ -0,0 +1,21 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Proximal Policy Optimization algorithm.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from .algorithm import PPOAlgorithm diff --git a/examples/pybullet/gym/pybullet_envs/agents/ppo/algorithm.py b/examples/pybullet/gym/pybullet_envs/agents/ppo/algorithm.py new file mode 100644 index 000000000..9e97425f9 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/ppo/algorithm.py @@ -0,0 +1,515 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Proximal Policy Optimization algorithm. + +Based on John Schulman's implementation in Python and Theano: +https://github.com/joschu/modular_rl/blob/master/modular_rl/ppo.py +""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import functools + +import tensorflow as tf + +from . import memory +from . import normalize +from . import utility + + +class PPOAlgorithm(object): + """A vectorized implementation of the PPO algorithm by John Schulman.""" + + def __init__(self, batch_env, step, is_training, should_log, config): + """Create an instance of the PPO algorithm. + + Args: + batch_env: In-graph batch environment. + step: Integer tensor holding the current training step. + is_training: Boolean tensor for whether the algorithm should train. + should_log: Boolean tensor for whether summaries should be returned. + config: Object containing the agent configuration as attributes. + """ + self._batch_env = batch_env + self._step = step + self._is_training = is_training + self._should_log = should_log + self._config = config + self._observ_filter = normalize.StreamingNormalize( + self._batch_env.observ[0], center=True, scale=True, clip=5, + name='normalize_observ') + self._reward_filter = normalize.StreamingNormalize( + self._batch_env.reward[0], center=False, scale=True, clip=10, + name='normalize_reward') + # Memory stores tuple of observ, action, mean, logstd, reward. + template = ( + self._batch_env.observ[0], self._batch_env.action[0], + self._batch_env.action[0], self._batch_env.action[0], + self._batch_env.reward[0]) + self._memory = memory.EpisodeMemory( + template, config.update_every, config.max_length, 'memory') + self._memory_index = tf.Variable(0, False) + use_gpu = self._config.use_gpu and utility.available_gpus() + with tf.device('/gpu:0' if use_gpu else '/cpu:0'): + # Create network variables for later calls to reuse. + action_size = self._batch_env.action.shape[1].value + self._network = tf.make_template( + 'network', functools.partial(config.network, config, action_size)) + output = self._network( + tf.zeros_like(self._batch_env.observ)[:, None], + tf.ones(len(self._batch_env))) + with tf.variable_scope('ppo_temporary'): + self._episodes = memory.EpisodeMemory( + template, len(batch_env), config.max_length, 'episodes') + if output.state is None: + self._last_state = None + else: + # Ensure the batch dimension is set. + tf.contrib.framework.nest.map_structure( + lambda x: x.set_shape([len(batch_env)] + x.shape.as_list()[1:]), + output.state) + # pylint: disable=undefined-variable + self._last_state = tf.contrib.framework.nest.map_structure( + lambda x: tf.Variable(lambda: tf.zeros_like(x), False), + output.state) + self._last_action = tf.Variable( + tf.zeros_like(self._batch_env.action), False, name='last_action') + self._last_mean = tf.Variable( + tf.zeros_like(self._batch_env.action), False, name='last_mean') + self._last_logstd = tf.Variable( + tf.zeros_like(self._batch_env.action), False, name='last_logstd') + self._penalty = tf.Variable( + self._config.kl_init_penalty, False, dtype=tf.float32) + self._optimizer = self._config.optimizer(self._config.learning_rate) + + def begin_episode(self, agent_indices): + """Reset the recurrent states and stored episode. + + Args: + agent_indices: Tensor containing current batch indices. + + Returns: + Summary tensor. + """ + with tf.name_scope('begin_episode/'): + if self._last_state is None: + reset_state = tf.no_op() + else: + reset_state = utility.reinit_nested_vars( + self._last_state, agent_indices) + reset_buffer = self._episodes.clear(agent_indices) + with tf.control_dependencies([reset_state, reset_buffer]): + return tf.constant('') + + def perform(self, agent_indices, observ): + """Compute batch of actions and a summary for a batch of observation. + + Args: + agent_indices: Tensor containing current batch indices. + observ: Tensor of a batch of observations for all agents. + + Returns: + Tuple of action batch tensor and summary tensor. + """ + with tf.name_scope('perform/'): + observ = self._observ_filter.transform(observ) + if self._last_state is None: + state = None + else: + state = tf.contrib.framework.nest.map_structure( + lambda x: tf.gather(x, agent_indices), self._last_state) + output = self._network(observ[:, None], tf.ones(observ.shape[0]), state) + action = tf.cond( + self._is_training, output.policy.sample, lambda: output.mean) + logprob = output.policy.log_prob(action)[:, 0] + # pylint: disable=g-long-lambda + summary = tf.cond(self._should_log, lambda: tf.summary.merge([ + tf.summary.histogram('mean', output.mean[:, 0]), + tf.summary.histogram('std', tf.exp(output.logstd[:, 0])), + tf.summary.histogram('action', action[:, 0]), + tf.summary.histogram('logprob', logprob)]), str) + # Remember current policy to append to memory in the experience callback. + if self._last_state is None: + assign_state = tf.no_op() + else: + assign_state = utility.assign_nested_vars( + self._last_state, output.state, agent_indices) + with tf.control_dependencies([ + assign_state, + tf.scatter_update( + self._last_action, agent_indices, action[:, 0]), + tf.scatter_update( + self._last_mean, agent_indices, output.mean[:, 0]), + tf.scatter_update( + self._last_logstd, agent_indices, output.logstd[:, 0])]): + return tf.check_numerics(action[:, 0], 'action'), tf.identity(summary) + + def experience( + self, agent_indices, observ, action, reward, unused_done, unused_nextob): + """Process the transition tuple of the current step. + + When training, add the current transition tuple to the memory and update + the streaming statistics for observations and rewards. A summary string is + returned if requested at this step. + + Args: + agent_indices: Tensor containing current batch indices. + observ: Batch tensor of observations. + action: Batch tensor of actions. + reward: Batch tensor of rewards. + unused_done: Batch tensor of done flags. + unused_nextob: Batch tensor of successor observations. + + Returns: + Summary tensor. + """ + with tf.name_scope('experience/'): + return tf.cond( + self._is_training, + # pylint: disable=g-long-lambda + lambda: self._define_experience( + agent_indices, observ, action, reward), str) + + def _define_experience(self, agent_indices, observ, action, reward): + """Implement the branch of experience() entered during training.""" + update_filters = tf.summary.merge([ + self._observ_filter.update(observ), + self._reward_filter.update(reward)]) + with tf.control_dependencies([update_filters]): + if self._config.train_on_agent_action: + # NOTE: Doesn't seem to change much. + action = self._last_action + batch = ( + observ, action, tf.gather(self._last_mean, agent_indices), + tf.gather(self._last_logstd, agent_indices), reward) + append = self._episodes.append(batch, agent_indices) + with tf.control_dependencies([append]): + norm_observ = self._observ_filter.transform(observ) + norm_reward = tf.reduce_mean(self._reward_filter.transform(reward)) + # pylint: disable=g-long-lambda + summary = tf.cond(self._should_log, lambda: tf.summary.merge([ + update_filters, + self._observ_filter.summary(), + self._reward_filter.summary(), + tf.summary.scalar('memory_size', self._memory_index), + tf.summary.histogram('normalized_observ', norm_observ), + tf.summary.histogram('action', self._last_action), + tf.summary.scalar('normalized_reward', norm_reward)]), str) + return summary + + def end_episode(self, agent_indices): + """Add episodes to the memory and perform update steps if memory is full. + + During training, add the collected episodes of the batch indices that + finished their episode to the memory. If the memory is full, train on it, + and then clear the memory. A summary string is returned if requested at + this step. + + Args: + agent_indices: Tensor containing current batch indices. + + Returns: + Summary tensor. + """ + with tf.name_scope('end_episode/'): + return tf.cond( + self._is_training, + lambda: self._define_end_episode(agent_indices), str) + + def _define_end_episode(self, agent_indices): + """Implement the branch of end_episode() entered during training.""" + episodes, length = self._episodes.data(agent_indices) + space_left = self._config.update_every - self._memory_index + use_episodes = tf.range(tf.minimum( + tf.shape(agent_indices)[0], space_left)) + episodes = [tf.gather(elem, use_episodes) for elem in episodes] + append = self._memory.replace( + episodes, tf.gather(length, use_episodes), + use_episodes + self._memory_index) + with tf.control_dependencies([append]): + inc_index = self._memory_index.assign_add(tf.shape(use_episodes)[0]) + with tf.control_dependencies([inc_index]): + memory_full = self._memory_index >= self._config.update_every + return tf.cond(memory_full, self._training, str) + + def _training(self): + """Perform multiple training iterations of both policy and value baseline. + + Training on the episodes collected in the memory. Reset the memory + afterwards. Always returns a summary string. + + Returns: + Summary tensor. + """ + with tf.name_scope('training'): + assert_full = tf.assert_equal( + self._memory_index, self._config.update_every) + with tf.control_dependencies([assert_full]): + data = self._memory.data() + (observ, action, old_mean, old_logstd, reward), length = data + with tf.control_dependencies([tf.assert_greater(length, 0)]): + length = tf.identity(length) + observ = self._observ_filter.transform(observ) + reward = self._reward_filter.transform(reward) + update_summary = self._perform_update_steps( + observ, action, old_mean, old_logstd, reward, length) + with tf.control_dependencies([update_summary]): + penalty_summary = self._adjust_penalty( + observ, old_mean, old_logstd, length) + with tf.control_dependencies([penalty_summary]): + clear_memory = tf.group( + self._memory.clear(), self._memory_index.assign(0)) + with tf.control_dependencies([clear_memory]): + weight_summary = utility.variable_summaries( + tf.trainable_variables(), self._config.weight_summaries) + return tf.summary.merge([ + update_summary, penalty_summary, weight_summary]) + + def _perform_update_steps( + self, observ, action, old_mean, old_logstd, reward, length): + """Perform multiple update steps of value function and policy. + + The advantage is computed once at the beginning and shared across + iterations. We need to decide for the summary of one iteration, and thus + choose the one after half of the iterations. + + Args: + observ: Sequences of observations. + action: Sequences of actions. + old_mean: Sequences of action means of the behavioral policy. + old_logstd: Sequences of action log stddevs of the behavioral policy. + reward: Sequences of rewards. + length: Batch of sequence lengths. + + Returns: + Summary tensor. + """ + return_ = utility.discounted_return( + reward, length, self._config.discount) + value = self._network(observ, length).value + if self._config.gae_lambda: + advantage = utility.lambda_return( + reward, value, length, self._config.discount, + self._config.gae_lambda) + else: + advantage = return_ - value + mean, variance = tf.nn.moments(advantage, axes=[0, 1], keep_dims=True) + advantage = (advantage - mean) / (tf.sqrt(variance) + 1e-8) + advantage = tf.Print( + advantage, [tf.reduce_mean(return_), tf.reduce_mean(value)], + 'return and value: ') + advantage = tf.Print( + advantage, [tf.reduce_mean(advantage)], + 'normalized advantage: ') + # pylint: disable=g-long-lambda + value_loss, policy_loss, summary = tf.scan( + lambda _1, _2: self._update_step( + observ, action, old_mean, old_logstd, reward, advantage, length), + tf.range(self._config.update_epochs), + [0., 0., ''], parallel_iterations=1) + print_losses = tf.group( + tf.Print(0, [tf.reduce_mean(value_loss)], 'value loss: '), + tf.Print(0, [tf.reduce_mean(policy_loss)], 'policy loss: ')) + with tf.control_dependencies([value_loss, policy_loss, print_losses]): + return summary[self._config.update_epochs // 2] + + def _update_step( + self, observ, action, old_mean, old_logstd, reward, advantage, length): + """Compute the current combined loss and perform a gradient update step. + + Args: + observ: Sequences of observations. + action: Sequences of actions. + old_mean: Sequences of action means of the behavioral policy. + old_logstd: Sequences of action log stddevs of the behavioral policy. + reward: Sequences of reward. + advantage: Sequences of advantages. + length: Batch of sequence lengths. + + Returns: + Tuple of value loss, policy loss, and summary tensor. + """ + value_loss, value_summary = self._value_loss(observ, reward, length) + network = self._network(observ, length) + policy_loss, policy_summary = self._policy_loss( + network.mean, network.logstd, old_mean, old_logstd, action, + advantage, length) + value_gradients, value_variables = ( + zip(*self._optimizer.compute_gradients(value_loss))) + policy_gradients, policy_variables = ( + zip(*self._optimizer.compute_gradients(policy_loss))) + all_gradients = value_gradients + policy_gradients + all_variables = value_variables + policy_variables + optimize = self._optimizer.apply_gradients( + zip(all_gradients, all_variables)) + summary = tf.summary.merge([ + value_summary, policy_summary, + tf.summary.scalar( + 'value_gradient_norm', tf.global_norm(value_gradients)), + tf.summary.scalar( + 'policy_gradient_norm', tf.global_norm(policy_gradients)), + utility.gradient_summaries( + zip(value_gradients, value_variables), dict(value=r'.*')), + utility.gradient_summaries( + zip(policy_gradients, policy_variables), dict(policy=r'.*'))]) + with tf.control_dependencies([optimize]): + return [tf.identity(x) for x in (value_loss, policy_loss, summary)] + + def _value_loss(self, observ, reward, length): + """Compute the loss function for the value baseline. + + The value loss is the difference between empirical and approximated returns + over the collected episodes. Returns the loss tensor and a summary strin. + + Args: + observ: Sequences of observations. + reward: Sequences of reward. + length: Batch of sequence lengths. + + Returns: + Tuple of loss tensor and summary tensor. + """ + with tf.name_scope('value_loss'): + value = self._network(observ, length).value + return_ = utility.discounted_return( + reward, length, self._config.discount) + advantage = return_ - value + value_loss = 0.5 * self._mask(advantage ** 2, length) + summary = tf.summary.merge([ + tf.summary.histogram('value_loss', value_loss), + tf.summary.scalar('avg_value_loss', tf.reduce_mean(value_loss))]) + value_loss = tf.reduce_mean(value_loss) + return tf.check_numerics(value_loss, 'value_loss'), summary + + def _policy_loss( + self, mean, logstd, old_mean, old_logstd, action, advantage, length): + """Compute the policy loss composed of multiple components. + + 1. The policy gradient loss is importance sampled from the data-collecting + policy at the beginning of training. + 2. The second term is a KL penalty between the policy at the beginning of + training and the current policy. + 3. Additionally, if this KL already changed more than twice the target + amount, we activate a strong penalty discouraging further divergence. + + Args: + mean: Sequences of action means of the current policy. + logstd: Sequences of action log stddevs of the current policy. + old_mean: Sequences of action means of the behavioral policy. + old_logstd: Sequences of action log stddevs of the behavioral policy. + action: Sequences of actions. + advantage: Sequences of advantages. + length: Batch of sequence lengths. + + Returns: + Tuple of loss tensor and summary tensor. + """ + with tf.name_scope('policy_loss'): + entropy = utility.diag_normal_entropy(mean, logstd) + kl = tf.reduce_mean(self._mask(utility.diag_normal_kl( + old_mean, old_logstd, mean, logstd), length), 1) + policy_gradient = tf.exp( + utility.diag_normal_logpdf(mean, logstd, action) - + utility.diag_normal_logpdf(old_mean, old_logstd, action)) + surrogate_loss = -tf.reduce_mean(self._mask( + policy_gradient * tf.stop_gradient(advantage), length), 1) + kl_penalty = self._penalty * kl + cutoff_threshold = self._config.kl_target * self._config.kl_cutoff_factor + cutoff_count = tf.reduce_sum( + tf.cast(kl > cutoff_threshold, tf.int32)) + with tf.control_dependencies([tf.cond( + cutoff_count > 0, + lambda: tf.Print(0, [cutoff_count], 'kl cutoff! '), int)]): + kl_cutoff = ( + self._config.kl_cutoff_coef * + tf.cast(kl > cutoff_threshold, tf.float32) * + (kl - cutoff_threshold) ** 2) + policy_loss = surrogate_loss + kl_penalty + kl_cutoff + summary = tf.summary.merge([ + tf.summary.histogram('entropy', entropy), + tf.summary.histogram('kl', kl), + tf.summary.histogram('surrogate_loss', surrogate_loss), + tf.summary.histogram('kl_penalty', kl_penalty), + tf.summary.histogram('kl_cutoff', kl_cutoff), + tf.summary.histogram('kl_penalty_combined', kl_penalty + kl_cutoff), + tf.summary.histogram('policy_loss', policy_loss), + tf.summary.scalar('avg_surr_loss', tf.reduce_mean(surrogate_loss)), + tf.summary.scalar('avg_kl_penalty', tf.reduce_mean(kl_penalty)), + tf.summary.scalar('avg_policy_loss', tf.reduce_mean(policy_loss))]) + policy_loss = tf.reduce_mean(policy_loss, 0) + return tf.check_numerics(policy_loss, 'policy_loss'), summary + + def _adjust_penalty(self, observ, old_mean, old_logstd, length): + """Adjust the KL policy between the behavioral and current policy. + + Compute how much the policy actually changed during the multiple + update steps. Adjust the penalty strength for the next training phase if we + overshot or undershot the target divergence too much. + + Args: + observ: Sequences of observations. + old_mean: Sequences of action means of the behavioral policy. + old_logstd: Sequences of action log stddevs of the behavioral policy. + length: Batch of sequence lengths. + + Returns: + Summary tensor. + """ + with tf.name_scope('adjust_penalty'): + network = self._network(observ, length) + assert_change = tf.assert_equal( + tf.reduce_all(tf.equal(network.mean, old_mean)), False, + message='policy should change') + print_penalty = tf.Print(0, [self._penalty], 'current penalty: ') + with tf.control_dependencies([assert_change, print_penalty]): + kl_change = tf.reduce_mean(self._mask(utility.diag_normal_kl( + old_mean, old_logstd, network.mean, network.logstd), length)) + kl_change = tf.Print(kl_change, [kl_change], 'kl change: ') + maybe_increase = tf.cond( + kl_change > 1.3 * self._config.kl_target, + # pylint: disable=g-long-lambda + lambda: tf.Print(self._penalty.assign( + self._penalty * 1.5), [0], 'increase penalty '), + float) + maybe_decrease = tf.cond( + kl_change < 0.7 * self._config.kl_target, + # pylint: disable=g-long-lambda + lambda: tf.Print(self._penalty.assign( + self._penalty / 1.5), [0], 'decrease penalty '), + float) + with tf.control_dependencies([maybe_increase, maybe_decrease]): + return tf.summary.merge([ + tf.summary.scalar('kl_change', kl_change), + tf.summary.scalar('penalty', self._penalty)]) + + def _mask(self, tensor, length): + """Set padding elements of a batch of sequences to zero. + + Useful to then safely sum along the time dimension. + + Args: + tensor: Tensor of sequences. + length: Batch of sequence lengths. + + Returns: + Masked sequences. + """ + with tf.name_scope('mask'): + range_ = tf.range(tensor.shape[1].value) + mask = tf.cast(range_[None, :] < length[:, None], tf.float32) + masked = tensor * mask + return tf.check_numerics(masked, 'masked') diff --git a/examples/pybullet/gym/pybullet_envs/agents/ppo/memory.py b/examples/pybullet/gym/pybullet_envs/agents/ppo/memory.py new file mode 100644 index 000000000..10e79ef72 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/ppo/memory.py @@ -0,0 +1,152 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Memory that stores episodes.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + + +class EpisodeMemory(object): + """Memory that stores episodes.""" + + def __init__(self, template, capacity, max_length, scope): + """Create a memory that stores episodes. + + Each transition tuple consists of quantities specified by the template. + These quantities would typically be be observartions, actions, rewards, and + done indicators. + + Args: + template: List of tensors to derive shapes and dtypes of each transition. + capacity: Number of episodes, or rows, hold by the memory. + max_length: Allocated sequence length for the episodes. + scope: Variable scope to use for internal variables. + """ + self._capacity = capacity + self._max_length = max_length + with tf.variable_scope(scope) as var_scope: + self._scope = var_scope + self._length = tf.Variable(tf.zeros(capacity, tf.int32), False) + self._buffers = [ + tf.Variable(tf.zeros( + [capacity, max_length] + elem.shape.as_list(), + elem.dtype), False) + for elem in template] + + def length(self, rows=None): + """Tensor holding the current length of episodes. + + Args: + rows: Episodes to select length from, defaults to all. + + Returns: + Batch tensor of sequence lengths. + """ + rows = tf.range(self._capacity) if rows is None else rows + return tf.gather(self._length, rows) + + def append(self, transitions, rows=None): + """Append a batch of transitions to rows of the memory. + + Args: + transitions: Tuple of transition quantities with batch dimension. + rows: Episodes to append to, defaults to all. + + Returns: + Operation. + """ + rows = tf.range(self._capacity) if rows is None else rows + assert rows.shape.ndims == 1 + assert_capacity = tf.assert_less( + rows, self._capacity, + message='capacity exceeded') + with tf.control_dependencies([assert_capacity]): + assert_max_length = tf.assert_less( + tf.gather(self._length, rows), self._max_length, + message='max length exceeded') + append_ops = [] + with tf.control_dependencies([assert_max_length]): + for buffer_, elements in zip(self._buffers, transitions): + timestep = tf.gather(self._length, rows) + indices = tf.stack([rows, timestep], 1) + append_ops.append(tf.scatter_nd_update(buffer_, indices, elements)) + with tf.control_dependencies(append_ops): + episode_mask = tf.reduce_sum(tf.one_hot( + rows, self._capacity, dtype=tf.int32), 0) + return self._length.assign_add(episode_mask) + + def replace(self, episodes, length, rows=None): + """Replace full episodes. + + Args: + episodes: Tuple of transition quantities with batch and time dimensions. + length: Batch of sequence lengths. + rows: Episodes to replace, defaults to all. + + Returns: + Operation. + """ + rows = tf.range(self._capacity) if rows is None else rows + assert rows.shape.ndims == 1 + assert_capacity = tf.assert_less( + rows, self._capacity, message='capacity exceeded') + with tf.control_dependencies([assert_capacity]): + assert_max_length = tf.assert_less_equal( + length, self._max_length, message='max length exceeded') + replace_ops = [] + with tf.control_dependencies([assert_max_length]): + for buffer_, elements in zip(self._buffers, episodes): + replace_op = tf.scatter_update(buffer_, rows, elements) + replace_ops.append(replace_op) + with tf.control_dependencies(replace_ops): + return tf.scatter_update(self._length, rows, length) + + def data(self, rows=None): + """Access a batch of episodes from the memory. + + Padding elements after the length of each episode are unspecified and might + contain old data. + + Args: + rows: Episodes to select, defaults to all. + + Returns: + Tuple containing a tuple of transition quantiries with batch and time + dimensions, and a batch of sequence lengths. + """ + rows = tf.range(self._capacity) if rows is None else rows + assert rows.shape.ndims == 1 + episode = [tf.gather(buffer_, rows) for buffer_ in self._buffers] + length = tf.gather(self._length, rows) + return episode, length + + def clear(self, rows=None): + """Reset episodes in the memory. + + Internally, this only sets their lengths to zero. The memory entries will + be overridden by future calls to append() or replace(). + + Args: + rows: Episodes to clear, defaults to all. + + Returns: + Operation. + """ + rows = tf.range(self._capacity) if rows is None else rows + assert rows.shape.ndims == 1 + return tf.scatter_update(self._length, rows, tf.zeros_like(rows)) diff --git a/examples/pybullet/gym/pybullet_envs/agents/ppo/normalize.py b/examples/pybullet/gym/pybullet_envs/agents/ppo/normalize.py new file mode 100644 index 000000000..6c4170519 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/ppo/normalize.py @@ -0,0 +1,168 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Normalize tensors based on streaming estimates of mean and variance.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + + +class StreamingNormalize(object): + """Normalize tensors based on streaming estimates of mean and variance.""" + + def __init__( + self, template, center=True, scale=True, clip=10, name='normalize'): + """Normalize tensors based on streaming estimates of mean and variance. + + Centering the value, scaling it by the standard deviation, and clipping + outlier values are optional. + + Args: + template: Example tensor providing shape and dtype of the vaule to track. + center: Python boolean indicating whether to subtract mean from values. + scale: Python boolean indicating whether to scale values by stddev. + clip: If and when to clip normalized values. + name: Parent scope of operations provided by this class. + """ + self._center = center + self._scale = scale + self._clip = clip + self._name = name + with tf.name_scope(name): + self._count = tf.Variable(0, False) + self._mean = tf.Variable(tf.zeros_like(template), False) + self._var_sum = tf.Variable(tf.zeros_like(template), False) + + def transform(self, value): + """Normalize a single or batch tensor. + + Applies the activated transformations in the constructor using current + estimates of mean and variance. + + Args: + value: Batch or single value tensor. + + Returns: + Normalized batch or single value tensor. + """ + with tf.name_scope(self._name + '/transform'): + no_batch_dim = value.shape.ndims == self._mean.shape.ndims + if no_batch_dim: + # Add a batch dimension if necessary. + value = value[None, ...] + if self._center: + value -= self._mean[None, ...] + if self._scale: + # We cannot scale before seeing at least two samples. + value /= tf.cond( + self._count > 1, lambda: self._std() + 1e-8, + lambda: tf.ones_like(self._var_sum))[None] + if self._clip: + value = tf.clip_by_value(value, -self._clip, self._clip) + # Remove batch dimension if necessary. + if no_batch_dim: + value = value[0] + return tf.check_numerics(value, 'value') + + def update(self, value): + """Update the mean and variance estimates. + + Args: + value: Batch or single value tensor. + + Returns: + Summary tensor. + """ + with tf.name_scope(self._name + '/update'): + if value.shape.ndims == self._mean.shape.ndims: + # Add a batch dimension if necessary. + value = value[None, ...] + count = tf.shape(value)[0] + with tf.control_dependencies([self._count.assign_add(count)]): + step = tf.cast(self._count, tf.float32) + mean_delta = tf.reduce_sum(value - self._mean[None, ...], 0) + new_mean = self._mean + mean_delta / step + new_mean = tf.cond(self._count > 1, lambda: new_mean, lambda: value[0]) + var_delta = ( + value - self._mean[None, ...]) * (value - new_mean[None, ...]) + new_var_sum = self._var_sum + tf.reduce_sum(var_delta, 0) + with tf.control_dependencies([new_mean, new_var_sum]): + update = self._mean.assign(new_mean), self._var_sum.assign(new_var_sum) + with tf.control_dependencies(update): + if value.shape.ndims == 1: + value = tf.reduce_mean(value) + return self._summary('value', tf.reduce_mean(value)) + + def reset(self): + """Reset the estimates of mean and variance. + + Resets the full state of this class. + + Returns: + Operation. + """ + with tf.name_scope(self._name + '/reset'): + return tf.group( + self._count.assign(0), + self._mean.assign(tf.zeros_like(self._mean)), + self._var_sum.assign(tf.zeros_like(self._var_sum))) + + def summary(self): + """Summary string of mean and standard deviation. + + Returns: + Summary tensor. + """ + with tf.name_scope(self._name + '/summary'): + mean_summary = tf.cond( + self._count > 0, lambda: self._summary('mean', self._mean), str) + std_summary = tf.cond( + self._count > 1, lambda: self._summary('stddev', self._std()), str) + return tf.summary.merge([mean_summary, std_summary]) + + def _std(self): + """Computes the current estimate of the standard deviation. + + Note that the standard deviation is not defined until at least two samples + were seen. + + Returns: + Tensor of current variance. + """ + variance = tf.cond( + self._count > 1, + lambda: self._var_sum / tf.cast(self._count - 1, tf.float32), + lambda: tf.ones_like(self._var_sum) * float('nan')) + # The epsilon corrects for small negative variance values caused by + # the algorithm. It was empirically chosen to work with all environments + # tested. + return tf.sqrt(variance + 1e-4) + + def _summary(self, name, tensor): + """Create a scalar or histogram summary matching the rank of the tensor. + + Args: + name: Name for the summary. + tensor: Tensor to summarize. + + Returns: + Summary tensor. + """ + if tensor.shape.ndims == 0: + return tf.summary.scalar(name, tensor) + else: + return tf.summary.histogram(name, tensor) diff --git a/examples/pybullet/gym/pybullet_envs/agents/ppo/utility.py b/examples/pybullet/gym/pybullet_envs/agents/ppo/utility.py new file mode 100644 index 000000000..c46934c49 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/ppo/utility.py @@ -0,0 +1,213 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for the PPO algorithm.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import math +import re + +import tensorflow as tf +from tensorflow.python.client import device_lib + + +def reinit_nested_vars(variables, indices=None): + """Reset all variables in a nested tuple to zeros. + + Args: + variables: Nested tuple or list of variaables. + indices: Batch indices to reset, defaults to all. + + Returns: + Operation. + """ + if isinstance(variables, (tuple, list)): + return tf.group(*[ + reinit_nested_vars(variable, indices) for variable in variables]) + if indices is None: + return variables.assign(tf.zeros_like(variables)) + else: + zeros = tf.zeros([tf.shape(indices)[0]] + variables.shape[1:].as_list()) + return tf.scatter_update(variables, indices, zeros) + + +def assign_nested_vars(variables, tensors, indices=None): + """Assign tensors to matching nested tuple of variables. + + Args: + variables: Nested tuple or list of variables to update. + tensors: Nested tuple or list of tensors to assign. + indices: Batch indices to assign to; default to all. + + Returns: + Operation. + """ + if isinstance(variables, (tuple, list)): + return tf.group(*[ + assign_nested_vars(variable, tensor) + for variable, tensor in zip(variables, tensors)]) + if indices is None: + return variables.assign(tensors) + else: + return tf.scatter_update(variables, indices, tensors) + + +def discounted_return(reward, length, discount): + """Discounted Monte-Carlo returns.""" + timestep = tf.range(reward.shape[1].value) + mask = tf.cast(timestep[None, :] < length[:, None], tf.float32) + return_ = tf.reverse(tf.transpose(tf.scan( + lambda agg, cur: cur + discount * agg, + tf.transpose(tf.reverse(mask * reward, [1]), [1, 0]), + tf.zeros_like(reward[:, -1]), 1, False), [1, 0]), [1]) + return tf.check_numerics(tf.stop_gradient(return_), 'return') + + +def fixed_step_return(reward, value, length, discount, window): + """N-step discounted return.""" + timestep = tf.range(reward.shape[1].value) + mask = tf.cast(timestep[None, :] < length[:, None], tf.float32) + return_ = tf.zeros_like(reward) + for _ in range(window): + return_ += reward + reward = discount * tf.concat( + [reward[:, 1:], tf.zeros_like(reward[:, -1:])], 1) + return_ += discount ** window * tf.concat( + [value[:, window:], tf.zeros_like(value[:, -window:]), 1]) + return tf.check_numerics(tf.stop_gradient(mask * return_), 'return') + + +def lambda_return(reward, value, length, discount, lambda_): + """TD-lambda returns.""" + timestep = tf.range(reward.shape[1].value) + mask = tf.cast(timestep[None, :] < length[:, None], tf.float32) + sequence = mask * reward + discount * value * (1 - lambda_) + discount = mask * discount * lambda_ + sequence = tf.stack([sequence, discount], 2) + return_ = tf.reverse(tf.transpose(tf.scan( + lambda agg, cur: cur[0] + cur[1] * agg, + tf.transpose(tf.reverse(sequence, [1]), [1, 2, 0]), + tf.zeros_like(value[:, -1]), 1, False), [1, 0]), [1]) + return tf.check_numerics(tf.stop_gradient(return_), 'return') + + +def lambda_advantage(reward, value, length, discount): + """Generalized Advantage Estimation.""" + timestep = tf.range(reward.shape[1].value) + mask = tf.cast(timestep[None, :] < length[:, None], tf.float32) + next_value = tf.concat([value[:, 1:], tf.zeros_like(value[:, -1:])], 1) + delta = reward + discount * next_value - value + advantage = tf.reverse(tf.transpose(tf.scan( + lambda agg, cur: cur + discount * agg, + tf.transpose(tf.reverse(mask * delta, [1]), [1, 0]), + tf.zeros_like(delta[:, -1]), 1, False), [1, 0]), [1]) + return tf.check_numerics(tf.stop_gradient(advantage), 'advantage') + + +def diag_normal_kl(mean0, logstd0, mean1, logstd1): + """Epirical KL divergence of two normals with diagonal covariance.""" + logstd0_2, logstd1_2 = 2 * logstd0, 2 * logstd1 + return 0.5 * ( + tf.reduce_sum(tf.exp(logstd0_2 - logstd1_2), -1) + + tf.reduce_sum((mean1 - mean0) ** 2 / tf.exp(logstd1_2), -1) + + tf.reduce_sum(logstd1_2, -1) - tf.reduce_sum(logstd0_2, -1) - + mean0.shape[-1].value) + + +def diag_normal_logpdf(mean, logstd, loc): + """Log density of a normal with diagonal covariance.""" + constant = -0.5 * math.log(2 * math.pi) - logstd + value = -0.5 * ((loc - mean) / tf.exp(logstd)) ** 2 + return tf.reduce_sum(constant + value, -1) + + +def diag_normal_entropy(mean, logstd): + """Empirical entropy of a normal with diagonal covariance.""" + constant = mean.shape[-1].value * math.log(2 * math.pi * math.e) + return (constant + tf.reduce_sum(2 * logstd, 1)) / 2 + + +def available_gpus(): + """List of GPU device names detected by TensorFlow.""" + local_device_protos = device_lib.list_local_devices() + return [x.name for x in local_device_protos if x.device_type == 'GPU'] + + +def gradient_summaries(grad_vars, groups=None, scope='gradients'): + """Create histogram summaries of the gradient. + + Summaries can be grouped via regexes matching variables names. + + Args: + grad_vars: List of (gradient, variable) tuples as returned by optimizers. + groups: Mapping of name to regex for grouping summaries. + scope: Name scope for this operation. + + Returns: + Summary tensor. + """ + groups = groups or {r'all': r'.*'} + grouped = collections.defaultdict(list) + for grad, var in grad_vars: + if grad is None: + continue + for name, pattern in groups.items(): + if re.match(pattern, var.name): + name = re.sub(pattern, name, var.name) + grouped[name].append(grad) + for name in groups: + if name not in grouped: + tf.logging.warn("No variables matching '{}' group.".format(name)) + summaries = [] + for name, grads in grouped.items(): + grads = [tf.reshape(grad, [-1]) for grad in grads] + grads = tf.concat(grads, 0) + summaries.append(tf.summary.histogram(scope + '/' + name, grads)) + return tf.summary.merge(summaries) + + +def variable_summaries(vars_, groups=None, scope='weights'): + """Create histogram summaries for the provided variables. + + Summaries can be grouped via regexes matching variables names. + + Args: + vars_: List of variables to summarize. + groups: Mapping of name to regex for grouping summaries. + scope: Name scope for this operation. + + Returns: + Summary tensor. + """ + groups = groups or {r'all': r'.*'} + grouped = collections.defaultdict(list) + for var in vars_: + for name, pattern in groups.items(): + if re.match(pattern, var.name): + name = re.sub(pattern, name, var.name) + grouped[name].append(var) + for name in groups: + if name not in grouped: + tf.logging.warn("No variables matching '{}' group.".format(name)) + summaries = [] + # pylint: disable=redefined-argument-from-local + for name, vars_ in grouped.items(): + vars_ = [tf.reshape(var, [-1]) for var in vars_] + vars_ = tf.concat(vars_, 0) + summaries.append(tf.summary.histogram(scope + '/' + name, vars_)) + return tf.summary.merge(summaries) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/__init__.py b/examples/pybullet/gym/pybullet_envs/agents/tools/__init__.py new file mode 100644 index 000000000..0201de98f --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/__init__.py @@ -0,0 +1,31 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tools for reinforcement learning.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +from . import wrappers +from .attr_dict import AttrDict +from .batch_env import BatchEnv +from .count_weights import count_weights +from .in_graph_batch_env import InGraphBatchEnv +from .in_graph_env import InGraphEnv +from .loop import Loop +from .mock_algorithm import MockAlgorithm +from .mock_environment import MockEnvironment +from .simulate import simulate +from .streaming_mean import StreamingMean diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/attr_dict.py b/examples/pybullet/gym/pybullet_envs/agents/tools/attr_dict.py new file mode 100644 index 000000000..1707486d8 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/attr_dict.py @@ -0,0 +1,54 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wrap a dictionary to access keys as attributes.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import contextlib + + +class AttrDict(dict): + """Wrap a dictionary to access keys as attributes.""" + + def __init__(self, *args, **kwargs): + super(AttrDict, self).__init__(*args, **kwargs) + super(AttrDict, self).__setattr__('_mutable', False) + + def __getattr__(self, key): + # Do not provide None for unimplemented magic attributes. + if key.startswith('__'): + raise AttributeError + return self.get(key, None) + + def __setattr__(self, key, value): + if not self._mutable: + message = "Cannot set attribute '{}'.".format(key) + message += " Use 'with obj.unlocked:' scope to set attributes." + raise RuntimeError(message) + if key.startswith('__'): + raise AttributeError("Cannot set magic attribute '{}'".format(key)) + self[key] = value + + @property + @contextlib.contextmanager + def unlocked(self): + super(AttrDict, self).__setattr__('_mutable', True) + yield + super(AttrDict, self).__setattr__('_mutable', False) + + def copy(self): + return type(self)(super(AttrDict, self).copy()) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/batch_env.py b/examples/pybullet/gym/pybullet_envs/agents/tools/batch_env.py new file mode 100644 index 000000000..946fd7211 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/batch_env.py @@ -0,0 +1,124 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Combine multiple environments to step them in batch.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import numpy as np + + +class BatchEnv(object): + """Combine multiple environments to step them in batch.""" + + def __init__(self, envs, blocking): + """Combine multiple environments to step them in batch. + + To step environments in parallel, environments must support a + `blocking=False` argument to their step and reset functions that makes them + return callables instead to receive the result at a later time. + + Args: + envs: List of environments. + blocking: Step environments after another rather than in parallel. + + Raises: + ValueError: Environments have different observation or action spaces. + """ + self._envs = envs + self._blocking = blocking + observ_space = self._envs[0].observation_space + if not all(env.observation_space == observ_space for env in self._envs): + raise ValueError('All environments must use the same observation space.') + action_space = self._envs[0].action_space + if not all(env.action_space == action_space for env in self._envs): + raise ValueError('All environments must use the same observation space.') + + def __len__(self): + """Number of combined environments.""" + return len(self._envs) + + def __getitem__(self, index): + """Access an underlying environment by index.""" + return self._envs[index] + + def __getattr__(self, name): + """Forward unimplemented attributes to one of the original environments. + + Args: + name: Attribute that was accessed. + + Returns: + Value behind the attribute name one of the wrapped environments. + """ + return getattr(self._envs[0], name) + + def step(self, actions): + """Forward a batch of actions to the wrapped environments. + + Args: + actions: Batched action to apply to the environment. + + Raises: + ValueError: Invalid actions. + + Returns: + Batch of observations, rewards, and done flags. + """ + for index, (env, action) in enumerate(zip(self._envs, actions)): + if not env.action_space.contains(action): + message = 'Invalid action at index {}: {}' + raise ValueError(message.format(index, action)) + if self._blocking: + transitions = [ + env.step(action) + for env, action in zip(self._envs, actions)] + else: + transitions = [ + env.step(action, blocking=False) + for env, action in zip(self._envs, actions)] + transitions = [transition() for transition in transitions] + observs, rewards, dones, infos = zip(*transitions) + observ = np.stack(observs) + reward = np.stack(rewards) + done = np.stack(dones) + info = tuple(infos) + return observ, reward, done, info + + def reset(self, indices=None): + """Reset the environment and convert the resulting observation. + + Args: + indices: The batch indices of environments to reset; defaults to all. + + Returns: + Batch of observations. + """ + if indices is None: + indices = np.arange(len(self._envs)) + if self._blocking: + observs = [self._envs[index].reset() for index in indices] + else: + observs = [self._envs[index].reset(blocking=False) for index in indices] + observs = [observ() for observ in observs] + observ = np.stack(observs) + return observ + + def close(self): + """Send close messages to the external process and join them.""" + for env in self._envs: + if hasattr(env, 'close'): + env.close() diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/count_weights.py b/examples/pybullet/gym/pybullet_envs/agents/tools/count_weights.py new file mode 100644 index 000000000..dd0d870f6 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/count_weights.py @@ -0,0 +1,48 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Count learnable parameters.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import re + +import numpy as np +import tensorflow as tf + + +def count_weights(scope=None, exclude=None, graph=None): + """Count learnable parameters. + + Args: + scope: Resrict the count to a variable scope. + exclude: Regex to match variable names to exclude. + graph: Operate on a graph other than the current default graph. + + Returns: + Number of learnable parameters as integer. + """ + if scope: + scope = scope if scope.endswith('/') else scope + '/' + graph = graph or tf.get_default_graph() + vars_ = graph.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES) + if scope: + vars_ = [var for var in vars_ if var.name.startswith(scope)] + if exclude: + exclude = re.compile(exclude) + vars_ = [var for var in vars_ if not exclude.match(var.name)] + shapes = [var.get_shape().as_list() for var in vars_] + return int(sum(np.prod(shape) for shape in shapes)) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_batch_env.py b/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_batch_env.py new file mode 100644 index 000000000..d4e1644d3 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_batch_env.py @@ -0,0 +1,178 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Batch of environments inside the TensorFlow graph.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import gym +import tensorflow as tf + + +class InGraphBatchEnv(object): + """Batch of environments inside the TensorFlow graph. + + The batch of environments will be stepped and reset inside of the graph using + a tf.py_func(). The current batch of observations, actions, rewards, and done + flags are held in according variables. + """ + + def __init__(self, batch_env): + """Batch of environments inside the TensorFlow graph. + + Args: + batch_env: Batch environment. + """ + self._batch_env = batch_env + observ_shape = self._parse_shape(self._batch_env.observation_space) + observ_dtype = self._parse_dtype(self._batch_env.observation_space) + action_shape = self._parse_shape(self._batch_env.action_space) + action_dtype = self._parse_dtype(self._batch_env.action_space) + with tf.variable_scope('env_temporary'): + self._observ = tf.Variable( + tf.zeros((len(self._batch_env),) + observ_shape, observ_dtype), + name='observ', trainable=False) + self._action = tf.Variable( + tf.zeros((len(self._batch_env),) + action_shape, action_dtype), + name='action', trainable=False) + self._reward = tf.Variable( + tf.zeros((len(self._batch_env),), tf.float32), + name='reward', trainable=False) + self._done = tf.Variable( + tf.cast(tf.ones((len(self._batch_env),)), tf.bool), + name='done', trainable=False) + + def __getattr__(self, name): + """Forward unimplemented attributes to one of the original environments. + + Args: + name: Attribute that was accessed. + + Returns: + Value behind the attribute name in one of the original environments. + """ + return getattr(self._batch_env, name) + + def __len__(self): + """Number of combined environments.""" + return len(self._batch_env) + + def __getitem__(self, index): + """Access an underlying environment by index.""" + return self._batch_env[index] + + def simulate(self, action): + """Step the batch of environments. + + The results of the step can be accessed from the variables defined below. + + Args: + action: Tensor holding the batch of actions to apply. + + Returns: + Operation. + """ + with tf.name_scope('environment/simulate'): + if action.dtype in (tf.float16, tf.float32, tf.float64): + action = tf.check_numerics(action, 'action') + observ_dtype = self._parse_dtype(self._batch_env.observation_space) + observ, reward, done = tf.py_func( + lambda a: self._batch_env.step(a)[:3], [action], + [observ_dtype, tf.float32, tf.bool], name='step') + observ = tf.check_numerics(observ, 'observ') + reward = tf.check_numerics(reward, 'reward') + return tf.group( + self._observ.assign(observ), + self._action.assign(action), + self._reward.assign(reward), + self._done.assign(done)) + + def reset(self, indices=None): + """Reset the batch of environments. + + Args: + indices: The batch indices of the environments to reset; defaults to all. + + Returns: + Batch tensor of the new observations. + """ + if indices is None: + indices = tf.range(len(self._batch_env)) + observ_dtype = self._parse_dtype(self._batch_env.observation_space) + observ = tf.py_func( + self._batch_env.reset, [indices], observ_dtype, name='reset') + observ = tf.check_numerics(observ, 'observ') + reward = tf.zeros_like(indices, tf.float32) + done = tf.zeros_like(indices, tf.bool) + with tf.control_dependencies([ + tf.scatter_update(self._observ, indices, observ), + tf.scatter_update(self._reward, indices, reward), + tf.scatter_update(self._done, indices, done)]): + return tf.identity(observ) + + @property + def observ(self): + """Access the variable holding the current observation.""" + return self._observ + + @property + def action(self): + """Access the variable holding the last recieved action.""" + return self._action + + @property + def reward(self): + """Access the variable holding the current reward.""" + return self._reward + + @property + def done(self): + """Access the variable indicating whether the episode is done.""" + return self._done + + def close(self): + """Send close messages to the external process and join them.""" + self._batch_env.close() + + def _parse_shape(self, space): + """Get a tensor shape from a OpenAI Gym space. + + Args: + space: Gym space. + + Returns: + Shape tuple. + """ + if isinstance(space, gym.spaces.Discrete): + return () + if isinstance(space, gym.spaces.Box): + return space.shape + raise NotImplementedError() + + def _parse_dtype(self, space): + """Get a tensor dtype from a OpenAI Gym space. + + Args: + space: Gym space. + + Returns: + TensorFlow data type. + """ + if isinstance(space, gym.spaces.Discrete): + return tf.int32 + if isinstance(space, gym.spaces.Box): + return tf.float32 + raise NotImplementedError() diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_env.py b/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_env.py new file mode 100644 index 000000000..33ff31d07 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/in_graph_env.py @@ -0,0 +1,162 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Put an OpenAI Gym environment into the TensorFlow graph.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import gym +import tensorflow as tf + + +class InGraphEnv(object): + """Put an OpenAI Gym environment into the TensorFlow graph. + + The environment will be stepped and reset inside of the graph using + tf.py_func(). The current observation, action, reward, and done flag are held + in according variables. + """ + + def __init__(self, env): + """Put an OpenAI Gym environment into the TensorFlow graph. + + Args: + env: OpenAI Gym environment. + """ + self._env = env + observ_shape = self._parse_shape(self._env.observation_space) + observ_dtype = self._parse_dtype(self._env.observation_space) + action_shape = self._parse_shape(self._env.action_space) + action_dtype = self._parse_dtype(self._env.action_space) + with tf.name_scope('environment'): + self._observ = tf.Variable( + tf.zeros(observ_shape, observ_dtype), name='observ', trainable=False) + self._action = tf.Variable( + tf.zeros(action_shape, action_dtype), name='action', trainable=False) + self._reward = tf.Variable( + 0.0, dtype=tf.float32, name='reward', trainable=False) + self._done = tf.Variable( + True, dtype=tf.bool, name='done', trainable=False) + self._step = tf.Variable( + 0, dtype=tf.int32, name='step', trainable=False) + + def __getattr__(self, name): + """Forward unimplemented attributes to the original environment. + + Args: + name: Attribute that was accessed. + + Returns: + Value behind the attribute name in the wrapped environment. + """ + return getattr(self._env, name) + + def simulate(self, action): + """Step the environment. + + The result of the step can be accessed from the variables defined below. + + Args: + action: Tensor holding the action to apply. + + Returns: + Operation. + """ + with tf.name_scope('environment/simulate'): + if action.dtype in (tf.float16, tf.float32, tf.float64): + action = tf.check_numerics(action, 'action') + observ_dtype = self._parse_dtype(self._env.observation_space) + observ, reward, done = tf.py_func( + lambda a: self._env.step(a)[:3], [action], + [observ_dtype, tf.float32, tf.bool], name='step') + observ = tf.check_numerics(observ, 'observ') + reward = tf.check_numerics(reward, 'reward') + return tf.group( + self._observ.assign(observ), + self._action.assign(action), + self._reward.assign(reward), + self._done.assign(done), + self._step.assign_add(1)) + + def reset(self): + """Reset the environment. + + Returns: + Tensor of the current observation. + """ + observ_dtype = self._parse_dtype(self._env.observation_space) + observ = tf.py_func(self._env.reset, [], observ_dtype, name='reset') + observ = tf.check_numerics(observ, 'observ') + with tf.control_dependencies([ + self._observ.assign(observ), + self._reward.assign(0), + self._done.assign(False)]): + return tf.identity(observ) + + @property + def observ(self): + """Access the variable holding the current observation.""" + return self._observ + + @property + def action(self): + """Access the variable holding the last recieved action.""" + return self._action + + @property + def reward(self): + """Access the variable holding the current reward.""" + return self._reward + + @property + def done(self): + """Access the variable indicating whether the episode is done.""" + return self._done + + @property + def step(self): + """Access the variable containg total steps of this environment.""" + return self._step + + def _parse_shape(self, space): + """Get a tensor shape from a OpenAI Gym space. + + Args: + space: Gym space. + + Returns: + Shape tuple. + """ + if isinstance(space, gym.spaces.Discrete): + return () + if isinstance(space, gym.spaces.Box): + return space.shape + raise NotImplementedError() + + def _parse_dtype(self, space): + """Get a tensor dtype from a OpenAI Gym space. + + Args: + space: Gym space. + + Returns: + TensorFlow data type. + """ + if isinstance(space, gym.spaces.Discrete): + return tf.int32 + if isinstance(space, gym.spaces.Box): + return tf.float32 + raise NotImplementedError() diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/loop.py b/examples/pybullet/gym/pybullet_envs/agents/tools/loop.py new file mode 100644 index 000000000..b8f118c85 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/loop.py @@ -0,0 +1,233 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Execute operations in a loop and coordinate logging and checkpoints.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import collections +import os + +import tensorflow as tf + +from . import streaming_mean + + +_Phase = collections.namedtuple( + 'Phase', + 'name, writer, op, batch, steps, feed, report_every, log_every,' + 'checkpoint_every') + + +class Loop(object): + """Execute operations in a loop and coordinate logging and checkpoints. + + Supports multiple phases, that define their own operations to run, and + intervals for reporting scores, logging summaries, and storing checkpoints. + All class state is stored in-graph to properly recover from checkpoints. + """ + + def __init__(self, logdir, step=None, log=None, report=None, reset=None): + """Execute operations in a loop and coordinate logging and checkpoints. + + The step, log, report, and report arguments will get created if not + provided. Reset is used to indicate switching to a new phase, so that the + model can start a new computation in case its computation is split over + multiple training steps. + + Args: + logdir: Will contain checkpoints and summaries for each phase. + step: Variable of the global step (optional). + log: Tensor indicating to the model to compute summary tensors. + report: Tensor indicating to the loop to report the current mean score. + reset: Tensor indicating to the model to start a new computation. + """ + self._logdir = logdir + self._step = ( + tf.Variable(0, False, name='global_step') if step is None else step) + self._log = tf.placeholder(tf.bool) if log is None else log + self._report = tf.placeholder(tf.bool) if report is None else report + self._reset = tf.placeholder(tf.bool) if reset is None else reset + self._phases = [] + + def add_phase( + self, name, done, score, summary, steps, + report_every=None, log_every=None, checkpoint_every=None, feed=None): + """Add a phase to the loop protocol. + + If the model breaks long computation into multiple steps, the done tensor + indicates whether the current score should be added to the mean counter. + For example, in reinforcement learning we only have a valid score at the + end of the episode. + + Score and done tensors can either be scalars or vectors, to support + single and batched computations. + + Args: + name: Name for the phase, used for the summary writer. + done: Tensor indicating whether current score can be used. + score: Tensor holding the current, possibly intermediate, score. + summary: Tensor holding summary string to write if not an empty string. + steps: Duration of the phase in steps. + report_every: Yield mean score every this number of steps. + log_every: Request summaries via `log` tensor every this number of steps. + checkpoint_every: Write checkpoint every this number of steps. + feed: Additional feed dictionary for the session run call. + + Raises: + ValueError: Unknown rank for done or score tensors. + """ + done = tf.convert_to_tensor(done, tf.bool) + score = tf.convert_to_tensor(score, tf.float32) + summary = tf.convert_to_tensor(summary, tf.string) + feed = feed or {} + if done.shape.ndims is None or score.shape.ndims is None: + raise ValueError("Rank of 'done' and 'score' tensors must be known.") + writer = self._logdir and tf.summary.FileWriter( + os.path.join(self._logdir, name), tf.get_default_graph(), + flush_secs=60) + op = self._define_step(done, score, summary) + batch = 1 if score.shape.ndims == 0 else score.shape[0].value + self._phases.append(_Phase( + name, writer, op, batch, int(steps), feed, report_every, + log_every, checkpoint_every)) + + def run(self, sess, saver, max_step=None): + """Run the loop schedule for a specified number of steps. + + Call the operation of the current phase until the global step reaches the + specified maximum step. Phases are repeated over and over in the order they + were added. + + Args: + sess: Session to use to run the phase operation. + saver: Saver used for checkpointing. + max_step: Run the operations until the step reaches this limit. + + Yields: + Reported mean scores. + """ + global_step = sess.run(self._step) + steps_made = 1 + while True: + if max_step and global_step >= max_step: + break + phase, epoch, steps_in = self._find_current_phase(global_step) + phase_step = epoch * phase.steps + steps_in + if steps_in % phase.steps < steps_made: + message = '\n' + ('-' * 50) + '\n' + message += 'Phase {} (phase step {}, global step {}).' + tf.logging.info(message.format(phase.name, phase_step, global_step)) + # Populate book keeping tensors. + phase.feed[self._reset] = (steps_in < steps_made) + phase.feed[self._log] = ( + phase.writer and + self._is_every_steps(phase_step, phase.batch, phase.log_every)) + phase.feed[self._report] = ( + self._is_every_steps(phase_step, phase.batch, phase.report_every)) + summary, mean_score, global_step, steps_made = sess.run( + phase.op, phase.feed) + if self._is_every_steps(phase_step, phase.batch, phase.checkpoint_every): + self._store_checkpoint(sess, saver, global_step) + if self._is_every_steps(phase_step, phase.batch, phase.report_every): + yield mean_score + if summary and phase.writer: + # We want smaller phases to catch up at the beginnig of each epoch so + # that their graphs are aligned. + longest_phase = max(phase.steps for phase in self._phases) + summary_step = epoch * longest_phase + steps_in + phase.writer.add_summary(summary, summary_step) + + def _is_every_steps(self, phase_step, batch, every): + """Determine whether a periodic event should happen at this step. + + Args: + phase_step: The incrementing step. + batch: The number of steps progressed at once. + every: The interval of the periode. + + Returns: + Boolean of whether the event should happen. + """ + if not every: + return False + covered_steps = range(phase_step, phase_step + batch) + return any((step + 1) % every == 0 for step in covered_steps) + + def _find_current_phase(self, global_step): + """Determine the current phase based on the global step. + + This ensures continuing the correct phase after restoring checkoints. + + Args: + global_step: The global number of steps performed across all phases. + + Returns: + Tuple of phase object, epoch number, and phase steps within the epoch. + """ + epoch_size = sum(phase.steps for phase in self._phases) + epoch = int(global_step // epoch_size) + steps_in = global_step % epoch_size + for phase in self._phases: + if steps_in < phase.steps: + return phase, epoch, steps_in + steps_in -= phase.steps + + def _define_step(self, done, score, summary): + """Combine operations of a phase. + + Keeps track of the mean score and when to report it. + + Args: + done: Tensor indicating whether current score can be used. + score: Tensor holding the current, possibly intermediate, score. + summary: Tensor holding summary string to write if not an empty string. + + Returns: + Tuple of summary tensor, mean score, and new global step. The mean score + is zero for non reporting steps. + """ + if done.shape.ndims == 0: + done = done[None] + if score.shape.ndims == 0: + score = score[None] + score_mean = streaming_mean.StreamingMean((), tf.float32) + with tf.control_dependencies([done, score, summary]): + done_score = tf.gather(score, tf.where(done)[:, 0]) + submit_score = tf.cond( + tf.reduce_any(done), lambda: score_mean.submit(done_score), tf.no_op) + with tf.control_dependencies([submit_score]): + mean_score = tf.cond(self._report, score_mean.clear, float) + steps_made = tf.shape(score)[0] + next_step = self._step.assign_add(steps_made) + with tf.control_dependencies([mean_score, next_step]): + return tf.identity(summary), mean_score, next_step, steps_made + + def _store_checkpoint(self, sess, saver, global_step): + """Store a checkpoint if a log directory was provided to the constructor. + + The directory will be created if needed. + + Args: + sess: Session containing variables to store. + saver: Saver used for checkpointing. + global_step: Step number of the checkpoint name. + """ + if not self._logdir or not saver: + return + tf.gfile.MakeDirs(self._logdir) + filename = os.path.join(self._logdir, 'model.ckpt') + saver.save(sess, filename, global_step) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/mock_algorithm.py b/examples/pybullet/gym/pybullet_envs/agents/tools/mock_algorithm.py new file mode 100644 index 000000000..c60712d8b --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/mock_algorithm.py @@ -0,0 +1,49 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Mock algorithm for testing reinforcement learning code.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + + +class MockAlgorithm(object): + """Produce random actions and empty summaries.""" + + def __init__(self, envs): + """Produce random actions and empty summaries. + + Args: + envs: List of in-graph environments. + """ + self._envs = envs + + def begin_episode(self, unused_agent_indices): + return tf.constant('') + + def perform(self, agent_indices, unused_observ): + shape = (tf.shape(agent_indices)[0],) + self._envs[0].action_space.shape + low = self._envs[0].action_space.low + high = self._envs[0].action_space.high + action = tf.random_uniform(shape) * (high - low) + low + return action, tf.constant('') + + def experience(self, unused_agent_indices, *unused_transition): + return tf.constant('') + + def end_episode(self, unused_agent_indices): + return tf.constant('') diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/mock_environment.py b/examples/pybullet/gym/pybullet_envs/agents/tools/mock_environment.py new file mode 100644 index 000000000..248f515b1 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/mock_environment.py @@ -0,0 +1,86 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Mock environment for testing reinforcement learning code.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import gym +import gym.spaces +import numpy as np + + +class MockEnvironment(object): + """Generate random agent input and keep track of statistics.""" + + def __init__(self, observ_shape, action_shape, min_duration, max_duration): + """Generate random agent input and keep track of statistics. + + Args: + observ_shape: Shape for the random observations. + action_shape: Shape for the action space. + min_duration: Minimum number of steps per episode. + max_duration: Maximum number of steps per episode. + + Attributes: + steps: List of actual simulated lengths for all episodes. + durations: List of decided lengths for all episodes. + """ + self._observ_shape = observ_shape + self._action_shape = action_shape + self._min_duration = min_duration + self._max_duration = max_duration + self._random = np.random.RandomState(0) + self.steps = [] + self.durations = [] + + @property + def observation_space(self): + low = np.zeros(self._observ_shape) + high = np.ones(self._observ_shape) + return gym.spaces.Box(low, high) + + @property + def action_space(self): + low = np.zeros(self._action_shape) + high = np.ones(self._action_shape) + return gym.spaces.Box(low, high) + + @property + def unwrapped(self): + return self + + def step(self, action): + assert self.action_space.contains(action) + assert self.steps[-1] < self.durations[-1] + self.steps[-1] += 1 + observ = self._current_observation() + reward = self._current_reward() + done = self.steps[-1] >= self.durations[-1] + info = {} + return observ, reward, done, info + + def reset(self): + duration = self._random.randint(self._min_duration, self._max_duration + 1) + self.steps.append(0) + self.durations.append(duration) + return self._current_observation() + + def _current_observation(self): + return self._random.uniform(0, 1, self._observ_shape) + + def _current_reward(self): + return self._random.uniform(-1, 1) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/simulate.py b/examples/pybullet/gym/pybullet_envs/agents/tools/simulate.py new file mode 100644 index 000000000..ebe9a898b --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/simulate.py @@ -0,0 +1,147 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""In-graph simulation step of a vectorized algorithm with environments.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + +from . import streaming_mean + + +def simulate(batch_env, algo, log=True, reset=False): + """Simulation step of a vecrotized algorithm with in-graph environments. + + Integrates the operations implemented by the algorithm and the environments + into a combined operation. + + Args: + batch_env: In-graph batch environment. + algo: Algorithm instance implementing required operations. + log: Tensor indicating whether to compute and return summaries. + reset: Tensor causing all environments to reset. + + Returns: + Tuple of tensors containing done flags for the current episodes, possibly + intermediate scores for the episodes, and a summary tensor. + """ + + def _define_begin_episode(agent_indices): + """Reset environments, intermediate scores and durations for new episodes. + + Args: + agent_indices: Tensor containing batch indices starting an episode. + + Returns: + Summary tensor. + """ + assert agent_indices.shape.ndims == 1 + zero_scores = tf.zeros_like(agent_indices, tf.float32) + zero_durations = tf.zeros_like(agent_indices) + reset_ops = [ + batch_env.reset(agent_indices), + tf.scatter_update(score, agent_indices, zero_scores), + tf.scatter_update(length, agent_indices, zero_durations)] + with tf.control_dependencies(reset_ops): + return algo.begin_episode(agent_indices) + + def _define_step(): + """Request actions from the algorithm and apply them to the environments. + + Increments the lengths of all episodes and increases their scores by the + current reward. After stepping the environments, provides the full + transition tuple to the algorithm. + + Returns: + Summary tensor. + """ + prevob = batch_env.observ + 0 # Ensure a copy of the variable value. + agent_indices = tf.range(len(batch_env)) + action, step_summary = algo.perform(agent_indices, prevob) + action.set_shape(batch_env.action.shape) + with tf.control_dependencies([batch_env.simulate(action)]): + add_score = score.assign_add(batch_env.reward) + inc_length = length.assign_add(tf.ones(len(batch_env), tf.int32)) + with tf.control_dependencies([add_score, inc_length]): + agent_indices = tf.range(len(batch_env)) + experience_summary = algo.experience( + agent_indices, prevob, batch_env.action, batch_env.reward, + batch_env.done, batch_env.observ) + return tf.summary.merge([step_summary, experience_summary]) + + def _define_end_episode(agent_indices): + """Notify the algorithm of ending episodes. + + Also updates the mean score and length counters used for summaries. + + Args: + agent_indices: Tensor holding batch indices that end their episodes. + + Returns: + Summary tensor. + """ + assert agent_indices.shape.ndims == 1 + submit_score = mean_score.submit(tf.gather(score, agent_indices)) + submit_length = mean_length.submit( + tf.cast(tf.gather(length, agent_indices), tf.float32)) + with tf.control_dependencies([submit_score, submit_length]): + return algo.end_episode(agent_indices) + + def _define_summaries(): + """Reset the average score and duration, and return them as summary. + + Returns: + Summary string. + """ + score_summary = tf.cond( + tf.logical_and(log, tf.cast(mean_score.count, tf.bool)), + lambda: tf.summary.scalar('mean_score', mean_score.clear()), str) + length_summary = tf.cond( + tf.logical_and(log, tf.cast(mean_length.count, tf.bool)), + lambda: tf.summary.scalar('mean_length', mean_length.clear()), str) + return tf.summary.merge([score_summary, length_summary]) + + with tf.name_scope('simulate'): + log = tf.convert_to_tensor(log) + reset = tf.convert_to_tensor(reset) + with tf.variable_scope('simulate_temporary'): + score = tf.Variable( + tf.zeros(len(batch_env), dtype=tf.float32), False, name='score') + length = tf.Variable( + tf.zeros(len(batch_env), dtype=tf.int32), False, name='length') + mean_score = streaming_mean.StreamingMean((), tf.float32) + mean_length = streaming_mean.StreamingMean((), tf.float32) + agent_indices = tf.cond( + reset, + lambda: tf.range(len(batch_env)), + lambda: tf.cast(tf.where(batch_env.done)[:, 0], tf.int32)) + begin_episode = tf.cond( + tf.cast(tf.shape(agent_indices)[0], tf.bool), + lambda: _define_begin_episode(agent_indices), str) + with tf.control_dependencies([begin_episode]): + step = _define_step() + with tf.control_dependencies([step]): + agent_indices = tf.cast(tf.where(batch_env.done)[:, 0], tf.int32) + end_episode = tf.cond( + tf.cast(tf.shape(agent_indices)[0], tf.bool), + lambda: _define_end_episode(agent_indices), str) + with tf.control_dependencies([end_episode]): + summary = tf.summary.merge([ + _define_summaries(), begin_episode, step, end_episode]) + with tf.control_dependencies([summary]): + done, score = tf.identity(batch_env.done), tf.identity(score) + return done, score, summary diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/streaming_mean.py b/examples/pybullet/gym/pybullet_envs/agents/tools/streaming_mean.py new file mode 100644 index 000000000..3f620fe37 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/streaming_mean.py @@ -0,0 +1,67 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Compute a streaming estimation of the mean of submitted tensors.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import tensorflow as tf + + +class StreamingMean(object): + """Compute a streaming estimation of the mean of submitted tensors.""" + + def __init__(self, shape, dtype): + """Specify the shape and dtype of the mean to be estimated. + + Note that a float mean to zero submitted elements is NaN, while computing + the integer mean of zero elements raises a division by zero error. + + Args: + shape: Shape of the mean to compute. + dtype: Data type of the mean to compute. + """ + self._dtype = dtype + self._sum = tf.Variable(lambda: tf.zeros(shape, dtype), False) + self._count = tf.Variable(lambda: 0, trainable=False) + + @property + def value(self): + """The current value of the mean.""" + return self._sum / tf.cast(self._count, self._dtype) + + @property + def count(self): + """The number of submitted samples.""" + return self._count + + def submit(self, value): + """Submit a single or batch tensor to refine the streaming mean.""" + # Add a batch dimension if necessary. + if value.shape.ndims == self._sum.shape.ndims: + value = value[None, ...] + return tf.group( + self._sum.assign_add(tf.reduce_sum(value, 0)), + self._count.assign_add(tf.shape(value)[0])) + + def clear(self): + """Return the mean estimate and reset the streaming statistics.""" + value = self._sum / tf.cast(self._count, self._dtype) + with tf.control_dependencies([value]): + reset_value = self._sum.assign(tf.zeros_like(self._sum)) + reset_count = self._count.assign(0) + with tf.control_dependencies([reset_value, reset_count]): + return tf.identity(value) diff --git a/examples/pybullet/gym/pybullet_envs/agents/tools/wrappers.py b/examples/pybullet/gym/pybullet_envs/agents/tools/wrappers.py new file mode 100644 index 000000000..e7c7543e3 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/tools/wrappers.py @@ -0,0 +1,558 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wrappers for OpenAI Gym environments.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import atexit +import multiprocessing +import sys +import traceback + +import gym +import gym.spaces +import numpy as np +import tensorflow as tf + + +class AutoReset(object): + """Automatically reset environment when the episode is done.""" + + def __init__(self, env): + self._env = env + self._done = True + + def __getattr__(self, name): + return getattr(self._env, name) + + def step(self, action): + if self._done: + observ, reward, done, info = self._env.reset(), 0.0, False, {} + else: + observ, reward, done, info = self._env.step(action) + self._done = done + return observ, reward, done, info + + def reset(self): + self._done = False + return self._env.reset() + + +class ActionRepeat(object): + """Repeat the agent action multiple steps.""" + + def __init__(self, env, amount): + self._env = env + self._amount = amount + + def __getattr__(self, name): + return getattr(self._env, name) + + def step(self, action): + done = False + total_reward = 0 + current_step = 0 + while current_step < self._amount and not done: + observ, reward, done, info = self._env.step(action) + total_reward += reward + current_step += 1 + return observ, total_reward, done, info + + +class RandomStart(object): + """Perform random number of random actions at the start of the episode.""" + + def __init__(self, env, max_steps): + self._env = env + self._max_steps = max_steps + + def __getattr__(self, name): + return getattr(self._env, name) + + def reset(self): + observ = self._env.reset() + random_steps = np.random.randint(0, self._max_steps) + for _ in range(random_steps): + action = self._env.action_space.sample() + observ, unused_reward, done, unused_info = self._env.step(action) + if done: + tf.logging.warning('Episode ended during random start.') + return self.reset() + return observ + + +class FrameHistory(object): + """Augment the observation with past observations.""" + + def __init__(self, env, past_indices, flatten): + """Augment the observation with past observations. + + Implemented as a Numpy ring buffer holding the necessary past observations. + + Args: + env: OpenAI Gym environment to wrap. + past_indices: List of non-negative integers indicating the time offsets + from the current time step of observations to include. + flatten: Concatenate the past observations rather than stacking them. + + Raises: + KeyError: The current observation is not included in the indices. + """ + if 0 not in past_indices: + raise KeyError('Past indices should include 0 for the current frame.') + self._env = env + self._past_indices = past_indices + self._step = 0 + self._buffer = None + self._capacity = max(past_indices) + self._flatten = flatten + + def __getattr__(self, name): + return getattr(self._env, name) + + @property + def observation_space(self): + low = self._env.observation_space.low + high = self._env.observation_space.high + low = np.repeat(low[None, ...], len(self._past_indices), 0) + high = np.repeat(high[None, ...], len(self._past_indices), 0) + if self._flatten: + low = np.reshape(low, (-1,) + low.shape[2:]) + high = np.reshape(high, (-1,) + high.shape[2:]) + return gym.spaces.Box(low, high) + + def step(self, action): + observ, reward, done, info = self._env.step(action) + self._step += 1 + self._buffer[self._step % self._capacity] = observ + observ = self._select_frames() + return observ, reward, done, info + + def reset(self): + observ = self._env.reset() + self._buffer = np.repeat(observ[None, ...], self._capacity, 0) + self._step = 0 + return self._select_frames() + + def _select_frames(self): + indices = [ + (self._step - index) % self._capacity for index in self._past_indices] + observ = self._buffer[indices] + if self._flatten: + observ = np.reshape(observ, (-1,) + observ.shape[2:]) + return observ + + +class FrameDelta(object): + """Convert the observation to a difference from the previous observation.""" + + def __init__(self, env): + self._env = env + self._last = None + + def __getattr__(self, name): + return getattr(self._env, name) + + @property + def observation_space(self): + low = self._env.observation_space.low + high = self._env.observation_space.high + low, high = low - high, high - low + return gym.spaces.Box(low, high) + + def step(self, action): + observ, reward, done, info = self._env.step(action) + delta = observ - self._last + self._last = observ + return delta, reward, done, info + + def reset(self): + observ = self._env.reset() + self._last = observ + return observ + + +class RangeNormalize(object): + """Normalize the specialized observation and action ranges to [-1, 1].""" + + def __init__(self, env, observ=None, action=None): + self._env = env + self._should_normalize_observ = ( + observ is not False and self._is_finite(self._env.observation_space)) + if observ is True and not self._should_normalize_observ: + raise ValueError('Cannot normalize infinite observation range.') + if observ is None and not self._should_normalize_observ: + tf.logging.info('Not normalizing infinite observation range.') + self._should_normalize_action = ( + action is not False and self._is_finite(self._env.action_space)) + if action is True and not self._should_normalize_action: + raise ValueError('Cannot normalize infinite action range.') + if action is None and not self._should_normalize_action: + tf.logging.info('Not normalizing infinite action range.') + + def __getattr__(self, name): + return getattr(self._env, name) + + @property + def observation_space(self): + space = self._env.observation_space + if not self._should_normalize_observ: + return space + return gym.spaces.Box(-np.ones(space.shape), np.ones(space.shape)) + + @property + def action_space(self): + space = self._env.action_space + if not self._should_normalize_action: + return space + return gym.spaces.Box(-np.ones(space.shape), np.ones(space.shape)) + + def step(self, action): + if self._should_normalize_action: + action = self._denormalize_action(action) + observ, reward, done, info = self._env.step(action) + if self._should_normalize_observ: + observ = self._normalize_observ(observ) + return observ, reward, done, info + + def reset(self): + observ = self._env.reset() + if self._should_normalize_observ: + observ = self._normalize_observ(observ) + return observ + + def _denormalize_action(self, action): + min_ = self._env.action_space.low + max_ = self._env.action_space.high + action = (action + 1) / 2 * (max_ - min_) + min_ + return action + + def _normalize_observ(self, observ): + min_ = self._env.observation_space.low + max_ = self._env.observation_space.high + observ = 2 * (observ - min_) / (max_ - min_) - 1 + return observ + + def _is_finite(self, space): + return np.isfinite(space.low).all() and np.isfinite(space.high).all() + + +class ClipAction(object): + """Clip out of range actions to the action space of the environment.""" + + def __init__(self, env): + self._env = env + + def __getattr__(self, name): + return getattr(self._env, name) + + @property + def action_space(self): + shape = self._env.action_space.shape + return gym.spaces.Box(-np.inf * np.ones(shape), np.inf * np.ones(shape)) + + def step(self, action): + action_space = self._env.action_space + action = np.clip(action, action_space.low, action_space.high) + return self._env.step(action) + + +class LimitDuration(object): + """End episodes after specified number of steps.""" + + def __init__(self, env, duration): + self._env = env + self._duration = duration + self._step = None + + def __getattr__(self, name): + return getattr(self._env, name) + + def step(self, action): + if self._step is None: + raise RuntimeError('Must reset environment.') + observ, reward, done, info = self._env.step(action) + self._step += 1 + if self._step >= self._duration: + done = True + self._step = None + return observ, reward, done, info + + def reset(self): + self._step = 0 + return self._env.reset() + + +class ExternalProcess(object): + """Step environment in a separate process for lock free paralellism.""" + + # Message types for communication via the pipe. + _ACCESS = 1 + _CALL = 2 + _RESULT = 3 + _EXCEPTION = 4 + _CLOSE = 5 + + def __init__(self, constructor): + """Step environment in a separate process for lock free paralellism. + + The environment will be created in the external process by calling the + specified callable. This can be an environment class, or a function + creating the environment and potentially wrapping it. The returned + environment should not access global variables. + + Args: + constructor: Callable that creates and returns an OpenAI gym environment. + + Attributes: + observation_space: The cached observation space of the environment. + action_space: The cached action space of the environment. + """ + self._conn, conn = multiprocessing.Pipe() + self._process = multiprocessing.Process( + target=self._worker, args=(constructor, conn)) + atexit.register(self.close) + self._process.start() + self._observ_space = None + self._action_space = None + + @property + def observation_space(self): + if not self._observ_space: + self._observ_space = self.__getattr__('observation_space') + return self._observ_space + + @property + def action_space(self): + if not self._action_space: + self._action_space = self.__getattr__('action_space') + return self._action_space + + def __getattr__(self, name): + """Request an attribute from the environment. + + Note that this involves communication with the external process, so it can + be slow. + + Args: + name: Attribute to access. + + Returns: + Value of the attribute. + """ + self._conn.send((self._ACCESS, name)) + return self._receive() + + def call(self, name, *args, **kwargs): + """Asynchronously call a method of the external environment. + + Args: + name: Name of the method to call. + *args: Positional arguments to forward to the method. + **kwargs: Keyword arguments to forward to the method. + + Returns: + Promise object that blocks and provides the return value when called. + """ + payload = name, args, kwargs + self._conn.send((self._CALL, payload)) + return self._receive + + def close(self): + """Send a close message to the external process and join it.""" + try: + self._conn.send((self._CLOSE, None)) + self._conn.close() + except IOError: + # The connection was already closed. + pass + self._process.join() + + def step(self, action, blocking=True): + """Step the environment. + + Args: + action: The action to apply to the environment. + blocking: Whether to wait for the result. + + Returns: + Transition tuple when blocking, otherwise callable that returns the + transition tuple. + """ + promise = self.call('step', action) + if blocking: + return promise() + else: + return promise + + def reset(self, blocking=True): + """Reset the environment. + + Args: + blocking: Whether to wait for the result. + + Returns: + New observation when blocking, otherwise callable that returns the new + observation. + """ + promise = self.call('reset') + if blocking: + return promise() + else: + return promise + + def _receive(self): + """Wait for a message from the worker process and return its payload. + + Raises: + Exception: An exception was raised inside the worker process. + KeyError: The reveived message is of an unknown type. + + Returns: + Payload object of the message. + """ + message, payload = self._conn.recv() + # Re-raise exceptions in the main process. + if message == self._EXCEPTION: + stacktrace = payload + raise Exception(stacktrace) + if message == self._RESULT: + return payload + raise KeyError('Received message of unexpected type {}'.format(message)) + + def _worker(self, constructor, conn): + """The process waits for actions and sends back environment results. + + Args: + constructor: Constructor for the OpenAI Gym environment. + conn: Connection for communication to the main process. + """ + try: + env = constructor() + while True: + try: + # Only block for short times to have keyboard exceptions be raised. + if not conn.poll(0.1): + continue + message, payload = conn.recv() + except (EOFError, KeyboardInterrupt): + break + if message == self._ACCESS: + name = payload + result = getattr(env, name) + conn.send((self._RESULT, result)) + continue + if message == self._CALL: + name, args, kwargs = payload + result = getattr(env, name)(*args, **kwargs) + conn.send((self._RESULT, result)) + continue + if message == self._CLOSE: + assert payload is None + break + raise KeyError('Received message of unknown type {}'.format(message)) + except Exception: # pylint: disable=broad-except + stacktrace = ''.join(traceback.format_exception(*sys.exc_info())) + tf.logging.error('Error in environment process: {}'.format(stacktrace)) + conn.send((self._EXCEPTION, stacktrace)) + conn.close() + + +class ConvertTo32Bit(object): + """Convert data types of an OpenAI Gym environment to 32 bit.""" + + def __init__(self, env): + """Convert data types of an OpenAI Gym environment to 32 bit. + + Args: + env: OpenAI Gym environment. + """ + self._env = env + + def __getattr__(self, name): + """Forward unimplemented attributes to the original environment. + + Args: + name: Attribute that was accessed. + + Returns: + Value behind the attribute name in the wrapped environment. + """ + return getattr(self._env, name) + + def step(self, action): + """Forward action to the wrapped environment. + + Args: + action: Action to apply to the environment. + + Raises: + ValueError: Invalid action. + + Returns: + Converted observation, converted reward, done flag, and info object. + """ + observ, reward, done, info = self._env.step(action) + observ = self._convert_observ(observ) + reward = self._convert_reward(reward) + return observ, reward, done, info + + def reset(self): + """Reset the environment and convert the resulting observation. + + Returns: + Converted observation. + """ + observ = self._env.reset() + observ = self._convert_observ(observ) + return observ + + def _convert_observ(self, observ): + """Convert the observation to 32 bits. + + Args: + observ: Numpy observation. + + Raises: + ValueError: Observation contains infinite values. + + Returns: + Numpy observation with 32-bit data type. + """ + if not np.isfinite(observ).all(): + raise ValueError('Infinite observation encountered.') + if observ.dtype == np.float64: + return observ.astype(np.float32) + if observ.dtype == np.int64: + return observ.astype(np.int32) + return observ + + def _convert_reward(self, reward): + """Convert the reward to 32 bits. + + Args: + reward: Numpy reward. + + Raises: + ValueError: Rewards contain infinite values. + + Returns: + Numpy reward with 32-bit data type. + """ + if not np.isfinite(reward).all(): + raise ValueError('Infinite reward encountered.') + return np.array(reward, dtype=np.float32) diff --git a/examples/pybullet/gym/pybullet_envs/agents/train_ppo.py b/examples/pybullet/gym/pybullet_envs/agents/train_ppo.py index 561de7628..e2f3b4114 100644 --- a/examples/pybullet/gym/pybullet_envs/agents/train_ppo.py +++ b/examples/pybullet/gym/pybullet_envs/agents/train_ppo.py @@ -24,15 +24,14 @@ from __future__ import division from __future__ import print_function import datetime -import functools import os import gym import tensorflow as tf -from agents import tools +from . import tools from . import configs -from agents.scripts import utility +from . import utility def _create_environment(config): @@ -73,7 +72,7 @@ def _define_loop(graph, logdir, train_steps, eval_steps): graph.force_reset) loop.add_phase( 'train', graph.done, graph.score, graph.summary, train_steps, - report_every=None, + report_every=train_steps, log_every=train_steps // 2, checkpoint_every=None, feed={graph.is_training: True}) @@ -100,9 +99,6 @@ def train(config, env_processes): Evaluation scores. """ tf.reset_default_graph() - with config.unlocked: - config.policy_optimizer = getattr(tf.train, config.policy_optimizer) - config.value_optimizer = getattr(tf.train, config.value_optimizer) if config.update_every % config.num_agents: tf.logging.warn('Number of agents should divide episodes per update.') with tf.device('/cpu:0'): diff --git a/examples/pybullet/gym/pybullet_envs/agents/utility.py b/examples/pybullet/gym/pybullet_envs/agents/utility.py new file mode 100644 index 000000000..9e45fc127 --- /dev/null +++ b/examples/pybullet/gym/pybullet_envs/agents/utility.py @@ -0,0 +1,190 @@ +# Copyright 2017 The TensorFlow Agents Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Utilities for using reinforcement learning algorithms.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import logging +import os +import re + +import ruamel.yaml as yaml +import tensorflow as tf + +from . import tools + + +def define_simulation_graph(batch_env, algo_cls, config): + """Define the algortihm and environment interaction. + + Args: + batch_env: In-graph environments object. + algo_cls: Constructor of a batch algorithm. + config: Configuration object for the algorithm. + + Returns: + Object providing graph elements via attributes. + """ + # pylint: disable=unused-variable + step = tf.Variable(0, False, dtype=tf.int32, name='global_step') + is_training = tf.placeholder(tf.bool, name='is_training') + should_log = tf.placeholder(tf.bool, name='should_log') + do_report = tf.placeholder(tf.bool, name='do_report') + force_reset = tf.placeholder(tf.bool, name='force_reset') + algo = algo_cls(batch_env, step, is_training, should_log, config) + done, score, summary = tools.simulate( + batch_env, algo, should_log, force_reset) + message = 'Graph contains {} trainable variables.' + tf.logging.info(message.format(tools.count_weights())) + # pylint: enable=unused-variable + return tools.AttrDict(locals()) + + +def define_batch_env(constructor, num_agents, env_processes): + """Create environments and apply all desired wrappers. + + Args: + constructor: Constructor of an OpenAI gym environment. + num_agents: Number of environments to combine in the batch. + env_processes: Whether to step environment in external processes. + + Returns: + In-graph environments object. + """ + with tf.variable_scope('environments'): + if env_processes: + envs = [ + tools.wrappers.ExternalProcess(constructor) + for _ in range(num_agents)] + else: + envs = [constructor() for _ in range(num_agents)] + batch_env = tools.BatchEnv(envs, blocking=not env_processes) + batch_env = tools.InGraphBatchEnv(batch_env) + return batch_env + + +def define_saver(exclude=None): + """Create a saver for the variables we want to checkpoint. + + Args: + exclude: List of regexes to match variable names to exclude. + + Returns: + Saver object. + """ + variables = [] + exclude = exclude or [] + exclude = [re.compile(regex) for regex in exclude] + for variable in tf.global_variables(): + if any(regex.match(variable.name) for regex in exclude): + continue + variables.append(variable) + saver = tf.train.Saver(variables, keep_checkpoint_every_n_hours=5) + return saver + + +def initialize_variables(sess, saver, logdir, checkpoint=None, resume=None): + """Initialize or restore variables from a checkpoint if available. + + Args: + sess: Session to initialize variables in. + saver: Saver to restore variables. + logdir: Directory to search for checkpoints. + checkpoint: Specify what checkpoint name to use; defaults to most recent. + resume: Whether to expect recovering a checkpoint or starting a new run. + + Raises: + ValueError: If resume expected but no log directory specified. + RuntimeError: If no resume expected but a checkpoint was found. + """ + sess.run(tf.group( + tf.local_variables_initializer(), + tf.global_variables_initializer())) + if resume and not (logdir or checkpoint): + raise ValueError('Need to specify logdir to resume a checkpoint.') + if logdir: + state = tf.train.get_checkpoint_state(logdir) + if checkpoint: + checkpoint = os.path.join(logdir, checkpoint) + if not checkpoint and state and state.model_checkpoint_path: + checkpoint = state.model_checkpoint_path + if checkpoint and resume is False: + message = 'Found unexpected checkpoint when starting a new run.' + raise RuntimeError(message) + if checkpoint: + saver.restore(sess, checkpoint) + + +def save_config(config, logdir=None): + """Save a new configuration by name. + + If a logging directory is specified, is will be created and the configuration + will be stored there. Otherwise, a log message will be printed. + + Args: + config: Configuration object. + logdir: Location for writing summaries and checkpoints if specified. + + Returns: + Configuration object. + """ + if logdir: + with config.unlocked: + config.logdir = logdir + message = 'Start a new run and write summaries and checkpoints to {}.' + tf.logging.info(message.format(config.logdir)) + tf.gfile.MakeDirs(config.logdir) + config_path = os.path.join(config.logdir, 'config.yaml') + with tf.gfile.FastGFile(config_path, 'w') as file_: + yaml.dump(config, file_, default_flow_style=False) + else: + message = ( + 'Start a new run without storing summaries and checkpoints since no ' + 'logging directory was specified.') + tf.logging.info(message) + return config + + +def load_config(logdir): + """Load a configuration from the log directory. + + Args: + logdir: The logging directory containing the configuration file. + + Raises: + IOError: The logging directory does not contain a configuration file. + + Returns: + Configuration object. + """ + config_path = logdir and os.path.join(logdir, 'config.yaml') + if not config_path or not tf.gfile.Exists(config_path): + message = ( + 'Cannot resume an existing run since the logging directory does not ' + 'contain a configuration file.') + raise IOError(message) + with tf.gfile.FastGFile(config_path, 'r') as file_: + config = yaml.load(file_, Loader=yaml.Loader) + message = 'Resume run and write summaries and checkpoints to {}.' + tf.logging.info(message.format(config.logdir)) + return config + + +def set_up_logging(): + """Configure the TensorFlow logger.""" + tf.logging.set_verbosity(tf.logging.INFO) + logging.getLogger('tensorflow').propagate = False diff --git a/examples/pybullet/gym/pybullet_envs/agents/visualize_ppo.py b/examples/pybullet/gym/pybullet_envs/agents/visualize_ppo.py index f7a67c7ea..dd27a546d 100644 --- a/examples/pybullet/gym/pybullet_envs/agents/visualize_ppo.py +++ b/examples/pybullet/gym/pybullet_envs/agents/visualize_ppo.py @@ -24,14 +24,13 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import functools import os import gym import tensorflow as tf -from agents import tools -from agents.scripts import utility +from . import tools +from . import utility def _create_environment(config, outdir): @@ -97,9 +96,6 @@ def visualize( env_processes: Whether to step environments in separate processes. """ config = utility.load_config(logdir) - with config.unlocked: - config.policy_optimizer = getattr(tf.train, config.policy_optimizer) - config.value_optimizer = getattr(tf.train, config.value_optimizer) with tf.device('/cpu:0'): batch_env = utility.define_batch_env( lambda: _create_environment(config, outdir),