parallel solver: various changes

- threading: adding btSequentialImpulseConstraintSolverMt
 - task scheduler: added parallelSum so that parallel solver can compute residuals
 - CommonRigidBodyMTBase: add slider for solver least squares residual and allow multithreading without needing OpenMP, TBB, or PPL
 - taskScheduler: don't wait for workers to sleep/signal at the end of each parallel block
 - parallel solver: convertContacts split into an allocContactConstraints and setupContactConstraints stage, the latter of which is done in parallel
 - parallel solver: rolling friction is now interleaved along with normal friction
 - parallel solver: batchified split impulse solving + some cleanup
 - parallel solver: sorting batches from largest to smallest
 - parallel solver: added parallel batch creation
 - parallel solver: added warmstartingWriteBackContacts func + other cleanup
 - task scheduler: truncate low bits to preserve determinism with parallelSum
 - parallel solver: reducing dynamic mem allocs and trying to parallelize more of the batch setup
 - parallel solver: parallelize updating constraint batch ids for merging
 - parallel solver: adding debug visualization
 - task scheduler: make TBB task scheduler parallelSum deterministic
 - parallel solver: split batch gen code into separate file; allow selection of batch gen method
 - task scheduler: add sleepWorkerThreadsHint() at end of simulation
 - parallel solver: added grain size per phase
 - task Scheduler: fix for strange threading issue; also no need for main thread to wait for workers to sleep
 - base constraint solver: break out joint setup into separate function for profiling/overriding
 - parallel solver: allow different batching method for contacts vs joints
 - base constraint solver: add convertJoint and convertBodies to make it possible to parallelize joint and body conversion
 - parallel solver: convert joints and bodies in parallel now
 - parallel solver: speed up batch creation with run-length encoding
 - parallel solver: batch gen: run-length expansion in parallel; collect constraint info in parallel
 - parallel solver: adding spatial grid batching method
 - parallel solver: enhancements to spatial grid batching
 - sequential solver: moving code for writing back into functions that derived classes can call
 - parallel solver: do write back of bodies and joints in parallel
 - parallel solver: removed all batching methods except for spatial grid (others were ineffective)
 - parallel solver: added 2D or 3D grid batching options; and a bit of cleanup
 - move btDefaultTaskScheduler into LinearMath project
This commit is contained in:
Lunkhound
2017-06-04 17:57:25 -07:00
parent 94bc897067
commit b8720f2161
25 changed files with 5236 additions and 767 deletions

View File

@@ -14,6 +14,9 @@ SET(LinearMath_SRCS
btSerializer64.cpp
btThreads.cpp
btVector3.cpp
TaskScheduler/btTaskScheduler.cpp
TaskScheduler/btThreadSupportPosix.cpp
TaskScheduler/btThreadSupportWin32.cpp
)
SET(LinearMath_HDRS
@@ -44,6 +47,7 @@ SET(LinearMath_HDRS
btTransform.h
btTransformUtil.h
btVector3.h
TaskScheduler/btThreadSupportInterface.h
)
ADD_LIBRARY(LinearMath ${LinearMath_SRCS} ${LinearMath_HDRS})

View File

@@ -0,0 +1,619 @@
#include "LinearMath/btMinMax.h"
#include "LinearMath/btAlignedObjectArray.h"
#include "LinearMath/btThreads.h"
#include "LinearMath/btQuickprof.h"
#include <stdio.h>
#include <algorithm>
typedef void( *btThreadFunc )( void* userPtr, void* lsMemory );
typedef void* ( *btThreadLocalStorageFunc )();
#if BT_THREADSAFE
#include "btThreadSupportInterface.h"
///
/// getNumHardwareThreads()
///
///
/// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
///
#if __cplusplus >= 201103L
#include <thread>
int getNumHardwareThreads()
{
return std::thread::hardware_concurrency();
}
#elif defined( _WIN32 )
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
int getNumHardwareThreads()
{
// caps out at 32
SYSTEM_INFO info;
GetSystemInfo( &info );
return info.dwNumberOfProcessors;
}
#else
int getNumHardwareThreads()
{
return 0; // don't know
}
#endif
void btSpinPause()
{
#if defined( _WIN32 )
YieldProcessor();
#endif
}
struct WorkerThreadStatus
{
enum Type
{
kInvalid,
kWaitingForWork,
kWorking,
kSleeping,
};
};
struct IJob
{
virtual void executeJob(int threadId) = 0;
};
class ParallelForJob : public IJob
{
const btIParallelForBody* mBody;
int mBegin;
int mEnd;
public:
ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
{
mBody = &body;
mBegin = iBegin;
mEnd = iEnd;
}
virtual void executeJob(int threadId) BT_OVERRIDE
{
BT_PROFILE( "executeJob" );
// call the functor body to do the work
mBody->forLoop( mBegin, mEnd );
}
};
static const int kCacheLineSize = 64;
struct ThreadLocalSum
{
btScalar mSum;
char mCachePadding[ kCacheLineSize - sizeof( btScalar ) ];
};
class ParallelSumJob : public IJob
{
const btIParallelSumBody* mBody;
ThreadLocalSum* mSumArray;
int mBegin;
int mEnd;
public:
ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalSum* sums )
{
mBody = &body;
mSumArray = sums;
mBegin = iBegin;
mEnd = iEnd;
}
virtual void executeJob( int threadId ) BT_OVERRIDE
{
BT_PROFILE( "executeJob" );
// call the functor body to do the work
btScalar val = mBody->sumLoop( mBegin, mEnd );
// by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
const float TRUNC_SCALE = float(1<<19);
val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
mSumArray[threadId].mSum += val;
}
};
struct JobContext
{
JobContext()
{
m_queueLock = NULL;
m_headIndex = 0;
m_tailIndex = 0;
m_workersShouldCheckQueue = false;
m_workersShouldSleep = false;
m_useSpinMutex = false;
m_coolDownTime = 1000; // 1000 microseconds
}
btCriticalSection* m_queueLock;
btSpinMutex m_mutex;
volatile bool m_workersShouldCheckQueue;
volatile bool m_workersShouldSleep;
btAlignedObjectArray<IJob*> m_jobQueue;
bool m_queueIsEmpty;
int m_tailIndex;
int m_headIndex;
bool m_useSpinMutex;
unsigned int m_coolDownTime;
btClock m_clock;
void lockQueue()
{
if ( m_useSpinMutex )
{
m_mutex.lock();
}
else
{
m_queueLock->lock();
}
}
void unlockQueue()
{
if ( m_useSpinMutex )
{
m_mutex.unlock();
}
else
{
m_queueLock->unlock();
}
}
void clearQueue()
{
lockQueue();
m_headIndex = 0;
m_tailIndex = 0;
m_queueIsEmpty = true;
unlockQueue();
m_jobQueue.resizeNoInitialize( 0 );
}
void submitJob( IJob* job )
{
m_jobQueue.push_back( job );
lockQueue();
m_tailIndex++;
m_queueIsEmpty = false;
unlockQueue();
}
IJob* consumeJob()
{
if ( m_queueIsEmpty )
{
// lock free path. even if this is taken erroneously it isn't harmful
return NULL;
}
IJob* job = NULL;
lockQueue();
if ( !m_queueIsEmpty )
{
job = m_jobQueue[ m_headIndex++ ];
if ( m_headIndex == m_tailIndex )
{
m_queueIsEmpty = true;
}
}
unlockQueue();
return job;
}
};
struct WorkerThreadLocalStorage
{
int threadId;
WorkerThreadStatus::Type status;
int numJobsFinished;
btSpinMutex m_mutex;
};
static void WorkerThreadFunc( void* userPtr, void* lsMemory )
{
BT_PROFILE( "WorkerThreadFunc" );
WorkerThreadLocalStorage* localStorage = (WorkerThreadLocalStorage*) lsMemory;
JobContext* jobContext = (JobContext*) userPtr;
bool shouldSleep = false;
while (! shouldSleep)
{
// do work
localStorage->m_mutex.lock();
while ( IJob* job = jobContext->consumeJob() )
{
localStorage->status = WorkerThreadStatus::kWorking;
job->executeJob( localStorage->threadId );
localStorage->numJobsFinished++;
}
localStorage->status = WorkerThreadStatus::kWaitingForWork;
localStorage->m_mutex.unlock();
unsigned long long int clockStart = jobContext->m_clock.getTimeMicroseconds();
// while queue is empty,
while (jobContext->m_queueIsEmpty)
{
// todo: spin wait a bit to avoid hammering the empty queue
btSpinPause();
if ( jobContext->m_workersShouldSleep )
{
shouldSleep = true;
break;
}
// if jobs are incoming,
if (jobContext->m_workersShouldCheckQueue)
{
clockStart = jobContext->m_clock.getTimeMicroseconds(); // reset clock
}
else
{
// if no jobs incoming and queue has been empty for the cooldown time, sleep
unsigned long long int timeElapsed = jobContext->m_clock.getTimeMicroseconds() - clockStart;
if (timeElapsed > jobContext->m_coolDownTime)
{
shouldSleep = true;
break;
}
}
}
}
// go idle
localStorage->m_mutex.lock();
localStorage->status = WorkerThreadStatus::kSleeping;
localStorage->m_mutex.unlock();
}
static void* WorkerThreadAllocFunc()
{
return new WorkerThreadLocalStorage;
}
class btTaskSchedulerDefault : public btITaskScheduler
{
JobContext m_jobContext;
btThreadSupportInterface* m_threadSupport;
btAlignedObjectArray<char> m_jobMem;
btAlignedObjectArray<char> m_threadLocalMem;
btSpinMutex m_antiNestingLock; // prevent nested parallel-for
int m_numThreads;
int m_numWorkerThreads;
int m_maxNumThreads;
int m_numJobs;
public:
btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
{
m_threadSupport = NULL;
}
virtual ~btTaskSchedulerDefault()
{
shutdown();
}
void init()
{
btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc, WorkerThreadAllocFunc );
m_threadSupport = btThreadSupportInterface::create( constructionInfo );
m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
m_numThreads = m_maxNumThreads;
m_jobContext.m_queueLock = m_threadSupport->createCriticalSection();
for ( int i = 0; i < m_numWorkerThreads; i++ )
{
WorkerThreadLocalStorage* storage = (WorkerThreadLocalStorage*) m_threadSupport->getThreadLocalMemory( i );
btAssert( storage );
storage->threadId = i + 1; // workers start at 1
storage->status = WorkerThreadStatus::kSleeping;
}
setWorkersActive( false ); // no work for them yet
setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
}
virtual void shutdown()
{
setWorkersActive( false );
waitForWorkersToSleep();
m_threadSupport->deleteCriticalSection( m_jobContext.m_queueLock );
m_jobContext.m_queueLock = NULL;
delete m_threadSupport;
m_threadSupport = NULL;
}
void setWorkersActive( bool active )
{
m_jobContext.m_workersShouldCheckQueue = active;
}
virtual int getMaxNumThreads() const BT_OVERRIDE
{
return m_maxNumThreads;
}
virtual int getNumThreads() const BT_OVERRIDE
{
return m_numThreads;
}
virtual void setNumThreads( int numThreads ) BT_OVERRIDE
{
m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
m_numWorkerThreads = m_numThreads - 1;
}
void waitJobs()
{
BT_PROFILE( "waitJobs" );
// have the main thread work until the job queue is empty
int numMainThreadJobsFinished = 0;
while ( IJob* job = m_jobContext.consumeJob() )
{
job->executeJob( 0 );
numMainThreadJobsFinished++;
}
// done with jobs for now, tell workers to rest
setWorkersActive( false );
unsigned long long int clockStart = m_jobContext.m_clock.getTimeMicroseconds();
// wait for workers to finish any jobs in progress
while ( true )
{
int numWorkerJobsFinished = 0;
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
{
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) );
storage->m_mutex.lock();
numWorkerJobsFinished += storage->numJobsFinished;
storage->m_mutex.unlock();
}
if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
{
break;
}
unsigned long long int timeElapsed = m_jobContext.m_clock.getTimeMicroseconds() - clockStart;
btAssert(timeElapsed < 1000);
if (timeElapsed > 100000)
{
break;
}
btSpinPause();
}
}
void wakeWorkers(int numWorkersToWake)
{
BT_PROFILE( "wakeWorkers" );
btAssert( m_jobContext.m_workersShouldCheckQueue );
int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
int numActiveWorkers = 0;
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
{
// note this count of active workers is not necessarily totally reliable, because a worker thread could be
// just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) );
if (storage->status != WorkerThreadStatus::kSleeping)
{
numActiveWorkers++;
}
}
for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
{
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) );
if (storage->status == WorkerThreadStatus::kSleeping)
{
m_threadSupport->runTask( iWorker, &m_jobContext );
numActiveWorkers++;
}
}
}
void waitForWorkersToSleep()
{
BT_PROFILE( "waitForWorkersToSleep" );
m_jobContext.m_workersShouldSleep = true;
m_threadSupport->waitForAllTasks();
for ( int i = 0; i < m_numWorkerThreads; i++ )
{
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory(i) );
btAssert( storage );
btAssert( storage->status == WorkerThreadStatus::kSleeping );
}
}
virtual void sleepWorkerThreadsHint() BT_OVERRIDE
{
BT_PROFILE( "sleepWorkerThreadsHint" );
// hint the task scheduler that we may not be using these threads for a little while
m_jobContext.m_workersShouldSleep = true;
}
void prepareWorkerThreads()
{
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
{
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) );
storage->m_mutex.lock();
storage->numJobsFinished = 0;
storage->m_mutex.unlock();
}
m_jobContext.m_workersShouldSleep = false;
setWorkersActive( true );
}
virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelFor_ThreadSupport" );
btAssert( iEnd >= iBegin );
btAssert( grainSize >= 1 );
int iterationCount = iEnd - iBegin;
if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
{
typedef ParallelForJob JobType;
int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
m_numJobs = jobCount;
btAssert( jobCount >= 2 ); // need more than one job for multithreading
int jobSize = sizeof( JobType );
int jobBufSize = jobSize * jobCount;
// make sure we have enough memory allocated to store jobs
if ( jobBufSize > m_jobMem.size() )
{
m_jobMem.resize( jobBufSize );
}
// make sure job queue is big enough
if ( jobCount > m_jobContext.m_jobQueue.capacity() )
{
m_jobContext.m_jobQueue.reserve( jobCount );
}
m_jobContext.clearQueue();
// prepare worker threads for incoming work
prepareWorkerThreads();
// submit all of the jobs
int iJob = 0;
JobType* jobs = reinterpret_cast<JobType*>( &m_jobMem[ 0 ] );
for ( int i = iBegin; i < iEnd; i += grainSize )
{
btAssert( iJob < jobCount );
int iE = btMin( i + grainSize, iEnd );
JobType& job = jobs[ iJob ];
new ( (void*) &job ) ParallelForJob( i, iE, body ); // placement new
m_jobContext.submitJob( &job );
iJob++;
}
wakeWorkers( jobCount - 1 );
// put the main thread to work on emptying the job queue and then wait for all workers to finish
waitJobs();
m_antiNestingLock.unlock();
}
else
{
BT_PROFILE( "parallelFor_mainThread" );
// just run on main thread
body.forLoop( iBegin, iEnd );
}
}
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelSum_ThreadSupport" );
btAssert( iEnd >= iBegin );
btAssert( grainSize >= 1 );
int iterationCount = iEnd - iBegin;
if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
{
typedef ParallelSumJob JobType;
int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
m_numJobs = jobCount;
btAssert( jobCount >= 2 ); // need more than one job for multithreading
int jobSize = sizeof( JobType );
int jobBufSize = jobSize * jobCount;
// make sure we have enough memory allocated to store jobs
if ( jobBufSize > m_jobMem.size() )
{
m_jobMem.resize( jobBufSize );
}
// make sure job queue is big enough
if ( jobCount > m_jobContext.m_jobQueue.capacity() )
{
m_jobContext.m_jobQueue.reserve( jobCount );
}
// make sure thread local area is big enough
int threadLocalSize = m_numThreads * sizeof( ThreadLocalSum );
if ( threadLocalSize > m_threadLocalMem.size() )
{
m_threadLocalMem.resize( threadLocalSize );
}
// initialize summation
ThreadLocalSum* threadLocalSum = reinterpret_cast<ThreadLocalSum*>( &m_threadLocalMem[ 0 ] );
for ( int iThread = 0; iThread < m_numThreads; ++iThread )
{
threadLocalSum[ iThread ].mSum = btScalar( 0 );
}
m_jobContext.clearQueue();
// prepare worker threads for incoming work
prepareWorkerThreads();
// submit all of the jobs
int iJob = 0;
JobType* jobs = reinterpret_cast<JobType*>( &m_jobMem[ 0 ] );
for ( int i = iBegin; i < iEnd; i += grainSize )
{
btAssert( iJob < jobCount );
int iE = btMin( i + grainSize, iEnd );
JobType& job = jobs[ iJob ];
new ( (void*) &job ) ParallelSumJob( i, iE, body, threadLocalSum ); // placement new
m_jobContext.submitJob( &job );
iJob++;
}
wakeWorkers( jobCount - 1 );
// put the main thread to work on emptying the job queue and then wait for all workers to finish
waitJobs();
m_antiNestingLock.unlock();
// add up all the thread sums
btScalar sum = btScalar(0);
for ( int iThread = 0; iThread < m_numThreads; ++iThread )
{
sum += threadLocalSum[ iThread ].mSum;
}
return sum;
}
else
{
BT_PROFILE( "parallelSum_mainThread" );
// just run on main thread
return body.sumLoop( iBegin, iEnd );
}
}
};
btITaskScheduler* btCreateDefaultTaskScheduler()
{
btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
ts->init();
return ts;
}
#else // #if BT_THREADSAFE
btITaskScheduler* btCreateDefaultTaskScheduler()
{
return NULL;
}
#endif // #else // #if BT_THREADSAFE

View File

@@ -0,0 +1,75 @@
/*
Bullet Continuous Collision Detection and Physics Library
Copyright (c) 2003-2018 Erwin Coumans http://bulletphysics.com
This software is provided 'as-is', without any express or implied warranty.
In no event will the authors be held liable for any damages arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it freely,
subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
*/
#ifndef BT_THREAD_SUPPORT_INTERFACE_H
#define BT_THREAD_SUPPORT_INTERFACE_H
class btCriticalSection
{
public:
btCriticalSection() {}
virtual ~btCriticalSection() {}
virtual void lock() = 0;
virtual void unlock() = 0;
};
class btThreadSupportInterface
{
public:
virtual ~btThreadSupportInterface() {}
virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1)
virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache
virtual void runTask( int threadIndex, void* userData ) = 0;
virtual void waitForAllTasks() = 0;
virtual btCriticalSection* createCriticalSection() = 0;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) = 0;
virtual void* getThreadLocalMemory( int taskId ) { return NULL; }
typedef void( *ThreadFunc )( void* userPtr, void* lsMemory );
typedef void* ( *MemorySetupFunc )( );
struct ConstructionInfo
{
ConstructionInfo( const char* uniqueName,
ThreadFunc userThreadFunc,
MemorySetupFunc lsMemoryFunc,
int threadStackSize = 65535
)
:m_uniqueName( uniqueName ),
m_userThreadFunc( userThreadFunc ),
m_lsMemoryFunc( lsMemoryFunc ),
m_threadStackSize( threadStackSize )
{
}
const char* m_uniqueName;
ThreadFunc m_userThreadFunc;
MemorySetupFunc m_lsMemoryFunc;
int m_threadStackSize;
};
static btThreadSupportInterface* create( const ConstructionInfo& info );
};
#endif //BT_THREAD_SUPPORT_INTERFACE_H

