Several fixes for the parallel raycasts

- Limits the maximum number of threads to 64, since btThreadSupportPosix
and btThreadsupportWin32 don't support more than 64 bits at this moment,
due to the use of UINT64 bitmasks. This could be fixed by using
std::bitset or some other alternative.
- Introduces a threadpool class, b3ThreadPool, which is a simple wrapper
around btThreadSupportInterface and uses this instead of the global task
scheduler for parallel raycasting. This is actually quite a bit faster
than the task scheduler (~10-15% in my tests for parallel raycasts),
since the advanced features (parallelFor) are not necessary for the
parallel raycasts.
- Puts 16*1024 of MAX_RAY_INTERSECTION_MAX_SIZE_STREAMING in
parentheses, since it otherwise causes problems with other operators
of equal precedence and introduces a smaller constant for Apple targets.
- Refactors the parallel raycasts code and adds some more profiling.
This commit is contained in:
Tigran Gasparian
2018-06-19 18:28:48 +02:00
parent 8c851a70c7
commit b84eb8af74
5 changed files with 112 additions and 48 deletions

View File

@@ -44,6 +44,7 @@
#include "b3PluginManager.h" #include "b3PluginManager.h"
#include "../Extras/Serialize/BulletFileLoader/btBulletFile.h" #include "../Extras/Serialize/BulletFileLoader/btBulletFile.h"
#include "BulletCollision/NarrowPhaseCollision/btRaycastCallback.h" #include "BulletCollision/NarrowPhaseCollision/btRaycastCallback.h"
#include "LinearMath/TaskScheduler/btThreadSupportInterface.h"
#ifndef SKIP_STATIC_PD_CONTROL_PLUGIN #ifndef SKIP_STATIC_PD_CONTROL_PLUGIN
#include "plugins/pdControlPlugin/pdControlPlugin.h" #include "plugins/pdControlPlugin/pdControlPlugin.h"
@@ -109,6 +110,47 @@ struct UrdfLinkNameMapUtil
}; };
class b3ThreadPool {
public:
b3ThreadPool(const char *name = "b3ThreadPool") {
btThreadSupportInterface::ConstructionInfo info(name, threadFunction);
m_threadSupportInterface = btThreadSupportInterface::create(info);
}
~b3ThreadPool() {
delete m_threadSupportInterface;
}
const int numWorkers() const { return m_threadSupportInterface->getNumWorkerThreads(); }
void runTask(int threadIdx, btThreadSupportInterface::ThreadFunc func, void *arg) {
FunctionContext ctx = m_functionContexts[threadIdx];
ctx.func = func;
ctx.arg = arg;
m_threadSupportInterface->runTask(threadIdx, (void *)&ctx);
}
void waitForAllTasks() {
BT_PROFILE("b3ThreadPool_waitForAllTasks");
m_threadSupportInterface->waitForAllTasks();
}
private:
struct FunctionContext {
btThreadSupportInterface::ThreadFunc func;
void *arg;
};
static void threadFunction(void *userPtr) {
BT_PROFILE("b3ThreadPool_threadFunction");
FunctionContext* ctx = (FunctionContext *)userPtr;
ctx->func(ctx->arg);
}
btThreadSupportInterface *m_threadSupportInterface;
FunctionContext m_functionContexts[BT_MAX_THREAD_COUNT];
};
struct SharedMemoryDebugDrawer : public btIDebugDraw struct SharedMemoryDebugDrawer : public btIDebugDraw
{ {
@@ -1659,7 +1701,7 @@ struct PhysicsServerCommandProcessorInternalData
b3HashMap<b3HashString, char*> m_profileEvents; b3HashMap<b3HashString, char*> m_profileEvents;
b3HashMap<b3HashString, UrdfVisualShapeCache> m_cachedVUrdfisualShapes; b3HashMap<b3HashString, UrdfVisualShapeCache> m_cachedVUrdfisualShapes;
btITaskScheduler* m_scheduler; b3ThreadPool* m_threadPool;
PhysicsServerCommandProcessorInternalData(PhysicsCommandProcessorInterface* proc) PhysicsServerCommandProcessorInternalData(PhysicsCommandProcessorInterface* proc)
:m_pluginManager(proc), :m_pluginManager(proc),
@@ -1689,7 +1731,7 @@ struct PhysicsServerCommandProcessorInternalData
m_pickedConstraint(0), m_pickedConstraint(0),
m_pickingMultiBodyPoint2Point(0), m_pickingMultiBodyPoint2Point(0),
m_pdControlPlugin(-1), m_pdControlPlugin(-1),
m_scheduler(0) m_threadPool(0)
{ {
{ {
@@ -1798,8 +1840,8 @@ PhysicsServerCommandProcessor::~PhysicsServerCommandProcessor()
char* event = *m_data->m_profileEvents.getAtIndex(i); char* event = *m_data->m_profileEvents.getAtIndex(i);
delete[] event; delete[] event;
} }
if (m_data->m_scheduler) if (m_data->m_threadPool)
delete m_data->m_scheduler; delete m_data->m_threadPool;
delete m_data; delete m_data;
} }
@@ -4716,16 +4758,17 @@ struct CastSyncInfo {
}; };
#endif // __cplusplus >= 201103L #endif // __cplusplus >= 201103L
struct BatchRayCaster : public btIParallelForBody struct BatchRayCaster
{ {
b3ThreadPool* m_threadPool;
CastSyncInfo *m_syncInfo; CastSyncInfo *m_syncInfo;
const btCollisionWorld *m_world; const btCollisionWorld *m_world;
const b3RayData *m_rayInputBuffer; const b3RayData *m_rayInputBuffer;
b3RayHitInfo *m_hitInfoOutputBuffer; b3RayHitInfo *m_hitInfoOutputBuffer;
int m_numRays; int m_numRays;
BatchRayCaster(const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays) BatchRayCaster(b3ThreadPool *threadPool, const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays)
: m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) { : m_threadPool(threadPool), m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) {
m_syncInfo = new CastSyncInfo; m_syncInfo = new CastSyncInfo;
} }
@@ -4735,25 +4778,49 @@ struct BatchRayCaster : public btIParallelForBody
void castRays(int numWorkers) { void castRays(int numWorkers) {
#if BT_THREADSAFE #if BT_THREADSAFE
btParallelFor(0, numWorkers, 1, *this); if (numWorkers <= 1) {
#else // BT_THREADSAFE castSequentially();
for (int i = 0; i < m_numRays; i++) {
processRay(i);
} }
else {
{
BT_PROFILE("BatchRayCaster_startingWorkerThreads");
int numTasks = btMin(m_threadPool->numWorkers(), numWorkers-1);
for (int i=0;i<numTasks;i++) {
m_threadPool->runTask(i, BatchRayCaster::rayCastWorker, this);
}
}
rayCastWorker(this);
m_threadPool->waitForAllTasks();
}
#else // BT_THREADSAFE
castSequentially();
#endif // BT_THREADSAFE #endif // BT_THREADSAFE
} }
void forLoop( int iBegin, int iEnd ) const static void rayCastWorker(void *arg) {
{ BT_PROFILE("BatchRayCaster_raycastWorker");
BatchRayCaster *const obj = (BatchRayCaster *)arg;
const int numRays = obj->m_numRays;
int taskNr;
while(true) { while(true) {
const int taskNr = m_syncInfo->getNextTask(); {
if (taskNr >= m_numRays) BT_PROFILE("CastSyncInfo_getNextTask");
taskNr = obj->m_syncInfo->getNextTask();
}
if (taskNr >= numRays)
return; return;
processRay(taskNr); obj->processRay(taskNr);
} }
} }
void processRay(int ray) const { void castSequentially() {
for (int i = 0; i < m_numRays; i++) {
processRay(i);
}
}
void processRay(int ray) {
BT_PROFILE("BatchRayCaster_processRay");
const double *from = m_rayInputBuffer[ray].m_rayFromPosition; const double *from = m_rayInputBuffer[ray].m_rayFromPosition;
const double *to = m_rayInputBuffer[ray].m_rayToPosition; const double *to = m_rayInputBuffer[ray].m_rayToPosition;
btVector3 rayFromWorld(from[0], from[1], from[2]); btVector3 rayFromWorld(from[0], from[1], from[2]);
@@ -4811,15 +4878,11 @@ struct BatchRayCaster : public btIParallelForBody
} }
}; };
void PhysicsServerCommandProcessor::createTaskScheduler() void PhysicsServerCommandProcessor::createThreadPool()
{ {
#ifdef BT_THREADSAFE #ifdef BT_THREADSAFE
if (btGetTaskScheduler() == 0) { if (m_data->m_threadPool == 0) {
m_data->m_scheduler = btCreateDefaultTaskScheduler(); m_data->m_threadPool = new b3ThreadPool("PhysicsServerCommandProcessorThreadPool");
if (m_data->m_scheduler == 0) {
m_data->m_scheduler = btGetSequentialTaskScheduler();
}
btSetTaskScheduler(m_data->m_scheduler);
} }
#endif //BT_THREADSAFE #endif //BT_THREADSAFE
} }
@@ -4832,9 +4895,17 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co
const int numCommandRays = clientCmd.m_requestRaycastIntersections.m_numCommandRays; const int numCommandRays = clientCmd.m_requestRaycastIntersections.m_numCommandRays;
const int numStreamingRays = clientCmd.m_requestRaycastIntersections.m_numStreamingRays; const int numStreamingRays = clientCmd.m_requestRaycastIntersections.m_numStreamingRays;
const int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads; const int totalRays = numCommandRays+numStreamingRays;
int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads;
if (numThreads == 0) {
// When 0 is specified, Bullet can decide how many threads to use.
// About 16 rays per thread seems to work reasonably well.
numThreads = btMax(1, totalRays / 16);
}
if (numThreads > 1) {
createThreadPool();
}
int totalRays = numCommandRays+numStreamingRays;
btAlignedObjectArray<b3RayData> rays; btAlignedObjectArray<b3RayData> rays;
rays.resize(totalRays); rays.resize(totalRays);
if (numCommandRays) if (numCommandRays)
@@ -4846,24 +4917,8 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co
memcpy(&rays[numCommandRays],bufferServerToClient,numStreamingRays*sizeof(b3RayData)); memcpy(&rays[numCommandRays],bufferServerToClient,numStreamingRays*sizeof(b3RayData));
} }
BatchRayCaster batchRayCaster(m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays); BatchRayCaster batchRayCaster(m_data->m_threadPool, m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays);
if (numThreads == 0) { batchRayCaster.castRays(numThreads);
createTaskScheduler();
// When 0 is specified, Bullet can decide how many threads to use.
// About 16 rays per thread seems to work reasonably well.
batchRayCaster.castRays(totalRays / 16);
} else if (numThreads == 1) {
// Sequentially trace all rays:
for (int i = 0; i < totalRays; i++) {
batchRayCaster.processRay(i);
}
} else {
// Otherwise, just use the user-specified number of threads. This is
// still limited by the number of virtual cores on the machine.
createTaskScheduler();
batchRayCaster.castRays(numThreads);
}
serverStatusOut.m_raycastHits.m_numRaycastHits = totalRays; serverStatusOut.m_raycastHits.m_numRaycastHits = totalRays;
serverStatusOut.m_type = CMD_REQUEST_RAY_CAST_INTERSECTIONS_COMPLETED; serverStatusOut.m_type = CMD_REQUEST_RAY_CAST_INTERSECTIONS_COMPLETED;

