diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp index 8bbfe82b2..51fa00ec1 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.cpp +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.cpp @@ -44,6 +44,7 @@ #include "b3PluginManager.h" #include "../Extras/Serialize/BulletFileLoader/btBulletFile.h" #include "BulletCollision/NarrowPhaseCollision/btRaycastCallback.h" +#include "LinearMath/TaskScheduler/btThreadSupportInterface.h" #ifndef SKIP_STATIC_PD_CONTROL_PLUGIN #include "plugins/pdControlPlugin/pdControlPlugin.h" @@ -109,6 +110,47 @@ struct UrdfLinkNameMapUtil }; +class b3ThreadPool { +public: + b3ThreadPool(const char *name = "b3ThreadPool") { + btThreadSupportInterface::ConstructionInfo info(name, threadFunction); + m_threadSupportInterface = btThreadSupportInterface::create(info); + } + + ~b3ThreadPool() { + delete m_threadSupportInterface; + } + + const int numWorkers() const { return m_threadSupportInterface->getNumWorkerThreads(); } + + void runTask(int threadIdx, btThreadSupportInterface::ThreadFunc func, void *arg) { + FunctionContext ctx = m_functionContexts[threadIdx]; + ctx.func = func; + ctx.arg = arg; + m_threadSupportInterface->runTask(threadIdx, (void *)&ctx); + } + + void waitForAllTasks() { + BT_PROFILE("b3ThreadPool_waitForAllTasks"); + m_threadSupportInterface->waitForAllTasks(); + } + +private: + struct FunctionContext { + btThreadSupportInterface::ThreadFunc func; + void *arg; + }; + + static void threadFunction(void *userPtr) { + BT_PROFILE("b3ThreadPool_threadFunction"); + FunctionContext* ctx = (FunctionContext *)userPtr; + ctx->func(ctx->arg); + } + + btThreadSupportInterface *m_threadSupportInterface; + FunctionContext m_functionContexts[BT_MAX_THREAD_COUNT]; +}; + struct SharedMemoryDebugDrawer : public btIDebugDraw { @@ -1659,7 +1701,7 @@ struct PhysicsServerCommandProcessorInternalData b3HashMap m_profileEvents; b3HashMap m_cachedVUrdfisualShapes; - btITaskScheduler* m_scheduler; + b3ThreadPool* m_threadPool; PhysicsServerCommandProcessorInternalData(PhysicsCommandProcessorInterface* proc) :m_pluginManager(proc), @@ -1689,7 +1731,7 @@ struct PhysicsServerCommandProcessorInternalData m_pickedConstraint(0), m_pickingMultiBodyPoint2Point(0), m_pdControlPlugin(-1), - m_scheduler(0) + m_threadPool(0) { { @@ -1798,8 +1840,8 @@ PhysicsServerCommandProcessor::~PhysicsServerCommandProcessor() char* event = *m_data->m_profileEvents.getAtIndex(i); delete[] event; } - if (m_data->m_scheduler) - delete m_data->m_scheduler; + if (m_data->m_threadPool) + delete m_data->m_threadPool; delete m_data; } @@ -4716,16 +4758,17 @@ struct CastSyncInfo { }; #endif // __cplusplus >= 201103L -struct BatchRayCaster : public btIParallelForBody +struct BatchRayCaster { + b3ThreadPool* m_threadPool; CastSyncInfo *m_syncInfo; const btCollisionWorld *m_world; const b3RayData *m_rayInputBuffer; b3RayHitInfo *m_hitInfoOutputBuffer; int m_numRays; - BatchRayCaster(const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays) - : m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) { + BatchRayCaster(b3ThreadPool *threadPool, const btCollisionWorld* world, const b3RayData *rayInputBuffer, b3RayHitInfo *hitInfoOutputBuffer, int numRays) + : m_threadPool(threadPool), m_world(world), m_rayInputBuffer(rayInputBuffer), m_hitInfoOutputBuffer(hitInfoOutputBuffer), m_numRays(numRays) { m_syncInfo = new CastSyncInfo; } @@ -4735,25 +4778,49 @@ struct BatchRayCaster : public btIParallelForBody void castRays(int numWorkers) { #if BT_THREADSAFE - btParallelFor(0, numWorkers, 1, *this); -#else // BT_THREADSAFE - for (int i = 0; i < m_numRays; i++) { - processRay(i); + if (numWorkers <= 1) { + castSequentially(); } + else { + { + BT_PROFILE("BatchRayCaster_startingWorkerThreads"); + int numTasks = btMin(m_threadPool->numWorkers(), numWorkers-1); + for (int i=0;irunTask(i, BatchRayCaster::rayCastWorker, this); + } + } + rayCastWorker(this); + m_threadPool->waitForAllTasks(); + } +#else // BT_THREADSAFE + castSequentially(); #endif // BT_THREADSAFE } - void forLoop( int iBegin, int iEnd ) const - { + static void rayCastWorker(void *arg) { + BT_PROFILE("BatchRayCaster_raycastWorker"); + BatchRayCaster *const obj = (BatchRayCaster *)arg; + const int numRays = obj->m_numRays; + int taskNr; while(true) { - const int taskNr = m_syncInfo->getNextTask(); - if (taskNr >= m_numRays) + { + BT_PROFILE("CastSyncInfo_getNextTask"); + taskNr = obj->m_syncInfo->getNextTask(); + } + if (taskNr >= numRays) return; - processRay(taskNr); + obj->processRay(taskNr); } } - void processRay(int ray) const { + void castSequentially() { + for (int i = 0; i < m_numRays; i++) { + processRay(i); + } + } + + void processRay(int ray) { + BT_PROFILE("BatchRayCaster_processRay"); const double *from = m_rayInputBuffer[ray].m_rayFromPosition; const double *to = m_rayInputBuffer[ray].m_rayToPosition; btVector3 rayFromWorld(from[0], from[1], from[2]); @@ -4811,15 +4878,11 @@ struct BatchRayCaster : public btIParallelForBody } }; -void PhysicsServerCommandProcessor::createTaskScheduler() +void PhysicsServerCommandProcessor::createThreadPool() { #ifdef BT_THREADSAFE - if (btGetTaskScheduler() == 0) { - m_data->m_scheduler = btCreateDefaultTaskScheduler(); - if (m_data->m_scheduler == 0) { - m_data->m_scheduler = btGetSequentialTaskScheduler(); - } - btSetTaskScheduler(m_data->m_scheduler); + if (m_data->m_threadPool == 0) { + m_data->m_threadPool = new b3ThreadPool("PhysicsServerCommandProcessorThreadPool"); } #endif //BT_THREADSAFE } @@ -4832,9 +4895,17 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co const int numCommandRays = clientCmd.m_requestRaycastIntersections.m_numCommandRays; const int numStreamingRays = clientCmd.m_requestRaycastIntersections.m_numStreamingRays; - const int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads; + const int totalRays = numCommandRays+numStreamingRays; + int numThreads = clientCmd.m_requestRaycastIntersections.m_numThreads; + if (numThreads == 0) { + // When 0 is specified, Bullet can decide how many threads to use. + // About 16 rays per thread seems to work reasonably well. + numThreads = btMax(1, totalRays / 16); + } + if (numThreads > 1) { + createThreadPool(); + } - int totalRays = numCommandRays+numStreamingRays; btAlignedObjectArray rays; rays.resize(totalRays); if (numCommandRays) @@ -4846,24 +4917,8 @@ bool PhysicsServerCommandProcessor::processRequestRaycastIntersectionsCommand(co memcpy(&rays[numCommandRays],bufferServerToClient,numStreamingRays*sizeof(b3RayData)); } - BatchRayCaster batchRayCaster(m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays); - if (numThreads == 0) { - createTaskScheduler(); - // When 0 is specified, Bullet can decide how many threads to use. - // About 16 rays per thread seems to work reasonably well. - batchRayCaster.castRays(totalRays / 16); - } else if (numThreads == 1) { - // Sequentially trace all rays: - for (int i = 0; i < totalRays; i++) { - batchRayCaster.processRay(i); - } - } else { - - // Otherwise, just use the user-specified number of threads. This is - // still limited by the number of virtual cores on the machine. - createTaskScheduler(); - batchRayCaster.castRays(numThreads); - } + BatchRayCaster batchRayCaster(m_data->m_threadPool, m_data->m_dynamicsWorld, &rays[0], (b3RayHitInfo *)bufferServerToClient, totalRays); + batchRayCaster.castRays(numThreads); serverStatusOut.m_raycastHits.m_numRaycastHits = totalRays; serverStatusOut.m_type = CMD_REQUEST_RAY_CAST_INTERSECTIONS_COMPLETED; diff --git a/examples/SharedMemory/PhysicsServerCommandProcessor.h b/examples/SharedMemory/PhysicsServerCommandProcessor.h index e624aa676..e6166a206 100644 --- a/examples/SharedMemory/PhysicsServerCommandProcessor.h +++ b/examples/SharedMemory/PhysicsServerCommandProcessor.h @@ -21,7 +21,7 @@ class PhysicsServerCommandProcessor : public CommandProcessorInterface struct PhysicsServerCommandProcessorInternalData* m_data; void resetSimulation(); - void createTaskScheduler(); + void createThreadPool(); protected: diff --git a/examples/SharedMemory/SharedMemoryPublic.h b/examples/SharedMemory/SharedMemoryPublic.h index 3023bfaf3..7e7338e36 100644 --- a/examples/SharedMemory/SharedMemoryPublic.h +++ b/examples/SharedMemory/SharedMemoryPublic.h @@ -585,7 +585,13 @@ typedef union { #define MAX_RAY_INTERSECTION_BATCH_SIZE 256 -#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING 16*1024 + +#ifdef __APPLE__ +#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (4*1024) +#else +#define MAX_RAY_INTERSECTION_BATCH_SIZE_STREAMING (16*1024) +#endif + #define MAX_RAY_HITS MAX_RAY_INTERSECTION_BATCH_SIZE #define VISUAL_SHAPE_MAX_PATH_LEN 1024 diff --git a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp index ccd7d1e12..50ca060df 100644 --- a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp +++ b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp @@ -48,14 +48,14 @@ subject to the following restrictions: int btGetNumHardwareThreads() { - return std::thread::hardware_concurrency(); + return btMin(BT_MAX_THREAD_COUNT, std::thread::hardware_concurrency()); } #else int btGetNumHardwareThreads() { - return sysconf( _SC_NPROCESSORS_ONLN ); + return btMin(BT_MAX_THREAD_COUNT, sysconf( _SC_NPROCESSORS_ONLN )); } #endif @@ -202,6 +202,7 @@ static void *threadFunction( void *argument ) } printf( "Thread TERMINATED\n" ); + return 0; } ///send messages to SPUs diff --git a/src/LinearMath/btThreads.h b/src/LinearMath/btThreads.h index ecd5a19cf..921fd088c 100644 --- a/src/LinearMath/btThreads.h +++ b/src/LinearMath/btThreads.h @@ -28,6 +28,8 @@ subject to the following restrictions: #define BT_OVERRIDE #endif +// Don't set this to larger than 64, without modifying btThreadSupportPosix +// and btThreadSupportWin32. They use UINT64 bit-masks. const unsigned int BT_MAX_THREAD_COUNT = 64; // only if BT_THREADSAFE is 1 // for internal use only