import os, inspect currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) parentdir = os.path.dirname(os.path.dirname(currentdir)) os.sys.path.insert(0,parentdir) import math import gym from gym import spaces from gym.utils import seeding import numpy as np import time import pybullet as p from . import kuka import random import pybullet_data from pkg_resources import parse_version maxSteps = 1000 RENDER_HEIGHT = 720 RENDER_WIDTH = 960 class KukaCamGymEnv(gym.Env): metadata = { 'render.modes': ['human', 'rgb_array'], 'video.frames_per_second' : 50 } def __init__(self, urdfRoot=pybullet_data.getDataPath(), actionRepeat=1, isEnableSelfCollision=True, renders=False, isDiscrete=False): self._timeStep = 1./240. self._urdfRoot = urdfRoot self._actionRepeat = actionRepeat self._isEnableSelfCollision = isEnableSelfCollision self._observation = [] self._envStepCounter = 0 self._renders = renders self._width = 341 self._height = 256 self._isDiscrete=isDiscrete self.terminated = 0 self._p = p if self._renders: cid = p.connect(p.SHARED_MEMORY) if (cid<0): p.connect(p.GUI) p.resetDebugVisualizerCamera(1.3,180,-41,[0.52,-0.2,-0.33]) else: p.connect(p.DIRECT) #timinglog = p.startStateLogging(p.STATE_LOGGING_PROFILE_TIMINGS, "kukaTimings.json") self.seed() self.reset() observationDim = len(self.getExtendedObservation()) #print("observationDim") #print(observationDim) observation_high = np.array([np.finfo(np.float32).max] * observationDim) if (self._isDiscrete): self.action_space = spaces.Discrete(7) else: action_dim = 3 self._action_bound = 1 action_high = np.array([self._action_bound] * action_dim) self.action_space = spaces.Box(-action_high, action_high, dtype=np.float32) self.observation_space = spaces.Box(low=0, high=255, shape=(self._height, self._width, 4), dtype=np.uint8) self.viewer = None def reset(self): self.terminated = 0 p.resetSimulation() p.setPhysicsEngineParameter(numSolverIterations=150) p.setTimeStep(self._timeStep) p.loadURDF(os.path.join(self._urdfRoot,"plane.urdf"),[0,0,-1]) p.loadURDF(os.path.join(self._urdfRoot,"table/table.urdf"), 0.5000000,0.00000,-.820000,0.000000,0.000000,0.0,1.0) xpos = 0.5 +0.2*random.random() ypos = 0 +0.25*random.random() ang = 3.1415925438*random.random() orn = p.getQuaternionFromEuler([0,0,ang]) self.blockUid =p.loadURDF(os.path.join(self._urdfRoot,"block.urdf"), xpos,ypos,-0.1,orn[0],orn[1],orn[2],orn[3]) p.setGravity(0,0,-10) self._kuka = kuka.Kuka(urdfRootPath=self._urdfRoot, timeStep=self._timeStep) self._envStepCounter = 0 p.stepSimulation() self._observation = self.getExtendedObservation() return np.array(self._observation) def __del__(self): p.disconnect() def seed(self, seed=None): self.np_random, seed = seeding.np_random(seed) return [seed] def getExtendedObservation(self): #camEyePos = [0.03,0.236,0.54] #distance = 1.06 #pitch=-56 #yaw = 258 #roll=0 #upAxisIndex = 2 #camInfo = p.getDebugVisualizerCamera() #print("width,height") #print(camInfo[0]) #print(camInfo[1]) #print("viewMatrix") #print(camInfo[2]) #print("projectionMatrix") #print(camInfo[3]) #viewMat = camInfo[2] #viewMat = p.computeViewMatrixFromYawPitchRoll(camEyePos,distance,yaw, pitch,roll,upAxisIndex) viewMat = [-0.5120397806167603, 0.7171027660369873, -0.47284144163131714, 0.0, -0.8589617609977722, -0.42747554183006287, 0.28186774253845215, 0.0, 0.0, 0.5504802465438843, 0.8348482847213745, 0.0, 0.1925382763147354, -0.24935829639434814, -0.4401884973049164, 1.0] #projMatrix = camInfo[3]#[0.7499999403953552, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, -1.0000200271606445, -1.0, 0.0, 0.0, -0.02000020071864128, 0.0] projMatrix = [0.75, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, -1.0000200271606445, -1.0, 0.0, 0.0, -0.02000020071864128, 0.0] img_arr = p.getCameraImage(width=self._width,height=self._height,viewMatrix=viewMat,projectionMatrix=projMatrix) rgb=img_arr[2] np_img_arr = np.reshape(rgb, (self._height, self._width, 4)) self._observation = np_img_arr return self._observation def step(self, action): if (self._isDiscrete): dv = 0.01 dx = [0,-dv,dv,0,0,0,0][action] dy = [0,0,0,-dv,dv,0,0][action] da = [0,0,0,0,0,-0.1,0.1][action] f = 0.3 realAction = [dx,dy,-0.002,da,f] else: dv = 0.01 dx = action[0] * dv dy = action[1] * dv da = action[2] * 0.1 f = 0.3 realAction = [dx,dy,-0.002,da,f] return self.step2( realAction) def step2(self, action): for i in range(self._actionRepeat): self._kuka.applyAction(action) p.stepSimulation() if self._termination(): break #self._observation = self.getExtendedObservation() self._envStepCounter += 1 self._observation = self.getExtendedObservation() if self._renders: time.sleep(self._timeStep) #print("self._envStepCounter") #print(self._envStepCounter) done = self._termination() reward = self._reward() #print("len=%r" % len(self._observation)) return np.array(self._observation), reward, done, {} def render(self, mode='human', close=False): if mode != "rgb_array": return np.array([]) base_pos,orn = self._p.getBasePositionAndOrientation(self._racecar.racecarUniqueId) view_matrix = self._p.computeViewMatrixFromYawPitchRoll( cameraTargetPosition=base_pos, distance=self._cam_dist, yaw=self._cam_yaw, pitch=self._cam_pitch, roll=0, upAxisIndex=2) proj_matrix = self._p.computeProjectionMatrixFOV( fov=60, aspect=float(RENDER_WIDTH)/RENDER_HEIGHT, nearVal=0.1, farVal=100.0) (_, _, px, _, _) = self._p.getCameraImage( width=RENDER_WIDTH, height=RENDER_HEIGHT, viewMatrix=view_matrix, projectionMatrix=proj_matrix, renderer=pybullet.ER_BULLET_HARDWARE_OPENGL) rgb_array = np.array(px) rgb_array = rgb_array[:, :, :3] return rgb_array def _termination(self): #print (self._kuka.endEffectorPos[2]) state = p.getLinkState(self._kuka.kukaUid,self._kuka.kukaEndEffectorIndex) actualEndEffectorPos = state[0] #print("self._envStepCounter") #print(self._envStepCounter) if (self.terminated or self._envStepCounter>maxSteps): self._observation = self.getExtendedObservation() return True maxDist = 0.005 closestPoints = p.getClosestPoints(self._kuka.trayUid, self._kuka.kukaUid,maxDist) if (len(closestPoints)):#(actualEndEffectorPos[2] <= -0.43): self.terminated = 1 #print("closing gripper, attempting grasp") #start grasp and terminate fingerAngle = 0.3 for i in range (100): graspAction = [0,0,0.0001,0,fingerAngle] self._kuka.applyAction(graspAction) p.stepSimulation() fingerAngle = fingerAngle-(0.3/100.) if (fingerAngle<0): fingerAngle=0 for i in range (1000): graspAction = [0,0,0.001,0,fingerAngle] self._kuka.applyAction(graspAction) p.stepSimulation() blockPos,blockOrn=p.getBasePositionAndOrientation(self.blockUid) if (blockPos[2] > 0.23): #print("BLOCKPOS!") #print(blockPos[2]) break state = p.getLinkState(self._kuka.kukaUid,self._kuka.kukaEndEffectorIndex) actualEndEffectorPos = state[0] if (actualEndEffectorPos[2]>0.5): break self._observation = self.getExtendedObservation() return True return False def _reward(self): #rewards is height of target object blockPos,blockOrn=p.getBasePositionAndOrientation(self.blockUid) closestPoints = p.getClosestPoints(self.blockUid,self._kuka.kukaUid,1000, -1, self._kuka.kukaEndEffectorIndex) reward = -1000 numPt = len(closestPoints) #print(numPt) if (numPt>0): #print("reward:") reward = -closestPoints[0][8]*10 if (blockPos[2] >0.2): #print("grasped a block!!!") #print("self._envStepCounter") #print(self._envStepCounter) reward = reward+1000 #print("reward") #print(reward) return reward if parse_version(gym.__version__) < parse_version('0.9.6'): _render = render _reset = reset _seed = seed _step = step