allow pybullet to connect to GRPC server. (need to use flag --enable_grpc in premake build system)
add grpcPlugin, it can work in GUI, SHARED_MEMORY_SERVER, DIRECT and other modes.
example script to start server from pybullet:
import pybullet as p
p.connect(p.GUI)
#if statically linked plugin
id = p.loadPlugin("grpcPlugin")
#dynamics loading the plugin
#id = p.loadPlugin("E:/develop/bullet3/bin/pybullet_grpcPlugin_vs2010_x64_debug.dll", postFix="_grpcPlugin")
#start the GRPC server at hostname, port
if (id>=0):
p.executePluginCommand(id, "localhost:1234")
Only in DIRECT mode, since there is no 'ping' you need to call to handle RCPs:
numRPC = 10
while (1):
p.executePluginCommand(id, intArgs=[numRPC])
This commit is contained in:
@@ -18,6 +18,9 @@ struct b3PluginContext
|
||||
int m_numMouseEvents;
|
||||
const struct b3Notification* m_notifications;
|
||||
int m_numNotifications;
|
||||
|
||||
//only used for grpc/processClientCommands
|
||||
class PhysicsCommandProcessorInterface* m_rpcCommandProcessorInterface;
|
||||
};
|
||||
|
||||
|
||||
|
||||
354
examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
Normal file
354
examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.cpp
Normal file
@@ -0,0 +1,354 @@
|
||||
|
||||
//tinyRendererPlugin implements the TinyRenderer as a plugin
|
||||
//it is statically linked when using preprocessor #define STATIC_LINK_VR_PLUGIN
|
||||
//otherwise you can dynamically load it using pybullet.loadPlugin
|
||||
|
||||
#include "grpcPlugin.h"
|
||||
#include "../../SharedMemoryPublic.h"
|
||||
#include "../b3PluginContext.h"
|
||||
#include "Bullet3Common/b3AlignedObjectArray.h"
|
||||
#include "SharedMemoryCommands.h"
|
||||
#include "PhysicsCommandProcessorInterface.h"
|
||||
|
||||
#include <stdio.h>
|
||||
|
||||
#include <grpc++/grpc++.h>
|
||||
#include <grpc/support/log.h>
|
||||
#include "../../../Utils/b3Clock.h"
|
||||
#include "../../grpc/pybullet.grpc.pb.h"
|
||||
#include "../../grpc/ConvertGRPCBullet.h"
|
||||
using grpc::Server;
|
||||
using grpc::ServerAsyncResponseWriter;
|
||||
using grpc::ServerBuilder;
|
||||
using grpc::ServerContext;
|
||||
using grpc::ServerCompletionQueue;
|
||||
using grpc::Status;
|
||||
using pybullet_grpc::PyBulletCommand;
|
||||
using pybullet_grpc::PyBulletStatus;
|
||||
using pybullet_grpc::PyBulletAPI;
|
||||
|
||||
|
||||
bool gVerboseNetworkMessagesServer4 = true;
|
||||
|
||||
|
||||
class ServerImpl final {
|
||||
public:
|
||||
|
||||
ServerImpl()
|
||||
{
|
||||
}
|
||||
~ServerImpl() {
|
||||
Exit();
|
||||
}
|
||||
|
||||
void Exit()
|
||||
{
|
||||
if (server_)
|
||||
{
|
||||
server_->Shutdown();
|
||||
// Always shutdown the completion queue after the server.
|
||||
cq_->Shutdown();
|
||||
server_ = 0;
|
||||
}
|
||||
}
|
||||
|
||||
void Init(PhysicsCommandProcessorInterface* comProc, const std::string& hostNamePort) {
|
||||
|
||||
// Listen on the given address without any authentication mechanism.
|
||||
m_builder.AddListeningPort(hostNamePort, grpc::InsecureServerCredentials());
|
||||
// Register "service_" as the instance through which we'll communicate with
|
||||
// clients. In this case it corresponds to an *asynchronous* service.
|
||||
m_builder.RegisterService(&service_);
|
||||
// Get hold of the completion queue used for the asynchronous communication
|
||||
// with the gRPC runtime.
|
||||
cq_ = m_builder.AddCompletionQueue();
|
||||
// Finally assemble the server.
|
||||
server_ = m_builder.BuildAndStart();
|
||||
std::cout << "Server listening on " << hostNamePort << std::endl;
|
||||
|
||||
// Proceed to the server's main loop.
|
||||
InitRpcs(comProc);
|
||||
}
|
||||
|
||||
// This can be run in multiple threads if needed.
|
||||
bool HandleSingleRpc()
|
||||
{
|
||||
|
||||
CallData::CallStatus status = CallData::CallStatus::CREATE;
|
||||
|
||||
{
|
||||
void* tag; // uniquely identifies a request.
|
||||
bool ok;
|
||||
|
||||
// Block waiting to read the next event from the completion queue. The
|
||||
// event is uniquely identified by its tag, which in this case is the
|
||||
// memory address of a CallData instance.
|
||||
// The return value of Next should always be checked. This return value
|
||||
// tells us whether there is any kind of event or cq_ is shutting down.
|
||||
|
||||
grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
|
||||
if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
|
||||
{
|
||||
//GPR_ASSERT(cq_->Next(&tag, &ok));
|
||||
GPR_ASSERT(ok);
|
||||
status = static_cast<CallData*>(tag)->Proceed();
|
||||
}
|
||||
}
|
||||
return status == CallData::CallStatus::TERMINATE;
|
||||
}
|
||||
|
||||
private:
|
||||
// Class encompasing the state and logic needed to serve a request.
|
||||
class CallData {
|
||||
public:
|
||||
// Take in the "service" instance (in this case representing an asynchronous
|
||||
// server) and the completion queue "cq" used for asynchronous communication
|
||||
// with the gRPC runtime.
|
||||
CallData(PyBulletAPI::AsyncService* service, ServerCompletionQueue* cq, PhysicsCommandProcessorInterface* comProc)
|
||||
: service_(service), cq_(cq), responder_(&ctx_), status_(CREATE), m_finished(false), m_comProc(comProc) {
|
||||
// Invoke the serving logic right away.
|
||||
Proceed();
|
||||
}
|
||||
|
||||
enum CallStatus { CREATE, PROCESS, FINISH, TERMINATE };
|
||||
|
||||
CallStatus Proceed() {
|
||||
if (status_ == CREATE) {
|
||||
// Make this instance progress to the PROCESS state.
|
||||
status_ = PROCESS;
|
||||
|
||||
// As part of the initial CREATE state, we *request* that the system
|
||||
// start processing SayHello requests. In this request, "this" acts are
|
||||
// the tag uniquely identifying the request (so that different CallData
|
||||
// instances can serve different requests concurrently), in this case
|
||||
// the memory address of this CallData instance.
|
||||
|
||||
|
||||
service_->RequestSubmitCommand(&ctx_, &m_command, &responder_, cq_, cq_,
|
||||
this);
|
||||
}
|
||||
else if (status_ == PROCESS) {
|
||||
// Spawn a new CallData instance to serve new clients while we process
|
||||
// the one for this CallData. The instance will deallocate itself as
|
||||
// part of its FINISH state.
|
||||
new CallData(service_, cq_, m_comProc);
|
||||
status_ = FINISH;
|
||||
|
||||
std::string replyString;
|
||||
// The actual processing.
|
||||
|
||||
SharedMemoryStatus serverStatus;
|
||||
b3AlignedObjectArray<char> buffer;
|
||||
buffer.resize(SHARED_MEMORY_MAX_STREAM_CHUNK_SIZE);
|
||||
SharedMemoryCommand cmd;
|
||||
SharedMemoryCommand* cmdPtr = 0;
|
||||
|
||||
m_status.set_statustype(CMD_UNKNOWN_COMMAND_FLUSHED);
|
||||
|
||||
|
||||
if (m_command.has_checkversioncommand())
|
||||
{
|
||||
m_status.set_statustype(CMD_CLIENT_COMMAND_COMPLETED);
|
||||
m_status.mutable_checkversionstatus()->set_serverversion(SHARED_MEMORY_MAGIC_NUMBER);
|
||||
}
|
||||
else
|
||||
{
|
||||
cmdPtr = convertGRPCToBulletCommand(m_command, cmd);
|
||||
|
||||
|
||||
if (cmdPtr)
|
||||
{
|
||||
bool hasStatus = m_comProc->processCommand(*cmdPtr, serverStatus, &buffer[0], buffer.size());
|
||||
|
||||
double timeOutInSeconds = 10;
|
||||
b3Clock clock;
|
||||
double startTimeSeconds = clock.getTimeInSeconds();
|
||||
double curTimeSeconds = clock.getTimeInSeconds();
|
||||
|
||||
while ((!hasStatus) && ((curTimeSeconds - startTimeSeconds) < timeOutInSeconds))
|
||||
{
|
||||
hasStatus = m_comProc->receiveStatus(serverStatus, &buffer[0], buffer.size());
|
||||
curTimeSeconds = clock.getTimeInSeconds();
|
||||
}
|
||||
|
||||
if (gVerboseNetworkMessagesServer4)
|
||||
{
|
||||
//printf("buffer.size = %d\n", buffer.size());
|
||||
printf("serverStatus.m_numDataStreamBytes = %d\n", serverStatus.m_numDataStreamBytes);
|
||||
}
|
||||
if (hasStatus)
|
||||
{
|
||||
b3AlignedObjectArray<unsigned char> packetData;
|
||||
unsigned char* statBytes = (unsigned char*)&serverStatus;
|
||||
|
||||
convertStatusToGRPC(serverStatus, &buffer[0], buffer.size(), m_status);
|
||||
}
|
||||
}
|
||||
|
||||
if (m_command.has_terminateservercommand())
|
||||
{
|
||||
status_ = TERMINATE;
|
||||
}
|
||||
}
|
||||
|
||||
// And we are done! Let the gRPC runtime know we've finished, using the
|
||||
// memory address of this instance as the uniquely identifying tag for
|
||||
// the event.
|
||||
|
||||
responder_.Finish(m_status, Status::OK, this);
|
||||
}
|
||||
else {
|
||||
GPR_ASSERT(status_ == FINISH);
|
||||
// Once in the FINISH state, deallocate ourselves (CallData).
|
||||
delete this;
|
||||
}
|
||||
return status_;
|
||||
}
|
||||
|
||||
private:
|
||||
// The means of communication with the gRPC runtime for an asynchronous
|
||||
// server.
|
||||
PyBulletAPI::AsyncService* service_;
|
||||
// The producer-consumer queue where for asynchronous server notifications.
|
||||
ServerCompletionQueue* cq_;
|
||||
// Context for the rpc, allowing to tweak aspects of it such as the use
|
||||
// of compression, authentication, as well as to send metadata back to the
|
||||
// client.
|
||||
ServerContext ctx_;
|
||||
|
||||
|
||||
|
||||
// What we get from the client.
|
||||
PyBulletCommand m_command;
|
||||
// What we send back to the client.
|
||||
PyBulletStatus m_status;
|
||||
|
||||
// The means to get back to the client.
|
||||
ServerAsyncResponseWriter<PyBulletStatus> responder_;
|
||||
|
||||
// Let's implement a tiny state machine with the following states.
|
||||
|
||||
CallStatus status_; // The current serving state.
|
||||
|
||||
bool m_finished;
|
||||
|
||||
PhysicsCommandProcessorInterface* m_comProc; //physics server command processor
|
||||
};
|
||||
|
||||
// This can be run in multiple threads if needed.
|
||||
void InitRpcs(PhysicsCommandProcessorInterface* comProc)
|
||||
{
|
||||
// Spawn a new CallData instance to serve new clients.
|
||||
new CallData(&service_, cq_.get(), comProc);
|
||||
}
|
||||
|
||||
|
||||
ServerBuilder m_builder;
|
||||
std::unique_ptr<ServerCompletionQueue> cq_;
|
||||
PyBulletAPI::AsyncService service_;
|
||||
std::unique_ptr<Server> server_;
|
||||
};
|
||||
|
||||
|
||||
struct grpcMyClass
|
||||
{
|
||||
int m_testData;
|
||||
|
||||
ServerImpl m_grpcServer;
|
||||
bool m_grpcInitialized;
|
||||
bool m_grpcTerminated;
|
||||
|
||||
grpcMyClass()
|
||||
:m_testData(42),
|
||||
m_grpcInitialized(false),
|
||||
m_grpcTerminated(false)
|
||||
{
|
||||
}
|
||||
virtual ~grpcMyClass()
|
||||
{
|
||||
}
|
||||
};
|
||||
|
||||
B3_SHARED_API int initPlugin_grpcPlugin(struct b3PluginContext* context)
|
||||
{
|
||||
grpcMyClass* obj = new grpcMyClass();
|
||||
context->m_userPointer = obj;
|
||||
|
||||
|
||||
return SHARED_MEMORY_MAGIC_NUMBER;
|
||||
}
|
||||
|
||||
|
||||
B3_SHARED_API int preTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
|
||||
{
|
||||
//process grpc server messages
|
||||
return 0;
|
||||
}
|
||||
|
||||
B3_SHARED_API int processClientCommands_grpcPlugin(struct b3PluginContext* context)
|
||||
{
|
||||
grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
|
||||
|
||||
if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
|
||||
{
|
||||
obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
|
||||
}
|
||||
|
||||
obj->m_testData++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
B3_SHARED_API int postTickPluginCallback_grpcPlugin(struct b3PluginContext* context)
|
||||
{
|
||||
grpcMyClass* obj = (grpcMyClass* )context->m_userPointer;
|
||||
obj->m_testData++;
|
||||
return 0;
|
||||
}
|
||||
|
||||
B3_SHARED_API int executePluginCommand_grpcPlugin(struct b3PluginContext* context, const struct b3PluginArguments* arguments)
|
||||
{
|
||||
///3 cases:
|
||||
/// 1: send a non-empty string to start the GRPC server
|
||||
/// 2: send some integer n, to call n times to HandleSingleRpc
|
||||
/// 3: send nothing to terminate the GRPC server
|
||||
|
||||
grpcMyClass* obj = (grpcMyClass*)context->m_userPointer;
|
||||
|
||||
if (arguments->m_text && strlen(arguments->m_text))
|
||||
{
|
||||
if (!obj->m_grpcInitialized && context->m_rpcCommandProcessorInterface)
|
||||
{
|
||||
obj->m_grpcServer.Init(context->m_rpcCommandProcessorInterface, arguments->m_text);
|
||||
}
|
||||
obj->m_grpcInitialized = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (arguments->m_numInts > 0)
|
||||
{
|
||||
for (int i = 0; i < arguments->m_ints[0]; i++)
|
||||
{
|
||||
if (obj->m_grpcInitialized && !obj->m_grpcTerminated)
|
||||
{
|
||||
obj->m_grpcTerminated = obj->m_grpcServer.HandleSingleRpc();
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
obj->m_grpcServer.Exit();
|
||||
obj->m_grpcInitialized = false;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
B3_SHARED_API void exitPlugin_grpcPlugin(struct b3PluginContext* context)
|
||||
{
|
||||
grpcMyClass* obj = (grpcMyClass*) context->m_userPointer;
|
||||
obj->m_grpcServer.Exit();
|
||||
delete obj;
|
||||
context->m_userPointer = 0;
|
||||
}
|
||||
29
examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.h
Normal file
29
examples/SharedMemory/plugins/grpcPlugin/grpcPlugin.h
Normal file
@@ -0,0 +1,29 @@
|
||||
#ifndef GRPC_PLUGIN_H
|
||||
#define GRPC_PLUGIN_H
|
||||
|
||||
#include "../b3PluginAPI.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C"
|
||||
{
|
||||
#endif
|
||||
|
||||
//the following 3 APIs are required
|
||||
B3_SHARED_API int initPlugin_grpcPlugin(struct b3PluginContext* context);
|
||||
B3_SHARED_API void exitPlugin_grpcPlugin(struct b3PluginContext* context);
|
||||
B3_SHARED_API int executePluginCommand_grpcPlugin(struct b3PluginContext* context, const struct b3PluginArguments* arguments);
|
||||
|
||||
//all the APIs below are optional
|
||||
B3_SHARED_API int preTickPluginCallback_grpcPlugin(struct b3PluginContext* context);
|
||||
B3_SHARED_API int postTickPluginCallback_grpcPlugin(struct b3PluginContext* context);
|
||||
|
||||
B3_SHARED_API int processClientCommands_grpcPlugin(struct b3PluginContext* context);
|
||||
|
||||
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
};
|
||||
#endif
|
||||
|
||||
#endif//#define GRPC_PLUGIN_H
|
||||
43
examples/SharedMemory/plugins/grpcPlugin/premake4.lua
Normal file
43
examples/SharedMemory/plugins/grpcPlugin/premake4.lua
Normal file
@@ -0,0 +1,43 @@
|
||||
|
||||
|
||||
project ("pybullet_grpcPlugin")
|
||||
language "C++"
|
||||
kind "SharedLib"
|
||||
|
||||
includedirs {".","../../../../src", "../../../../examples",
|
||||
"../../../ThirdPartyLibs"}
|
||||
defines {"PHYSICS_IN_PROCESS_EXAMPLE_BROWSER"}
|
||||
|
||||
initGRPC()
|
||||
|
||||
links{"BulletFileLoader", "Bullet3Common", "LinearMath"}
|
||||
|
||||
|
||||
if os.is("MacOSX") then
|
||||
-- targetextension {"so"}
|
||||
links{"Cocoa.framework"}
|
||||
end
|
||||
|
||||
|
||||
files {
|
||||
"grpcPlugin.cpp",
|
||||
"../../PhysicsClient.cpp",
|
||||
"../../PhysicsClient.h",
|
||||
"../../PhysicsClientSharedMemory.cpp",
|
||||
"../../PhysicsClientSharedMemory.h",
|
||||
"../../PhysicsClientSharedMemory_C_API.cpp",
|
||||
"../../PhysicsClientSharedMemory_C_API.h",
|
||||
"../../PhysicsClientC_API.cpp",
|
||||
"../../PhysicsClientC_API.h",
|
||||
"../../Win32SharedMemory.cpp",
|
||||
"../../Win32SharedMemory.h",
|
||||
"../../PosixSharedMemory.cpp",
|
||||
"../../PosixSharedMemory.h",
|
||||
"../../../Utils/b3Clock.cpp",
|
||||
"../../../Utils/b3Clock.h",
|
||||
"../../../Utils/b3ResourcePath.cpp",
|
||||
"../../../Utils/b3ResourcePath.h",
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user