diff --git a/src/LinearMath/TaskScheduler/btTaskScheduler.cpp b/src/LinearMath/TaskScheduler/btTaskScheduler.cpp index 1aa7d44d4..02fe07ab1 100644 --- a/src/LinearMath/TaskScheduler/btTaskScheduler.cpp +++ b/src/LinearMath/TaskScheduler/btTaskScheduler.cpp @@ -7,16 +7,11 @@ #include -typedef void( *btThreadFunc )( void* userPtr, void* lsMemory ); -typedef void* ( *btThreadLocalStorageFunc )(); #if BT_THREADSAFE #include "btThreadSupportInterface.h" - - - #if defined( _WIN32 ) #define WIN32_LEAN_AND_MEAN @@ -26,6 +21,9 @@ typedef void* ( *btThreadLocalStorageFunc )(); #endif +typedef unsigned long long btU64; +static const int kCacheLineSize = 64; + void btSpinPause() { #if defined( _WIN32 ) @@ -46,6 +44,62 @@ struct WorkerThreadStatus }; +ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives +{ + static const int kMaxThreadCount = BT_MAX_THREAD_COUNT; + // directives for all worker threads packed into a single cacheline + char m_threadDirs[kMaxThreadCount]; + +public: + enum Type + { + kInvalid, + kGoToSleep, // go to sleep + kStayAwakeButIdle, // wait for not checking job queue + kScanForJobs, // actively scan job queue for jobs + }; + WorkerThreadDirectives() + { + for ( int i = 0; i < kMaxThreadCount; ++i ) + { + m_threadDirs[ i ] = 0; + } + } + + Type getDirective(int threadId) + { + btAssert(threadId < kMaxThreadCount); + return static_cast(m_threadDirs[threadId]); + } + + void setDirectiveByRange(int threadBegin, int threadEnd, Type dir) + { + btAssert( threadBegin < threadEnd ); + btAssert( threadEnd <= kMaxThreadCount ); + char dirChar = static_cast(dir); + for ( int i = threadBegin; i < threadEnd; ++i ) + { + m_threadDirs[ i ] = dirChar; + } + } +}; + +class JobQueue; + +ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage +{ + int m_threadId; + WorkerThreadStatus::Type m_status; + int m_numJobsFinished; + btSpinMutex m_mutex; + btScalar m_sumResult; + WorkerThreadDirectives * m_directive; + JobQueue* m_queue; + btClock* m_clock; + unsigned int m_cooldownTime; +}; + + struct IJob { virtual void executeJob(int threadId) = 0; @@ -53,88 +107,152 @@ struct IJob class ParallelForJob : public IJob { - const btIParallelForBody* mBody; - int mBegin; - int mEnd; + const btIParallelForBody* m_body; + int m_begin; + int m_end; public: ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body ) { - mBody = &body; - mBegin = iBegin; - mEnd = iEnd; + m_body = &body; + m_begin = iBegin; + m_end = iEnd; } virtual void executeJob(int threadId) BT_OVERRIDE { BT_PROFILE( "executeJob" ); // call the functor body to do the work - mBody->forLoop( mBegin, mEnd ); + m_body->forLoop( m_begin, m_end ); } }; -static const int kCacheLineSize = 64; - -struct ThreadLocalSum -{ - btScalar mSum; - char mCachePadding[ kCacheLineSize - sizeof( btScalar ) ]; -}; class ParallelSumJob : public IJob { - const btIParallelSumBody* mBody; - ThreadLocalSum* mSumArray; - int mBegin; - int mEnd; + const btIParallelSumBody* m_body; + ThreadLocalStorage* m_threadLocalStoreArray; + int m_begin; + int m_end; public: - ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalSum* sums ) + ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls ) { - mBody = &body; - mSumArray = sums; - mBegin = iBegin; - mEnd = iEnd; + m_body = &body; + m_threadLocalStoreArray = tls; + m_begin = iBegin; + m_end = iEnd; } virtual void executeJob( int threadId ) BT_OVERRIDE { BT_PROFILE( "executeJob" ); // call the functor body to do the work - btScalar val = mBody->sumLoop( mBegin, mEnd ); + btScalar val = m_body->sumLoop( m_begin, m_end ); +#if BT_PARALLEL_SUM_DETERMINISTISM // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision) const float TRUNC_SCALE = float(1<<19); val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits - mSumArray[threadId].mSum += val; +#endif + m_threadLocalStoreArray[threadId].m_sumResult += val; } }; -struct JobContext +ATTRIBUTE_ALIGNED64(class) JobQueue { - JobContext() - { - m_queueLock = NULL; - m_headIndex = 0; - m_tailIndex = 0; - m_workersShouldCheckQueue = false; - m_workersShouldSleep = false; - m_useSpinMutex = false; - m_coolDownTime = 1000; // 1000 microseconds - } + btThreadSupportInterface* m_threadSupport; btCriticalSection* m_queueLock; btSpinMutex m_mutex; - volatile bool m_workersShouldCheckQueue; - volatile bool m_workersShouldSleep; btAlignedObjectArray m_jobQueue; + char* m_jobMem; + int m_jobMemSize; bool m_queueIsEmpty; int m_tailIndex; int m_headIndex; + int m_allocSize; bool m_useSpinMutex; - unsigned int m_coolDownTime; - btClock m_clock; + btAlignedObjectArray m_neighborContexts; + char m_cachePadding[kCacheLineSize]; // prevent false sharing + void freeJobMem() + { + if ( m_jobMem ) + { + // free old + btAlignedFree(m_jobMem); + m_jobMem = NULL; + } + } + void resizeJobMem(int newSize) + { + if (newSize > m_jobMemSize) + { + freeJobMem(); + m_jobMem = static_cast(btAlignedAlloc(newSize, kCacheLineSize)); + m_jobMemSize = newSize; + } + } + +public: + + JobQueue() + { + m_jobMem = NULL; + m_jobMemSize = 0; + m_threadSupport = NULL; + m_queueLock = NULL; + m_headIndex = 0; + m_tailIndex = 0; + m_useSpinMutex = false; + } + ~JobQueue() + { + freeJobMem(); + if (m_queueLock && m_threadSupport) + { + m_threadSupport->deleteCriticalSection(m_queueLock); + m_queueLock = NULL; + } + } + void init(btThreadSupportInterface* threadSup, btAlignedObjectArray* contextArray) + { + m_threadSupport = threadSup; + if (threadSup) + { + m_queueLock = m_threadSupport->createCriticalSection(); + } + setupJobStealing(contextArray, contextArray->size()); + } + void setupJobStealing(btAlignedObjectArray* contextArray, int numActiveContexts) + { + btAlignedObjectArray& contexts = *contextArray; + int selfIndex = 0; + for (int i = 0; i < contexts.size(); ++i) + { + if ( this == &contexts[ i ] ) + { + selfIndex = i; + break; + } + } + int numNeighbors = btMin(2, contexts.size() - 1); + int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3}; + int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]); + m_neighborContexts.reserve( numNeighbors ); + m_neighborContexts.resizeNoInitialize(0); + for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++) + { + int neighborIndex = selfIndex + neighborOffsets[i]; + if ( neighborIndex >= 0 && neighborIndex < numActiveContexts) + { + m_neighborContexts.push_back( &contexts[ neighborIndex ] ); + } + } + } + + bool isQueueEmpty() const {return m_queueIsEmpty;} void lockQueue() { if ( m_useSpinMutex ) @@ -157,24 +275,44 @@ struct JobContext m_queueLock->unlock(); } } - void clearQueue() + void clearQueue(int jobCount, int jobSize) { lockQueue(); m_headIndex = 0; m_tailIndex = 0; + m_allocSize = 0; m_queueIsEmpty = true; + int jobBufSize = jobSize * jobCount; + // make sure we have enough memory allocated to store jobs + if ( jobBufSize > m_jobMemSize ) + { + resizeJobMem( jobBufSize ); + } + // make sure job queue is big enough + if ( jobCount > m_jobQueue.capacity() ) + { + m_jobQueue.reserve( jobCount ); + } unlockQueue(); m_jobQueue.resizeNoInitialize( 0 ); } + void* allocJobMem(int jobSize) + { + btAssert(m_jobMemSize >= (m_allocSize + jobSize)); + void* jobMem = &m_jobMem[m_allocSize]; + m_allocSize += jobSize; + return jobMem; + } void submitJob( IJob* job ) { + btAssert( reinterpret_cast( job ) >= &m_jobMem[ 0 ] && reinterpret_cast( job ) < &m_jobMem[ 0 ] + m_allocSize ); m_jobQueue.push_back( job ); lockQueue(); m_tailIndex++; m_queueIsEmpty = false; unlockQueue(); } - IJob* consumeJob() + IJob* consumeJobFromOwnQueue() { if ( m_queueIsEmpty ) { @@ -186,6 +324,7 @@ struct JobContext if ( !m_queueIsEmpty ) { job = m_jobQueue[ m_headIndex++ ]; + btAssert( reinterpret_cast( job ) >= &m_jobMem[ 0 ] && reinterpret_cast( job ) < &m_jobMem[ 0 ] + m_allocSize ); if ( m_headIndex == m_tailIndex ) { m_queueIsEmpty = true; @@ -194,58 +333,78 @@ struct JobContext unlockQueue(); return job; } + IJob* consumeJob() + { + if (IJob* job = consumeJobFromOwnQueue()) + { + return job; + } + // own queue is empty, try to steal from neighbor + for (int i = 0; i < m_neighborContexts.size(); ++i) + { + JobQueue* otherContext = m_neighborContexts[ i ]; + if ( IJob* job = otherContext->consumeJobFromOwnQueue() ) + { + return job; + } + } + return NULL; + } }; -struct WorkerThreadLocalStorage -{ - int threadId; - WorkerThreadStatus::Type status; - int numJobsFinished; - btSpinMutex m_mutex; -}; - - -static void WorkerThreadFunc( void* userPtr, void* lsMemory ) +static void WorkerThreadFunc( void* userPtr ) { BT_PROFILE( "WorkerThreadFunc" ); - WorkerThreadLocalStorage* localStorage = (WorkerThreadLocalStorage*) lsMemory; - JobContext* jobContext = (JobContext*) userPtr; + ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr; + JobQueue* jobQueue = localStorage->m_queue; bool shouldSleep = false; + int threadId = localStorage->m_threadId; while (! shouldSleep) { // do work localStorage->m_mutex.lock(); - while ( IJob* job = jobContext->consumeJob() ) + while ( IJob* job = jobQueue->consumeJob() ) { - localStorage->status = WorkerThreadStatus::kWorking; - job->executeJob( localStorage->threadId ); - localStorage->numJobsFinished++; + localStorage->m_status = WorkerThreadStatus::kWorking; + job->executeJob( threadId ); + localStorage->m_numJobsFinished++; } - localStorage->status = WorkerThreadStatus::kWaitingForWork; + localStorage->m_status = WorkerThreadStatus::kWaitingForWork; localStorage->m_mutex.unlock(); - unsigned long long int clockStart = jobContext->m_clock.getTimeMicroseconds(); + btU64 clockStart = localStorage->m_clock->getTimeMicroseconds(); // while queue is empty, - while (jobContext->m_queueIsEmpty) + while (jobQueue->isQueueEmpty()) { // todo: spin wait a bit to avoid hammering the empty queue btSpinPause(); - if ( jobContext->m_workersShouldSleep ) + if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep ) { shouldSleep = true; break; } // if jobs are incoming, - if (jobContext->m_workersShouldCheckQueue) + if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs ) { - clockStart = jobContext->m_clock.getTimeMicroseconds(); // reset clock + clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock } else { + for ( int i = 0; i < 50; ++i ) + { + btSpinPause(); + btSpinPause(); + btSpinPause(); + btSpinPause(); + if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty()) + { + break; + } + } // if no jobs incoming and queue has been empty for the cooldown time, sleep - unsigned long long int timeElapsed = jobContext->m_clock.getTimeMicroseconds() - clockStart; - if (timeElapsed > jobContext->m_coolDownTime) + btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart; + if (timeElapsed > localStorage->m_cooldownTime) { shouldSleep = true; break; @@ -254,77 +413,107 @@ static void WorkerThreadFunc( void* userPtr, void* lsMemory ) } } - // go idle + // go sleep localStorage->m_mutex.lock(); - localStorage->status = WorkerThreadStatus::kSleeping; + localStorage->m_status = WorkerThreadStatus::kSleeping; localStorage->m_mutex.unlock(); } -static void* WorkerThreadAllocFunc() -{ - return new WorkerThreadLocalStorage; -} - - - class btTaskSchedulerDefault : public btITaskScheduler { - JobContext m_jobContext; btThreadSupportInterface* m_threadSupport; - btAlignedObjectArray m_jobMem; - btAlignedObjectArray m_threadLocalMem; + WorkerThreadDirectives* m_workerDirective; + btAlignedObjectArray m_jobQueues; + btAlignedObjectArray m_perThreadJobQueues; + btAlignedObjectArray m_threadLocalStorage; btSpinMutex m_antiNestingLock; // prevent nested parallel-for + btClock m_clock; int m_numThreads; int m_numWorkerThreads; + int m_numActiveJobQueues; int m_maxNumThreads; int m_numJobs; + static const int kFirstWorkerThreadId = 1; public: btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport") { m_threadSupport = NULL; + m_workerDirective = NULL; } virtual ~btTaskSchedulerDefault() { - shutdown(); + waitForWorkersToSleep(); + if (m_threadSupport) + { + delete m_threadSupport; + m_threadSupport = NULL; + } + if (m_workerDirective) + { + btAlignedFree(m_workerDirective); + m_workerDirective = NULL; + } } void init() { - btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc, WorkerThreadAllocFunc ); + btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc ); m_threadSupport = btThreadSupportInterface::create( constructionInfo ); + m_workerDirective = static_cast(btAlignedAlloc(sizeof(*m_workerDirective), 64)); m_numWorkerThreads = m_threadSupport->getNumWorkerThreads(); m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1; m_numThreads = m_maxNumThreads; - m_jobContext.m_queueLock = m_threadSupport->createCriticalSection(); - for ( int i = 0; i < m_numWorkerThreads; i++ ) + // ideal to have one job queue for each physical processor (except for the main thread which needs no queue) + int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio(); + int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue); + m_jobQueues.resize(numJobQueues); + m_numActiveJobQueues = numJobQueues; + for ( int i = 0; i < m_jobQueues.size(); ++i ) { - WorkerThreadLocalStorage* storage = (WorkerThreadLocalStorage*) m_threadSupport->getThreadLocalMemory( i ); - btAssert( storage ); - storage->threadId = i + 1; // workers start at 1 - storage->status = WorkerThreadStatus::kSleeping; + m_jobQueues[i].init( m_threadSupport, &m_jobQueues ); } - setWorkersActive( false ); // no work for them yet + m_perThreadJobQueues.resize(m_numThreads); + for ( int i = 0; i < m_numThreads; i++ ) + { + JobQueue* jq = NULL; + // only worker threads get a job queue + if (i > 0) + { + if (numThreadsPerQueue == 1) + { + // one queue per worker thread + jq = &m_jobQueues[ i - kFirstWorkerThreadId ]; + } + else + { + // 2 threads share each queue + jq = &m_jobQueues[ i / numThreadsPerQueue ]; + } + } + m_perThreadJobQueues[i] = jq; + } + m_threadLocalStorage.resize(m_numThreads); + for ( int i = 0; i < m_numThreads; i++ ) + { + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + storage.m_threadId = i; + storage.m_directive = m_workerDirective; + storage.m_status = WorkerThreadStatus::kSleeping; + storage.m_cooldownTime = 1000; // 1000 microseconds, threads go to sleep after this long if they have nothing to do + storage.m_clock = &m_clock; + storage.m_queue = m_perThreadJobQueues[i]; + } + setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() ); } - virtual void shutdown() + void setWorkerDirectives(WorkerThreadDirectives::Type dir) { - setWorkersActive( false ); - waitForWorkersToSleep(); - m_threadSupport->deleteCriticalSection( m_jobContext.m_queueLock ); - m_jobContext.m_queueLock = NULL; - - delete m_threadSupport; - m_threadSupport = NULL; - } - - void setWorkersActive( bool active ) - { - m_jobContext.m_workersShouldCheckQueue = active; + m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir); } virtual int getMaxNumThreads() const BT_OVERRIDE @@ -341,38 +530,56 @@ public: { m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 ); m_numWorkerThreads = m_numThreads - 1; + m_numActiveJobQueues = 0; + // if there is at least 1 worker, + if ( m_numWorkerThreads > 0 ) + { + // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue + JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ]; + int iLastActiveContext = lastActiveContext - &m_jobQueues[0]; + m_numActiveJobQueues = iLastActiveContext + 1; + for ( int i = 0; i < m_jobQueues.size(); ++i ) + { + m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues ); + } + } + m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep); } void waitJobs() { BT_PROFILE( "waitJobs" ); - // have the main thread work until the job queue is empty + // have the main thread work until the job queues are empty int numMainThreadJobsFinished = 0; - while ( IJob* job = m_jobContext.consumeJob() ) + for ( int i = 0; i < m_numActiveJobQueues; ++i ) { - job->executeJob( 0 ); - numMainThreadJobsFinished++; + while ( IJob* job = m_jobQueues[i].consumeJob() ) + { + job->executeJob( 0 ); + numMainThreadJobsFinished++; + } } - // done with jobs for now, tell workers to rest - setWorkersActive( false ); - unsigned long long int clockStart = m_jobContext.m_clock.getTimeMicroseconds(); + // done with jobs for now, tell workers to rest (but not sleep) + setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle ); + + btU64 clockStart = m_clock.getTimeMicroseconds(); // wait for workers to finish any jobs in progress while ( true ) { int numWorkerJobsFinished = 0; - for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) + for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread ) { - WorkerThreadLocalStorage* storage = static_cast( m_threadSupport->getThreadLocalMemory( iWorker ) ); + ThreadLocalStorage* storage = &m_threadLocalStorage[iThread]; storage->m_mutex.lock(); - numWorkerJobsFinished += storage->numJobsFinished; + numWorkerJobsFinished += storage->m_numJobsFinished; storage->m_mutex.unlock(); } if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs) { break; } - unsigned long long int timeElapsed = m_jobContext.m_clock.getTimeMicroseconds() - clockStart; + btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart; btAssert(timeElapsed < 1000); if (timeElapsed > 100000) { @@ -385,25 +592,25 @@ public: void wakeWorkers(int numWorkersToWake) { BT_PROFILE( "wakeWorkers" ); - btAssert( m_jobContext.m_workersShouldCheckQueue ); + btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs ); int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads); int numActiveWorkers = 0; for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) { // note this count of active workers is not necessarily totally reliable, because a worker thread could be // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare. - WorkerThreadLocalStorage* storage = static_cast( m_threadSupport->getThreadLocalMemory( iWorker ) ); - if (storage->status != WorkerThreadStatus::kSleeping) + ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; + if (storage.m_status != WorkerThreadStatus::kSleeping) { numActiveWorkers++; } } for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker ) { - WorkerThreadLocalStorage* storage = static_cast( m_threadSupport->getThreadLocalMemory( iWorker ) ); - if (storage->status == WorkerThreadStatus::kSleeping) + ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ]; + if (storage.m_status == WorkerThreadStatus::kSleeping) { - m_threadSupport->runTask( iWorker, &m_jobContext ); + m_threadSupport->runTask( iWorker, &storage ); numActiveWorkers++; } } @@ -412,13 +619,12 @@ public: void waitForWorkersToSleep() { BT_PROFILE( "waitForWorkersToSleep" ); - m_jobContext.m_workersShouldSleep = true; + setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); m_threadSupport->waitForAllTasks(); - for ( int i = 0; i < m_numWorkerThreads; i++ ) + for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ ) { - WorkerThreadLocalStorage* storage = static_cast( m_threadSupport->getThreadLocalMemory(i) ); - btAssert( storage ); - btAssert( storage->status == WorkerThreadStatus::kSleeping ); + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + btAssert( storage.m_status == WorkerThreadStatus::kSleeping ); } } @@ -426,20 +632,19 @@ public: { BT_PROFILE( "sleepWorkerThreadsHint" ); // hint the task scheduler that we may not be using these threads for a little while - m_jobContext.m_workersShouldSleep = true; + setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); } void prepareWorkerThreads() { - for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker ) + for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i ) { - WorkerThreadLocalStorage* storage = static_cast( m_threadSupport->getThreadLocalMemory( iWorker ) ); - storage->m_mutex.lock(); - storage->numJobsFinished = 0; - storage->m_mutex.unlock(); + ThreadLocalStorage& storage = m_threadLocalStorage[i]; + storage.m_mutex.lock(); + storage.m_numJobsFinished = 0; + storage.m_mutex.unlock(); } - m_jobContext.m_workersShouldSleep = false; - setWorkersActive( true ); + setWorkerDirectives( WorkerThreadDirectives::kScanForJobs ); } virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE @@ -455,32 +660,32 @@ public: m_numJobs = jobCount; btAssert( jobCount >= 2 ); // need more than one job for multithreading int jobSize = sizeof( JobType ); - int jobBufSize = jobSize * jobCount; - // make sure we have enough memory allocated to store jobs - if ( jobBufSize > m_jobMem.size() ) - { - m_jobMem.resize( jobBufSize ); - } - // make sure job queue is big enough - if ( jobCount > m_jobContext.m_jobQueue.capacity() ) - { - m_jobContext.m_jobQueue.reserve( jobCount ); - } - m_jobContext.clearQueue(); + for (int i = 0; i < m_numActiveJobQueues; ++i) + { + m_jobQueues[i].clearQueue( jobCount, jobSize ); + } // prepare worker threads for incoming work prepareWorkerThreads(); // submit all of the jobs int iJob = 0; - JobType* jobs = reinterpret_cast( &m_jobMem[ 0 ] ); + int iThread = kFirstWorkerThreadId; // first worker thread for ( int i = iBegin; i < iEnd; i += grainSize ) { btAssert( iJob < jobCount ); int iE = btMin( i + grainSize, iEnd ); - JobType& job = jobs[ iJob ]; - new ( (void*) &job ) ParallelForJob( i, iE, body ); // placement new - m_jobContext.submitJob( &job ); + JobQueue* jq = m_perThreadJobQueues[ iThread ]; + btAssert(jq); + btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); + void* jobMem = jq->allocJobMem(jobSize); + JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new + jq->submitJob( job ); iJob++; + iThread++; + if ( iThread >= m_numThreads ) + { + iThread = kFirstWorkerThreadId; // first worker thread + } } wakeWorkers( jobCount - 1 ); @@ -508,44 +713,38 @@ public: m_numJobs = jobCount; btAssert( jobCount >= 2 ); // need more than one job for multithreading int jobSize = sizeof( JobType ); - int jobBufSize = jobSize * jobCount; - // make sure we have enough memory allocated to store jobs - if ( jobBufSize > m_jobMem.size() ) + for (int i = 0; i < m_numActiveJobQueues; ++i) { - m_jobMem.resize( jobBufSize ); - } - // make sure job queue is big enough - if ( jobCount > m_jobContext.m_jobQueue.capacity() ) - { - m_jobContext.m_jobQueue.reserve( jobCount ); - } - // make sure thread local area is big enough - int threadLocalSize = m_numThreads * sizeof( ThreadLocalSum ); - if ( threadLocalSize > m_threadLocalMem.size() ) - { - m_threadLocalMem.resize( threadLocalSize ); - } - // initialize summation - ThreadLocalSum* threadLocalSum = reinterpret_cast( &m_threadLocalMem[ 0 ] ); - for ( int iThread = 0; iThread < m_numThreads; ++iThread ) - { - threadLocalSum[ iThread ].mSum = btScalar( 0 ); + m_jobQueues[i].clearQueue( jobCount, jobSize ); + } + + // initialize summation + for ( int iThread = 0; iThread < m_numThreads; ++iThread ) + { + m_threadLocalStorage[iThread].m_sumResult = btScalar(0); } - m_jobContext.clearQueue(); // prepare worker threads for incoming work prepareWorkerThreads(); // submit all of the jobs int iJob = 0; - JobType* jobs = reinterpret_cast( &m_jobMem[ 0 ] ); + int iThread = kFirstWorkerThreadId; // first worker thread for ( int i = iBegin; i < iEnd; i += grainSize ) { btAssert( iJob < jobCount ); int iE = btMin( i + grainSize, iEnd ); - JobType& job = jobs[ iJob ]; - new ( (void*) &job ) ParallelSumJob( i, iE, body, threadLocalSum ); // placement new - m_jobContext.submitJob( &job ); + JobQueue* jq = m_perThreadJobQueues[ iThread ]; + btAssert(jq); + btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues); + void* jobMem = jq->allocJobMem(jobSize); + JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new + jq->submitJob( job ); iJob++; + iThread++; + if ( iThread >= m_numThreads ) + { + iThread = kFirstWorkerThreadId; // first worker thread + } } wakeWorkers( jobCount - 1 ); @@ -556,7 +755,7 @@ public: btScalar sum = btScalar(0); for ( int iThread = 0; iThread < m_numThreads; ++iThread ) { - sum += threadLocalSum[ iThread ].mSum; + sum += m_threadLocalStorage[ iThread ].m_sumResult; } m_antiNestingLock.unlock(); return sum; @@ -586,4 +785,4 @@ btITaskScheduler* btCreateDefaultTaskScheduler() return NULL; } -#endif // #else // #if BT_THREADSAFE \ No newline at end of file +#endif // #else // #if BT_THREADSAFE diff --git a/src/LinearMath/TaskScheduler/btThreadSupportInterface.h b/src/LinearMath/TaskScheduler/btThreadSupportInterface.h index d537d7095..a0ad802b1 100644 --- a/src/LinearMath/TaskScheduler/btThreadSupportInterface.h +++ b/src/LinearMath/TaskScheduler/btThreadSupportInterface.h @@ -37,34 +37,29 @@ public: virtual int getNumWorkerThreads() const = 0; // number of worker threads (total number of logical processors - 1) virtual int getCacheFriendlyNumThreads() const = 0; // the number of logical processors sharing a single L3 cache + virtual int getLogicalToPhysicalCoreRatio() const = 0; // the number of logical processors per physical processor (usually 1 or 2) virtual void runTask( int threadIndex, void* userData ) = 0; virtual void waitForAllTasks() = 0; virtual btCriticalSection* createCriticalSection() = 0; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) = 0; - virtual void* getThreadLocalMemory( int taskId ) { return NULL; } - - typedef void( *ThreadFunc )( void* userPtr, void* lsMemory ); - typedef void* ( *MemorySetupFunc )( ); + typedef void( *ThreadFunc )( void* userPtr ); struct ConstructionInfo { ConstructionInfo( const char* uniqueName, ThreadFunc userThreadFunc, - MemorySetupFunc lsMemoryFunc, int threadStackSize = 65535 ) :m_uniqueName( uniqueName ), m_userThreadFunc( userThreadFunc ), - m_lsMemoryFunc( lsMemoryFunc ), m_threadStackSize( threadStackSize ) { } const char* m_uniqueName; ThreadFunc m_userThreadFunc; - MemorySetupFunc m_lsMemoryFunc; int m_threadStackSize; }; diff --git a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp index 5521fc555..ccd7d1e12 100644 --- a/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp +++ b/src/LinearMath/TaskScheduler/btThreadSupportPosix.cpp @@ -73,7 +73,6 @@ public: ThreadFunc m_userThreadFunc; void* m_userPtr; //for taskDesc etc - void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc pthread_t thread; //each tread will wait until this signal to start its work @@ -103,17 +102,14 @@ public: virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } // TODO: return the number of logical processors sharing the first L3 cache virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return m_numThreads + 1; } + // TODO: detect if CPU has hyperthreading enabled + virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return 1; } virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; virtual void waitForAllTasks() BT_OVERRIDE; virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; - - virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE - { - return m_activeThreadStatus[ taskId ].m_lsMemory; - } }; @@ -190,7 +186,7 @@ static void *threadFunction( void *argument ) if ( userPtr ) { btAssert( status->m_status ); - status->m_userThreadFunc( userPtr, status->m_lsMemory ); + status->m_userThreadFunc( userPtr ); status->m_status = 2; checkPThreadFunction( sem_post( status->m_mainSemaphore ) ); status->threadUsed++; @@ -292,7 +288,6 @@ void btThreadSupportPosix::startThreads( const ConstructionInfo& threadConstruct threadStatus.m_commandId = 0; threadStatus.m_status = 0; threadStatus.m_mainSemaphore = m_mainSemaphore; - threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc(); threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; threadStatus.threadUsed = 0; diff --git a/src/LinearMath/TaskScheduler/btThreadSupportWin32.cpp b/src/LinearMath/TaskScheduler/btThreadSupportWin32.cpp index de693590e..00edac650 100644 --- a/src/LinearMath/TaskScheduler/btThreadSupportWin32.cpp +++ b/src/LinearMath/TaskScheduler/btThreadSupportWin32.cpp @@ -179,7 +179,6 @@ public: ThreadFunc m_userThreadFunc; void* m_userPtr; //for taskDesc etc - void* m_lsMemory; //initialized using Win32LocalStoreMemorySetupFunc void* m_threadHandle; //this one is calling 'Win32ThreadFunc' @@ -208,15 +207,11 @@ public: virtual int getNumWorkerThreads() const BT_OVERRIDE { return m_numThreads; } virtual int getCacheFriendlyNumThreads() const BT_OVERRIDE { return countSetBits(m_processorInfo.processorTeamMasks[0]); } + virtual int getLogicalToPhysicalCoreRatio() const BT_OVERRIDE { return m_processorInfo.numLogicalProcessors / m_processorInfo.numCores; } virtual void runTask( int threadIndex, void* userData ) BT_OVERRIDE; virtual void waitForAllTasks() BT_OVERRIDE; - virtual void* getThreadLocalMemory( int taskId ) BT_OVERRIDE - { - return m_activeThreadStatus[ taskId ].m_lsMemory; - } - virtual btCriticalSection* createCriticalSection() BT_OVERRIDE; virtual void deleteCriticalSection( btCriticalSection* criticalSection ) BT_OVERRIDE; }; @@ -246,7 +241,7 @@ DWORD WINAPI win32threadStartFunc( LPVOID lpParam ) if ( userPtr ) { btAssert( status->m_status ); - status->m_userThreadFunc( userPtr, status->m_lsMemory ); + status->m_userThreadFunc( userPtr ); status->m_status = 2; SetEvent( status->m_eventCompleteHandle ); } @@ -392,7 +387,6 @@ void btThreadSupportWin32::startThreads( const ConstructionInfo& threadConstruct threadStatus.m_commandId = 0; threadStatus.m_status = 0; threadStatus.m_threadHandle = handle; - threadStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc(); threadStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; printf( "started %s thread %d with threadHandle %p\n", threadConstructionInfo.m_uniqueName, i, handle ); @@ -410,9 +404,7 @@ void btThreadSupportWin32::stopThreads() WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE ); } - delete threadStatus.m_lsMemory; - - threadStatus.m_userPtr = 0; + threadStatus.m_userPtr = NULL; SetEvent( threadStatus.m_eventStartHandle ); WaitForSingleObject( threadStatus.m_eventCompleteHandle, INFINITE );