From 9b66074949381eeccc3e0f0667ce3f00e514c1f8 Mon Sep 17 00:00:00 2001 From: erwincoumans Date: Tue, 4 Sep 2018 09:10:57 -0700 Subject: [PATCH] add some missing GRPC files --- examples/SharedMemory/PhysicsClientGRPC.cpp | 236 ++++++++++++++++++ examples/SharedMemory/PhysicsClientGRPC.h | 41 +++ .../SharedMemory/PhysicsClientGRPC_C_API.cpp | 29 +++ .../SharedMemory/PhysicsClientGRPC_C_API.h | 19 ++ .../SharedMemory/grpc/ConvertGRPCBullet.cpp | 12 +- examples/pybullet/pybullet.c | 2 +- 6 files changed, 332 insertions(+), 7 deletions(-) create mode 100644 examples/SharedMemory/PhysicsClientGRPC.cpp create mode 100644 examples/SharedMemory/PhysicsClientGRPC.h create mode 100644 examples/SharedMemory/PhysicsClientGRPC_C_API.cpp create mode 100644 examples/SharedMemory/PhysicsClientGRPC_C_API.h diff --git a/examples/SharedMemory/PhysicsClientGRPC.cpp b/examples/SharedMemory/PhysicsClientGRPC.cpp new file mode 100644 index 000000000..05198b30f --- /dev/null +++ b/examples/SharedMemory/PhysicsClientGRPC.cpp @@ -0,0 +1,236 @@ +#ifdef BT_ENABLE_GRPC +#include "PhysicsClientGRPC.h" +#include "grpc/pybullet.grpc.pb.h" +#include +using grpc::Channel; +#include +#include +#include "../Utils/b3Clock.h" +#include "PhysicsClient.h" +//#include "LinearMath/btVector3.h" +#include "SharedMemoryCommands.h" +#include +#include "Bullet3Common/b3Logging.h" +#include "Bullet3Common/b3AlignedObjectArray.h" +#include "grpc/ConvertGRPCBullet.h" + + +static unsigned int b3DeserializeInt2(const unsigned char* input) +{ + unsigned int tmp = (input[3] << 24) + (input[2] << 16) + (input[1] << 8) + input[0]; + return tmp; +} + +bool gVerboseNetworkMessagesClient3 = false; + + +struct GRPCNetworkedInternalData +{ + std::shared_ptr m_grpcChannel; + std::unique_ptr< pybullet_grpc::PyBulletAPI::Stub> m_stub; + + + bool m_isConnected; + + SharedMemoryCommand m_clientCmd; + bool m_hasCommand; + + SharedMemoryStatus m_lastStatus; + b3AlignedObjectArray m_stream; + + std::string m_hostName; + int m_port; + + b3AlignedObjectArray m_tempBuffer; + double m_timeOutInSeconds; + + GRPCNetworkedInternalData() + : + m_isConnected(false), + m_hasCommand(false), + m_timeOutInSeconds(60) + { + + } + + void disconnect() + { + if (m_isConnected) + { + m_stub = 0; + m_grpcChannel = 0; + m_isConnected = false; + } + } + bool connectGRPC() + { + if (m_isConnected) + return true; + std::string hostport = m_hostName + ':' + std::to_string(m_port); + m_grpcChannel = grpc::CreateChannel( + hostport, grpc::InsecureChannelCredentials()); + + m_stub = pybullet_grpc::PyBulletAPI::NewStub(m_grpcChannel); + + + + // Set timeout for API + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now()+std::chrono::seconds((long long)m_timeOutInSeconds); + grpc::ClientContext context; + context.set_deadline(deadline); + ::pybullet_grpc::PyBulletCommand request; + pybullet_grpc::CheckVersionCommand* cmd1 = request.mutable_checkversioncommand(); + cmd1->set_clientversion(SHARED_MEMORY_MAGIC_NUMBER); + ::pybullet_grpc::PyBulletStatus response; + // The actual RPC. + grpc::Status status = m_stub->SubmitCommand(&context, request, &response); + if (response.has_checkversionstatus()) + { + if (response.checkversionstatus().serverversion() == SHARED_MEMORY_MAGIC_NUMBER) + { + m_isConnected = true; + } + else + { + printf("Error: Client version (%d) is different from server version (%d)", SHARED_MEMORY_MAGIC_NUMBER, response.checkversionstatus().serverversion()); + } + } + else + { + printf("Error: cannot connect to GRPC server\n"); + } + + + + return m_isConnected; + } + + bool checkData() + { + + bool hasStatus = false; + return hasStatus; + } + +}; + + +GRPCNetworkedPhysicsProcessor::GRPCNetworkedPhysicsProcessor(const char* hostName, int port) +{ + m_data = new GRPCNetworkedInternalData; + if (hostName) + { + m_data->m_hostName = hostName; + } + m_data->m_port = port; + +} + +GRPCNetworkedPhysicsProcessor::~GRPCNetworkedPhysicsProcessor() +{ + disconnect(); + delete m_data; +} + +bool GRPCNetworkedPhysicsProcessor::processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes) +{ + if(gVerboseNetworkMessagesClient3) + { + printf("GRPCNetworkedPhysicsProcessor::processCommand\n"); + } + + ::pybullet_grpc::PyBulletCommand grpcCommand; + pybullet_grpc::PyBulletCommand* grpcCmdPtr = convertBulletToGRPCCommand(clientCmd, grpcCommand); + + if (grpcCmdPtr) + { + grpc::ClientContext context; + std::chrono::system_clock::time_point deadline = + std::chrono::system_clock::now() + std::chrono::seconds((long long)m_data->m_timeOutInSeconds); + context.set_deadline(deadline); + ::pybullet_grpc::PyBulletStatus status; + // The actual RPC. + grpc::Status grpcStatus = m_data->m_stub->SubmitCommand(&context, grpcCommand, &status); + + //convert grpc status to Bullet status + bool convertedOk = convertGRPCToStatus(status, serverStatusOut, bufferServerToClient, bufferSizeInBytes); + return convertedOk; + + } + + return false; +} + +bool GRPCNetworkedPhysicsProcessor::receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes) +{ + bool hasStatus = m_data->checkData(); + + if (hasStatus) + { + if (gVerboseNetworkMessagesClient3) + { + printf("GRPCNetworkedPhysicsProcessor::receiveStatus\n"); + } + + serverStatusOut = m_data->m_lastStatus; + int numStreamBytes = m_data->m_stream.size(); + + if (numStreamBytes < bufferSizeInBytes) + { + for (int i = 0; i < numStreamBytes; i++) + { + bufferServerToClient[i] = m_data->m_stream[i]; + } + } + else + { + printf("Error: steam buffer overflow\n"); + } + + } + + + return hasStatus; + +} + + +void GRPCNetworkedPhysicsProcessor::renderScene(int renderFlags) +{ +} + +void GRPCNetworkedPhysicsProcessor::physicsDebugDraw(int debugDrawFlags) +{ +} + +void GRPCNetworkedPhysicsProcessor::setGuiHelper(struct GUIHelperInterface* guiHelper) +{ +} + +bool GRPCNetworkedPhysicsProcessor::isConnected() const +{ + return m_data->m_isConnected; +} + + +bool GRPCNetworkedPhysicsProcessor::connect() +{ + bool isConnected = m_data->connectGRPC(); + return isConnected; +} + +void GRPCNetworkedPhysicsProcessor::disconnect() +{ + m_data->disconnect(); + +} + + +void GRPCNetworkedPhysicsProcessor::setTimeOut(double timeOutInSeconds) +{ + m_data->m_timeOutInSeconds = timeOutInSeconds; +} + +#endif //BT_ENABLE_GRPC + diff --git a/examples/SharedMemory/PhysicsClientGRPC.h b/examples/SharedMemory/PhysicsClientGRPC.h new file mode 100644 index 000000000..e146666f4 --- /dev/null +++ b/examples/SharedMemory/PhysicsClientGRPC.h @@ -0,0 +1,41 @@ +#ifndef PHYSICS_CLIENT_GRPC_H +#define PHYSICS_CLIENT_GRPC_H + +#include "PhysicsDirect.h" +#include "PhysicsCommandProcessorInterface.h" + +class GRPCNetworkedPhysicsProcessor : public PhysicsCommandProcessorInterface +{ + + struct GRPCNetworkedInternalData* m_data; + +public: + GRPCNetworkedPhysicsProcessor(const char* hostName, int port); + + virtual ~GRPCNetworkedPhysicsProcessor(); + + virtual bool connect(); + + virtual void disconnect(); + + virtual bool isConnected() const; + + virtual bool processCommand(const struct SharedMemoryCommand& clientCmd, struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes); + + virtual bool receiveStatus(struct SharedMemoryStatus& serverStatusOut, char* bufferServerToClient, int bufferSizeInBytes); + + virtual void renderScene(int renderFlags); + + virtual void physicsDebugDraw(int debugDrawFlags); + + virtual void setGuiHelper(struct GUIHelperInterface* guiHelper); + + virtual void setTimeOut(double timeOutInSeconds); + + virtual void reportNotifications() {} + +}; + + +#endif //PHYSICS_CLIENT_GRPC_H + diff --git a/examples/SharedMemory/PhysicsClientGRPC_C_API.cpp b/examples/SharedMemory/PhysicsClientGRPC_C_API.cpp new file mode 100644 index 000000000..4289d0335 --- /dev/null +++ b/examples/SharedMemory/PhysicsClientGRPC_C_API.cpp @@ -0,0 +1,29 @@ +#ifdef BT_ENABLE_GRPC + +#include "PhysicsClientGRPC_C_API.h" +#include "PhysicsClientGRPC.h" +#include "PhysicsDirect.h" +#include + +B3_SHARED_API b3PhysicsClientHandle b3ConnectPhysicsGRPC(const char* hostName, int port) +{ + + GRPCNetworkedPhysicsProcessor* tcp = new GRPCNetworkedPhysicsProcessor(hostName, port); + + PhysicsDirect* direct = new PhysicsDirect(tcp, true); + + bool connected; + connected = direct->connect(); + if (connected) + { + printf("b3ConnectPhysicsGRPC connected successfully.\n"); + } + else + { + printf("b3ConnectPhysicsGRPC connection failed.\n"); + + } + return (b3PhysicsClientHandle)direct; +} + +#endif //BT_ENABLE_GRPC \ No newline at end of file diff --git a/examples/SharedMemory/PhysicsClientGRPC_C_API.h b/examples/SharedMemory/PhysicsClientGRPC_C_API.h new file mode 100644 index 000000000..1e801e18e --- /dev/null +++ b/examples/SharedMemory/PhysicsClientGRPC_C_API.h @@ -0,0 +1,19 @@ +#ifndef PHYSICS_CLIENT_GRPC_C_API_H +#define PHYSICS_CLIENT_GRPC_C_API_H + +#include "PhysicsClientC_API.h" + +#ifdef __cplusplus +extern "C" { +#endif + + +///send physics commands using GRPC connection +B3_SHARED_API b3PhysicsClientHandle b3ConnectPhysicsGRPC(const char* hostName, int port); + + +#ifdef __cplusplus +} +#endif + +#endif //PHYSICS_CLIENT_GRPC_C_API_H diff --git a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp index 163aba08a..00bf11096 100644 --- a/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp +++ b/examples/SharedMemory/grpc/ConvertGRPCBullet.cpp @@ -10,7 +10,7 @@ #include "pybullet.grpc.pb.h" #include "LinearMath/btMinMax.h" -//#define ALLOW_GRPC_COMMAND_CONVERSION +#define ALLOW_GRPC_COMMAND_CONVERSION #define ALLOW_GRPC_STATUS_CONVERSION using grpc::Server; @@ -483,7 +483,7 @@ pybullet_grpc::PyBulletCommand* convertBulletToGRPCCommand(const struct SharedMe if (0 == grpcCmdPtr) { grpcCmdPtr = &grpcCommand; - printf("Warning: slow fallback of convertBulletToGRPCCommand (%d)", clientCmd.m_type); + //printf("Warning: slow fallback of convertBulletToGRPCCommand (%d)", clientCmd.m_type); //convert an unknown command as binary blob int sz = sizeof(SharedMemoryCommand); if (sz > 0) @@ -559,7 +559,7 @@ SharedMemoryCommand* convertGRPCToBulletCommand(const PyBulletCommand& grpcComma { memcpy(&cmd, data, numBytes); } - printf("slow fallback on command type %d\n", cmd.m_type); + //printf("slow fallback on command type %d\n", cmd.m_type); } else { @@ -1491,7 +1491,7 @@ bool convertGRPCToStatus(const PyBulletStatus& grpcReply, SharedMemoryStatus& se { if (grpcReply.unknownstatusbinaryblob_size() == 1) { - printf("convertStatusToGRPC: slow fallback status (%d), slow fallback", grpcReply.statustype()); + //printf("convertStatusToGRPC: slow fallback status (%d), slow fallback", grpcReply.statustype()); const char* data = grpcReply.unknownstatusbinaryblob().Get(0).c_str(); int numBytes = grpcReply.unknownstatusbinaryblob().Get(0).size(); @@ -1501,7 +1501,7 @@ bool convertGRPCToStatus(const PyBulletStatus& grpcReply, SharedMemoryStatus& se { memcpy(&serverStatus, data, numBytes); } - printf("slow fallback on command type %d\n", serverStatus.m_type); + //printf("slow fallback on command type %d\n", serverStatus.m_type); btAssert(grpcReply.statustype() == serverStatus.m_type); converted = true; } @@ -1822,7 +1822,7 @@ bool convertStatusToGRPC(const SharedMemoryStatus& serverStatus, char* bufferSer default: { #endif //ALLOW_GRPC_STATUS_CONVERSION - printf("convertStatusToGRPC: unknown status (%d), slow fallback", serverStatus.m_type); + //printf("convertStatusToGRPC: unknown status (%d), slow fallback", serverStatus.m_type); int sz = sizeof(SharedMemoryStatus); grpcReply.add_unknownstatusbinaryblob((const char*)&serverStatus, sz); converted = true; diff --git a/examples/pybullet/pybullet.c b/examples/pybullet/pybullet.c index 6ae55b9d0..5697ec0e9 100644 --- a/examples/pybullet/pybullet.c +++ b/examples/pybullet/pybullet.c @@ -439,7 +439,7 @@ static PyObject* pybullet_connectPhysicsServer(PyObject* self, PyObject* args, P #ifdef BT_ENABLE_GRPC sm = b3ConnectPhysicsGRPC(hostName.c_str(), tcpPort); #else - b3Warning("GRPC is not enabled in this pybullet build"); + PyErr_SetString(SpamError, "GRPC is not enabled in this pybullet build"); #endif break; }