task scheduler: add multiple job queues to improve performance when there are many threads

This commit is contained in:
Lunkhound
2018-03-16 16:38:11 -07:00
parent 04e0d57dc1
commit bdc3c2bafb
4 changed files with 386 additions and 205 deletions

View File

@@ -7,16 +7,11 @@
#include <algorithm> #include <algorithm>
typedef void( *btThreadFunc )( void* userPtr, void* lsMemory );
typedef void* ( *btThreadLocalStorageFunc )();
#if BT_THREADSAFE #if BT_THREADSAFE
#include "btThreadSupportInterface.h" #include "btThreadSupportInterface.h"
#if defined( _WIN32 ) #if defined( _WIN32 )
#define WIN32_LEAN_AND_MEAN #define WIN32_LEAN_AND_MEAN
@@ -26,6 +21,9 @@ typedef void* ( *btThreadLocalStorageFunc )();
#endif #endif
typedef unsigned long long btU64;
static const int kCacheLineSize = 64;
void btSpinPause() void btSpinPause()
{ {
#if defined( _WIN32 ) #if defined( _WIN32 )
@@ -46,6 +44,62 @@ struct WorkerThreadStatus
}; };
ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives
{
static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
// directives for all worker threads packed into a single cacheline
char m_threadDirs[kMaxThreadCount];
public:
enum Type
{
kInvalid,
kGoToSleep, // go to sleep
kStayAwakeButIdle, // wait for not checking job queue
kScanForJobs, // actively scan job queue for jobs
};
WorkerThreadDirectives()
{
for ( int i = 0; i < kMaxThreadCount; ++i )
{
m_threadDirs[ i ] = 0;
}
}
Type getDirective(int threadId)
{
btAssert(threadId < kMaxThreadCount);
return static_cast<Type>(m_threadDirs[threadId]);
}
void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
{
btAssert( threadBegin < threadEnd );
btAssert( threadEnd <= kMaxThreadCount );
char dirChar = static_cast<char>(dir);
for ( int i = threadBegin; i < threadEnd; ++i )
{
m_threadDirs[ i ] = dirChar;
}
}
};
class JobQueue;
ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage
{
int m_threadId;
WorkerThreadStatus::Type m_status;
int m_numJobsFinished;
btSpinMutex m_mutex;
btScalar m_sumResult;
WorkerThreadDirectives * m_directive;
JobQueue* m_queue;
btClock* m_clock;
unsigned int m_cooldownTime;
};
struct IJob struct IJob
{ {
virtual void executeJob(int threadId) = 0; virtual void executeJob(int threadId) = 0;
@@ -53,88 +107,152 @@ struct IJob
class ParallelForJob : public IJob class ParallelForJob : public IJob
{ {
const btIParallelForBody* mBody; const btIParallelForBody* m_body;
int mBegin; int m_begin;
int mEnd; int m_end;
public: public:
ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body ) ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
{ {
mBody = &body; m_body = &body;
mBegin = iBegin; m_begin = iBegin;
mEnd = iEnd; m_end = iEnd;
} }
virtual void executeJob(int threadId) BT_OVERRIDE virtual void executeJob(int threadId) BT_OVERRIDE
{ {
BT_PROFILE( "executeJob" ); BT_PROFILE( "executeJob" );
// call the functor body to do the work // call the functor body to do the work
mBody->forLoop( mBegin, mEnd ); m_body->forLoop( m_begin, m_end );
} }
}; };
static const int kCacheLineSize = 64;
struct ThreadLocalSum
{
btScalar mSum;
char mCachePadding[ kCacheLineSize - sizeof( btScalar ) ];
};
class ParallelSumJob : public IJob class ParallelSumJob : public IJob
{ {
const btIParallelSumBody* mBody; const btIParallelSumBody* m_body;
ThreadLocalSum* mSumArray; ThreadLocalStorage* m_threadLocalStoreArray;
int mBegin; int m_begin;
int mEnd; int m_end;
public: public:
ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalSum* sums ) ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls )
{ {
mBody = &body; m_body = &body;
mSumArray = sums; m_threadLocalStoreArray = tls;
mBegin = iBegin; m_begin = iBegin;
mEnd = iEnd; m_end = iEnd;
} }
virtual void executeJob( int threadId ) BT_OVERRIDE virtual void executeJob( int threadId ) BT_OVERRIDE
{ {
BT_PROFILE( "executeJob" ); BT_PROFILE( "executeJob" );
// call the functor body to do the work // call the functor body to do the work
btScalar val = mBody->sumLoop( mBegin, mEnd ); btScalar val = m_body->sumLoop( m_begin, m_end );
#if BT_PARALLEL_SUM_DETERMINISTISM
// by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision) // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
const float TRUNC_SCALE = float(1<<19); const float TRUNC_SCALE = float(1<<19);
val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
mSumArray[threadId].mSum += val; #endif
m_threadLocalStoreArray[threadId].m_sumResult += val;
} }
}; };
struct JobContext ATTRIBUTE_ALIGNED64(class) JobQueue
{ {
JobContext() btThreadSupportInterface* m_threadSupport;
{
m_queueLock = NULL;
m_headIndex = 0;
m_tailIndex = 0;
m_workersShouldCheckQueue = false;
m_workersShouldSleep = false;
m_useSpinMutex = false;
m_coolDownTime = 1000; // 1000 microseconds
}
btCriticalSection* m_queueLock; btCriticalSection* m_queueLock;
btSpinMutex m_mutex; btSpinMutex m_mutex;
volatile bool m_workersShouldCheckQueue;
volatile bool m_workersShouldSleep;
btAlignedObjectArray<IJob*> m_jobQueue; btAlignedObjectArray<IJob*> m_jobQueue;
char* m_jobMem;
int m_jobMemSize;
bool m_queueIsEmpty; bool m_queueIsEmpty;
int m_tailIndex; int m_tailIndex;
int m_headIndex; int m_headIndex;
int m_allocSize;
bool m_useSpinMutex; bool m_useSpinMutex;
unsigned int m_coolDownTime; btAlignedObjectArray<JobQueue*> m_neighborContexts;
btClock m_clock; char m_cachePadding[kCacheLineSize]; // prevent false sharing
void freeJobMem()
{
if ( m_jobMem )
{
// free old
btAlignedFree(m_jobMem);
m_jobMem = NULL;
}
}
void resizeJobMem(int newSize)
{
if (newSize > m_jobMemSize)
{
freeJobMem();
m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
m_jobMemSize = newSize;
}
}
public:
JobQueue()
{
m_jobMem = NULL;
m_jobMemSize = 0;
m_threadSupport = NULL;
m_queueLock = NULL;
m_headIndex = 0;
m_tailIndex = 0;
m_useSpinMutex = false;
}
~JobQueue()
{
freeJobMem();
if (m_queueLock && m_threadSupport)
{
m_threadSupport->deleteCriticalSection(m_queueLock);
m_queueLock = NULL;
}
}
void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray)
{
m_threadSupport = threadSup;
if (threadSup)
{
m_queueLock = m_threadSupport->createCriticalSection();
}
setupJobStealing(contextArray, contextArray->size());
}
void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts)
{
btAlignedObjectArray<JobQueue>& contexts = *contextArray;
int selfIndex = 0;
for (int i = 0; i < contexts.size(); ++i)
{
if ( this == &contexts[ i ] )
{
selfIndex = i;
break;
}
}
int numNeighbors = btMin(2, contexts.size() - 1);
int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]);
m_neighborContexts.reserve( numNeighbors );
m_neighborContexts.resizeNoInitialize(0);
for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
{
int neighborIndex = selfIndex + neighborOffsets[i];
if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
{
m_neighborContexts.push_back( &contexts[ neighborIndex ] );
}
}
}
bool isQueueEmpty() const {return m_queueIsEmpty;}
void lockQueue() void lockQueue()
{ {
if ( m_useSpinMutex ) if ( m_useSpinMutex )
@@ -157,24 +275,44 @@ struct JobContext
m_queueLock->unlock(); m_queueLock->unlock();
} }
} }
void clearQueue() void clearQueue(int jobCount, int jobSize)
{ {
lockQueue(); lockQueue();
m_headIndex = 0; m_headIndex = 0;
m_tailIndex = 0; m_tailIndex = 0;
m_allocSize = 0;
m_queueIsEmpty = true; m_queueIsEmpty = true;
int jobBufSize = jobSize * jobCount;
// make sure we have enough memory allocated to store jobs
if ( jobBufSize > m_jobMemSize )
{
resizeJobMem( jobBufSize );
}
// make sure job queue is big enough
if ( jobCount > m_jobQueue.capacity() )
{
m_jobQueue.reserve( jobCount );
}
unlockQueue(); unlockQueue();
m_jobQueue.resizeNoInitialize( 0 ); m_jobQueue.resizeNoInitialize( 0 );
} }
void* allocJobMem(int jobSize)
{
btAssert(m_jobMemSize >= (m_allocSize + jobSize));
void* jobMem = &m_jobMem[m_allocSize];
m_allocSize += jobSize;
return jobMem;
}
void submitJob( IJob* job ) void submitJob( IJob* job )
{ {
btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
m_jobQueue.push_back( job ); m_jobQueue.push_back( job );
lockQueue(); lockQueue();
m_tailIndex++; m_tailIndex++;
m_queueIsEmpty = false; m_queueIsEmpty = false;
unlockQueue(); unlockQueue();
} }
IJob* consumeJob() IJob* consumeJobFromOwnQueue()
{ {
if ( m_queueIsEmpty ) if ( m_queueIsEmpty )
{ {
@@ -186,6 +324,7 @@ struct JobContext
if ( !m_queueIsEmpty ) if ( !m_queueIsEmpty )
{ {
job = m_jobQueue[ m_headIndex++ ]; job = m_jobQueue[ m_headIndex++ ];
btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
if ( m_headIndex == m_tailIndex ) if ( m_headIndex == m_tailIndex )
{ {
m_queueIsEmpty = true; m_queueIsEmpty = true;
@@ -194,58 +333,78 @@ struct JobContext
unlockQueue(); unlockQueue();
return job; return job;
} }
IJob* consumeJob()
{
if (IJob* job = consumeJobFromOwnQueue())
{
return job;
}
// own queue is empty, try to steal from neighbor
for (int i = 0; i < m_neighborContexts.size(); ++i)
{
JobQueue* otherContext = m_neighborContexts[ i ];
if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
{
return job;
}
}
return NULL;
}
}; };
struct WorkerThreadLocalStorage static void WorkerThreadFunc( void* userPtr )
{
int threadId;
WorkerThreadStatus::Type status;
int numJobsFinished;
btSpinMutex m_mutex;
};
static void WorkerThreadFunc( void* userPtr, void* lsMemory )
{ {
BT_PROFILE( "WorkerThreadFunc" ); BT_PROFILE( "WorkerThreadFunc" );
WorkerThreadLocalStorage* localStorage = (WorkerThreadLocalStorage*) lsMemory; ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
JobContext* jobContext = (JobContext*) userPtr; JobQueue* jobQueue = localStorage->m_queue;
bool shouldSleep = false; bool shouldSleep = false;
int threadId = localStorage->m_threadId;
while (! shouldSleep) while (! shouldSleep)
{ {
// do work // do work
localStorage->m_mutex.lock(); localStorage->m_mutex.lock();
while ( IJob* job = jobContext->consumeJob() ) while ( IJob* job = jobQueue->consumeJob() )
{ {
localStorage->status = WorkerThreadStatus::kWorking; localStorage->m_status = WorkerThreadStatus::kWorking;
job->executeJob( localStorage->threadId ); job->executeJob( threadId );
localStorage->numJobsFinished++; localStorage->m_numJobsFinished++;
} }
localStorage->status = WorkerThreadStatus::kWaitingForWork; localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
localStorage->m_mutex.unlock(); localStorage->m_mutex.unlock();
unsigned long long int clockStart = jobContext->m_clock.getTimeMicroseconds(); btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
// while queue is empty, // while queue is empty,
while (jobContext->m_queueIsEmpty) while (jobQueue->isQueueEmpty())
{ {
// todo: spin wait a bit to avoid hammering the empty queue // todo: spin wait a bit to avoid hammering the empty queue
btSpinPause(); btSpinPause();
if ( jobContext->m_workersShouldSleep ) if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
{ {
shouldSleep = true; shouldSleep = true;
break; break;
} }
// if jobs are incoming, // if jobs are incoming,
if (jobContext->m_workersShouldCheckQueue) if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
{ {
clockStart = jobContext->m_clock.getTimeMicroseconds(); // reset clock clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
} }
else else
{ {
for ( int i = 0; i < 50; ++i )
{
btSpinPause();
btSpinPause();
btSpinPause();
btSpinPause();
if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
{
break;
}
}
// if no jobs incoming and queue has been empty for the cooldown time, sleep // if no jobs incoming and queue has been empty for the cooldown time, sleep
unsigned long long int timeElapsed = jobContext->m_clock.getTimeMicroseconds() - clockStart; btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
if (timeElapsed > jobContext->m_coolDownTime) if (timeElapsed > localStorage->m_cooldownTime)
{ {
shouldSleep = true; shouldSleep = true;
break; break;
@@ -254,77 +413,107 @@ static void WorkerThreadFunc( void* userPtr, void* lsMemory )
} }
} }
// go idle // go sleep
localStorage->m_mutex.lock(); localStorage->m_mutex.lock();
localStorage->status = WorkerThreadStatus::kSleeping; localStorage->m_status = WorkerThreadStatus::kSleeping;
localStorage->m_mutex.unlock(); localStorage->m_mutex.unlock();
} }
static void* WorkerThreadAllocFunc()
{
return new WorkerThreadLocalStorage;
}
class btTaskSchedulerDefault : public btITaskScheduler class btTaskSchedulerDefault : public btITaskScheduler
{ {
JobContext m_jobContext;
btThreadSupportInterface* m_threadSupport; btThreadSupportInterface* m_threadSupport;
btAlignedObjectArray<char> m_jobMem; WorkerThreadDirectives* m_workerDirective;
btAlignedObjectArray<char> m_threadLocalMem; btAlignedObjectArray<JobQueue> m_jobQueues;
btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
btSpinMutex m_antiNestingLock; // prevent nested parallel-for btSpinMutex m_antiNestingLock; // prevent nested parallel-for
btClock m_clock;
int m_numThreads; int m_numThreads;
int m_numWorkerThreads; int m_numWorkerThreads;
int m_numActiveJobQueues;
int m_maxNumThreads; int m_maxNumThreads;
int m_numJobs; int m_numJobs;
static const int kFirstWorkerThreadId = 1;
public: public:
btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
{ {
m_threadSupport = NULL; m_threadSupport = NULL;
m_workerDirective = NULL;
} }
virtual ~btTaskSchedulerDefault() virtual ~btTaskSchedulerDefault()
{ {
shutdown(); waitForWorkersToSleep();
if (m_threadSupport)
{
delete m_threadSupport;
m_threadSupport = NULL;
}
if (m_workerDirective)
{
btAlignedFree(m_workerDirective);
m_workerDirective = NULL;
}
} }
void init() void init()
{ {
btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc, WorkerThreadAllocFunc ); btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc );
m_threadSupport = btThreadSupportInterface::create( constructionInfo ); m_threadSupport = btThreadSupportInterface::create( constructionInfo );
m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
m_numWorkerThreads = m_threadSupport->getNumWorkerThreads(); m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1; m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
m_numThreads = m_maxNumThreads; m_numThreads = m_maxNumThreads;
m_jobContext.m_queueLock = m_threadSupport->createCriticalSection(); // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
for ( int i = 0; i < m_numWorkerThreads; i++ ) int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
m_jobQueues.resize(numJobQueues);
m_numActiveJobQueues = numJobQueues;
for ( int i = 0; i < m_jobQueues.size(); ++i )
{ {
WorkerThreadLocalStorage* storage = (WorkerThreadLocalStorage*) m_threadSupport->getThreadLocalMemory( i ); m_jobQueues[i].init( m_threadSupport, &m_jobQueues );
btAssert( storage );
storage->threadId = i + 1; // workers start at 1
storage->status = WorkerThreadStatus::kSleeping;
} }
setWorkersActive( false ); // no work for them yet m_perThreadJobQueues.resize(m_numThreads);
for ( int i = 0; i < m_numThreads; i++ )
{
JobQueue* jq = NULL;
// only worker threads get a job queue
if (i > 0)
{
if (numThreadsPerQueue == 1)
{
// one queue per worker thread
jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
}
else
{
// 2 threads share each queue
jq = &m_jobQueues[ i / numThreadsPerQueue ];
}
}
m_perThreadJobQueues[i] = jq;
}
m_threadLocalStorage.resize(m_numThreads);
for ( int i = 0; i < m_numThreads; i++ )
{
ThreadLocalStorage& storage = m_threadLocalStorage[i];
storage.m_threadId = i;
storage.m_directive = m_workerDirective;
storage.m_status = WorkerThreadStatus::kSleeping;
storage.m_cooldownTime = 1000; // 1000 microseconds, threads go to sleep after this long if they have nothing to do
storage.m_clock = &m_clock;
storage.m_queue = m_perThreadJobQueues[i];
}
setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet
setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() ); setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
} }
virtual void shutdown() void setWorkerDirectives(WorkerThreadDirectives::Type dir)
{ {
setWorkersActive( false ); m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
waitForWorkersToSleep();
m_threadSupport->deleteCriticalSection( m_jobContext.m_queueLock );
m_jobContext.m_queueLock = NULL;
delete m_threadSupport;
m_threadSupport = NULL;
}
void setWorkersActive( bool active )
{
m_jobContext.m_workersShouldCheckQueue = active;
} }
virtual int getMaxNumThreads() const BT_OVERRIDE virtual int getMaxNumThreads() const BT_OVERRIDE
@@ -341,38 +530,56 @@ public:
{ {
m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 ); m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
m_numWorkerThreads = m_numThreads - 1; m_numWorkerThreads = m_numThreads - 1;
m_numActiveJobQueues = 0;
// if there is at least 1 worker,
if ( m_numWorkerThreads > 0 )
{
// re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
m_numActiveJobQueues = iLastActiveContext + 1;
for ( int i = 0; i < m_jobQueues.size(); ++i )
{
m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
}
}
m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
} }
void waitJobs() void waitJobs()
{ {
BT_PROFILE( "waitJobs" ); BT_PROFILE( "waitJobs" );
// have the main thread work until the job queue is empty // have the main thread work until the job queues are empty
int numMainThreadJobsFinished = 0; int numMainThreadJobsFinished = 0;
while ( IJob* job = m_jobContext.consumeJob() ) for ( int i = 0; i < m_numActiveJobQueues; ++i )
{ {
job->executeJob( 0 ); while ( IJob* job = m_jobQueues[i].consumeJob() )
numMainThreadJobsFinished++; {
job->executeJob( 0 );
numMainThreadJobsFinished++;
}
} }
// done with jobs for now, tell workers to rest
setWorkersActive( false );
unsigned long long int clockStart = m_jobContext.m_clock.getTimeMicroseconds(); // done with jobs for now, tell workers to rest (but not sleep)
setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
btU64 clockStart = m_clock.getTimeMicroseconds();
// wait for workers to finish any jobs in progress // wait for workers to finish any jobs in progress
while ( true ) while ( true )
{ {
int numWorkerJobsFinished = 0; int numWorkerJobsFinished = 0;
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
{ {
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) ); ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
storage->m_mutex.lock(); storage->m_mutex.lock();
numWorkerJobsFinished += storage->numJobsFinished; numWorkerJobsFinished += storage->m_numJobsFinished;
storage->m_mutex.unlock(); storage->m_mutex.unlock();
} }
if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs) if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
{ {
break; break;
} }
unsigned long long int timeElapsed = m_jobContext.m_clock.getTimeMicroseconds() - clockStart; btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
btAssert(timeElapsed < 1000); btAssert(timeElapsed < 1000);
if (timeElapsed > 100000) if (timeElapsed > 100000)
{ {
@@ -385,25 +592,25 @@ public:
void wakeWorkers(int numWorkersToWake) void wakeWorkers(int numWorkersToWake)
{ {
BT_PROFILE( "wakeWorkers" ); BT_PROFILE( "wakeWorkers" );
btAssert( m_jobContext.m_workersShouldCheckQueue ); btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads); int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
int numActiveWorkers = 0; int numActiveWorkers = 0;
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
{ {
// note this count of active workers is not necessarily totally reliable, because a worker thread could be // note this count of active workers is not necessarily totally reliable, because a worker thread could be
// just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare. // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) ); ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
if (storage->status != WorkerThreadStatus::kSleeping) if (storage.m_status != WorkerThreadStatus::kSleeping)
{ {
numActiveWorkers++; numActiveWorkers++;
} }
} }
for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker ) for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
{ {
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) ); ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
if (storage->status == WorkerThreadStatus::kSleeping) if (storage.m_status == WorkerThreadStatus::kSleeping)
{ {
m_threadSupport->runTask( iWorker, &m_jobContext ); m_threadSupport->runTask( iWorker, &storage );
numActiveWorkers++; numActiveWorkers++;
} }
} }
@@ -412,13 +619,12 @@ public:
void waitForWorkersToSleep() void waitForWorkersToSleep()
{ {
BT_PROFILE( "waitForWorkersToSleep" ); BT_PROFILE( "waitForWorkersToSleep" );
m_jobContext.m_workersShouldSleep = true; setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
m_threadSupport->waitForAllTasks(); m_threadSupport->waitForAllTasks();
for ( int i = 0; i < m_numWorkerThreads; i++ ) for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
{ {
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory(i) ); ThreadLocalStorage& storage = m_threadLocalStorage[i];
btAssert( storage ); btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
btAssert( storage->status == WorkerThreadStatus::kSleeping );
} }
} }
@@ -426,20 +632,19 @@ public:
{ {
BT_PROFILE( "sleepWorkerThreadsHint" ); BT_PROFILE( "sleepWorkerThreadsHint" );
// hint the task scheduler that we may not be using these threads for a little while // hint the task scheduler that we may not be using these threads for a little while
m_jobContext.m_workersShouldSleep = true; setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
} }
void prepareWorkerThreads() void prepareWorkerThreads()
{ {
for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
{ {
WorkerThreadLocalStorage* storage = static_cast<WorkerThreadLocalStorage*>( m_threadSupport->getThreadLocalMemory( iWorker ) ); ThreadLocalStorage& storage = m_threadLocalStorage[i];
storage->m_mutex.lock(); storage.m_mutex.lock();
storage->numJobsFinished = 0; storage.m_numJobsFinished = 0;
storage->m_mutex.unlock(); storage.m_mutex.unlock();
} }
m_jobContext.m_workersShouldSleep = false; setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
setWorkersActive( true );
} }
virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
@@ -455,32 +660,32 @@ public:
m_numJobs = jobCount; m_numJobs = jobCount;
btAssert( jobCount >= 2 ); // need more than one job for multithreading btAssert( jobCount >= 2 ); // need more than one job for multithreading
int jobSize = sizeof( JobType ); int jobSize = sizeof( JobType );
int jobBufSize = jobSize * jobCount;
// make sure we have enough memory allocated to store jobs
if ( jobBufSize > m_jobMem.size() )
{
m_jobMem.resize( jobBufSize );
}
// make sure job queue is big enough
if ( jobCount > m_jobContext.m_jobQueue.capacity() )
{
m_jobContext.m_jobQueue.reserve( jobCount );
}
m_jobContext.clearQueue(); for (int i = 0; i < m_numActiveJobQueues; ++i)
{
m_jobQueues[i].clearQueue( jobCount, jobSize );
}
// prepare worker threads for incoming work // prepare worker threads for incoming work
prepareWorkerThreads(); prepareWorkerThreads();
// submit all of the jobs // submit all of the jobs
int iJob = 0; int iJob = 0;
JobType* jobs = reinterpret_cast<JobType*>( &m_jobMem[ 0 ] ); int iThread = kFirstWorkerThreadId; // first worker thread
for ( int i = iBegin; i < iEnd; i += grainSize ) for ( int i = iBegin; i < iEnd; i += grainSize )
{ {
btAssert( iJob < jobCount ); btAssert( iJob < jobCount );
int iE = btMin( i + grainSize, iEnd ); int iE = btMin( i + grainSize, iEnd );
JobType& job = jobs[ iJob ]; JobQueue* jq = m_perThreadJobQueues[ iThread ];
new ( (void*) &job ) ParallelForJob( i, iE, body ); // placement new btAssert(jq);
m_jobContext.submitJob( &job ); btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
void* jobMem = jq->allocJobMem(jobSize);
JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new
jq->submitJob( job );
iJob++; iJob++;
iThread++;
if ( iThread >= m_numThreads )
{
iThread = kFirstWorkerThreadId; // first worker thread
}
} }
wakeWorkers( jobCount - 1 ); wakeWorkers( jobCount - 1 );
@@ -508,44 +713,38 @@ public:
m_numJobs = jobCount; m_numJobs = jobCount;
btAssert( jobCount >= 2 ); // need more than one job for multithreading btAssert( jobCount >= 2 ); // need more than one job for multithreading
int jobSize = sizeof( JobType ); int jobSize = sizeof( JobType );
int jobBufSize = jobSize * jobCount; for (int i = 0; i < m_numActiveJobQueues; ++i)
// make sure we have enough memory allocated to store jobs
if ( jobBufSize > m_jobMem.size() )
{ {
m_jobMem.resize( jobBufSize ); m_jobQueues[i].clearQueue( jobCount, jobSize );
} }
// make sure job queue is big enough
if ( jobCount > m_jobContext.m_jobQueue.capacity() ) // initialize summation
{ for ( int iThread = 0; iThread < m_numThreads; ++iThread )
m_jobContext.m_jobQueue.reserve( jobCount ); {
} m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
// make sure thread local area is big enough
int threadLocalSize = m_numThreads * sizeof( ThreadLocalSum );
if ( threadLocalSize > m_threadLocalMem.size() )
{
m_threadLocalMem.resize( threadLocalSize );
}
// initialize summation
ThreadLocalSum* threadLocalSum = reinterpret_cast<ThreadLocalSum*>( &m_threadLocalMem[ 0 ] );
for ( int iThread = 0; iThread < m_numThreads; ++iThread )
{
threadLocalSum[ iThread ].mSum = btScalar( 0 );
} }
m_jobContext.clearQueue();
// prepare worker threads for incoming work // prepare worker threads for incoming work
prepareWorkerThreads(); prepareWorkerThreads();
// submit all of the jobs // submit all of the jobs
int iJob = 0; int iJob = 0;
JobType* jobs = reinterpret_cast<JobType*>( &m_jobMem[ 0 ] ); int iThread = kFirstWorkerThreadId; // first worker thread
for ( int i = iBegin; i < iEnd; i += grainSize ) for ( int i = iBegin; i < iEnd; i += grainSize )
{ {
btAssert( iJob < jobCount ); btAssert( iJob < jobCount );
int iE = btMin( i + grainSize, iEnd ); int iE = btMin( i + grainSize, iEnd );
JobType& job = jobs[ iJob ]; JobQueue* jq = m_perThreadJobQueues[ iThread ];
new ( (void*) &job ) ParallelSumJob( i, iE, body, threadLocalSum ); // placement new btAssert(jq);
m_jobContext.submitJob( &job ); btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
void* jobMem = jq->allocJobMem(jobSize);
JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new
jq->submitJob( job );
iJob++; iJob++;
iThread++;
if ( iThread >= m_numThreads )
{
iThread = kFirstWorkerThreadId; // first worker thread
}
} }
wakeWorkers( jobCount - 1 ); wakeWorkers( jobCount - 1 );
@@ -556,7 +755,7 @@ public:
btScalar sum = btScalar(0); btScalar sum = btScalar(0);
for ( int iThread = 0; iThread < m_numThreads; ++iThread ) for ( int iThread = 0; iThread < m_numThreads; ++iThread )
{ {
sum += threadLocalSum[ iThread ].mSum; sum += m_threadLocalStorage[ iThread ].m_sumResult;
} }
m_antiNestingLock.unlock(); m_antiNestingLock.unlock();
return sum; return sum;
@@ -586,4 +785,4 @@ btITaskScheduler* btCreateDefaultTaskScheduler()
return NULL; return NULL;
} }
#endif // #else // #if BT_THREADSAFE #endif // #else // #if BT_THREADSAFE

View File

@@ -37,34 +37,29 @@ public:
virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1) virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1)
virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache
virtual int getLogicalToPhysicalCoreRatio() const = 0; // the number of logical processors per physical processor (usually 1 or 2)
virtual void runTask( int threadIndex, void* userData ) = 0; virtual void runTask( int threadIndex, void* userData ) = 0;
virtual void waitForAllTasks() = 0; virtual void waitForAllTasks() = 0;
virtual btCriticalSection* createCriticalSection() = 0; virtual btCriticalSection* createCriticalSection() = 0;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) = 0; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) = 0;
virtual void* getThreadLocalMemory( int taskId ) { return NULL; } typedef void( *ThreadFunc )( void* userPtr );
typedef void( *ThreadFunc )( void* userPtr, void* lsMemory );
typedef void* ( *MemorySetupFunc )( );
struct ConstructionInfo struct ConstructionInfo
{ {
ConstructionInfo( const char* uniqueName, ConstructionInfo( const char* uniqueName,
ThreadFunc userThreadFunc, ThreadFunc userThreadFunc,
MemorySetupFunc lsMemoryFunc,
int threadStackSize = 65535 int threadStackSize = 65535
) )
:m_uniqueName( uniqueName ), :m_uniqueName( uniqueName ),
m_userThreadFunc( userThreadFunc ), m_userThreadFunc( userThreadFunc ),
m_lsMemoryFunc( lsMemoryFunc ),
m_threadStackSize( threadStackSize ) m_threadStackSize( threadStackSize )
{ {
} }
const char* m_uniqueName; const char* m_uniqueName;
ThreadFunc m_userThreadFunc; ThreadFunc m_userThreadFunc;
MemorySetupFunc m_lsMemoryFunc;
int m_threadStackSize; int m_threadStackSize;
}; };

