Bullet Collision Detection & Physics Library
PosixThreadSupport.cpp
Go to the documentation of this file.
1 /*
2 Bullet Continuous Collision Detection and Physics Library
3 Copyright (c) 2003-2007 Erwin Coumans http://bulletphysics.com
4 
5 This software is provided 'as-is', without any express or implied warranty.
6 In no event will the authors be held liable for any damages arising from the use of this software.
7 Permission is granted to anyone to use this software for any purpose,
8 including commercial applications, and to alter it and redistribute it freely,
9 subject to the following restrictions:
10 
11 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
12 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 3. This notice may not be removed or altered from any source distribution.
14 */
15 
16 #include <stdio.h>
17 #include "PosixThreadSupport.h"
18 #ifdef USE_PTHREADS
19 #include <errno.h>
20 #include <unistd.h>
21 
24 
25 #define checkPThreadFunction(returnValue) \
26  if(0 != returnValue) { \
27  printf("PThread problem at line %i in file %s: %i %d\n", __LINE__, __FILE__, returnValue, errno); \
28  }
29 
30 // The number of threads should be equal to the number of available cores
31 // Todo: each worker should be linked to a single core, using SetThreadIdealProcessor.
32 
33 // PosixThreadSupport helps to initialize/shutdown libspe2, start/stop SPU tasks and communication
34 // Setup and initialize SPU/CELL/Libspe2
36 {
37  startThreads(threadConstructionInfo);
38 }
39 
40 // cleanup/shutdown Libspe2
42 {
43  stopSPU();
44 }
45 
46 #if (defined (__APPLE__))
47 #define NAMED_SEMAPHORES
48 #endif
49 
50 // this semaphore will signal, if and how many threads are finished with their work
51 static sem_t* mainSemaphore=0;
52 
53 static sem_t* createSem(const char* baseName)
54 {
55  static int semCount = 0;
56 #ifdef NAMED_SEMAPHORES
57  char name[32];
59  snprintf(name, 32, "/%s-%d-%4.4d", baseName, getpid(), semCount++);
60  sem_t* tempSem = sem_open(name, O_CREAT, 0600, 0);
61 
62  if (tempSem != reinterpret_cast<sem_t *>(SEM_FAILED))
63  {
64 // printf("Created \"%s\" Semaphore %p\n", name, tempSem);
65  }
66  else
67  {
68  //printf("Error creating Semaphore %d\n", errno);
69  exit(-1);
70  }
72 #else
73  sem_t* tempSem = new sem_t;
74  checkPThreadFunction(sem_init(tempSem, 0, 0));
75 #endif
76  return tempSem;
77 }
78 
79 static void destroySem(sem_t* semaphore)
80 {
81 #ifdef NAMED_SEMAPHORES
82  checkPThreadFunction(sem_close(semaphore));
83 #else
84  checkPThreadFunction(sem_destroy(semaphore));
85  delete semaphore;
86 #endif
87 }
88 
89 static void *threadFunction(void *argument)
90 {
91 
93 
94 
95  while (1)
96  {
97  checkPThreadFunction(sem_wait(status->startSemaphore));
98 
99  void* userPtr = status->m_userPtr;
100 
101  if (userPtr)
102  {
103  btAssert(status->m_status);
104  status->m_userThreadFunc(userPtr,status->m_lsMemory);
105  status->m_status = 2;
107  status->threadUsed++;
108  } else {
109  //exit Thread
110  status->m_status = 3;
112  printf("Thread with taskId %i exiting\n",status->m_taskId);
113  break;
114  }
115 
116  }
117 
118  printf("Thread TERMINATED\n");
119  return 0;
120 
121 }
122 
125 {
127 
129 
130 
131 
132  switch (uiCommand)
133  {
135  {
136  btSpuStatus& spuStatus = m_activeSpuStatus[taskId];
137  btAssert(taskId >= 0);
138  btAssert(taskId < m_activeSpuStatus.size());
139 
140  spuStatus.m_commandId = uiCommand;
141  spuStatus.m_status = 1;
142  spuStatus.m_userPtr = (void*)uiArgument0;
143 
144  // fire event to start new task
145  checkPThreadFunction(sem_post(spuStatus.startSemaphore));
146  break;
147  }
148  default:
149  {
151  btAssert(0);
152  }
153 
154  };
155 
156 
157 }
158 
159 
161 void PosixThreadSupport::waitForResponse(unsigned int *puiArgument0, unsigned int *puiArgument1)
162 {
164 
166 
167 
168  btAssert(m_activeSpuStatus.size());
169 
170  // wait for any of the threads to finish
172 
173  // get at least one thread which has finished
174  size_t last = -1;
175 
176  for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t) {
177  if(2 == m_activeSpuStatus[t].m_status) {
178  last = t;
179  break;
180  }
181  }
182 
183  btSpuStatus& spuStatus = m_activeSpuStatus[last];
184 
185  btAssert(spuStatus.m_status > 1);
186  spuStatus.m_status = 0;
187 
188  // need to find an active spu
189  btAssert(last >= 0);
190 
191  *puiArgument0 = spuStatus.m_taskId;
192  *puiArgument1 = spuStatus.m_status;
193 }
194 
195 
196 
198 {
199  printf("%s creating %i threads.\n", __FUNCTION__, threadConstructionInfo.m_numThreads);
200  m_activeSpuStatus.resize(threadConstructionInfo.m_numThreads);
201 
202  mainSemaphore = createSem("main");
203  //checkPThreadFunction(sem_wait(mainSemaphore));
204 
205  for (int i=0;i < threadConstructionInfo.m_numThreads;i++)
206  {
207  printf("starting thread %d\n",i);
208 
209  btSpuStatus& spuStatus = m_activeSpuStatus[i];
210 
211  spuStatus.startSemaphore = createSem("threadLocal");
212 
213  checkPThreadFunction(pthread_create(&spuStatus.thread, NULL, &threadFunction, (void*)&spuStatus));
214 
215  spuStatus.m_userPtr=0;
216 
217  spuStatus.m_taskId = i;
218  spuStatus.m_commandId = 0;
219  spuStatus.m_status = 0;
220  spuStatus.m_lsMemory = threadConstructionInfo.m_lsMemoryFunc();
221  spuStatus.m_userThreadFunc = threadConstructionInfo.m_userThreadFunc;
222  spuStatus.threadUsed = 0;
223 
224  printf("started thread %d \n",i);
225 
226  }
227 
228 }
229 
231 {
232 }
233 
234 
237 {
238  for(size_t t=0; t < size_t(m_activeSpuStatus.size()); ++t)
239  {
240  btSpuStatus& spuStatus = m_activeSpuStatus[t];
241  printf("%s: Thread %i used: %ld\n", __FUNCTION__, int(t), spuStatus.threadUsed);
242 
243  spuStatus.m_userPtr = 0;
244  checkPThreadFunction(sem_post(spuStatus.startSemaphore));
246 
247  printf("destroy semaphore\n");
248  destroySem(spuStatus.startSemaphore);
249  printf("semaphore destroyed\n");
250  checkPThreadFunction(pthread_join(spuStatus.thread,0));
251 
252  }
253  printf("destroy main semaphore\n");
255  printf("main semaphore destroyed\n");
256  m_activeSpuStatus.clear();
257 }
258 
260 {
261  pthread_mutex_t m_mutex;
262 
263 public:
265  {
266  pthread_mutex_init(&m_mutex, NULL);
267  }
269  {
270  pthread_mutex_destroy(&m_mutex);
271  }
272 
273  ATTRIBUTE_ALIGNED16(unsigned int mCommonBuff[32]);
274 
275  virtual unsigned int getSharedParam(int i)
276  {
277  return mCommonBuff[i];
278  }
279  virtual void setSharedParam(int i,unsigned int p)
280  {
281  mCommonBuff[i] = p;
282  }
283 
284  virtual void lock()
285  {
286  pthread_mutex_lock(&m_mutex);
287  }
288  virtual void unlock()
289  {
290  pthread_mutex_unlock(&m_mutex);
291  }
292 };
293 
294 
295 #if defined(_POSIX_BARRIERS) && (_POSIX_BARRIERS - 20012L) >= 0
296 /* OK to use barriers on this platform */
297 class PosixBarrier : public btBarrier
298 {
299  pthread_barrier_t m_barr;
300  int m_numThreads;
301 public:
302  PosixBarrier()
303  :m_numThreads(0) { }
304  virtual ~PosixBarrier() {
305  pthread_barrier_destroy(&m_barr);
306  }
307 
308  virtual void sync()
309  {
310  int rc = pthread_barrier_wait(&m_barr);
311  if(rc != 0 && rc != PTHREAD_BARRIER_SERIAL_THREAD)
312  {
313  printf("Could not wait on barrier\n");
314  exit(-1);
315  }
316  }
317  virtual void setMaxCount(int numThreads)
318  {
319  int result = pthread_barrier_init(&m_barr, NULL, numThreads);
320  m_numThreads = numThreads;
321  btAssert(result==0);
322  }
323  virtual int getMaxCount()
324  {
325  return m_numThreads;
326  }
327 };
328 #else
329 /* Not OK to use barriers on this platform - insert alternate code here */
330 class PosixBarrier : public btBarrier
331 {
332  pthread_mutex_t m_mutex;
333  pthread_cond_t m_cond;
334 
336  int m_called;
337 
338 public:
340  :m_numThreads(0)
341  {
342  }
343  virtual ~PosixBarrier()
344  {
345  if (m_numThreads>0)
346  {
347  pthread_mutex_destroy(&m_mutex);
348  pthread_cond_destroy(&m_cond);
349  }
350  }
351 
352  virtual void sync()
353  {
354  pthread_mutex_lock(&m_mutex);
355  m_called++;
356  if (m_called == m_numThreads) {
357  m_called = 0;
358  pthread_cond_broadcast(&m_cond);
359  } else {
360  pthread_cond_wait(&m_cond,&m_mutex);
361  }
362  pthread_mutex_unlock(&m_mutex);
363 
364  }
365  virtual void setMaxCount(int numThreads)
366  {
367  if (m_numThreads>0)
368  {
369  pthread_mutex_destroy(&m_mutex);
370  pthread_cond_destroy(&m_cond);
371  }
372  m_called = 0;
373  pthread_mutex_init(&m_mutex,NULL);
374  pthread_cond_init(&m_cond,NULL);
375  m_numThreads = numThreads;
376  }
377  virtual int getMaxCount()
378  {
379  return m_numThreads;
380  }
381 };
382 
383 #endif//_POSIX_BARRIERS
384 
385 
386 
388 {
390  barrier->setMaxCount(getNumTasks());
391  return barrier;
392 }
393 
395 {
396  return new PosixCriticalSection();
397 }
398 
400 {
401  delete barrier;
402 }
403 
405 {
406  delete cs;
407 }
408 #endif // USE_PTHREADS
409 
virtual btCriticalSection * createCriticalSection()
void startThreads(ThreadConstructionInfo &threadInfo)
static void * threadFunction(void *argument)
virtual void setMaxCount(int numThreads)
uint32_t ppu_address_t
#define btAssert(x)
Definition: btScalar.h:101
virtual void deleteBarrier(btBarrier *barrier)
virtual btBarrier * createBarrier()
virtual ~PosixThreadSupport()
cleanup/shutdown Libspe2
unsigned int mCommonBuff[32]
static void destroySem(sem_t *semaphore)
pthread_cond_t m_cond
unsigned int uint32_t
virtual unsigned int getSharedParam(int i)
virtual int getNumTasks() const
#define ATTRIBUTE_ALIGNED16(a)
Definition: btScalar.h:59
virtual void waitForResponse(unsigned int *puiArgument0, unsigned int *puiArgument1)
check for messages from SPUs
PosixThreadSupport(ThreadConstructionInfo &threadConstructionInfo)
#define checkPThreadFunction(returnValue)
virtual void setSharedParam(int i, unsigned int p)
btAlignedObjectArray< btSpuStatus > m_activeSpuStatus
static sem_t * mainSemaphore
virtual int getMaxCount()
static sem_t * createSem(const char *baseName)
virtual void sync()
virtual void stopSPU()
tell the task scheduler we are done with the SPU tasks
Setup and initialize SPU/CELL/Libspe2.
virtual void deleteCriticalSection(btCriticalSection *criticalSection)
static void barrier(unsigned int a)
virtual void sendRequest(uint32_t uiCommand, ppu_address_t uiArgument0, uint32_t uiArgument1)
send messages to SPUs
#define CMD_GATHER_AND_PROCESS_PAIRLIST
pthread_mutex_t m_mutex
virtual void startSPU()
start the spus (can be called at the beginning of each frame, to make sure that the right SPU program...