View File

@@ -0,0 +1,369 @@
/*
Bullet Continuous Collision Detection and Physics Library
Copyright (c) 2003-2018 Erwin Coumans http://bulletphysics.com
This software is provided 'as-is', without any express or implied warranty.
In no event will the authors be held liable for any damages arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it freely,
subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
*/
#if BT_THREADSAFE && !defined( _WIN32 )
#include "LinearMath/btScalar.h"
#include "LinearMath/btAlignedObjectArray.h"
#include "LinearMath/btThreads.h"
#include "LinearMath/btMinMax.h"
#include "btThreadSupportInterface.h"
#include <stdio.h>
#include <errno.h>
#include <unistd.h>
#ifndef _XOPEN_SOURCE
#define _XOPEN_SOURCE 600 //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html
#endif //_XOPEN_SOURCE
#include <pthread.h>
#include <semaphore.h>
#include <unistd.h> //for sysconf
///
/// getNumHardwareThreads()
///
///
/// https://stackoverflow.com/questions/150355/programmatically-find-the-number-of-cores-on-a-machine
///
#if __cplusplus >= 201103L
#include <thread>
int btGetNumHardwareThreads()
{
return std::thread::hardware_concurrency();
}
#else
int btGetNumHardwareThreads()
{
return sysconf( _SC_NPROCESSORS_ONLN );
}
#endif
// btThreadSupportPosix helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
class btThreadSupportPosix : public btThreadSupportInterface
{
public:
struct btThreadStatus
{
int m_taskId;
int m_commandId;
int m_status;
ThreadFunc m_userThreadFunc;
void* m_userPtr; //for taskDesc etc
void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc
pthread_t thread;
//each tread will wait until this signal to start its work
sem_t* startSemaphore;
// this is a copy of m_mainSemaphore,
//each tread will signal once it is finished with its work
sem_t* m_mainSemaphore;
unsigned long threadUsed;
};
private:
typedef unsigned long long UINT64;
btAlignedObjectArray<btThreadStatus> m_activeThreadStatus;
// m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work
sem_t* m_mainSemaphore;
int m_numThreads;
UINT64 m_startedThreadsMask;
void startThreads( const ConstructionInfo& threadInfo );
void stopThreads();
int waitForResponse();
public:
btThreadSupportPosix( const ConstructionInfo& threadConstructionInfo );
virtual ~btThreadSupportPosix();
virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
// TODO: return the number of logical processors sharing the first L3 cache
virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; }
virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE;
virtual void waitForAllTasks() BT_OVERRIDE;
virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE;
virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE
{
return m_activeThreadStatus[ taskId ].m_lsMemory;
}
};
#define checkPThreadFunction(returnValue) \
if(0 != returnValue) { \
printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
}
// The number of threads should be equal to the number of available cores
// Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
btThreadSupportPosix::btThreadSupportPosix( const ConstructionInfo& threadConstructionInfo )
{
startThreads( threadConstructionInfo );
}
// cleanup/shutdown Libspe2
btThreadSupportPosix::~btThreadSupportPosix()
{
stopThreads();
}
#if (defined (__APPLE__))
#define NAMED_SEMAPHORES
#endif
static sem_t* createSem( const char* baseName )
{
static int semCount = 0;
#ifdef NAMED_SEMAPHORES
/// Named semaphore begin
char name[ 32 ];
snprintf( name, 32, "/%8.s-%4.d-%4.4d", baseName, getpid(), semCount++ );
sem_t* tempSem = sem_open( name, O_CREAT, 0600, 0 );
if ( tempSem != reinterpret_cast<sem_t *>( SEM_FAILED ) )
{
// printf("Created \"%s\" Semaphore %p\n", name, tempSem);
}
else
{
//printf("Error creating Semaphore %d\n", errno);
exit( -1 );
}
/// Named semaphore end
#else
sem_t* tempSem = new sem_t;
checkPThreadFunction( sem_init( tempSem, 0, 0 ) );
#endif
return tempSem;
}
static void destroySem( sem_t* semaphore )
{
#ifdef NAMED_SEMAPHORES
checkPThreadFunction( sem_close( semaphore ) );
#else
checkPThreadFunction( sem_destroy( semaphore ) );
delete semaphore;
#endif
}
static void *threadFunction( void *argument )
{
btThreadSupportPosix::btThreadStatus* status = ( btThreadSupportPosix::btThreadStatus* )argument;
while ( 1 )
{
checkPThreadFunction( sem_wait( status->startSemaphore ) );
void* userPtr = status->m_userPtr;
if ( userPtr )
{
btAssert( status->m_status );
status->m_userThreadFunc( userPtr, status->m_lsMemory );
status->m_status = 2;
checkPThreadFunction( sem_post( status->m_mainSemaphore ) );
status->threadUsed++;
}
else
{
//exit Thread
status->m_status = 3;
checkPThreadFunction( sem_post( status->m_mainSemaphore ) );
printf( "Thread with taskId %i exiting\n", status->m_taskId );
break;
}
}
printf( "Thread TERMINATED\n" );
}
///send messages to SPUs
void btThreadSupportPosix::runTask( int threadIndex, void* userData )
{
///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished
btThreadStatus& threadStatus = m_activeThreadStatus[ threadIndex ];
btAssert( threadIndex >= 0 );
btAssert( threadIndex < m_activeThreadStatus.size() );
threadStatus.m_commandId = 1;
threadStatus.m_status = 1;
threadStatus.m_userPtr = userData;
m_startedThreadsMask |= UINT64( 1 ) << threadIndex;
// fire event to start new task
checkPThreadFunction( sem_post( threadStatus.startSemaphore ) );
}
///check for messages from SPUs
int btThreadSupportPosix::waitForResponse()
{
///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response
///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback'
btAssert( m_activeThreadStatus.size() );
// wait for any of the threads to finish
checkPThreadFunction( sem_wait( m_mainSemaphore ) );
// get at least one thread which has finished
size_t last = -1;
for ( size_t t = 0; t < size_t( m_activeThreadStatus.size() ); ++t )
{
if ( 2 == m_activeThreadStatus[ t ].m_status )
{
last = t;
break;
}
}
btThreadStatus& threadStatus = m_activeThreadStatus[ last ];
btAssert( threadStatus.m_status > 1 );
threadStatus.m_status = 0;
// need to find an active spu
btAssert( last >= 0 );
m_startedThreadsMask &= ~( UINT64( 1 ) << last );
return last;
}
void btThreadSupportPosix::waitForAllTasks()
{
while ( m_startedThreadsMask )
{
waitForResponse();
}
}
void btThreadSupportPosix::startThreads( const ConstructionInfo& threadConstructionInfo )
{
m_numThreads = btGetNumHardwareThreads() - 1; // main thread exists already
printf( "%s creating %i threads.\n", __FUNCTION__, m_numThreads );
m_activeThreadStatus.resize( m_numThreads );
m_startedThreadsMask = 0;
m_mainSemaphore = createSem( "main" );
//checkPThreadFunction(sem_wait(mainSemaphore));
for ( int i = 0; i < m_numThreads; i++ )
{
printf( "starting thread %d\n", i );
btThreadStatus& threadStatus = m_activeThreadStatus[ i ];
threadStatus.startSemaphore = createSem( "threadLocal" );
checkPThreadFunction( pthread_create( &threadStatus.thread, NULL, &threadFunction, (void*) &threadStatus ) );
threadStatus.m_userPtr = 0;
threadStatus.m_taskId = i;
threadStatus.m_commandId = 0;
threadStatus.m_status = 0;
threadStatus.m_mainSemaphore = m_mainSemaphore;
threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
threadStatus.threadUsed = 0;
printf( "started thread %d \n", i );
}
}
///tell the task scheduler we are done with the SPU tasks
void btThreadSupportPosix::stopThreads()
{
for ( size_t t = 0; t < size_t( m_activeThreadStatus.size() ); ++t )
{
btThreadStatus& threadStatus = m_activeThreadStatus[ t ];
printf( "%s: Thread %i used: %ld\n", __FUNCTION__, int( t ), threadStatus.threadUsed );
threadStatus.m_userPtr = 0;
checkPThreadFunction( sem_post( threadStatus.startSemaphore ) );
checkPThreadFunction( sem_wait( m_mainSemaphore ) );
printf( "destroy semaphore\n" );
destroySem( threadStatus.startSemaphore );
printf( "semaphore destroyed\n" );
checkPThreadFunction( pthread_join( threadStatus.thread, 0 ) );
}
printf( "destroy main semaphore\n" );
destroySem( m_mainSemaphore );
printf( "main semaphore destroyed\n" );
m_activeThreadStatus.clear();
}
class btCriticalSectionPosix : public btCriticalSection
{
pthread_mutex_t m_mutex;
public:
btCriticalSectionPosix()
{
pthread_mutex_init( &m_mutex, NULL );
}
virtual ~btCriticalSectionPosix()
{
pthread_mutex_destroy( &m_mutex );
}
virtual void lock()
{
pthread_mutex_lock( &m_mutex );
}
virtual void unlock()
{
pthread_mutex_unlock( &m_mutex );
}
};
btCriticalSection* btThreadSupportPosix::createCriticalSection()
{
return new btCriticalSectionPosix();
}
void btThreadSupportPosix::deleteCriticalSection( btCriticalSection* cs )
{
delete cs;
}
btThreadSupportInterface* btThreadSupportInterface::create( const ConstructionInfo& info )
{
return new btThreadSupportPosix( info );
}
#endif // BT_THREADSAFE && !defined( _WIN32 )

