From 362887987c2602c97ac6a4155fa3a86ed1158c2b Mon Sep 17 00:00:00 2001 From: Erwin Coumans Date: Fri, 21 Aug 2015 11:09:53 -0700 Subject: [PATCH] add example/MultiThreading functions --- build3/premake4.lua | 3 +- .../MultiThreading/b3PosixThreadSupport.cpp | 436 +++++++++++++++++ .../MultiThreading/b3PosixThreadSupport.h | 144 ++++++ .../b3ThreadSupportInterface.cpp | 22 + .../MultiThreading/b3ThreadSupportInterface.h | 90 ++++ .../MultiThreading/b3Win32ThreadSupport.cpp | 454 ++++++++++++++++++ .../MultiThreading/b3Win32ThreadSupport.h | 139 ++++++ examples/MultiThreading/main.cpp | 177 +++++++ examples/MultiThreading/premake4.lua | 51 ++ 9 files changed, 1515 insertions(+), 1 deletion(-) create mode 100644 examples/MultiThreading/b3PosixThreadSupport.cpp create mode 100644 examples/MultiThreading/b3PosixThreadSupport.h create mode 100644 examples/MultiThreading/b3ThreadSupportInterface.cpp create mode 100644 examples/MultiThreading/b3ThreadSupportInterface.h create mode 100644 examples/MultiThreading/b3Win32ThreadSupport.cpp create mode 100644 examples/MultiThreading/b3Win32ThreadSupport.h create mode 100644 examples/MultiThreading/main.cpp create mode 100644 examples/MultiThreading/premake4.lua diff --git a/build3/premake4.lua b/build3/premake4.lua index cb87cce0b..b19cf19c7 100644 --- a/build3/premake4.lua +++ b/build3/premake4.lua @@ -159,7 +159,8 @@ if not _OPTIONS["ios"] then include "../examples/ExampleBrowser" include "../examples/OpenGLWindow" - include "../examples/SharedMemory" + include "../examples/SharedMemory" + include "../examples/MultiThreading" include "../examples/ThirdPartyLibs/Gwen" include "../Extras" diff --git a/examples/MultiThreading/b3PosixThreadSupport.cpp b/examples/MultiThreading/b3PosixThreadSupport.cpp new file mode 100644 index 000000000..705e297ce --- /dev/null +++ b/examples/MultiThreading/b3PosixThreadSupport.cpp @@ -0,0 +1,436 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#include +#include "b3PosixThreadSupport.h" +#include +#include + + +#define checkPThreadFunction(returnValue) \ + if(0 != returnValue) { \ + printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \ + } + +// The number of threads should be equal to the number of available cores +// Todo: each worker should be linked to a single core, using SetThreadIdealProcessor. + + +b3PosixThreadSupport::b3PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo) +{ + startThreads(threadConstructionInfo); +} + +// cleanup/shutdown Libspe2 +b3PosixThreadSupport::~b3PosixThreadSupport() +{ + stopThreads(); +} + +#if (defined (__APPLE__)) +#define NAMED_SEMAPHORES +#endif + +// this semaphore will signal, if and how many threads are finished with their work +static sem_t* mainSemaphore=0; + +static sem_t* createSem(const char* baseName) +{ + static int semCount = 0; +#ifdef NAMED_SEMAPHORES + /// Named semaphore begin + char name[32]; + snprintf(name, 32, "/%s-%d-%4.4d", baseName, getpid(), semCount++); + sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0); + + if (tempSem != reinterpret_cast(SEM_FAILED)) + { +// printf("Created \"%s\" Semaphore %p\n", name, tempSem); + } + else + { + //printf("Error creating Semaphore %d\n", errno); + exit(-1); + } + /// Named semaphore end +#else + sem_t* tempSem = new sem_t; + checkPThreadFunction(sem_init(tempSem, 0, 0)); +#endif + return tempSem; +} + +static void destroySem(sem_t* semaphore) +{ +#ifdef NAMED_SEMAPHORES + checkPThreadFunction(sem_close(semaphore)); +#else + checkPThreadFunction(sem_destroy(semaphore)); + delete semaphore; +#endif +} + +static void *threadFunction(void *argument) +{ + + b3PosixThreadSupport::b3ThreadStatus* status = (b3PosixThreadSupport::b3ThreadStatus*)argument; + + + while (1) + { + checkPThreadFunction(sem_wait(status->startSemaphore)); + + void* userPtr = status->m_userPtr; + + if (userPtr) + { + b3Assert(status->m_status); + status->m_userThreadFunc(userPtr,status->m_lsMemory); + status->m_status = 2; + checkPThreadFunction(sem_post(mainSemaphore)); + status->threadUsed++; + } else { + //exit Thread + status->m_status = 3; + checkPThreadFunction(sem_post(mainSemaphore)); + printf("Thread with taskId %i exiting\n",status->m_taskId); + break; + } + + } + + printf("Thread TERMINATED\n"); + return 0; + +} + +///send messages to SPUs +void b3PosixThreadSupport::sendRequest(int uiCommand, void* uiArgument0, int taskId) +{ + /// gMidphaseSPU.sendRequest(CMD_GATHER_AND_PROCESS_PAIRLIST, (int) &taskDesc); + + ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished + + + + switch (uiCommand) + { + case B3_THREAD_SCHEDULE_TASK: + { + b3ThreadStatus& spuStatus = m_activeThreadStatus[taskId]; + b3Assert(taskId >= 0); + b3Assert(taskId < m_activeThreadStatus.size()); + + spuStatus.m_commandId = uiCommand; + spuStatus.m_status = 1; + spuStatus.m_userPtr = (void*)uiArgument0; + + // fire event to start new task + checkPThreadFunction(sem_post(spuStatus.startSemaphore)); + break; + } + default: + { + ///not implemented + b3Assert(0); + } + + }; + + +} + +///non-blocking test if a task is completed. First implement all versions, and then enable this API +bool b3PosixThreadSupport::isTaskCompleted(int *puiArgument0, int *puiArgument1, int timeOutInMilliseconds) +{ + + b3Assert(m_activeThreadStatus.size()); + + // wait for any of the threads to finish + int result = sem_trywait(mainSemaphore); + if (result==0) + { + // get at least one thread which has finished + size_t last = -1; + + for(size_t t=0; t < size_t(m_activeThreadStatus.size()); ++t) { + if(2 == m_activeThreadStatus[t].m_status) { + last = t; + break; + } + } + + b3ThreadStatus& spuStatus = m_activeThreadStatus[last]; + + b3Assert(spuStatus.m_status > 1); + spuStatus.m_status = 0; + + // need to find an active spu + b3Assert(last >= 0); + + *puiArgument0 = spuStatus.m_taskId; + *puiArgument1 = spuStatus.m_status; + return true; + } + return false; +} + + +///check for messages from SPUs +void b3PosixThreadSupport::waitForResponse( int *puiArgument0, int *puiArgument1) +{ + ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response + + ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback' + + + b3Assert(m_activeThreadStatus.size()); + + // wait for any of the threads to finish + checkPThreadFunction(sem_wait(mainSemaphore)); + + // get at least one thread which has finished + size_t last = -1; + + for(size_t t=0; t < size_t(m_activeThreadStatus.size()); ++t) { + if(2 == m_activeThreadStatus[t].m_status) { + last = t; + break; + } + } + + b3ThreadStatus& spuStatus = m_activeThreadStatus[last]; + + b3Assert(spuStatus.m_status > 1); + spuStatus.m_status = 0; + + // need to find an active spu + b3Assert(last >= 0); + + *puiArgument0 = spuStatus.m_taskId; + *puiArgument1 = spuStatus.m_status; +} + + + +void b3PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructionInfo) +{ + printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads); + m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads); + + mainSemaphore = createSem("main"); + //checkPThreadFunction(sem_wait(mainSemaphore)); + + for (int i=0;i < threadConstructionInfo.m_numThreads;i++) + { + printf("starting thread %d\n",i); + + b3ThreadStatus& spuStatus = m_activeThreadStatus[i]; + + spuStatus.startSemaphore = createSem("threadLocal"); + + checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus)); + + spuStatus.m_userPtr=0; + + spuStatus.m_taskId = i; + spuStatus.m_commandId = 0; + spuStatus.m_status = 0; + spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc(); + spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; + spuStatus.threadUsed = 0; + + printf("started thread %d \n",i); + + } + +} + + + +///tell the task scheduler we are done with the SPU tasks +void b3PosixThreadSupport::stopThreads() +{ + for(size_t t=0; t < size_t(m_activeThreadStatus.size()); ++t) + { + b3ThreadStatus& spuStatus = m_activeThreadStatus[t]; + printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed); + + spuStatus.m_userPtr = 0; + checkPThreadFunction(sem_post(spuStatus.startSemaphore)); + checkPThreadFunction(sem_wait(mainSemaphore)); + + printf("destroy semaphore\n"); + destroySem(spuStatus.startSemaphore); + printf("semaphore destroyed\n"); + checkPThreadFunction(pthread_join(spuStatus.thread,0)); + + } + printf("destroy main semaphore\n"); + destroySem(mainSemaphore); + printf("main semaphore destroyed\n"); + m_activeThreadStatus.clear(); +} + +class b3PosixCriticalSection : public b3CriticalSection +{ + pthread_mutex_t m_mutex; + +public: + b3PosixCriticalSection() + { + pthread_mutex_init(&m_mutex, NULL); + } + virtual ~b3PosixCriticalSection() + { + pthread_mutex_destroy(&m_mutex); + } + + B3_ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]); + + virtual unsigned int getSharedParam(int i) + { + return mCommonBuff[i]; + } + virtual void setSharedParam(int i,unsigned int p) + { + mCommonBuff[i] = p; + } + + virtual void lock() + { + pthread_mutex_lock(&m_mutex); + } + virtual void unlock() + { + pthread_mutex_unlock(&m_mutex); + } +}; + + +#if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0 +/* OK to use barriers on this platform */ +class b3PosixBarrier : public b3Barrier +{ + pthread_barrier_t m_barr; + int m_numThreads; +public: + b3PosixBarrier() + :m_numThreads(0) { } + virtual ~b3PosixBarrier() { + pthread_barrier_destroy(&m_barr); + } + + virtual void sync() + { + int rc = pthread_barrier_wait(&m_barr); + if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD) + { + printf("Could not wait on barrier\n"); + exit(-1); + } + } + virtual void setMaxCount(int numThreads) + { + int result = pthread_barrier_init(&m_barr, NULL, numThreads); + m_numThreads = numThreads; + b3Assert(result==0); + } + virtual int getMaxCount() + { + return m_numThreads; + } +}; +#else +/* Not OK to use barriers on this platform - insert alternate code here */ +class b3PosixBarrier : public b3Barrier +{ + pthread_mutex_t m_mutex; + pthread_cond_t m_cond; + + int m_numThreads; + int m_called; + +public: + b3PosixBarrier() + :m_numThreads(0) + { + } + virtual ~b3PosixBarrier() + { + if (m_numThreads>0) + { + pthread_mutex_destroy(&m_mutex); + pthread_cond_destroy(&m_cond); + } + } + + virtual void sync() + { + pthread_mutex_lock(&m_mutex); + m_called++; + if (m_called == m_numThreads) { + m_called = 0; + pthread_cond_broadcast(&m_cond); + } else { + pthread_cond_wait(&m_cond,&m_mutex); + } + pthread_mutex_unlock(&m_mutex); + + } + virtual void setMaxCount(int numThreads) + { + if (m_numThreads>0) + { + pthread_mutex_destroy(&m_mutex); + pthread_cond_destroy(&m_cond); + } + m_called = 0; + pthread_mutex_init(&m_mutex,NULL); + pthread_cond_init(&m_cond,NULL); + m_numThreads = numThreads; + } + virtual int getMaxCount() + { + return m_numThreads; + } +}; + +#endif//_POSIX_BARRIERS + + + +b3Barrier* b3PosixThreadSupport::createBarrier() +{ + b3PosixBarrier* barrier = new b3PosixBarrier(); + barrier->setMaxCount(getNumTasks()); + return barrier; +} + +b3CriticalSection* b3PosixThreadSupport::createCriticalSection() +{ + return new b3PosixCriticalSection(); +} + +void b3PosixThreadSupport::deleteBarrier(b3Barrier* barrier) +{ + delete barrier; +} + +void b3PosixThreadSupport::deleteCriticalSection(b3CriticalSection* cs) +{ + delete cs; +} + diff --git a/examples/MultiThreading/b3PosixThreadSupport.h b/examples/MultiThreading/b3PosixThreadSupport.h new file mode 100644 index 000000000..17dde8d60 --- /dev/null +++ b/examples/MultiThreading/b3PosixThreadSupport.h @@ -0,0 +1,144 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#ifndef B3_POSIX_THREAD_SUPPORT_H +#define B3_POSIX_THREAD_SUPPORT_H + + +#include "Bullet3Common/b3Scalar.h" + + +#ifndef _XOPEN_SOURCE +#define _XOPEN_SOURCE 600 //for definition of pthread_barrier_t, see http://pages.cs.wisc.edu/~travitch/pthreads_primer.html +#endif //_XOPEN_SOURCE +#include +#include + + + +#include "Bullet3Common/b3AlignedObjectArray.h" + +#include "b3ThreadSupportInterface.h" + + +typedef void (*b3PosixThreadFunc)(void* userPtr,void* lsMemory); +typedef void* (*b3PosixlsMemorySetupFunc)(); + +// b3PosixThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication +class b3PosixThreadSupport : public b3ThreadSupportInterface +{ +public: + typedef enum sStatus { + STATUS_BUSY, + STATUS_READY, + STATUS_FINISHED + } Status; + + // placeholder, until libspe2 support is there + struct b3ThreadStatus + { + int m_taskId; + int m_commandId; + int m_status; + + b3PosixThreadFunc m_userThreadFunc; + void* m_userPtr; //for taskDesc etc + void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc + + pthread_t thread; + sem_t* startSemaphore; + + unsigned long threadUsed; + }; +private: + + b3AlignedObjectArray m_activeThreadStatus; +public: + ///Setup and initialize SPU/CELL/Libspe2 + + + + struct ThreadConstructionInfo + { + ThreadConstructionInfo(const char* uniqueName, + b3PosixThreadFunc userThreadFunc, + b3PosixlsMemorySetupFunc lsMemoryFunc, + int numThreads=1, + int threadStackSize=65535 + ) + :m_uniqueName(uniqueName), + m_userThreadFunc(userThreadFunc), + m_lsMemoryFunc(lsMemoryFunc), + m_numThreads(numThreads), + m_threadStackSize(threadStackSize) + { + + } + + const char* m_uniqueName; + b3PosixThreadFunc m_userThreadFunc; + b3PosixlsMemorySetupFunc m_lsMemoryFunc; + int m_numThreads; + int m_threadStackSize; + + }; + + b3PosixThreadSupport(ThreadConstructionInfo& threadConstructionInfo); + +///cleanup/shutdown Libspe2 + virtual ~b3PosixThreadSupport(); + + void startThreads(ThreadConstructionInfo& threadInfo); + + + virtual void sendRequest(int uiCommand, void* uiArgument0, int uiArgument1); + + virtual void waitForResponse(int *puiArgument0, int *puiArgument1); + + +///tell the task scheduler we are done with the SPU tasks + virtual void stopThreads(); + + virtual void setNumTasks(int numTasks) {} + + virtual int getNumTasks() const + { + return m_activeThreadStatus.size(); + } + + ///non-blocking test if a task is completed. First implement all versions, and then enable this API + virtual bool isTaskCompleted(int *puiArgument0, int *puiArgument1, int timeOutInMilliseconds); + + + virtual b3Barrier* createBarrier(); + + virtual b3CriticalSection* createCriticalSection(); + + virtual void deleteBarrier(b3Barrier* barrier); + + virtual void deleteCriticalSection(b3CriticalSection* criticalSection); + + + virtual void* getThreadLocalMemory(int taskId) + { + return m_activeThreadStatus[taskId].m_lsMemory; + } + +}; + + +#endif // B3_POSIX_THREAD_SUPPORT_H + + diff --git a/examples/MultiThreading/b3ThreadSupportInterface.cpp b/examples/MultiThreading/b3ThreadSupportInterface.cpp new file mode 100644 index 000000000..430242113 --- /dev/null +++ b/examples/MultiThreading/b3ThreadSupportInterface.cpp @@ -0,0 +1,22 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#include "b3ThreadSupportInterface.h" + +b3ThreadSupportInterface::~b3ThreadSupportInterface() +{ + +} + diff --git a/examples/MultiThreading/b3ThreadSupportInterface.h b/examples/MultiThreading/b3ThreadSupportInterface.h new file mode 100644 index 000000000..d53209abd --- /dev/null +++ b/examples/MultiThreading/b3ThreadSupportInterface.h @@ -0,0 +1,90 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#ifndef B3_THREAD_SUPPORT_INTERFACE_H +#define B3_THREAD_SUPPORT_INTERFACE_H + +enum +{ + B3_THREAD_SCHEDULE_TASK=1, +}; + +#include "Bullet3Common/b3Scalar.h" //for B3_ATTRIBUTE_ALIGNED16 +//#include "PlatformDefinitions.h" +//#include "PpuAddressSpace.h" + +class b3Barrier { +public: + b3Barrier() {} + virtual ~b3Barrier() {} + + virtual void sync() = 0; + virtual void setMaxCount(int n) = 0; + virtual int getMaxCount() = 0; +}; + +class b3CriticalSection { +public: + b3CriticalSection() {} + virtual ~b3CriticalSection() {} + + B3_ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]); + + virtual unsigned int getSharedParam(int i) = 0; + virtual void setSharedParam(int i,unsigned int p) = 0; + + virtual void lock() = 0; + virtual void unlock() = 0; +}; + + +class b3ThreadSupportInterface +{ +public: + + virtual ~b3ThreadSupportInterface(); + + + virtual void sendRequest(int uiCommand, void* uiArgument0, int uiArgument1) =0; + + + virtual void waitForResponse(int *puiArgument0, int *puiArgument1) =0; + + + ///non-blocking test if a task is completed. First implement all versions, and then enable this API + virtual bool isTaskCompleted(int *puiArgument0, int *puiArgument1, int timeOutInMilliseconds)=0; + + + virtual void stopThreads()=0; + + ///tell the task scheduler to use no more than numTasks tasks + virtual void setNumTasks(int numTasks)=0; + + virtual int getNumTasks() const = 0; + + virtual b3Barrier* createBarrier() = 0; + + virtual b3CriticalSection* createCriticalSection() = 0; + + virtual void deleteBarrier(b3Barrier* barrier)=0; + + virtual void deleteCriticalSection(b3CriticalSection* criticalSection)=0; + + virtual void* getThreadLocalMemory(int taskId) { return 0; } + +}; + +#endif //B3_THREAD_SUPPORT_INTERFACE_H + diff --git a/examples/MultiThreading/b3Win32ThreadSupport.cpp b/examples/MultiThreading/b3Win32ThreadSupport.cpp new file mode 100644 index 000000000..7eae9b1a3 --- /dev/null +++ b/examples/MultiThreading/b3Win32ThreadSupport.cpp @@ -0,0 +1,454 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#include "b3Win32ThreadSupport.h" + + +#include + + + + +///The number of threads should be equal to the number of available cores +///@todo: each worker should be linked to a single core, using SetThreadIdealProcessor. + +///b3Win32ThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication +///Setup and initialize SPU/CELL/Libspe2 +b3Win32ThreadSupport::b3Win32ThreadSupport(const Win32ThreadConstructionInfo & threadConstructionInfo) +{ + m_maxNumTasks = threadConstructionInfo.m_numThreads; + startThreads(threadConstructionInfo); +} + +///cleanup/shutdown Libspe2 +b3Win32ThreadSupport::~b3Win32ThreadSupport() +{ + stopThreads(); +} + + + + +#include + +DWORD WINAPI Thread_no_1( LPVOID lpParam ) +{ + + b3Win32ThreadSupport::b3ThreadStatus* status = (b3Win32ThreadSupport::b3ThreadStatus*)lpParam; + + + while (1) + { + WaitForSingleObject(status->m_eventStartHandle,INFINITE); + + void* userPtr = status->m_userPtr; + + if (userPtr) + { + b3Assert(status->m_status); + status->m_userThreadFunc(userPtr,status->m_lsMemory); + status->m_status = 2; + SetEvent(status->m_eventCompletetHandle); + } else + { + //exit Thread + status->m_status = 3; + printf("Thread with taskId %i with handle %p exiting\n",status->m_taskId, status->m_threadHandle); + SetEvent(status->m_eventCompletetHandle); + break; + } + + } + + printf("Thread TERMINATED\n"); + return 0; + +} + +///send messages to SPUs +void b3Win32ThreadSupport::sendRequest(int uiCommand, void* uiArgument0, int taskId) +{ + /// gMidphaseSPU.sendRequest(CMD_GATHER_AND_PROCESS_PAIRLIST, (void*) &taskDesc); + + ///we should spawn an SPU task here, and in 'waitForResponse' it should wait for response of the (one of) the first tasks that finished + + + + switch (uiCommand) + { + case B3_THREAD_SCHEDULE_TASK: + { + + +//#define SINGLE_THREADED 1 +#ifdef SINGLE_THREADED + + b3ThreadStatus& threadStatus = m_activeThreadStatus[0]; + threadStatus.m_userPtr=(void*)uiArgument0; + threadStatus.m_userThreadFunc(threadStatus.m_userPtr,threadStatus.m_lsMemory); + HANDLE handle =0; +#else + + + b3ThreadStatus& threadStatus = m_activeThreadStatus[taskId]; + b3Assert(taskId>=0); + b3Assert(int(taskId) 1); + threadStatus.m_status = 0; + + ///need to find an active spu + b3Assert(last>=0); + +#else + last=0; + b3ThreadStatus& threadStatus = m_activeThreadStatus[last]; +#endif //SINGLE_THREADED + + + + *puiArgument0 = threadStatus.m_taskId; + *puiArgument1 = threadStatus.m_status; + + +} + + +///check for messages from SPUs +bool b3Win32ThreadSupport::isTaskCompleted(int *puiArgument0, int *puiArgument1, int timeOutInMilliseconds) +{ + ///We should wait for (one of) the first tasks to finish (or other SPU messages), and report its response + + ///A possible response can be 'yes, SPU handled it', or 'no, please do a PPU fallback' + + + b3Assert(m_activeThreadStatus.size()); + + int last = -1; +#ifndef SINGLE_THREADED + DWORD res = WaitForMultipleObjects(m_completeHandles.size(), &m_completeHandles[0], FALSE, timeOutInMilliseconds); + + if ((res != STATUS_TIMEOUT) && (res != WAIT_FAILED)) + { + + b3Assert(res != WAIT_FAILED); + last = res - WAIT_OBJECT_0; + + b3ThreadStatus& threadStatus = m_activeThreadStatus[last]; + b3Assert(threadStatus.m_threadHandle); + b3Assert(threadStatus.m_eventCompletetHandle); + + //WaitForSingleObject(threadStatus.m_eventCompletetHandle, INFINITE); + b3Assert(threadStatus.m_status > 1); + threadStatus.m_status = 0; + + ///need to find an active spu + b3Assert(last>=0); + + #else + last=0; + b3ThreadStatus& threadStatus = m_activeThreadStatus[last]; + #endif //SINGLE_THREADED + + + + *puiArgument0 = threadStatus.m_taskId; + *puiArgument1 = threadStatus.m_status; + + return true; + } + + return false; +} + + +void b3Win32ThreadSupport::startThreads(const Win32ThreadConstructionInfo& threadConstructionInfo) +{ + + m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads); + m_completeHandles.resize(threadConstructionInfo.m_numThreads); + + m_maxNumTasks = threadConstructionInfo.m_numThreads; + + for (int i=0;i0) + { + WaitForSingleObject(threadStatus.m_eventCompletetHandle, INFINITE); + } + + + threadStatus.m_userPtr = 0; + SetEvent(threadStatus.m_eventStartHandle); + WaitForSingleObject(threadStatus.m_eventCompletetHandle, INFINITE); + + CloseHandle(threadStatus.m_eventCompletetHandle); + CloseHandle(threadStatus.m_eventStartHandle); + CloseHandle(threadStatus.m_threadHandle); + + } + + m_activeThreadStatus.clear(); + m_completeHandles.clear(); + +} + + + +class b3Win32Barrier : public b3Barrier +{ +private: + CRITICAL_SECTION mExternalCriticalSection; + CRITICAL_SECTION mLocalCriticalSection; + HANDLE mRunEvent,mNotifyEvent; + int mCounter,mEnableCounter; + int mMaxCount; + +public: + b3Win32Barrier() + { + mCounter = 0; + mMaxCount = 1; + mEnableCounter = 0; + InitializeCriticalSection(&mExternalCriticalSection); + InitializeCriticalSection(&mLocalCriticalSection); + mRunEvent = CreateEvent(NULL,TRUE,FALSE,NULL); + mNotifyEvent = CreateEvent(NULL,TRUE,FALSE,NULL); + } + + virtual ~b3Win32Barrier() + { + DeleteCriticalSection(&mExternalCriticalSection); + DeleteCriticalSection(&mLocalCriticalSection); + CloseHandle(mRunEvent); + CloseHandle(mNotifyEvent); + } + + void sync() + { + int eventId; + + EnterCriticalSection(&mExternalCriticalSection); + + //PFX_PRINTF("enter taskId %d count %d stage %d phase %d mEnableCounter %d\n",taskId,mCounter,debug&0xff,debug>>16,mEnableCounter); + + if(mEnableCounter > 0) { + ResetEvent(mNotifyEvent); + LeaveCriticalSection(&mExternalCriticalSection); + WaitForSingleObject(mNotifyEvent,INFINITE); + EnterCriticalSection(&mExternalCriticalSection); + } + + eventId = mCounter; + mCounter++; + + if(eventId == mMaxCount-1) { + SetEvent(mRunEvent); + + mEnableCounter = mCounter-1; + mCounter = 0; + } + else { + ResetEvent(mRunEvent); + LeaveCriticalSection(&mExternalCriticalSection); + WaitForSingleObject(mRunEvent,INFINITE); + EnterCriticalSection(&mExternalCriticalSection); + mEnableCounter--; + } + + if(mEnableCounter == 0) { + SetEvent(mNotifyEvent); + } + + //PFX_PRINTF("leave taskId %d count %d stage %d phase %d mEnableCounter %d\n",taskId,mCounter,debug&0xff,debug>>16,mEnableCounter); + + LeaveCriticalSection(&mExternalCriticalSection); + } + + virtual void setMaxCount(int n) {mMaxCount = n;} + virtual int getMaxCount() {return mMaxCount;} +}; + +class b3Win32CriticalSection : public b3CriticalSection +{ +private: + CRITICAL_SECTION mCriticalSection; + +public: + b3Win32CriticalSection() + { + InitializeCriticalSection(&mCriticalSection); + } + + ~b3Win32CriticalSection() + { + DeleteCriticalSection(&mCriticalSection); + } + + unsigned int getSharedParam(int i) + { + b3Assert(i>=0&&i<31); + return mCommonBuff[i+1]; + } + + void setSharedParam(int i,unsigned int p) + { + b3Assert(i>=0&&i<31); + mCommonBuff[i+1] = p; + } + + void lock() + { + EnterCriticalSection(&mCriticalSection); + mCommonBuff[0] = 1; + } + + void unlock() + { + mCommonBuff[0] = 0; + LeaveCriticalSection(&mCriticalSection); + } +}; + + +b3Barrier* b3Win32ThreadSupport::createBarrier() +{ + unsigned char* mem = (unsigned char*)b3AlignedAlloc(sizeof(b3Win32Barrier),16); + b3Win32Barrier* barrier = new(mem) b3Win32Barrier(); + barrier->setMaxCount(getNumTasks()); + return barrier; +} + +b3CriticalSection* b3Win32ThreadSupport::createCriticalSection() +{ + unsigned char* mem = (unsigned char*) b3AlignedAlloc(sizeof(b3Win32CriticalSection),16); + b3Win32CriticalSection* cs = new(mem) b3Win32CriticalSection(); + return cs; +} + +void b3Win32ThreadSupport::deleteBarrier(b3Barrier* barrier) +{ + barrier->~b3Barrier(); + b3AlignedFree(barrier); +} + +void b3Win32ThreadSupport::deleteCriticalSection(b3CriticalSection* criticalSection) +{ + criticalSection->~b3CriticalSection(); + b3AlignedFree(criticalSection); +} + + + + + diff --git a/examples/MultiThreading/b3Win32ThreadSupport.h b/examples/MultiThreading/b3Win32ThreadSupport.h new file mode 100644 index 000000000..41cd96e3b --- /dev/null +++ b/examples/MultiThreading/b3Win32ThreadSupport.h @@ -0,0 +1,139 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +#include "Bullet3Common/b3Scalar.h" + + +#ifndef BT_WIN32_THREAD_SUPPORT_H +#define BT_WIN32_THREAD_SUPPORT_H + +#include "Bullet3Common/b3AlignedObjectArray.h" + +#include "b3ThreadSupportInterface.h" + + +typedef void (*b3Win32ThreadFunc)(void* userPtr,void* lsMemory); +typedef void* (*b3Win32lsMemorySetupFunc)(); + + +///b3Win32ThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication +class b3Win32ThreadSupport : public b3ThreadSupportInterface +{ +public: + ///placeholder, until libspe2 support is there + struct b3ThreadStatus + { + int m_taskId; + int m_commandId; + int m_status; + + b3Win32ThreadFunc m_userThreadFunc; + void* m_userPtr; //for taskDesc etc + void* m_lsMemory; //initialized using Win32LocalStoreMemorySetupFunc + + void* m_threadHandle; //this one is calling 'Win32ThreadFunc' + + void* m_eventStartHandle; + char m_eventStartHandleName[32]; + + void* m_eventCompletetHandle; + char m_eventCompletetHandleName[32]; + + + }; +private: + + b3AlignedObjectArray m_activeThreadStatus; + b3AlignedObjectArray m_completeHandles; + + int m_maxNumTasks; +public: + ///Setup and initialize SPU/CELL/Libspe2 + + struct Win32ThreadConstructionInfo + { + Win32ThreadConstructionInfo(const char* uniqueName, + b3Win32ThreadFunc userThreadFunc, + b3Win32lsMemorySetupFunc lsMemoryFunc, + int numThreads=1, + int threadStackSize=65535 + ) + :m_uniqueName(uniqueName), + m_userThreadFunc(userThreadFunc), + m_lsMemoryFunc(lsMemoryFunc), + m_numThreads(numThreads), + m_threadStackSize(threadStackSize) + { + + } + + const char* m_uniqueName; + b3Win32ThreadFunc m_userThreadFunc; + b3Win32lsMemorySetupFunc m_lsMemoryFunc; + int m_numThreads; + int m_threadStackSize; + + }; + + + + b3Win32ThreadSupport(const Win32ThreadConstructionInfo& threadConstructionInfo); + +///cleanup/shutdown Libspe2 + virtual ~b3Win32ThreadSupport(); + + void startThreads(const Win32ThreadConstructionInfo& threadInfo); + + +///send messages to SPUs + virtual void sendRequest(int uiCommand, void* uiArgument0, int uiArgument1); + +///check for messages from SPUs + virtual void waitForResponse(int *puiArgument0, int *puiArgument1); + + virtual bool isTaskCompleted(int *puiArgument0, int *puiArgument1, int timeOutInMilliseconds); + +///start the spus (can be called at the beginning of each frame, to make sure that the right SPU program is loaded) + virtual void startThreads(); + +///tell the task scheduler we are done with the SPU tasks + virtual void stopThreads(); + + virtual void setNumTasks(int numTasks) + { + m_maxNumTasks = numTasks; + } + + virtual int getNumTasks() const + { + return m_maxNumTasks; + } + + virtual void* getThreadLocalMemory(int taskId) + { + return m_activeThreadStatus[taskId].m_lsMemory; + } + virtual b3Barrier* createBarrier(); + + virtual b3CriticalSection* createCriticalSection(); + + virtual void deleteBarrier(b3Barrier* barrier); + + virtual void deleteCriticalSection(b3CriticalSection* criticalSection); +}; + +#endif //BT_WIN32_THREAD_SUPPORT_H + + diff --git a/examples/MultiThreading/main.cpp b/examples/MultiThreading/main.cpp new file mode 100644 index 000000000..7d39e5e54 --- /dev/null +++ b/examples/MultiThreading/main.cpp @@ -0,0 +1,177 @@ +/* +Bullet Continuous Collision Detection and Physics Library +Copyright (c) 2010 Erwin Coumans http://bulletphysics.org + +This software is provided 'as-is', without any express or implied warranty. +In no event will the authors be held liable for any damages arising from the use of this software. +Permission is granted to anyone to use this software for any purpose, +including commercial applications, and to alter it and redistribute it freely, +subject to the following restrictions: + +1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. +2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. +3. This notice may not be removed or altered from any source distribution. +*/ + +/// ThreadingDemo shows how to use the cross platform thread support interface. +/// You can start threads and perform a blocking wait for completion +/// Under Windows it uses Win32 Threads. On Mac and Linux it uses pthreads. On PlayStation 3 Cell SPU it uses SPURS. + +/// June 2010 +/// New: critical section/barriers and non-blocking polling for completion + +void SampleThreadFunc(void* userPtr,void* lsMemory); +void* SamplelsMemoryFunc(); + +#include +//#include "BulletMultiThreaded/PlatformDefinitions.h" + +#ifndef _WIN32 +#include "b3PosixThreadSupport.h" + +b3ThreadSupportInterface* createThreadSupport(int numThreads) +{ + b3PosixThreadSupport::ThreadConstructionInfo constructionInfo("testThreads", + SampleThreadFunc, + SamplelsMemoryFunc, + numThreads); + b3ThreadSupportInterface* threadSupport = new b3PosixThreadSupport(constructionInfo); + + return threadSupport; + +} + + +#elif defined( _WIN32) +#include "b3Win32ThreadSupport.h" + +b3ThreadSupportInterface* createThreadSupport(int numThreads) +{ + b3Win32ThreadSupport::Win32ThreadConstructionInfo threadConstructionInfo("testThreads",SampleThreadFunc,SamplelsMemoryFunc,numThreads); + b3Win32ThreadSupport* threadSupport = new b3Win32ThreadSupport(threadConstructionInfo); + return threadSupport; + +} +#endif + + + +struct SampleArgs +{ + SampleArgs() + :m_fakeWork(1) + { + } + b3CriticalSection* m_cs; + float m_fakeWork; +}; + +struct SampleThreadLocalStorage +{ + int threadId; +}; + + +void SampleThreadFunc(void* userPtr,void* lsMemory) +{ + printf("thread started\n"); + + SampleThreadLocalStorage* localStorage = (SampleThreadLocalStorage*) lsMemory; + + SampleArgs* args = (SampleArgs*) userPtr; + int workLeft = true; + while (workLeft) + { + args->m_cs->lock(); + int count = args->m_cs->getSharedParam(0); + args->m_cs->setSharedParam(0,count-1); + args->m_cs->unlock(); + if (count>0) + { + printf("thread %d processed number %d\n",localStorage->threadId, count); + } + //do some fake work + for (int i=0;i<1000000;i++) + args->m_fakeWork = b3Scalar(1.21)*args->m_fakeWork; + workLeft = count>0; + } + printf("finished\n"); + //do nothing +} + + +void* SamplelsMemoryFunc() +{ + //don't create local store memory, just return 0 + return new SampleThreadLocalStorage; +} + + + + + + + + + + +int main(int argc,char** argv) +{ + int numThreads = 8; + + b3ThreadSupportInterface* threadSupport = createThreadSupport(numThreads); + + + + for (int i=0;igetNumTasks();i++) + { + SampleThreadLocalStorage* storage = (SampleThreadLocalStorage*)threadSupport->getThreadLocalMemory(i); + b3Assert(storage); + storage->threadId = i; + } + + + SampleArgs args; + args.m_cs = threadSupport->createCriticalSection(); + args.m_cs->setSharedParam(0,100); + + + int arg0,arg1; + int i; + for (i=0;isendRequest(B3_THREAD_SCHEDULE_TASK, (void*) &args, i); + } + + bool blockingWait =false; + if (blockingWait) + { + for (i=0;iwaitForResponse(&arg0,&arg1); + printf("finished waiting for response: %d %d\n", arg0,arg1); + } + } else + { + int numActiveThreads = numThreads; + while (numActiveThreads) + { + if (threadSupport->isTaskCompleted(&arg0,&arg1,0)) + { + numActiveThreads--; + printf("numActiveThreads = %d\n",numActiveThreads); + + } else + { +// printf("polling.."); + } + }; + } + +printf("stopping threads\n"); + + delete threadSupport; + printf("Press ENTER to quit\n"); + getchar(); + return 0; +} diff --git a/examples/MultiThreading/premake4.lua b/examples/MultiThreading/premake4.lua new file mode 100644 index 000000000..581408c17 --- /dev/null +++ b/examples/MultiThreading/premake4.lua @@ -0,0 +1,51 @@ + + project "App_ThreadingTest" + + kind "ConsoleApp" + +-- defines { } + + + includedirs + { + ".","../../src" + } + + + links { "Bullet3Common" } + + + files { + "b3ThreadSupportInterface.cpp", + "main.cpp", + "b3ThreadSupportInterface.h" + } + if os.is("Windows") then + + files { + "b3Win32ThreadSupport.cpp", + "b3Win32ThreadSupport.h" + } + --links {"winmm"} + --defines {"__WINDOWS_MM__", "WIN32"} + end + + if os.is("Linux") then + files { + "b3PosixThreadSupport.cpp", + "b3PosixThreadSupport.h" + } + + links {"pthread"} + end + + if os.is("MacOSX") then + files { + "b3PosixThreadSupport.cpp", + "b3PosixThreadSupport.h" + } + + links {"pthread"} + --links{"CoreAudio.framework", "coreMIDI.framework", "Cocoa.framework"} + --defines {"__MACOSX_CORE__"} + end