View File

@@ -73,7 +73,6 @@ public:
ThreadFunc m_userThreadFunc; ThreadFunc m_userThreadFunc;
void* m_userPtr; //for taskDesc etc void* m_userPtr; //for taskDesc etc
void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc
pthread_t thread; pthread_t thread;
//each tread will wait until this signal to start its work //each tread will wait until this signal to start its work
@@ -103,17 +102,14 @@ public:
virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
// TODO: return the number of logical processors sharing the first L3 cache // TODO: return the number of logical processors sharing the first L3 cache
virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; } virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; }
// TODO: detect if CPU has hyperthreading enabled
virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; }
virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE;
virtual void waitForAllTasks() BT_OVERRIDE; virtual void waitForAllTasks() BT_OVERRIDE;
virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE;
virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE
{
return m_activeThreadStatus[ taskId ].m_lsMemory;
}
}; };
@@ -190,7 +186,7 @@ static void *threadFunction( void *argument )
if ( userPtr ) if ( userPtr )
{ {
btAssert( status->m_status ); btAssert( status->m_status );
status->m_userThreadFunc( userPtr, status->m_lsMemory ); status->m_userThreadFunc( userPtr );
status->m_status = 2; status->m_status = 2;
checkPThreadFunction( sem_post( status->m_mainSemaphore ) ); checkPThreadFunction( sem_post( status->m_mainSemaphore ) );
status->threadUsed++; status->threadUsed++;
@@ -292,7 +288,6 @@ void btThreadSupportPosix::startThreads( const ConstructionInfo& threadConstruct
threadStatus.m_commandId = 0; threadStatus.m_commandId = 0;
threadStatus.m_status = 0; threadStatus.m_status = 0;
threadStatus.m_mainSemaphore = m_mainSemaphore; threadStatus.m_mainSemaphore = m_mainSemaphore;
threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
threadStatus.threadUsed = 0; threadStatus.threadUsed = 0;

View File

@@ -179,7 +179,6 @@ public:
ThreadFunc m_userThreadFunc; ThreadFunc m_userThreadFunc;
void* m_userPtr; //for taskDesc etc void* m_userPtr; //for taskDesc etc
void* m_lsMemory; //initialized using Win32LocalStoreMemorySetupFunc
void* m_threadHandle; //this one is calling 'Win32ThreadFunc' void* m_threadHandle; //this one is calling 'Win32ThreadFunc'
@@ -208,15 +207,11 @@ public:
virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; }
virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); } virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); }
virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return m_processorInfo.numLogicalProcessors / m_processorInfo.numCores; }
virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE;
virtual void waitForAllTasks() BT_OVERRIDE; virtual void waitForAllTasks() BT_OVERRIDE;
virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE
{
return m_activeThreadStatus[ taskId ].m_lsMemory;
}
virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; virtual btCriticalSection* createCriticalSection() BT_OVERRIDE;
virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE;
}; };
@@ -246,7 +241,7 @@ DWORD WINAPI win32threadStartFunc( LPVOID lpParam )
if ( userPtr ) if ( userPtr )
{ {
btAssert( status->m_status ); btAssert( status->m_status );
status->m_userThreadFunc( userPtr, status->m_lsMemory ); status->m_userThreadFunc( userPtr );
status->m_status = 2; status->m_status = 2;
SetEvent( status->m_eventCompleteHandle ); SetEvent( status->m_eventCompleteHandle );
} }
@@ -392,7 +387,6 @@ void btThreadSupportWin32::startThreads( const ConstructionInfo& threadConstruct
threadStatus.m_commandId = 0; threadStatus.m_commandId = 0;
threadStatus.m_status = 0; threadStatus.m_status = 0;
threadStatus.m_threadHandle = handle; threadStatus.m_threadHandle = handle;
threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
printf( "started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle ); printf( "started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle );
@@ -410,9 +404,7 @@ void btThreadSupportWin32::stopThreads()
WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE ); WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE );
} }
delete threadStatus.m_lsMemory; threadStatus.m_userPtr = NULL;
threadStatus.m_userPtr = 0;
SetEvent( threadStatus.m_eventStartHandle ); SetEvent( threadStatus.m_eventStartHandle );
WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE ); WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE );