import os, inspect currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) print("current_dir=" + currentdir) os.sys.path.insert(0, currentdir) import math import gym from gym import spaces from gym.utils import seeding import numpy as np import time import pybullet as p from . import kuka import random import pybullet_data from pkg_resources import parse_version largeValObservation = 100 RENDER_HEIGHT = 720 RENDER_WIDTH = 960 class KukaGymEnv(gym.Env): metadata = {'render.modes': ['human', 'rgb_array'], 'video.frames_per_second': 50} def __init__(self, urdfRoot=pybullet_data.getDataPath(), actionRepeat=1, isEnableSelfCollision=True, renders=False, isDiscrete=False, maxSteps=1000): #print("KukaGymEnv __init__") self._isDiscrete = isDiscrete self._timeStep = 1. / 240. self._urdfRoot = urdfRoot self._actionRepeat = actionRepeat self._isEnableSelfCollision = isEnableSelfCollision self._observation = [] self._envStepCounter = 0 self._renders = renders self._maxSteps = maxSteps self.terminated = 0 self._cam_dist = 1.3 self._cam_yaw = 180 self._cam_pitch = -40 self._p = p if self._renders: cid = p.connect(p.SHARED_MEMORY) if (cid < 0): cid = p.connect(p.GUI) p.resetDebugVisualizerCamera(1.3, 180, -41, [0.52, -0.2, -0.33]) else: p.connect(p.DIRECT) #timinglog = p.startStateLogging(p.STATE_LOGGING_PROFILE_TIMINGS, "kukaTimings.json") self.seed() self.reset() observationDim = len(self.getExtendedObservation()) #print("observationDim") #print(observationDim) observation_high = np.array([largeValObservation] * observationDim) if (self._isDiscrete): self.action_space = spaces.Discrete(7) else: action_dim = 3 self._action_bound = 1 action_high = np.array([self._action_bound] * action_dim) self.action_space = spaces.Box(-action_high, action_high) self.observation_space = spaces.Box(-observation_high, observation_high) self.viewer = None def reset(self): #print("KukaGymEnv _reset") self.terminated = 0 p.resetSimulation() p.setPhysicsEngineParameter(numSolverIterations=150) p.setTimeStep(self._timeStep) p.loadURDF(os.path.join(self._urdfRoot, "plane.urdf"), [0, 0, -1]) p.loadURDF(os.path.join(self._urdfRoot, "table/table.urdf"), 0.5000000, 0.00000, -.820000, 0.000000, 0.000000, 0.0, 1.0) xpos = 0.55 + 0.12 * random.random() ypos = 0 + 0.2 * random.random() ang = 3.14 * 0.5 + 3.1415925438 * random.random() orn = p.getQuaternionFromEuler([0, 0, ang]) self.blockUid = p.loadURDF(os.path.join(self._urdfRoot, "block.urdf"), xpos, ypos, -0.15, orn[0], orn[1], orn[2], orn[3]) p.setGravity(0, 0, -10) self._kuka = kuka.Kuka(urdfRootPath=self._urdfRoot, timeStep=self._timeStep) self._envStepCounter = 0 p.stepSimulation() self._observation = self.getExtendedObservation() return np.array(self._observation) def __del__(self): p.disconnect() def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] def getExtendedObservation(self): self._observation = self._kuka.getObservation() gripperState = p.getLinkState(self._kuka.kukaUid, self._kuka.kukaGripperIndex) gripperPos = gripperState[0] gripperOrn = gripperState[1] blockPos, blockOrn = p.getBasePositionAndOrientation(self.blockUid) invGripperPos, invGripperOrn = p.invertTransform(gripperPos, gripperOrn) gripperMat = p.getMatrixFromQuaternion(gripperOrn) dir0 = [gripperMat[0], gripperMat[3], gripperMat[6]] dir1 = [gripperMat[1], gripperMat[4], gripperMat[7]] dir2 = [gripperMat[2], gripperMat[5], gripperMat[8]] gripperEul = p.getEulerFromQuaternion(gripperOrn) #print("gripperEul") #print(gripperEul) blockPosInGripper, blockOrnInGripper = p.multiplyTransforms(invGripperPos, invGripperOrn, blockPos, blockOrn) projectedBlockPos2D = [blockPosInGripper[0], blockPosInGripper[1]] blockEulerInGripper = p.getEulerFromQuaternion(blockOrnInGripper) #print("projectedBlockPos2D") #print(projectedBlockPos2D) #print("blockEulerInGripper") #print(blockEulerInGripper) #we return the relative x,y position and euler angle of block in gripper space blockInGripperPosXYEulZ = [blockPosInGripper[0], blockPosInGripper[1], blockEulerInGripper[2]] #p.addUserDebugLine(gripperPos,[gripperPos[0]+dir0[0],gripperPos[1]+dir0[1],gripperPos[2]+dir0[2]],[1,0,0],lifeTime=1) #p.addUserDebugLine(gripperPos,[gripperPos[0]+dir1[0],gripperPos[1]+dir1[1],gripperPos[2]+dir1[2]],[0,1,0],lifeTime=1) #p.addUserDebugLine(gripperPos,[gripperPos[0]+dir2[0],gripperPos[1]+dir2[1],gripperPos[2]+dir2[2]],[0,0,1],lifeTime=1) self._observation.extend(list(blockInGripperPosXYEulZ)) return self._observation def step(self, action): if (self._isDiscrete): dv = 0.005 dx = [0, -dv, dv, 0, 0, 0, 0][action] dy = [0, 0, 0, -dv, dv, 0, 0][action] da = [0, 0, 0, 0, 0, -0.05, 0.05][action] f = 0.3 realAction = [dx, dy, -0.002, da, f] else: #print("action[0]=", str(action[0])) dv = 0.005 dx = action[0] * dv dy = action[1] * dv da = action[2] * 0.05 f = 0.3 realAction = [dx, dy, -0.002, da, f] return self.step2(realAction) def step2(self, action): for i in range(self._actionRepeat): self._kuka.applyAction(action) p.stepSimulation() if self._termination(): break self._envStepCounter += 1 if self._renders: time.sleep(self._timeStep) self._observation = self.getExtendedObservation() #print("self._envStepCounter") #print(self._envStepCounter) done = self._termination() npaction = np.array([ action[3] ]) #only penalize rotation until learning works well [action[0],action[1],action[3]]) actionCost = np.linalg.norm(npaction) * 10. #print("actionCost") #print(actionCost) reward = self._reward() - actionCost #print("reward") #print(reward) #print("len=%r" % len(self._observation)) return np.array(self._observation), reward, done, {} def render(self, mode="rgb_array", close=False): if mode != "rgb_array": return np.array([]) base_pos, orn = self._p.getBasePositionAndOrientation(self._kuka.kukaUid) view_matrix = self._p.computeViewMatrixFromYawPitchRoll(cameraTargetPosition=base_pos, distance=self._cam_dist, yaw=self._cam_yaw, pitch=self._cam_pitch, roll=0, upAxisIndex=2) proj_matrix = self._p.computeProjectionMatrixFOV(fov=60, aspect=float(RENDER_WIDTH) / RENDER_HEIGHT, nearVal=0.1, farVal=100.0) (_, _, px, _, _) = self._p.getCameraImage(width=RENDER_WIDTH, height=RENDER_HEIGHT, viewMatrix=view_matrix, projectionMatrix=proj_matrix, renderer=self._p.ER_BULLET_HARDWARE_OPENGL) #renderer=self._p.ER_TINY_RENDERER) rgb_array = np.array(px, dtype=np.uint8) rgb_array = np.reshape(rgb_array, (RENDER_HEIGHT, RENDER_WIDTH, 4)) rgb_array = rgb_array[:, :, :3] return rgb_array def _termination(self): #print (self._kuka.endEffectorPos[2]) state = p.getLinkState(self._kuka.kukaUid, self._kuka.kukaEndEffectorIndex) actualEndEffectorPos = state[0] #print("self._envStepCounter") #print(self._envStepCounter) if (self.terminated or self._envStepCounter > self._maxSteps): self._observation = self.getExtendedObservation() return True maxDist = 0.005 closestPoints = p.getClosestPoints(self._kuka.trayUid, self._kuka.kukaUid, maxDist) if (len(closestPoints)): #(actualEndEffectorPos[2] <= -0.43): self.terminated = 1 #print("terminating, closing gripper, attempting grasp") #start grasp and terminate fingerAngle = 0.3 for i in range(100): graspAction = [0, 0, 0.0001, 0, fingerAngle] self._kuka.applyAction(graspAction) p.stepSimulation() fingerAngle = fingerAngle - (0.3 / 100.) if (fingerAngle < 0): fingerAngle = 0 for i in range(1000): graspAction = [0, 0, 0.001, 0, fingerAngle] self._kuka.applyAction(graspAction) p.stepSimulation() blockPos, blockOrn = p.getBasePositionAndOrientation(self.blockUid) if (blockPos[2] > 0.23): #print("BLOCKPOS!") #print(blockPos[2]) break state = p.getLinkState(self._kuka.kukaUid, self._kuka.kukaEndEffectorIndex) actualEndEffectorPos = state[0] if (actualEndEffectorPos[2] > 0.5): break self._observation = self.getExtendedObservation() return True return False def _reward(self): #rewards is height of target object blockPos, blockOrn = p.getBasePositionAndOrientation(self.blockUid) closestPoints = p.getClosestPoints(self.blockUid, self._kuka.kukaUid, 1000, -1, self._kuka.kukaEndEffectorIndex) reward = -1000 numPt = len(closestPoints) #print(numPt) if (numPt > 0): #print("reward:") reward = -closestPoints[0][8] * 10 if (blockPos[2] > 0.2): reward = reward + 10000 print("successfully grasped a block!!!") #print("self._envStepCounter") #print(self._envStepCounter) #print("self._envStepCounter") #print(self._envStepCounter) #print("reward") #print(reward) #print("reward") #print(reward) return reward if parse_version(gym.__version__) < parse_version('0.9.6'): _render = render _reset = reset _seed = seed _step = step