From b84eb8af74763f46748e333dc7946e290b3b0028 Mon Sep 17 00:00:00 2001 From: Tigran Gasparian Date: Tue, 19 Jun 2018 18:28:48 +0200 Subject: [PATCH] Several fixes for the parallel raycasts - Limits the maximum number of threads to 64, since btThreadSupportPosix and btThreadsupportWin32 don't support more than 64 bits at this moment, due to the use of UINT64 bitmasks. This could be fixed by using std::bitset or some other alternative. - Introduces a threadpool class, b3ThreadPool, which is a simple wrapper around btThreadSupportInterface and uses this instead of the global task scheduler for parallel raycasting. This is actually quite a bit faster than the task scheduler (~10-15% in my tests for parallel raycasts), since the advanced features (parallelFor) are not necessary for the parallel raycasts. - Puts 16*1024 of MAX_RAY_INTERSECTION_MAX_SIZE_STREAMING in parentheses, since it otherwise causes problems with other operators of equal precedence and introduces a smaller constant for Apple targets. - Refactors the parallel raycasts code and adds some more profiling. --- .../PhysicsServerCommandProcessor.cpp | 143 ++++++++++++------ .../PhysicsServerCommandProcessor.h | 2 +- examples/SharedMemory/SharedMemoryPublic.h | 8 +- .../TaskScheduler/btThreadSupportPosix.cpp | 5 +- src/LinearMath/btThreads.h | 2 + 5 files changed, 112 insertions(+), 48 deletions(-) diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp index 8bbfe82b2..51fa00ec1 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp @@ -44,6 +44,7 @@ #include "b3PluginManager.h" #include "../Extras/Serialize/BulletFileLoader/btBulletFile.h" #include "BulletCollision/NarrowPhaseCollision/btRaycastCallback.h" +#include "LinearMath/TaskScheduler/btThreadSupportInterface.h" #ifndef SKIP_STATIC_PD_CONTROL_PLUGIN #include "plugins/pdControlPlugin/pdControlPlugin.h" @@ -109,6 +110,47 @@ struct UrdfLinkNameMapUtil }; +class b3ThreadPool { +public: + b3ThreadPool(const char *name = "b3ThreadPool") { + btThreadSupportInterface::ConstructionInfo info(name, threadFunction); + m_threadSupportInterface = btThreadSupportInterface::create(info); + } + + ~b3ThreadPool() { + delete m_threadSupportInterface; + } + + const int numWorkers() const { return m_threadSupportInterface->getNumWorkerThreads(); } + + void runTask(int threadIdx, btThreadSupportInterface::ThreadFunc func, void *arg) { + FunctionContext ctx = m_functionContexts[threadIdx]; + ctx.func = func; + ctx.arg = arg; + m_threadSupportInterface->runTask(threadIdx, (void *)&ctx); + } + + void waitForAllTasks() { + BT_PROFILE("b3ThreadPool_waitForAllTasks"); + m_threadSupportInterface->waitForAllTasks(); + } + +private: + struct FunctionContext { + btThreadSupportInterface::ThreadFunc func; + void *arg; + }; + + static void threadFunction(void *userPtr) { + BT_PROFILE("b3ThreadPool_threadFunction"); + FunctionContext* ctx = (FunctionContext *)userPtr; + ctx->func(ctx->arg); + } + + btThreadSupportInterface *m_threadSupportInterface; + FunctionContext m_functionContexts[BT_MAX_THREAD_COUNT]; +}; + struct SharedMemoryDebugDrawer : public btIDebugDraw { @@ -1659,7 +1701,7 @@ struct PhysicsServerCommandProcessorInternalData b3HashMap m_profileEvents; b3HashMap m_cachedVUrdfisualShapes; - btITaskScheduler* m_scheduler; + b3ThreadPool* m_threadPool; PhysicsServerCommandProcessorInternalData(PhysicsCommandProcessorInterface* proc) :m_pluginManager(proc), @@ -1689,7 +1731,7 @@ struct PhysicsServerCommandProcessorInternalData m_pickedConstraint(0), m_pickingMultiBodyPoint2Point(0), m_pdControlPlugin(-1), - m_scheduler(0) + m_threadPool(0) { { @@ -1798,8 +1840,8 @@ PhysicsServerCommandProcessor::~PhysicsServerCommandProcessor() char* event = *m_data->m_profileEvents.getAtIndex(i); delete[] event; } - if (m_data->m_scheduler) - delete m_data->m_scheduler; + if (m_data->m_threadPool) + delete m_data->m_threadPool; delete m_data; } @@ -4716,16 +4758,17 @@ struct CastSyncInfo { }; #endif // __cplusplus >= 201103L -struct BatchRayCaster : public btIParallelForBody +struct BatchRayCaster { + b3ThreadPool* m_threadPool; CastSyncInfo *m_syncInfo; const btCollisionWorld *m_world; const b3RayData *m_rayInputBuffer; b3RayHitInfo *m_hitInfoOutputBuffer; int m_numRays; - BatchRayCaster(const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays) - : m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) { + BatchRayCaster(b3ThreadPool *threadPool, const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays) + : m_threadPool(threadPool), m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) { m_syncInfo = new CastSyncInfo; } @@ -4735,25 +4778,49 @@ struct BatchRayCaster : public btIParallelForBody void castRays(int numWorkers) { #if BT_THREADSAFE - btParallelFor(0, numWorkers, 1, *this); -#else // BT_THREADSAFE - for (int i = 0; i < m_numRays; i++) { - processRay(i); + if (numWorkers <= 1) { + castSequentially(); } + else { + { + BT_PROFILE("BatchRayCaster_startingWorkerThreads"); + int numTasks = btMin(m_threadPool->numWorkers(), numWorkers-1); + for (int i=0;irunTask(i, BatchRayCaster::rayCastWorker, this); + } + } + rayCastWorker(this); + m_threadPool->waitForAllTasks(); + } +#else // BT_THREADSAFE + castSequentially(); #endif // BT_THREADSAFE } - void forLoop( int iBegin, int iEnd ) const - { + static void rayCastWorker(void *arg) { + BT_PROFILE("BatchRayCaster_raycastWorker"); + BatchRayCaster *const obj = (BatchRayCaster *)arg; + const int numRays = obj->m_numRays; + int taskNr; while(true) { - const int taskNr = m_syncInfo->getNextTask(); - if (taskNr >= m_numRays) + { + BT_PROFILE("CastSyncInfo_getNextTask"); + taskNr = obj->m_syncInfo->getNextTask(); + } + if (taskNr >= numRays) return; - processRay(taskNr); + obj->processRay(taskNr); } } - void processRay(int ray) const { + void castSequentially() { + for (int i = 0; i < m_numRays; i++) { + processRay(i); + } + } + + void processRay(int ray) { + BT_PROFILE("BatchRayCaster_processRay"); const double *from = m_rayInputBuffer[ray].m_rayFromPosition; const double *to = m_rayInputBuffer[ray].m_rayToPosition; btVector3 rayFromWorld(from[0], from[1], from[2]); @@ -4811,15 +4878,11 @@ struct BatchRayCaster : public btIParallelForBody } }; -void PhysicsServerCommandProcessor::createTaskScheduler() +void PhysicsServerCommandProcessor::createThreadPool() { #ifdef BT_THREADSAFE - if (btGetTaskScheduler() == 0) { - m_data->m_scheduler = btCreateDefaultTaskScheduler(); - if (m_data->m_scheduler == 0) { - m_data->m_scheduler = btGetSequentialTaskScheduler(); - } - btSetTaskScheduler(m_data->m_scheduler); + if (m_data->m_threadPool == 0) { + m_data->m_threadPool = new b3ThreadPool("PhysicsServerCommandProcessorThreadPool"); } #endif //BT_THREADSAFE } @@ -4832,9 +4895,17 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co const int numCommandRays = clientCmd.m_requestRaycastIntersections.m_numCommandRays; const int numStreamingRays = clientCmd.m_requestRaycastIntersections.m_numStreamingRays; - const int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads; + const int totalRays = numCommandRays+numStreamingRays; + int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads; + if (numThreads == 0) { + // When 0 is specified, Bullet can decide how many threads to use. + // About 16 rays per thread seems to work reasonably well. + numThreads = btMax(1, totalRays / 16); + } + if (numThreads > 1) { + createThreadPool(); + } - int totalRays = numCommandRays+numStreamingRays; btAlignedObjectArray rays; rays.resize(totalRays); if (numCommandRays) @@ -4846,24 +4917,8 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co memcpy(&rays[numCommandRays],bufferServerToClient,numStreamingRays*sizeof(b3RayData)); } - BatchRayCaster batchRayCaster(m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays); - if (numThreads == 0) { - createTaskScheduler(); - // When 0 is specified, Bullet can decide how many threads to use. - // About 16 rays per thread seems to work reasonably well. - batchRayCaster.castRays(totalRays / 16); - } else if (numThreads == 1) { - // Sequentially trace all rays: - for (int i = 0; i < totalRays; i++) { - batchRayCaster.processRay(i); - } - } else { - - // Otherwise, just use the user-specified number of threads. This is - // still limited by the number of virtual cores on the machine. - createTaskScheduler(); - batchRayCaster.castRays(numThreads); - } + BatchRayCaster batchRayCaster(m_data->m_threadPool, m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays); + batchRayCaster.castRays(numThreads); serverStatusOut.m_raycastHits.m_numRaycastHits = totalRays; serverStatusOut.m_type = CMD_REQUEST_RAY_CAST_INTERSECTIONS_COMPLETED; diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.h b/examples/SharedMemory/PhysicsServerCommandProcessor.h index e624aa676..e6166a206 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.h +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.h @@ -21,7 +21,7 @@ class PhysicsServerCommandProcessor : public CommandProcessorInterface struct PhysicsServerCommandProcessorInternalData* m_data; void resetSimulation(); - void createTaskScheduler(); + void createThreadPool(); protected: diff --git a/examples/SharedMemory/SharedMemoryPublic.h b/examples/SharedMemory/SharedMemoryPublic.h index 3023bfaf3..7e7338e36 100644 --- a/examples/SharedMemory/SharedMemoryPublic.h +++ b/examples/SharedMemory/SharedMemoryPublic.h @@ -585,7 +585,13 @@ typedef union { #define MAX_RAY_INTERSECTION_BATCH_SIZE 256 -#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING 16*1024 + +#ifdef __APPLE__ +#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (4*1024) +#else +#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (16*1024) +#endif + #define MAX_RAY_HITS MAX_RAY_INTERSECTION_BATCH_SIZE #define VISUAL_SHAPE_MAX_PATH_LEN 1024 diff --git a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp index ccd7d1e12..50ca060df 100644 --- a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp +++ b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp @@ -48,14 +48,14 @@ subject to the following restrictions: int btGetNumHardwareThreads() { - return std::thread::hardware_concurrency(); + return btMin(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()); } #else int btGetNumHardwareThreads() { - return sysconf( _SC_NPROCESSORS_ONLN ); + return btMin(BT_MAX_THREAD_COUNT, sysconf( _SC_NPROCESSORS_ONLN )); } #endif @@ -202,6 +202,7 @@ static void *threadFunction( void *argument ) } printf( "Thread TERMINATED\n" ); + return 0; } ///send messages to SPUs diff --git a/src/LinearMath/btThreads.h b/src/LinearMath/btThreads.h index ecd5a19cf..921fd088c 100644 --- a/src/LinearMath/btThreads.h +++ b/src/LinearMath/btThreads.h @@ -28,6 +28,8 @@ subject to the following restrictions: #define BT_OVERRIDE #endif +// Don't set this to larger than 64, without modifying btThreadSupportPosix +// and btThreadSupportWin32. They use UINT64 bit-masks. const unsigned int BT_MAX_THREAD_COUNT = 64; // only if BT_THREADSAFE is 1 // for internal use only