View File

@@ -0,0 +1,480 @@
/*
Bullet Continuous Collision Detection and Physics Library
Copyright (c) 2003-2018 Erwin Coumans http://bulletphysics.com
This software is provided 'as-is', without any express or implied warranty.
In no event will the authors be held liable for any damages arising from the use of this software.
Permission is granted to anyone to use this software for any purpose,
including commercial applications, and to alter it and redistribute it freely,
subject to the following restrictions:
1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution.
*/
#if defined( _WIN32 ) && BT_THREADSAFE
#include "LinearMath/btScalar.h"
#include "LinearMath/btMinMax.h"
#include "LinearMath/btAlignedObjectArray.h"
#include "LinearMath/btThreads.h"
#include "btThreadSupportInterface.h"
#include <windows.h>
#include <stdio.h>
struct btProcessorInfo
{
int numLogicalProcessors;
int numCores;
int numNumaNodes;
int numL1Cache;
int numL2Cache;
int numL3Cache;
int numPhysicalPackages;
static const int maxNumTeamMasks = 32;
int numTeamMasks;
UINT64 processorTeamMasks[ maxNumTeamMasks ];
};
UINT64 getProcessorTeamMask( const btProcessorInfo& procInfo, int procId )
{
UINT64 procMask = UINT64( 1 ) << procId;
for ( int i = 0; i < procInfo.numTeamMasks; ++i )
{
if ( procMask & procInfo.processorTeamMasks[ i ] )
{
return procInfo.processorTeamMasks[ i ];
}
}
return 0;
}
int getProcessorTeamIndex( const btProcessorInfo& procInfo, int procId )
{
UINT64 procMask = UINT64( 1 ) << procId;
for ( int i = 0; i < procInfo.numTeamMasks; ++i )
{
if ( procMask & procInfo.processorTeamMasks[ i ] )
{
return i;
}
}
return -1;
}
int countSetBits( ULONG64 bits )
{
int count = 0;
while ( bits )
{
if ( bits & 1 )
{
count++;
}
bits >>= 1;
}
return count;
}
typedef BOOL( WINAPI *Pfn_GetLogicalProcessorInformation )( PSYSTEM_LOGICAL_PROCESSOR_INFORMATION, PDWORD );
void getProcessorInformation( btProcessorInfo* procInfo )
{
memset( procInfo, 0, sizeof( *procInfo ) );
Pfn_GetLogicalProcessorInformation getLogicalProcInfo =
(Pfn_GetLogicalProcessorInformation) GetProcAddress( GetModuleHandle( TEXT( "kernel32" ) ), "GetLogicalProcessorInformation" );
if ( getLogicalProcInfo == NULL )
{
// no info
return;
}
PSYSTEM_LOGICAL_PROCESSOR_INFORMATION buf = NULL;
DWORD bufSize = 0;
while ( true )
{
if ( getLogicalProcInfo( buf, &bufSize ) )
{
break;
}
else
{
if ( GetLastError() == ERROR_INSUFFICIENT_BUFFER )
{
if ( buf )
{
free( buf );
}
buf = (PSYSTEM_LOGICAL_PROCESSOR_INFORMATION) malloc( bufSize );
}
}
}
int len = bufSize / sizeof( *buf );
for ( int i = 0; i < len; ++i )
{
PSYSTEM_LOGICAL_PROCESSOR_INFORMATION info = buf + i;
switch ( info->Relationship )
{
case RelationNumaNode:
procInfo->numNumaNodes++;
break;
case RelationProcessorCore:
procInfo->numCores++;
procInfo->numLogicalProcessors += countSetBits( info->ProcessorMask );
break;
case RelationCache:
if ( info->Cache.Level == 1 )
{
procInfo->numL1Cache++;
}
else if ( info->Cache.Level == 2 )
{
procInfo->numL2Cache++;
}
else if ( info->Cache.Level == 3 )
{
procInfo->numL3Cache++;
// processors that share L3 cache are considered to be on the same team
// because they can more easily work together on the same data.
// Large performance penalties will occur if 2 or more threads from different
// teams attempt to frequently read and modify the same cache lines.
//
// On the AMD Ryzen 7 CPU for example, the 8 cores on the CPU are split into
// 2 CCX units of 4 cores each. Each CCX has a separate L3 cache, so if both
// CCXs are operating on the same data, many cycles will be spent keeping the
// two caches coherent.
if ( procInfo->numTeamMasks < btProcessorInfo::maxNumTeamMasks )
{
procInfo->processorTeamMasks[ procInfo->numTeamMasks ] = info->ProcessorMask;
procInfo->numTeamMasks++;
}
}
break;
case RelationProcessorPackage:
procInfo->numPhysicalPackages++;
break;
}
}
free( buf );
}
///btThreadSupportWin32 helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
class btThreadSupportWin32 : public btThreadSupportInterface
{
public:
struct btThreadStatus
{
int m_taskId;
int m_commandId;
int m_status;
ThreadFunc m_userThreadFunc;
void* m_userPtr; //for taskDesc etc
void* m_lsMemory; //initialized using Win32LocalStoreMemorySetupFunc
void* m_threadHandle; //this one is calling 'Win32ThreadFunc'
void* m_eventStartHandle;
char m_eventStartHandleName[ 32 ];
void* m_eventCompleteHandle;
char m_eventCompleteHandleName[ 32 ];
};
private:
btAlignedObjectArray<btThreadStatus> m_activeThreadStatus;
btAlignedObjectArray<void*> m_completeHandles;
int m_numThreads;
DWORD_PTR m_startedThreadMask;
btProcessorInfo m_processorInfo;
void startThreads( const ConstructionInfo& threadInfo );
void stopThreads();
int waitForResponse();
public:
btThreadSupportWin32( const ConstructionInfo& threadConstructionInfo );
virtual ~btThreadSupportWin32();
virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); }
virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE;
virtual void waitForAllTasks() BT_OVERRIDE;
virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE
{
return m_activeThreadStatus[ taskId ].m_lsMemory;
}
virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE;
};
btThreadSupportWin32::btThreadSupportWin32( const ConstructionInfo & threadConstructionInfo )
{
startThreads( threadConstructionInfo );
}
btThreadSupportWin32::~btThreadSupportWin32()
{
stopThreads();
}
DWORD WINAPI win32threadStartFunc( LPVOID lpParam )
{
btThreadSupportWin32::btThreadStatus* status = ( btThreadSupportWin32::btThreadStatus* )lpParam;
while ( 1 )
{
WaitForSingleObject( status->m_eventStartHandle, INFINITE );
void* userPtr = status->m_userPtr;
if ( userPtr )
{
btAssert( status->m_status );
status->m_userThreadFunc( userPtr, status->m_lsMemory );
status->m_status = 2;
SetEvent( status->m_eventCompleteHandle );
}
else
{
//exit Thread
status->m_status = 3;
printf( "Thread with taskId %i with handle %p exiting\n", status->m_taskId, status->m_threadHandle );
SetEvent( status->m_eventCompleteHandle );
break;
}
}
printf( "Thread TERMINATED\n" );
return 0;
}
void btThreadSupportWin32::runTask( int threadIndex, void* userData )
{
btThreadStatus& threadStatus = m_activeThreadStatus[ threadIndex ];
btAssert( taskId >= 0 );
btAssert( int( taskId ) < m_activeThreadStatus.size() );
threadStatus.m_commandId = 1;
threadStatus.m_status = 1;
threadStatus.m_userPtr = userData;
m_startedThreadMask |= DWORD_PTR( 1 ) << threadIndex;
///fire event to start new task
SetEvent( threadStatus.m_eventStartHandle );
}
int btThreadSupportWin32::waitForResponse()
{
btAssert( m_activeThreadStatus.size() );
int last = -1;
DWORD res = WaitForMultipleObjects( m_completeHandles.size(), &m_completeHandles[ 0 ], FALSE, INFINITE );
btAssert( res != WAIT_FAILED );
last = res - WAIT_OBJECT_0;
btThreadStatus& threadStatus = m_activeThreadStatus[ last ];
btAssert( threadStatus.m_threadHandle );
btAssert( threadStatus.m_eventCompleteHandle );
//WaitForSingleObject(threadStatus.m_eventCompleteHandle, INFINITE);
btAssert( threadStatus.m_status > 1 );
threadStatus.m_status = 0;
///need to find an active spu
btAssert( last >= 0 );
m_startedThreadMask &= ~( DWORD_PTR( 1 ) << last );
return last;
}
void btThreadSupportWin32::waitForAllTasks()
{
while ( m_startedThreadMask )
{
waitForResponse();
}
}
void btThreadSupportWin32::startThreads( const ConstructionInfo& threadConstructionInfo )
{
static int uniqueId = 0;
uniqueId++;
btProcessorInfo& procInfo = m_processorInfo;
getProcessorInformation( &procInfo );
DWORD_PTR dwProcessAffinityMask = 0;
DWORD_PTR dwSystemAffinityMask = 0;
if ( !GetProcessAffinityMask( GetCurrentProcess(), &dwProcessAffinityMask, &dwSystemAffinityMask ) )
{
dwProcessAffinityMask = 0;
}
///The number of threads should be equal to the number of available cores - 1
m_numThreads = btMin(procInfo.numLogicalProcessors, int(BT_MAX_THREAD_COUNT)) - 1; // cap to max thread count (-1 because main thread already exists)
m_activeThreadStatus.resize( m_numThreads );
m_completeHandles.resize( m_numThreads );
m_startedThreadMask = 0;
// set main thread affinity
if ( DWORD_PTR mask = dwProcessAffinityMask & getProcessorTeamMask( procInfo, 0 ))
{
SetThreadAffinityMask( GetCurrentThread(), mask );
SetThreadIdealProcessor( GetCurrentThread(), 0 );
}
for ( int i = 0; i < m_numThreads; i++ )
{
printf( "starting thread %d\n", i );
btThreadStatus& threadStatus = m_activeThreadStatus[ i ];
LPSECURITY_ATTRIBUTES lpThreadAttributes = NULL;
SIZE_T dwStackSize = threadConstructionInfo.m_threadStackSize;
LPTHREAD_START_ROUTINE lpStartAddress = &win32threadStartFunc;
LPVOID lpParameter = &threadStatus;
DWORD dwCreationFlags = 0;
LPDWORD lpThreadId = 0;
threadStatus.m_userPtr = 0;
sprintf( threadStatus.m_eventStartHandleName, "es%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i );
threadStatus.m_eventStartHandle = CreateEventA( 0, false, false, threadStatus.m_eventStartHandleName );
sprintf( threadStatus.m_eventCompleteHandleName, "ec%.8s%d%d", threadConstructionInfo.m_uniqueName, uniqueId, i );
threadStatus.m_eventCompleteHandle = CreateEventA( 0, false, false, threadStatus.m_eventCompleteHandleName );
m_completeHandles[ i ] = threadStatus.m_eventCompleteHandle;
HANDLE handle = CreateThread( lpThreadAttributes, dwStackSize, lpStartAddress, lpParameter, dwCreationFlags, lpThreadId );
//SetThreadPriority( handle, THREAD_PRIORITY_HIGHEST );
// highest priority -- can cause erratic performance when numThreads > numCores
// we don't want worker threads to be higher priority than the main thread or the main thread could get
// totally shut out and unable to tell the workers to stop
//SetThreadPriority( handle, THREAD_PRIORITY_BELOW_NORMAL );
{
int processorId = i + 1; // leave processor 0 for main thread
DWORD_PTR teamMask = getProcessorTeamMask( procInfo, processorId );
if ( teamMask )
{
// bind each thread to only execute on processors of it's assigned team
// - for single-socket Intel x86 CPUs this has no effect (only a single, shared L3 cache so there is only 1 team)
// - for multi-socket Intel this will keep threads from migrating from one socket to another
// - for AMD Ryzen this will keep threads from migrating from one CCX to another
DWORD_PTR mask = teamMask & dwProcessAffinityMask;
if ( mask )
{
SetThreadAffinityMask( handle, mask );
}
}
SetThreadIdealProcessor( handle, processorId );
}
threadStatus.m_taskId = i;
threadStatus.m_commandId = 0;
threadStatus.m_status = 0;
threadStatus.m_threadHandle = handle;
threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
printf( "started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle );
}
}
///tell the task scheduler we are done with the SPU tasks
void btThreadSupportWin32::stopThreads()
{
for ( int i = 0; i < m_activeThreadStatus.size(); i++ )
{
btThreadStatus& threadStatus = m_activeThreadStatus[ i ];
if ( threadStatus.m_status > 0 )
{
WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE );
}
delete threadStatus.m_lsMemory;
threadStatus.m_userPtr = 0;
SetEvent( threadStatus.m_eventStartHandle );
WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE );
CloseHandle( threadStatus.m_eventCompleteHandle );
CloseHandle( threadStatus.m_eventStartHandle );
CloseHandle( threadStatus.m_threadHandle );
}
m_activeThreadStatus.clear();
m_completeHandles.clear();
}
class btWin32CriticalSection : public btCriticalSection
{
private:
CRITICAL_SECTION mCriticalSection;
public:
btWin32CriticalSection()
{
InitializeCriticalSection( &mCriticalSection );
}
~btWin32CriticalSection()
{
DeleteCriticalSection( &mCriticalSection );
}
void lock()
{
EnterCriticalSection( &mCriticalSection );
}
void unlock()
{
LeaveCriticalSection( &mCriticalSection );
}
};
btCriticalSection* btThreadSupportWin32::createCriticalSection()
{
unsigned char* mem = (unsigned char*) btAlignedAlloc( sizeof( btWin32CriticalSection ), 16 );
btWin32CriticalSection* cs = new( mem ) btWin32CriticalSection();
return cs;
}
void btThreadSupportWin32::deleteCriticalSection( btCriticalSection* criticalSection )
{
criticalSection->~btCriticalSection();
btAlignedFree( criticalSection );
}
btThreadSupportInterface* btThreadSupportInterface::create( const ConstructionInfo& info )
{
return new btThreadSupportWin32( info );
}
#endif //defined(_WIN32) && BT_THREADSAFE

