Merge pull request #2244 from erwincoumans/master

add minitaur extended to pybullet_envs
This commit is contained in:
erwincoumans
2019-05-08 10:36:51 -07:00
committed by GitHub
2 changed files with 264 additions and 0 deletions

View File

@@ -39,6 +39,14 @@ register(
reward_threshold=5.0,
)
register(
id='MinitaurExtendedEnv-v0',
entry_point='pybullet_envs.minitaur.envs:MinitaurExtendedEnv',
max_episode_steps=1000,
reward_threshold=5.0,
)
register(
id='MinitaurReactiveEnv-v0',
entry_point='pybullet_envs.minitaur.envs:MinitaurReactiveEnv',

View File

@@ -0,0 +1,256 @@
"""Extends the environment by adding observation and action history.
The implementation is a bit dirty import of the implementation in
the experimental branch.
"""
from gym import spaces
import numpy as np
from pybullet_envs.minitaur.envs.minitaur_reactive_env import MinitaurReactiveEnv
class MinitaurExtendedEnv(MinitaurReactiveEnv):
"""The 'extended' environment for Markovian property.
This class implements to include prior actions and observations to the
observation vector, thus making the environment "more" Markovian. This is
especially useful for systems with latencies.
Args:
history_length: the length of the historic data
history_include_actions: a flag for including actions as history
history_include_states: a flag for including states as history
include_state_difference: a flag for including the first-order differences
as history
include_second_state_difference: a flag for including the second-order state
differences as history.
include_base_position: a flag for including the base as observation,
never_terminate: if this is on, the environment unconditionally never
terminates.
action_scale: the scale of actions,
"""
MAX_BUFFER_SIZE = 1001
ACTION_DIM = 8
PARENT_OBSERVATION_DIM = 12
INIT_EXTENSION_POS = 2.0
INIT_SWING_POS = 0.0
metadata = {
"render.modes": ["human", "rgb_array"],
"video.frames_per_second": 50,
}
def __init__(self,
history_length=1,
history_include_actions=True,
history_include_states=False,
include_state_difference=False,
include_second_state_difference=False,
include_base_position=False,
include_leg_model=False,
never_terminate=False,
action_scale=0.5,
**kwargs):
self._kwargs = kwargs
self._history_length = history_length
self._history_include_actions = history_include_actions
self._history_include_states = history_include_states
self._include_state_difference = include_state_difference
self._include_second_state_difference = include_second_state_difference
self._include_base_position = include_base_position
self._include_leg_model = include_leg_model
self._never_terminate = never_terminate
self._action_scale = action_scale
self._past_parent_observations = np.zeros((self.MAX_BUFFER_SIZE + 1,
self.PARENT_OBSERVATION_DIM))
self._past_motor_angles = np.zeros((self.MAX_BUFFER_SIZE + 1, 8))
self._past_actions = np.zeros((self.MAX_BUFFER_SIZE, self.ACTION_DIM))
self._counter = 0
super(MinitaurExtendedEnv, self).__init__(**kwargs)
self.action_space = spaces.Box(-1.0, 1.0, self.action_space.shape)
self.observation_space = spaces.Box(-np.inf, np.inf,
self._get_observation().shape)
# This is mainly for the TF-Agents compatibility
self.action_space.flat_dim = len(self.action_space.low)
self.observation_space.flat_dim = len(self.observation_space.low)
def _get_observation(self):
"""Maybe concatenate motor velocity and torque into observations."""
parent_observation = super(MinitaurExtendedEnv, self)._get_observation()
parent_observation = np.array(parent_observation)
# Base class might require this.
self._observation = parent_observation
self._past_parent_observations[self._counter] = parent_observation
num_motors = self.minitaur.num_motors
self._past_motor_angles[self._counter] = parent_observation[-num_motors:]
history_states = []
history_actions = []
for i in range(self._history_length):
t = max(self._counter - i - 1, 0)
if self._history_include_states:
history_states.append(self._past_parent_observations[t])
if self._history_include_actions:
history_actions.append(self._past_actions[t])
t = self._counter
tm, tmm = max(0, self._counter - 1), max(0, self._counter - 2)
state_difference, second_state_difference = [], []
if self._include_state_difference:
state_difference = [
self._past_motor_angles[t] - self._past_motor_angles[tm]
]
if self._include_second_state_difference:
second_state_difference = [
self._past_motor_angles[t] - 2 * self._past_motor_angles[tm] +
self._past_motor_angles[tmm]
]
base_position = []
if self._include_base_position:
base_position = np.array((self.minitaur.GetBasePosition()))
leg_model = []
if self._include_leg_model:
raw_motor_angles = self.minitaur.GetMotorAngles()
leg_model = self._convert_to_leg_model(raw_motor_angles)
observation_list = (
[parent_observation] + history_states + history_actions +
state_difference + second_state_difference + [base_position] +
[leg_model])
full_observation = np.concatenate(observation_list)
return full_observation
def reset(self):
"""Resets the time and history buffer."""
self._counter = 0
self._signal(self._counter) # This sets the current phase
self._past_parent_observations = np.zeros((self.MAX_BUFFER_SIZE + 1,
self.PARENT_OBSERVATION_DIM))
self._past_motor_angles = np.zeros((self.MAX_BUFFER_SIZE + 1, 8))
self._past_actions = np.zeros((self.MAX_BUFFER_SIZE, self.ACTION_DIM))
self._counter = 0
return np.array(super(MinitaurExtendedEnv, self).reset())
def step(self, action):
"""Step function wrapper can be used to add shaping terms to the reward.
Args:
action: an array of the given action
Returns:
next_obs: the next observation
reward: the reward for this experience tuple
done: the terminal flag
info: an additional information
"""
action *= self._action_scale
self._past_actions[self._counter] = action
self._counter += 1
next_obs, _, done, info = super(MinitaurExtendedEnv, self).step(action)
reward = self.reward()
info.update(base_reward=reward)
return next_obs, reward, done, info
def terminate(self):
"""The helper function to terminate the environment."""
super(MinitaurExtendedEnv, self)._close()
def _termination(self):
"""Determines whether the env is terminated or not.
checks whether 1) the front leg is bent too much or 2) the time exceeds
the manually set weights.
Returns:
terminal: the terminal flag whether the env is terminated or not
"""
if self._never_terminate:
return False
leg_model = self._convert_to_leg_model(self.minitaur.GetMotorAngles())
swing0 = leg_model[0]
swing1 = leg_model[2]
maximum_swing_angle = 0.8
if swing0 > maximum_swing_angle or swing1 > maximum_swing_angle:
return True
if self._counter >= 500:
return True
return False
def reward(self):
"""Compute rewards for the given time step.
It considers two terms: 1) forward velocity reward and 2) action
acceleration penalty.
Returns:
reward: the computed reward.
"""
current_base_position = self.minitaur.GetBasePosition()
dt = self.control_time_step
velocity = (current_base_position[0] - self._last_base_position[0]) / dt
velocity_reward = np.clip(velocity, -0.5, 0.5)
action = self._past_actions[self._counter - 1]
prev_action = self._past_actions[max(self._counter - 2, 0)]
prev_prev_action = self._past_actions[max(self._counter - 3, 0)]
acc = action - 2 * prev_action + prev_prev_action
action_acceleration_penalty = np.mean(np.abs(acc))
reward = 0.0
reward += 1.0 * velocity_reward
reward -= 0.1 * action_acceleration_penalty
return reward
@staticmethod
def convert_to_leg_model(motor_angles):
"""A helper function to convert motor angles to leg model.
Args:
motor_angles: raw motor angles:
Returns:
leg_angles: the leg pose model represented in swing and extension.
"""
# TODO(sehoonha): clean up model conversion codes
num_legs = 4
# motor_angles = motor_angles / (np.pi / 4.)
leg_angles = np.zeros(num_legs * 2)
for i in range(num_legs):
motor1, motor2 = motor_angles[2 * i:2 * i + 2]
swing = (-1)**(i // 2) * 0.5 * (motor2 - motor1)
extension = 0.5 * (motor1 + motor2)
leg_angles[i] = swing
leg_angles[i + num_legs] = extension
return leg_angles
def __getstate__(self):
"""A helper get state function for pickling."""
return {"kwargs": self._kwargs}
def __setstate__(self, state):
"""A helper set state function for pickling."""
self.__init__(**state["kwargs"])