fix one more multi-threading issue

This commit is contained in:
Erwin Coumans
2016-07-20 12:48:01 -07:00
parent 2278385802
commit d3a94248d4
2 changed files with 22 additions and 14 deletions

View File

@@ -44,8 +44,6 @@ b3PosixThreadSupport::~b3PosixThreadSupport()
#define NAMED_SEMAPHORES #define NAMED_SEMAPHORES
#endif #endif
// this semaphore will signal, if and how many threads are finished with their work
static sem_t* mainSemaphore=0;
static sem_t* createSem(const char* baseName) static sem_t* createSem(const char* baseName)
{ {
@@ -100,12 +98,12 @@ static void *threadFunction(void *argument)
b3Assert(status->m_status); b3Assert(status->m_status);
status->m_userThreadFunc(userPtr,status->m_lsMemory); status->m_userThreadFunc(userPtr,status->m_lsMemory);
status->m_status = 2; status->m_status = 2;
checkPThreadFunction(sem_post(mainSemaphore)); checkPThreadFunction(sem_post(status->m_mainSemaphore));
status->threadUsed++; status->threadUsed++;
} else { } else {
//exit Thread //exit Thread
status->m_status = 3; status->m_status = 3;
checkPThreadFunction(sem_post(mainSemaphore)); checkPThreadFunction(sem_post(status->m_mainSemaphore));
printf("Thread with taskId %i exiting\n",status->m_taskId); printf("Thread with taskId %i exiting\n",status->m_taskId);
break; break;
} }
@@ -160,13 +158,14 @@ bool b3PosixThreadSupport::isTaskCompleted(int *puiArgument0, int *puiArgument1,
b3Assert(m_activeThreadStatus.size()); b3Assert(m_activeThreadStatus.size());
// wait for any of the threads to finish // wait for any of the threads to finish
int result = sem_trywait(mainSemaphore); int result = sem_trywait(m_mainSemaphore);
if (result==0) if (result==0)
{ {
// get at least one thread which has finished // get at least one thread which has finished
size_t last = -1; int last = -1;
int status = -1;
for(size_t t=0; t < size_t(m_activeThreadStatus.size()); ++t) { for(int t=0; t < int (m_activeThreadStatus.size()); ++t) {
status = m_activeThreadStatus[t].m_status;
if(2 == m_activeThreadStatus[t].m_status) { if(2 == m_activeThreadStatus[t].m_status) {
last = t; last = t;
break; break;
@@ -200,7 +199,7 @@ void b3PosixThreadSupport::waitForResponse( int *puiArgument0, int *puiArgument
b3Assert(m_activeThreadStatus.size()); b3Assert(m_activeThreadStatus.size());
// wait for any of the threads to finish // wait for any of the threads to finish
checkPThreadFunction(sem_wait(mainSemaphore)); checkPThreadFunction(sem_wait(m_mainSemaphore));
// get at least one thread which has finished // get at least one thread which has finished
size_t last = -1; size_t last = -1;
@@ -231,7 +230,7 @@ void b3PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructi
printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads); printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads); m_activeThreadStatus.resize(threadConstructionInfo.m_numThreads);
mainSemaphore = createSem("main"); m_mainSemaphore = createSem("main");
//checkPThreadFunction(sem_wait(mainSemaphore)); //checkPThreadFunction(sem_wait(mainSemaphore));
for (int i=0;i < threadConstructionInfo.m_numThreads;i++) for (int i=0;i < threadConstructionInfo.m_numThreads;i++)
@@ -249,6 +248,7 @@ void b3PosixThreadSupport::startThreads(ThreadConstructionInfo& threadConstructi
spuStatus.m_taskId = i; spuStatus.m_taskId = i;
spuStatus.m_commandId = 0; spuStatus.m_commandId = 0;
spuStatus.m_status = 0; spuStatus.m_status = 0;
spuStatus.m_mainSemaphore = m_mainSemaphore;
spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc(); spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc; spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
spuStatus.threadUsed = 0; spuStatus.threadUsed = 0;
@@ -271,7 +271,7 @@ void b3PosixThreadSupport::stopThreads()
spuStatus.m_userPtr = 0; spuStatus.m_userPtr = 0;
checkPThreadFunction(sem_post(spuStatus.startSemaphore)); checkPThreadFunction(sem_post(spuStatus.startSemaphore));
checkPThreadFunction(sem_wait(mainSemaphore)); checkPThreadFunction(sem_wait(m_mainSemaphore));
printf("destroy semaphore\n"); printf("destroy semaphore\n");
destroySem(spuStatus.startSemaphore); destroySem(spuStatus.startSemaphore);
@@ -280,7 +280,7 @@ void b3PosixThreadSupport::stopThreads()
} }
printf("destroy main semaphore\n"); printf("destroy main semaphore\n");
destroySem(mainSemaphore); destroySem(m_mainSemaphore);
printf("main semaphore destroyed\n"); printf("main semaphore destroyed\n");
m_activeThreadStatus.clear(); m_activeThreadStatus.clear();
} }

View File

@@ -57,14 +57,22 @@ public:
void* m_userPtr; //for taskDesc etc void* m_userPtr; //for taskDesc etc
void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc void* m_lsMemory; //initialized using PosixLocalStoreMemorySetupFunc
pthread_t thread; pthread_t thread;
sem_t* startSemaphore; //each tread will wait until this signal to start its work
sem_t* startSemaphore;
// this is a copy of m_mainSemaphore,
//each tread will signal once it is finished with its work
sem_t* m_mainSemaphore;
unsigned long threadUsed; unsigned long threadUsed;
}; };
private: private:
b3AlignedObjectArray<b3ThreadStatus> m_activeThreadStatus; b3AlignedObjectArray<b3ThreadStatus> m_activeThreadStatus;
// m_mainSemaphoresemaphore will signal, if and how many threads are finished with their work
sem_t* m_mainSemaphore;
public: public:
///Setup and initialize SPU/CELL/Libspe2 ///Setup and initialize SPU/CELL/Libspe2