View File

@@ -453,6 +453,33 @@ void btParallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBod
#endif// #if BT_THREADSAFE
}
btScalar btParallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body )
{
#if BT_THREADSAFE
#if BT_DETECT_BAD_THREAD_INDEX
if ( !btThreadsAreRunning() )
{
// clear out thread ids
for ( int i = 0; i < BT_MAX_THREAD_COUNT; ++i )
{
gDebugThreadIds[ i ] = kInvalidThreadId;
}
}
#endif // #if BT_DETECT_BAD_THREAD_INDEX
btAssert( gBtTaskScheduler != NULL ); // call btSetTaskScheduler() with a valid task scheduler first!
return gBtTaskScheduler->parallelSum( iBegin, iEnd, grainSize, body );
#else // #if BT_THREADSAFE
// non-parallel version of btParallelSum
btAssert( !"called btParallelFor in non-threadsafe build. enable BT_THREADSAFE" );
return body.sumLoop( iBegin, iEnd );
#endif //#else // #if BT_THREADSAFE
}
///
/// btTaskSchedulerSequential -- non-threaded implementation of task scheduler
@@ -470,6 +497,11 @@ public:
BT_PROFILE( "parallelFor_sequential" );
body.forLoop( iBegin, iEnd );
}
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelSum_sequential" );
return body.sumLoop( iBegin, iEnd );
}
};
@@ -514,11 +546,25 @@ public:
#pragma omp parallel for schedule( static, 1 )
for ( int i = iBegin; i < iEnd; i += grainSize )
{
BT_PROFILE( "OpenMP_job" );
BT_PROFILE( "OpenMP_forJob" );
body.forLoop( i, ( std::min )( i + grainSize, iEnd ) );
}
btPopThreadsAreRunning();
}
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelFor_OpenMP" );
btPushThreadsAreRunning();
btScalar sum = btScalar( 0 );
#pragma omp parallel for schedule( static, 1 ) reduction(+:sum)
for ( int i = iBegin; i < iEnd; i += grainSize )
{
BT_PROFILE( "OpenMP_sumJob" );
sum += body.sumLoop( i, ( std::min )( i + grainSize, iEnd ) );
}
btPopThreadsAreRunning();
return sum;
}
};
#endif // #if BT_USE_OPENMP && BT_THREADSAFE
@@ -571,22 +617,21 @@ public:
btResetThreadIndexCounter();
}
}
struct BodyAdapter
struct ForBodyAdapter
{
const btIParallelForBody* mBody;
ForBodyAdapter( const btIParallelForBody* body ) : mBody( body ) {}
void operator()( const tbb::blocked_range<int>& range ) const
{
BT_PROFILE( "TBB_job" );
BT_PROFILE( "TBB_forJob" );
mBody->forLoop( range.begin(), range.end() );
}
};
virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelFor_TBB" );
// TBB dispatch
BodyAdapter tbbBody;
tbbBody.mBody = &body;
ForBodyAdapter tbbBody( &body );
btPushThreadsAreRunning();
tbb::parallel_for( tbb::blocked_range<int>( iBegin, iEnd, grainSize ),
tbbBody,
@@ -594,6 +639,29 @@ public:
);
btPopThreadsAreRunning();
}
struct SumBodyAdapter
{
const btIParallelSumBody* mBody;
btScalar mSum;
SumBodyAdapter( const btIParallelSumBody* body ) : mBody( body ), mSum( btScalar( 0 ) ) {}
SumBodyAdapter( const SumBodyAdapter& src, tbb::split ) : mBody( src.mBody ), mSum( btScalar( 0 ) ) {}
void join( const SumBodyAdapter& src ) { mSum += src.mSum; }
void operator()( const tbb::blocked_range<int>& range )
{
BT_PROFILE( "TBB_sumJob" );
mSum += mBody->sumLoop( range.begin(), range.end() );
}
};
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelSum_TBB" );
SumBodyAdapter tbbBody( &body );
btPushThreadsAreRunning();
tbb::parallel_deterministic_reduce( tbb::blocked_range<int>( iBegin, iEnd, grainSize ), tbbBody );
btPopThreadsAreRunning();
return tbbBody.mSum;
}
};
#endif // #if BT_USE_TBB && BT_THREADSAFE
@@ -605,6 +673,7 @@ public:
class btTaskSchedulerPPL : public btITaskScheduler
{
int m_numThreads;
concurrency::combinable<btScalar> m_sum; // for parallelSum
public:
btTaskSchedulerPPL() : btITaskScheduler( "PPL" )
{
@@ -644,15 +713,16 @@ public:
btResetThreadIndexCounter();
}
}
struct BodyAdapter
struct ForBodyAdapter
{
const btIParallelForBody* mBody;
int mGrainSize;
int mIndexEnd;
ForBodyAdapter( const btIParallelForBody* body, int grainSize, int end ) : mBody( body ), mGrainSize( grainSize ), mIndexEnd( end ) {}
void operator()( int i ) const
{
BT_PROFILE( "PPL_job" );
BT_PROFILE( "PPL_forJob" );
mBody->forLoop( i, ( std::min )( i + mGrainSize, mIndexEnd ) );
}
};
@@ -660,10 +730,7 @@ public:
{
BT_PROFILE( "parallelFor_PPL" );
// PPL dispatch
BodyAdapter pplBody;
pplBody.mBody = &body;
pplBody.mGrainSize = grainSize;
pplBody.mIndexEnd = iEnd;
ForBodyAdapter pplBody( &body, grainSize, iEnd );
btPushThreadsAreRunning();
// note: MSVC 2010 doesn't support partitioner args, so avoid them
concurrency::parallel_for( iBegin,
@@ -673,6 +740,36 @@ public:
);
btPopThreadsAreRunning();
}
struct SumBodyAdapter
{
const btIParallelSumBody* mBody;
concurrency::combinable<btScalar>* mSum;
int mGrainSize;
int mIndexEnd;
SumBodyAdapter( const btIParallelSumBody* body, concurrency::combinable<btScalar>* sum, int grainSize, int end ) : mBody( body ), mSum(sum), mGrainSize( grainSize ), mIndexEnd( end ) {}
void operator()( int i ) const
{
BT_PROFILE( "PPL_sumJob" );
mSum->local() += mBody->sumLoop( i, ( std::min )( i + mGrainSize, mIndexEnd ) );
}
};
static btScalar sumFunc( btScalar a, btScalar b ) { return a + b; }
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
{
BT_PROFILE( "parallelSum_PPL" );
m_sum.clear();
SumBodyAdapter pplBody( &body, &m_sum, grainSize, iEnd );
btPushThreadsAreRunning();
// note: MSVC 2010 doesn't support partitioner args, so avoid them
concurrency::parallel_for( iBegin,
iEnd,
grainSize,
pplBody
);
btPopThreadsAreRunning();
return m_sum.combine( sumFunc );
}
};
#endif // #if BT_USE_PPL && BT_THREADSAFE