View File

@@ -21,7 +21,7 @@ class PhysicsServerCommandProcessor : public CommandProcessorInterface
struct PhysicsServerCommandProcessorInternalData* m_data; struct PhysicsServerCommandProcessorInternalData* m_data;
void resetSimulation(); void resetSimulation();
void createTaskScheduler(); void createThreadPool();
protected: protected:

View File

@@ -585,7 +585,13 @@ typedef union {
#define MAX_RAY_INTERSECTION_BATCH_SIZE 256 #define MAX_RAY_INTERSECTION_BATCH_SIZE 256
#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING 16*1024
#ifdef __APPLE__
#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (4*1024)
#else
#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (16*1024)
#endif
#define MAX_RAY_HITS MAX_RAY_INTERSECTION_BATCH_SIZE #define MAX_RAY_HITS MAX_RAY_INTERSECTION_BATCH_SIZE
#define VISUAL_SHAPE_MAX_PATH_LEN 1024 #define VISUAL_SHAPE_MAX_PATH_LEN 1024

View File

@@ -48,14 +48,14 @@ subject to the following restrictions:
int btGetNumHardwareThreads() int btGetNumHardwareThreads()
{ {
return std::thread::hardware_concurrency(); return btMin<int>(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency());
} }
#else #else
int btGetNumHardwareThreads() int btGetNumHardwareThreads()
{ {
return sysconf( _SC_NPROCESSORS_ONLN ); return btMin<int>(BT_MAX_THREAD_COUNT, sysconf( _SC_NPROCESSORS_ONLN ));
} }
#endif #endif
@@ -202,6 +202,7 @@ static void *threadFunction( void *argument )
} }
printf( "Thread TERMINATED\n" ); printf( "Thread TERMINATED\n" );
return 0;
} }
///send messages to SPUs ///send messages to SPUs

View File

@@ -28,6 +28,8 @@ subject to the following restrictions:
#define BT_OVERRIDE #define BT_OVERRIDE
#endif #endif
// Don't set this to larger than 64, without modifying btThreadSupportPosix
// and btThreadSupportWin32. They use UINT64 bit-masks.
const unsigned int BT_MAX_THREAD_COUNT = 64; // only if BT_THREADSAFE is 1 const unsigned int BT_MAX_THREAD_COUNT = 64; // only if BT_THREADSAFE is 1
// for internal use only // for internal use only