Reduces wait in the gRPC plugin, fix state bug.

Updates the gRPC plugin to gather messages in its own thread, and
corrects the state details not being correctly added to shared memory.
This commit is contained in:
mbennice
2019-05-07 11:11:01 -07:00
parent 111d06abd0
commit cf16c5c51f
2 changed files with 77 additions and 43 deletions

View File

@@ -11,6 +11,9 @@
#include <stdio.h>
#include <mutex>
#include <thread>
#include <grpc++/grpc++.h>
#include <grpc/support/log.h>
#include "../../../Utils/b3Clock.h"
@@ -44,6 +47,9 @@ public:
if (server_)
{
server_->Shutdown();
m_requestThreadCancelled = true;
m_requestThread->join();
delete m_requestThread;
// Always shutdown the completion queue after the server.
cq_->Shutdown();
server_ = 0;
@@ -64,6 +70,10 @@ public:
server_ = m_builder.BuildAndStart();
std::cout << "grpcPlugin Bullet Physics GRPC server listening on " << hostNamePort << std::endl;
//Start the thread to gather the requests.
m_requestThreadCancelled = false;
m_requestThread = new std::thread(&ServerImpl::GatherRequests, this);
// Proceed to the server's main loop.
InitRpcs(comProc);
}
@@ -72,25 +82,13 @@ public:
bool HandleSingleRpc()
{
CallData::CallStatus status = CallData::CallStatus::CREATE;
std::lock_guard<std::mutex> guard(m_queueMutex);
if (!m_requestQueue.empty()) {
void* tag = m_requestQueue.front();
m_requestQueue.pop_front();
status = static_cast<CallData*>(tag)->Proceed();
}
{
void* tag; // uniquely identifies a request.
bool ok;
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
{
//GPR_ASSERT(cq_->Next(&tag, &ok));
GPR_ASSERT(ok);
status = static_cast<CallData*>(tag)->Proceed();
}
}
return status == CallData::CallStatus::TERMINATE;
}
@@ -252,6 +250,39 @@ private:
std::unique_ptr<ServerCompletionQueue> cq_;
PyBulletAPI::AsyncService service_;
std::unique_ptr<Server> server_;
// Mutex to protect access to the request queue variables (m_requestQueue,
// m_requestThread, m_requestThreadCancelled).
std::mutex m_queueMutex;
// List of outstanding request tags.
std::list<void*> m_requestQueue;
// Whether or not the gathering thread is cancelled.
bool m_requestThreadCancelled;
// Thread to gather requests from the completion queue.
std::thread* m_requestThread;
void GatherRequests() {
void* tag; // uniquely identifies a request.
bool ok;
while(!m_requestThreadCancelled) {
// Block waiting to read the next event from the completion queue. The
// event is uniquely identified by its tag, which in this case is the
// memory address of a CallData instance.
// The return value of Next should always be checked. This return value
// tells us whether there is any kind of event or cq_ is shutting down.
grpc::CompletionQueue::NextStatus nextStatus = cq_->AsyncNext(&tag, &ok, gpr_now(GPR_CLOCK_MONOTONIC));
if (nextStatus == grpc::CompletionQueue::NextStatus::GOT_EVENT)
{
GPR_ASSERT(ok);
std::lock_guard<std::mutex> guard(m_queueMutex);
m_requestQueue.push_back(tag);
}
}
}
};
struct grpcMyClass