View File

@@ -107,6 +107,17 @@ public:
virtual void forLoop( int iBegin, int iEnd ) const = 0;
};
//
// btIParallelSumBody -- subclass this to express work that can be done in parallel
// and produces a sum over all loop elements
//
class btIParallelSumBody
{
public:
virtual ~btIParallelSumBody() {}
virtual btScalar sumLoop( int iBegin, int iEnd ) const = 0;
};
//
// btITaskScheduler -- subclass this to implement a task scheduler that can dispatch work to
// worker threads
@@ -122,6 +133,8 @@ public:
virtual int getNumThreads() const = 0;
virtual void setNumThreads( int numThreads ) = 0;
virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) = 0;
virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) = 0;
virtual void sleepWorkerThreadsHint() {} // hint the task scheduler that we may not be using these threads for a little while
// internal use only
virtual void activate();
@@ -143,6 +156,9 @@ btITaskScheduler* btGetTaskScheduler();
// get non-threaded task scheduler (always available)
btITaskScheduler* btGetSequentialTaskScheduler();
// create a default task scheduler (Win32 or pthreads based)
btITaskScheduler* btCreateDefaultTaskScheduler();
// get OpenMP task scheduler (if available, otherwise returns null)
btITaskScheduler* btGetOpenMPTaskScheduler();
@@ -156,5 +172,9 @@ btITaskScheduler* btGetPPLTaskScheduler();
// (iterations may be done out of order, so no dependencies are allowed)
void btParallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body );
// btParallelSum -- call this to dispatch work like a for-loop, returns the sum of all iterations
// (iterations may be done out of order, so no dependencies are allowed)
btScalar btParallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body );
#endif