diff --git a/examples/ExampleBrowser/CMakeLists.txt b/examples/ExampleBrowser/CMakeLists.txt index 4a4db93db..9382f41f6 100644 --- a/examples/ExampleBrowser/CMakeLists.txt +++ b/examples/ExampleBrowser/CMakeLists.txt @@ -220,6 +220,7 @@ SET(BulletExampleBrowser_SRCS ../MultiThreading/b3PosixThreadSupport.cpp ../MultiThreading/b3Win32ThreadSupport.cpp ../MultiThreading/b3ThreadSupportInterface.cpp + ../MultiThreading/btTaskScheduler.cpp ../RenderingExamples/TinyRendererSetup.cpp ../RenderingExamples/TimeSeriesCanvas.cpp ../RenderingExamples/TimeSeriesCanvas.h diff --git a/examples/MultiThreadedDemo/CommonRigidBodyMTBase.cpp b/examples/MultiThreadedDemo/CommonRigidBodyMTBase.cpp index a47b76da6..83eecca61 100644 --- a/examples/MultiThreadedDemo/CommonRigidBodyMTBase.cpp +++ b/examples/MultiThreadedDemo/CommonRigidBodyMTBase.cpp @@ -35,6 +35,7 @@ class btCollisionShape; #include "BulletDynamics/MLCPSolvers/btSolveProjectedGaussSeidel.h" #include "BulletDynamics/MLCPSolvers/btDantzigSolver.h" #include "BulletDynamics/MLCPSolvers/btLemkeSolver.h" +#include "../MultiThreading/btTaskScheduler.h" static int gNumIslands = 0; @@ -173,7 +174,7 @@ void myParallelIslandDispatch( btAlignedObjectArray m_taskSchedulers; + btAlignedObjectArray m_allocatedTaskSchedulers; public: btTaskSchedulerManager() {} @@ -251,6 +253,11 @@ public: { addTaskScheduler( btGetSequentialTaskScheduler() ); #if BT_THREADSAFE + if ( btITaskScheduler* ts = createDefaultTaskScheduler() ) + { + m_allocatedTaskSchedulers.push_back( ts ); + addTaskScheduler( ts ); + } addTaskScheduler( btGetOpenMPTaskScheduler() ); addTaskScheduler( btGetTBBTaskScheduler() ); addTaskScheduler( btGetPPLTaskScheduler() ); @@ -263,9 +270,16 @@ public: { btSetTaskScheduler( m_taskSchedulers[ 0 ] ); } - btGetTaskScheduler()->setNumThreads( btGetTaskScheduler()->getMaxNumThreads() ); #endif // #if BT_THREADSAFE } + void shutdown() + { + for ( int i = 0; i < m_allocatedTaskSchedulers.size(); ++i ) + { + delete m_allocatedTaskSchedulers[ i ]; + } + m_allocatedTaskSchedulers.clear(); + } void addTaskScheduler( btITaskScheduler* ts ) { @@ -281,8 +295,13 @@ public: static btTaskSchedulerManager gTaskSchedulerMgr; +#if BT_THREADSAFE +static bool gMultithreadedWorld = true; +static bool gDisplayProfileInfo = true; +#else static bool gMultithreadedWorld = false; static bool gDisplayProfileInfo = false; +#endif static SolverType gSolverType = SOLVER_TYPE_SEQUENTIAL_IMPULSE; static int gSolverMode = SOLVER_SIMD | SOLVER_USE_WARMSTARTING | diff --git a/examples/MultiThreadedDemo/MultiThreadedDemo.cpp b/examples/MultiThreadedDemo/MultiThreadedDemo.cpp index 3dff09ae8..96c856f53 100644 --- a/examples/MultiThreadedDemo/MultiThreadedDemo.cpp +++ b/examples/MultiThreadedDemo/MultiThreadedDemo.cpp @@ -28,11 +28,10 @@ class btCollisionShape; #include "btBulletCollisionCommon.h" -#define BT_OVERRIDE static btScalar gSliderStackRows = 8.0f; static btScalar gSliderStackColumns = 6.0f; -static btScalar gSliderStackHeight = 15.0f; +static btScalar gSliderStackHeight = 10.0f; static btScalar gSliderGroundHorizontalAmplitude = 0.0f; static btScalar gSliderGroundVerticalAmplitude = 0.0f; @@ -240,7 +239,7 @@ void MultiThreadedDemo::createSceneObjects() const btVector3 halfExtents = btVector3( 0.5f, 0.25f, 0.5f ); int numStackRows = btMax(1, int(gSliderStackRows)); int numStackCols = btMax(1, int(gSliderStackColumns)); - int stackHeight = 15; + int stackHeight = int(gSliderStackHeight); float stackZSpacing = 3.0f; float stackXSpacing = 20.0f; diff --git a/examples/MultiThreading/btTaskScheduler.cpp b/examples/MultiThreading/btTaskScheduler.cpp new file mode 100644 index 000000000..c4de30ebc --- /dev/null +++ b/examples/MultiThreading/btTaskScheduler.cpp @@ -0,0 +1,448 @@ + +#include "LinearMath/btTransform.h" +#include "../Utils/b3Clock.h" +#include "LinearMath/btAlignedObjectArray.h" +#include "LinearMath/btThreads.h" +#include "LinearMath/btQuickprof.h" +#include +#include + + +typedef void( *btThreadFunc )( void* userPtr, void* lsMemory ); +typedef void* ( *btThreadLocalStorageFunc )(); + +#if BT_THREADSAFE + +#if defined( _WIN32 ) + +#include "b3Win32ThreadSupport.h" + +b3ThreadSupportInterface* createThreadSupport( int numThreads, btThreadFunc threadFunc, btThreadLocalStorageFunc localStoreFunc, const char* uniqueName ) +{ + b3Win32ThreadSupport::Win32ThreadConstructionInfo constructionInfo( uniqueName, threadFunc, localStoreFunc, numThreads ); + //constructionInfo.m_priority = 0; // highest priority (the default) -- can cause erratic performance when numThreads > numCores + // we don't want worker threads to be higher priority than the main thread or the main thread could get + // totally shut out and unable to tell the workers to stop + constructionInfo.m_priority = -1; // normal priority + b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport( constructionInfo ); + return threadSupport; +} + +#else // #if defined( _WIN32 ) + +#include "b3PosixThreadSupport.h" + +b3ThreadSupportInterface* createThreadSupport( int numThreads, btThreadFunc threadFunc, btThreadLocalStorageFunc localStoreFunc, const char* uniqueName) +{ + b3PosixThreadSupport::ThreadConstructionInfo constructionInfo( uniqueName, threadFunc, localStoreFunc, numThreads ); + b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport( constructionInfo ); + return threadSupport; +} + +#endif // #else // #if defined( _WIN32 ) + + +/// +/// getNumHardwareThreads() +/// +/// +/// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine +/// +#if __cplusplus >= 201103L + +#include + +int getNumHardwareThreads() +{ + return std::thread::hardware_concurrency(); +} + +#elif defined( _WIN32 ) + +#define WIN32_LEAN_AND_MEAN + +#include + +int getNumHardwareThreads() +{ + // caps out at 32 + SYSTEM_INFO info; + GetSystemInfo( &info ); + return info.dwNumberOfProcessors; +} + +#else + +int getNumHardwareThreads() +{ + return 0; // don't know +} + +#endif + + +struct WorkerThreadStatus +{ + enum Type + { + kInvalid, + kWaitingForWork, + kWorking, + kSleeping, + }; +}; + + +struct IJob +{ + virtual void executeJob() = 0; +}; + +class ParallelForJob : public IJob +{ + const btIParallelForBody* mBody; + int mBegin; + int mEnd; + +public: + ParallelForJob() + { + mBody = NULL; + mBegin = 0; + mEnd = 0; + } + void init( int iBegin, int iEnd, const btIParallelForBody& body ) + { + mBody = &body; + mBegin = iBegin; + mEnd = iEnd; + } + virtual void executeJob() override + { + BT_PROFILE( "executeJob" ); + + // call the functor body to do the work + mBody->forLoop( mBegin, mEnd ); + } +}; + + +struct JobContext +{ + JobContext() + { + m_queueLock = NULL; + m_headIndex = 0; + m_tailIndex = 0; + m_workersShouldCheckQueue = false; + m_useSpinMutex = false; + } + b3CriticalSection* m_queueLock; + btSpinMutex m_mutex; + volatile bool m_workersShouldCheckQueue; + + btAlignedObjectArray m_jobQueue; + bool m_queueIsEmpty; + int m_tailIndex; + int m_headIndex; + bool m_useSpinMutex; + + void lockQueue() + { + if ( m_useSpinMutex ) + { + m_mutex.lock(); + } + else + { + m_queueLock->lock(); + } + } + void unlockQueue() + { + if ( m_useSpinMutex ) + { + m_mutex.unlock(); + } + else + { + m_queueLock->unlock(); + } + } + void clearQueue() + { + lockQueue(); + m_headIndex = 0; + m_tailIndex = 0; + m_queueIsEmpty = true; + unlockQueue(); + m_jobQueue.resizeNoInitialize( 0 ); + } + void submitJob( IJob* job ) + { + m_jobQueue.push_back( job ); + lockQueue(); + m_tailIndex++; + m_queueIsEmpty = false; + unlockQueue(); + } + IJob* consumeJob() + { + if ( m_queueIsEmpty ) + { + // lock free path. even if this is taken erroneously it isn't harmful + return NULL; + } + IJob* job = NULL; + lockQueue(); + if ( !m_queueIsEmpty ) + { + job = m_jobQueue[ m_headIndex++ ]; + if ( m_headIndex == m_tailIndex ) + { + m_queueIsEmpty = true; + } + } + unlockQueue(); + return job; + } +}; + + +struct WorkerThreadLocalStorage +{ + int threadId; + WorkerThreadStatus::Type status; +}; + + +static void WorkerThreadFunc( void* userPtr, void* lsMemory ) +{ + BT_PROFILE( "WorkerThreadFunc" ); + WorkerThreadLocalStorage* localStorage = (WorkerThreadLocalStorage*) lsMemory; + localStorage->status = WorkerThreadStatus::kWaitingForWork; + //printf( "WorkerThreadFunc: worker %d start working\n", localStorage->threadId ); + + JobContext* jobContext = (JobContext*) userPtr; + + while ( jobContext->m_workersShouldCheckQueue ) + { + if ( IJob* job = jobContext->consumeJob() ) + { + localStorage->status = WorkerThreadStatus::kWorking; + job->executeJob(); + localStorage->status = WorkerThreadStatus::kWaitingForWork; + } + else + { + // todo: spin wait a bit to avoid hammering the empty queue + } + } + + //printf( "WorkerThreadFunc stop working\n" ); + localStorage->status = WorkerThreadStatus::kSleeping; + // go idle +} + + +static void* WorkerThreadAllocFunc() +{ + return new WorkerThreadLocalStorage; +} + + + +class btTaskSchedulerDefault : public btITaskScheduler +{ + JobContext m_jobContext; + b3ThreadSupportInterface* m_threadSupport; + btAlignedObjectArray m_jobs; + btSpinMutex m_antiNestingLock; // prevent nested parallel-for + int m_numThreads; + int m_numWorkerThreads; + int m_numWorkersRunning; +public: + + btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") + { + m_threadSupport = NULL; + m_numThreads = getNumHardwareThreads(); + // if can't detect number of cores, + if ( m_numThreads == 0 ) + { + // take a guess + m_numThreads = 4; + } + m_numWorkerThreads = m_numThreads - 1; + m_numWorkersRunning = 0; + } + + virtual ~btTaskSchedulerDefault() + { + shutdown(); + } + + void init() + { + int maxNumWorkerThreads = BT_MAX_THREAD_COUNT - 1; + m_threadSupport = createThreadSupport( maxNumWorkerThreads, WorkerThreadFunc, WorkerThreadAllocFunc, "TaskScheduler" ); + m_jobContext.m_queueLock = m_threadSupport->createCriticalSection(); + for ( int i = 0; i < maxNumWorkerThreads; i++ ) + { + WorkerThreadLocalStorage* storage = (WorkerThreadLocalStorage*) m_threadSupport->getThreadLocalMemory( i ); + btAssert( storage ); + storage->threadId = i; + storage->status = WorkerThreadStatus::kSleeping; + } + setWorkersActive( false ); // no work for them yet + } + + virtual void shutdown() + { + setWorkersActive( false ); + waitForWorkersToSleep(); + m_threadSupport->deleteCriticalSection( m_jobContext.m_queueLock ); + m_jobContext.m_queueLock = NULL; + + delete m_threadSupport; + m_threadSupport = NULL; + } + + void setWorkersActive( bool active ) + { + m_jobContext.m_workersShouldCheckQueue = active; + } + + virtual int getMaxNumThreads() const BT_OVERRIDE + { + return BT_MAX_THREAD_COUNT; + } + + virtual int getNumThreads() const BT_OVERRIDE + { + return m_numThreads; + } + + virtual void setNumThreads( int numThreads ) BT_OVERRIDE + { + m_numThreads = btMax( btMin(numThreads, int(BT_MAX_THREAD_COUNT)), 1 ); + m_numWorkerThreads = m_numThreads - 1; + } + + void waitJobs() + { + BT_PROFILE( "waitJobs" ); + // have the main thread work until the job queue is empty + for ( ;; ) + { + if ( IJob* job = m_jobContext.consumeJob() ) + { + job->executeJob(); + } + else + { + break; + } + } + // done with jobs for now, tell workers to rest + setWorkersActive( false ); + waitForWorkersToSleep(); + } + + void wakeWorkers() + { + BT_PROFILE( "wakeWorkers" ); + btAssert( m_jobContext.m_workersShouldCheckQueue ); + // tell each worker thread to start working + for ( int i = 0; i < m_numWorkerThreads; i++ ) + { + m_threadSupport->runTask( B3_THREAD_SCHEDULE_TASK, &m_jobContext, i ); + m_numWorkersRunning++; + } + } + + void waitForWorkersToSleep() + { + BT_PROFILE( "waitForWorkersToSleep" ); + while ( m_numWorkersRunning > 0 ) + { + int iThread; + int threadStatus; + m_threadSupport->waitForResponse( &iThread, &threadStatus ); // wait for worker threads to finish working + m_numWorkersRunning--; + } + //m_threadSupport->waitForAllTasksToComplete(); + for ( int i = 0; i < m_numWorkerThreads; i++ ) + { + //m_threadSupport->waitForTaskCompleted( i ); + WorkerThreadLocalStorage* storage = (WorkerThreadLocalStorage*) m_threadSupport->getThreadLocalMemory( i ); + btAssert( storage ); + btAssert( storage->status == WorkerThreadStatus::kSleeping ); + } + } + + virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE + { + BT_PROFILE( "parallelFor_ThreadSupport" ); + btAssert( iEnd >= iBegin ); + btAssert( grainSize >= 1 ); + int iterationCount = iEnd - iBegin; + if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() ) + { + int jobCount = ( iterationCount + grainSize - 1 ) / grainSize; + btAssert( jobCount >= 2 ); // need more than one job for multithreading + if ( jobCount > m_jobs.size() ) + { + m_jobs.resize( jobCount ); + } + if ( jobCount > m_jobContext.m_jobQueue.capacity() ) + { + m_jobContext.m_jobQueue.reserve( jobCount ); + } + + m_jobContext.clearQueue(); + // prepare worker threads for incoming work + setWorkersActive( true ); + wakeWorkers(); + // submit all of the jobs + int iJob = 0; + for ( int i = iBegin; i < iEnd; i += grainSize ) + { + btAssert( iJob < jobCount ); + int iE = btMin( i + grainSize, iEnd ); + ParallelForJob& job = m_jobs[ iJob ]; + job.init( i, iE, body ); + m_jobContext.submitJob( &job ); + iJob++; + } + + // put the main thread to work on emptying the job queue and then wait for all workers to finish + waitJobs(); + m_antiNestingLock.unlock(); + } + else + { + BT_PROFILE( "parallelFor_mainThread" ); + // just run on main thread + body.forLoop( iBegin, iEnd ); + } + } +}; + + + +btITaskScheduler* createDefaultTaskScheduler() +{ + btTaskSchedulerDefault* ts = new btTaskSchedulerDefault(); + ts->init(); + return ts; +} + +#else // #if BT_THREADSAFE + +btITaskScheduler* createDefaultTaskScheduler() +{ + return NULL; +} + +#endif // #else // #if BT_THREADSAFE \ No newline at end of file diff --git a/examples/MultiThreading/btTaskScheduler.h b/examples/MultiThreading/btTaskScheduler.h new file mode 100644 index 000000000..a83b635eb --- /dev/null +++ b/examples/MultiThreading/btTaskScheduler.h @@ -0,0 +1,26 @@ +/* +Copyright (c) 2003-2014 Erwin Coumans http://bullet.googlecode.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + + + +#ifndef BT_TASK_SCHEDULER_H +#define BT_TASK_SCHEDULER_H + + +class btITaskScheduler; + +btITaskScheduler* createDefaultTaskScheduler(); + + +#endif // BT_TASK_SCHEDULER_H