threads.h
Go to the documentation of this file.
1 // The libMesh Finite Element Library.
2 // Copyright (C) 2002-2014 Benjamin S. Kirk, John W. Peterson, Roy H. Stogner
3 
4 // This library is free software; you can redistribute it and/or
5 // modify it under the terms of the GNU Lesser General Public
6 // License as published by the Free Software Foundation; either
7 // version 2.1 of the License, or (at your option) any later version.
8 
9 // This library is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 // Lesser General Public License for more details.
13 
14 // You should have received a copy of the GNU Lesser General Public
15 // License along with this library; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 
18 
19 #ifndef LIBMESH_THREADS_H
20 #define LIBMESH_THREADS_H
21 
22 // Local includes
23 #include "libmesh/libmesh_config.h"
24 #include "libmesh/libmesh_common.h" // for libmesh_assert
25 
26 // Threading building blocks includes
27 #ifdef LIBMESH_HAVE_TBB_API
28 # include "libmesh/libmesh_logging.h" // only mess with the perflog if we are really multithreaded
29 # include "tbb/tbb_stddef.h"
30 # include "tbb/blocked_range.h"
31 # include "tbb/parallel_for.h"
32 # include "tbb/parallel_reduce.h"
33 # include "tbb/task_scheduler_init.h"
34 # include "tbb/partitioner.h"
35 # include "tbb/spin_mutex.h"
36 # include "tbb/recursive_mutex.h"
37 # include "tbb/atomic.h"
38 #endif
39 
40 // C++ includes
41 #ifdef LIBMESH_HAVE_STD_THREAD
42 # include <thread>
43 #elif LIBMESH_HAVE_TBB_CXX_THREAD
44 # include "tbb/tbb_thread.h"
45 #endif
46 
47 #ifdef LIBMESH_HAVE_PTHREAD
48 # include "libmesh/libmesh_logging.h" // only mess with the perflog if we are really multithreaded
49 # include <pthread.h>
50 # include <algorithm>
51 # include <vector>
52 
53 #ifdef __APPLE__
54 #include <libkern/OSAtomic.h>
55 #endif
56 
57 #endif
58 
59 // Thread-Local-Storage macros
60 
61 #ifdef LIBMESH_HAVE_STD_THREAD
62 # define LIBMESH_TLS_TYPE(type) thread_local type
63 # define LIBMESH_TLS_REF(value) (value)
64 #else
65 # ifdef LIBMESH_HAVE_TBB_API
66 # include "tbb/enumerable_thread_specific.h"
67 # define LIBMESH_TLS_TYPE(type) tbb::enumerable_thread_specific<type>
68 # define LIBMESH_TLS_REF(value) (value).local()
69 # else // Maybe support gcc __thread eventually?
70 # define LIBMESH_TLS_TYPE(type) type
71 # define LIBMESH_TLS_REF(value) (value)
72 # endif
73 #endif
74 
75 
76 
77 namespace libMesh
78 {
79 
80 
85 namespace Threads
86 {
92  extern bool in_threads;
93 
98  class BoolAcquire {
99  public:
100  explicit
101  BoolAcquire(bool& b) : _b(b) { libmesh_assert(!_b); _b = true; }
102 
103  ~BoolAcquire() { libmesh_assert(_b); _b = false; }
104  private:
105  bool& _b;
106  };
107 
108 
109 #ifdef LIBMESH_HAVE_STD_THREAD
110  //--------------------------------------------------------------------
114  typedef std::thread Thread;
115 
116 #elif LIBMESH_HAVE_TBB_CXX_THREAD
117  //--------------------------------------------------------------------
121  typedef tbb::tbb_thread Thread;
122 
123 #else
124  //--------------------------------------------------------------------
129  class Thread
130  {
131  public:
137  template <typename Callable>
138  Thread (Callable f) { f(); }
139 
143  void join() {}
144 
148  bool joinable() const { return true; }
149  };
150 
151 #endif
152 
153 
154 
155 #ifdef LIBMESH_HAVE_TBB_API
156  //-------------------------------------------------------------------
161 
162 
163 
164  //-------------------------------------------------------------------
169  typedef tbb::split split;
170 
171 
172 
173  //-------------------------------------------------------------------
178  template <typename Range, typename Body>
179  inline
180  void parallel_for (const Range &range, const Body &body)
181  {
183 
184 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
185  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
186 
187  if (libMesh::n_threads() > 1)
189 #endif
190 
191  if (libMesh::n_threads() > 1)
192  tbb::parallel_for (range, body, tbb::auto_partitioner());
193 
194  else
195  body(range);
196 
197 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
198  if (libMesh::n_threads() > 1 && logging_was_enabled)
200 #endif
201  }
202 
203 
204 
205  //-------------------------------------------------------------------
210  template <typename Range, typename Body, typename Partitioner>
211  inline
212  void parallel_for (const Range &range, const Body &body, const Partitioner &partitioner)
213  {
215 
216 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
217  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
218 
219  if (libMesh::n_threads() > 1)
221 #endif
222 
223  if (libMesh::n_threads() > 1)
224  tbb::parallel_for (range, body, partitioner);
225 
226  else
227  body(range);
228 
229 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
230  if (libMesh::n_threads() > 1 && logging_was_enabled)
232 #endif
233  }
234 
235 
236 
237  //-------------------------------------------------------------------
242  template <typename Range, typename Body>
243  inline
244  void parallel_reduce (const Range &range, Body &body)
245  {
247 
248 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
249  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
250 
251  if (libMesh::n_threads() > 1)
253 #endif
254 
255  if (libMesh::n_threads() > 1)
256  tbb::parallel_reduce (range, body, tbb::auto_partitioner());
257 
258  else
259  body(range);
260 
261 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
262  if (libMesh::n_threads() > 1 && logging_was_enabled)
264 #endif
265  }
266 
267 
268 
269  //-------------------------------------------------------------------
274  template <typename Range, typename Body, typename Partitioner>
275  inline
276  void parallel_reduce (const Range &range, Body &body, const Partitioner &partitioner)
277  {
279 
280 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
281  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
282 
283  if (libMesh::n_threads() > 1)
285 #endif
286 
287  if (libMesh::n_threads() > 1)
288  tbb::parallel_reduce (range, body, partitioner);
289 
290  else
291  body(range);
292 
293 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
294  if (libMesh::n_threads() > 1 && logging_was_enabled)
296 #endif
297  }
298 
299 
300 
301  //-------------------------------------------------------------------
307 
308  //-------------------------------------------------------------------
315 
316  //-------------------------------------------------------------------
322  template <typename T>
323  class atomic : public tbb::atomic<T> {};
324 
325 #else //LIBMESH_HAVE_TBB_API
326 #ifdef LIBMESH_HAVE_PTHREAD
327 
328  //-------------------------------------------------------------------
333 #ifdef __APPLE__
335  {
336  public:
337  spin_mutex() : slock(0) {} // The convention is that the lock being zero is _unlocked_
339 
340  void lock () { OSSpinLockLock(&slock); }
341  void unlock () { OSSpinLockUnlock(&slock); }
342 
344  {
345  public:
346  scoped_lock () : smutex(NULL) {}
347  explicit scoped_lock ( spin_mutex& in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
348 
350 
351  void acquire ( spin_mutex& in_smutex ) { smutex = &in_smutex; smutex->lock(); }
352  void release () { if(smutex) smutex->unlock(); smutex = NULL; }
353 
354  private:
356  };
357 
358  private:
359  OSSpinLock slock;
360  };
361 #else
362  class spin_mutex
363  {
364  public:
365  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
366  spin_mutex() { pthread_spin_init(&slock, PTHREAD_PROCESS_PRIVATE); }
367  ~spin_mutex() { pthread_spin_destroy(&slock); }
368 
369  void lock () { pthread_spin_lock(&slock); }
370  void unlock () { pthread_spin_unlock(&slock); }
371 
372  class scoped_lock
373  {
374  public:
375  scoped_lock () : smutex(NULL) {}
376  explicit scoped_lock ( spin_mutex& in_smutex ) : smutex(&in_smutex) { smutex->lock(); }
377 
379 
380  void acquire ( spin_mutex& in_smutex ) { smutex = &in_smutex; smutex->lock(); }
381  void release () { if(smutex) smutex->unlock(); smutex = NULL; }
382 
383  private:
384  spin_mutex * smutex;
385  };
386 
387  private:
388  pthread_spinlock_t slock;
389  };
390 #endif
391 
392  //-------------------------------------------------------------------
398  {
399  public:
400  // Might want to use PTHREAD_MUTEX_ADAPTIVE_NP on Linux, but it's not available on OSX.
402  {
403  pthread_mutexattr_init(&attr);
404  pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
405  pthread_mutex_init(&mutex, &attr);
406  }
407  ~recursive_mutex() { pthread_mutex_destroy(&mutex); }
408 
409  void lock () { pthread_mutex_lock(&mutex); }
410  void unlock () { pthread_mutex_unlock(&mutex); }
411 
413  {
414  public:
415  scoped_lock () : rmutex(NULL) {}
416  explicit scoped_lock ( recursive_mutex& in_rmutex ) : rmutex(&in_rmutex) { rmutex->lock(); }
417 
419 
420  void acquire ( recursive_mutex& in_rmutex ) { rmutex = &in_rmutex; rmutex->lock(); }
421  void release () { if(rmutex) rmutex->unlock(); rmutex = NULL; }
422 
423  private:
425  };
426 
427  private:
428  pthread_mutex_t mutex;
429  pthread_mutexattr_t attr;
430  };
431 
432  extern std::map<pthread_t, unsigned int> _pthread_unique_ids;
434 
439  unsigned int pthread_unique_id();
440 
441  template <typename Range>
442  unsigned int num_pthreads(Range & range)
443  {
444  unsigned int min = std::min((std::size_t)libMesh::n_threads(), range.size());
445  return min > 0 ? min : 1;
446  }
447 
448  template <typename Range, typename Body>
449  class RangeBody
450  {
451  public:
452  Range * range;
453  Body * body;
454  };
455 
456  template <typename Range, typename Body>
457  void * run_body(void * args)
458  {
459 
460  RangeBody<Range, Body> * range_body = (RangeBody<Range, Body>*)args;
461 
462  Body & body = *range_body->body;
463  Range & range = *range_body->range;
464 
465  body(range);
466 
467  return NULL;
468  }
469 
470  //-------------------------------------------------------------------
475  {
476  public:
477  static const int automatic = -1;
478  explicit task_scheduler_init (int = automatic) {}
479  void initialize (int = automatic) {}
480  void terminate () {}
481  };
482 
483  //-------------------------------------------------------------------
488  class split {};
489 
490 
491 
492 
493  //-------------------------------------------------------------------
498  template <typename Range, typename Body>
499  inline
500  void parallel_for (const Range &range, const Body &body)
501  {
503 
504 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
505  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
506 
507  if (libMesh::n_threads() > 1)
509 #endif
510 
511  unsigned int n_threads = num_pthreads(range);
512 
513  std::vector<Range *> ranges(n_threads);
514  std::vector<RangeBody<const Range, const Body> > range_bodies(n_threads);
515  std::vector<pthread_t> threads(n_threads);
516 
517  // Create the ranges for each thread
518  unsigned int range_size = range.size() / n_threads;
519 
520  typename Range::const_iterator current_beginning = range.begin();
521 
522  for(unsigned int i=0; i<n_threads; i++)
523  {
524  unsigned int this_range_size = range_size;
525 
526  if(i+1 == n_threads)
527  this_range_size += range.size() % n_threads; // Give the last one the remaining work to do
528 
529  ranges[i] = new Range(range, current_beginning, current_beginning + this_range_size);
530 
531  current_beginning = current_beginning + this_range_size;
532  }
533 
534  // Create the RangeBody arguments
535  for(unsigned int i=0; i<n_threads; i++)
536  {
537  range_bodies[i].range = ranges[i];
538  range_bodies[i].body = &body;
539  }
540 
541  // Create the threads
542  #pragma omp parallel for schedule (static)
543  for(unsigned int i=0; i<n_threads; i++)
544  {
545 #if LIBMESH_HAVE_OPENMP
546  run_body<Range, Body>((void*)&range_bodies[i]);
547 #else // Just use Pthreads
548  spin_mutex::scoped_lock lock(_pthread_unique_id_mutex);
549  pthread_create(&threads[i], NULL, &run_body<Range, Body>, (void*)&range_bodies[i]);
550  _pthread_unique_ids[threads[i]] = i;
551 #endif
552  }
553 
554 #if !LIBMESH_HAVE_OPENMP
555  // Wait for them to finish
556  for(unsigned int i=0; i<n_threads; i++)
557  {
558  pthread_join(threads[i], NULL);
559  spin_mutex::scoped_lock lock(_pthread_unique_id_mutex);
560  _pthread_unique_ids.erase(threads[i]);
561  }
562 #endif
563 
564  // Clean up
565  for(unsigned int i=0; i<n_threads; i++)
566  delete ranges[i];
567 
568 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
569  if (libMesh::n_threads() > 1 && logging_was_enabled)
571 #endif
572  }
573 
574  //-------------------------------------------------------------------
579  template <typename Range, typename Body, typename Partitioner>
580  inline
581  void parallel_for (const Range &range, const Body &body, const Partitioner &)
582  {
583  parallel_for(range, body);
584  }
585 
586  //-------------------------------------------------------------------
591  template <typename Range, typename Body>
592  inline
593  void parallel_reduce (const Range &range, Body &body)
594  {
595  Threads::BoolAcquire b(Threads::in_threads);
596 
597 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
598  const bool logging_was_enabled = libMesh::perflog.logging_enabled();
599 
600  if (libMesh::n_threads() > 1)
602 #endif
603 
604  unsigned int n_threads = num_pthreads(range);
605 
606  std::vector<Range *> ranges(n_threads);
607  std::vector<Body *> bodies(n_threads);
608  std::vector<RangeBody<Range, Body> > range_bodies(n_threads);
609 
610  // Create copies of the body for each thread
611  bodies[0] = &body; // Use the original body for the first one
612  for(unsigned int i=1; i<n_threads; i++)
613  bodies[i] = new Body(body, Threads::split());
614 
615  // Create the ranges for each thread
616  unsigned int range_size = range.size() / n_threads;
617 
618  typename Range::const_iterator current_beginning = range.begin();
619 
620  for(unsigned int i=0; i<n_threads; i++)
621  {
622  unsigned int this_range_size = range_size;
623 
624  if(i+1 == n_threads)
625  this_range_size += range.size() % n_threads; // Give the last one the remaining work to do
626 
627  ranges[i] = new Range(range, current_beginning, current_beginning + this_range_size);
628 
629  current_beginning = current_beginning + this_range_size;
630  }
631 
632  // Create the RangeBody arguments
633  for(unsigned int i=0; i<n_threads; i++)
634  {
635  range_bodies[i].range = ranges[i];
636  range_bodies[i].body = bodies[i];
637  }
638 
639  // Create the threads
640  std::vector<pthread_t> threads(n_threads);
641 
642  #pragma omp parallel for schedule (static)
643  for(unsigned int i=0; i<n_threads; i++)
644  {
645 #if LIBMESH_HAVE_OPENMP
646  run_body<Range, Body>((void*)&range_bodies[i]);
647 #else // Just use Pthreads
648  spin_mutex::scoped_lock lock(_pthread_unique_id_mutex);
649  pthread_create(&threads[i], NULL, &run_body<Range, Body>, (void*)&range_bodies[i]);
650  _pthread_unique_ids[threads[i]] = i;
651 #endif
652  }
653 
654 #if !LIBMESH_HAVE_OPENMP
655  // Wait for them to finish
656  for(unsigned int i=0; i<n_threads; i++)
657  {
658  pthread_join(threads[i], NULL);
659  spin_mutex::scoped_lock lock(_pthread_unique_id_mutex);
660  _pthread_unique_ids.erase(threads[i]);
661  }
662 #endif
663 
664  // Join them all down to the original Body
665  for(unsigned int i=n_threads-1; i != 0; i--)
666  bodies[i-1]->join(*bodies[i]);
667 
668  // Clean up
669  for(unsigned int i=1; i<n_threads; i++)
670  delete bodies[i];
671  for(unsigned int i=0; i<n_threads; i++)
672  delete ranges[i];
673 
674 #ifdef LIBMESH_ENABLE_PERFORMANCE_LOGGING
675  if (libMesh::n_threads() > 1 && logging_was_enabled)
677 #endif
678  }
679 
680  //-------------------------------------------------------------------
685  template <typename Range, typename Body, typename Partitioner>
686  inline
687  void parallel_reduce (const Range &range, Body &body, const Partitioner &)
688  {
689  parallel_reduce(range, body);
690  }
691 
692 
693  //-------------------------------------------------------------------
698  template <typename T>
699  class atomic
700  {
701  public:
702  atomic () : val(0) {}
703  operator T () { return val; }
704 
705  T operator=( T value )
706  {
707  spin_mutex::scoped_lock lock(smutex);
708  val = value;
709  return val;
710  }
711 
712  atomic<T>& operator=( const atomic<T>& value )
713  {
714  spin_mutex::scoped_lock lock(smutex);
715  val = value;
716  return *this;
717  }
718 
719 
720  T operator+=(T value)
721  {
722  spin_mutex::scoped_lock lock(smutex);
723  val += value;
724  return val;
725  }
726 
727  T operator-=(T value)
728  {
729  spin_mutex::scoped_lock lock(smutex);
730  val -= value;
731  return val;
732  }
733 
735  {
736  spin_mutex::scoped_lock lock(smutex);
737  val++;
738  return val;
739  }
740 
741  T operator++(int)
742  {
743  spin_mutex::scoped_lock lock(smutex);
744  val++;
745  return val;
746  }
747 
749  {
750  spin_mutex::scoped_lock lock(smutex);
751  val--;
752  return val;
753  }
754 
755  T operator--(int)
756  {
757  spin_mutex::scoped_lock lock(smutex);
758  val--;
759  return val;
760  }
761 
762  private:
763  T val;
765  };
766 
767 #else //LIBMESH_HAVE_PTHREAD
768 
769  //-------------------------------------------------------------------
773  class task_scheduler_init
774  {
775  public:
776  static const int automatic = -1;
777  explicit task_scheduler_init (int = automatic) {}
778  void initialize (int = automatic) {}
779  void terminate () {}
780  };
781 
782  //-------------------------------------------------------------------
787  class split {};
788 
789  //-------------------------------------------------------------------
794  template <typename Range, typename Body>
795  inline
796  void parallel_for (const Range &range, const Body &body)
797  {
798  BoolAcquire b(in_threads);
799  body(range);
800  }
801 
802  //-------------------------------------------------------------------
807  template <typename Range, typename Body, typename Partitioner>
808  inline
809  void parallel_for (const Range &range, const Body &body, const Partitioner &)
810  {
811  BoolAcquire b(in_threads);
812  body(range);
813  }
814 
815  //-------------------------------------------------------------------
820  template <typename Range, typename Body>
821  inline
822  void parallel_reduce (const Range &range, Body &body)
823  {
824  BoolAcquire b(in_threads);
825  body(range);
826  }
827 
828  //-------------------------------------------------------------------
833  template <typename Range, typename Body, typename Partitioner>
834  inline
835  void parallel_reduce (const Range &range, Body &body, const Partitioner &)
836  {
837  BoolAcquire b(in_threads);
838  body(range);
839  }
840 
841  //-------------------------------------------------------------------
846  class spin_mutex
847  {
848  public:
850  void lock () {}
851  void unlock () {}
852 
853  class scoped_lock
854  {
855  public:
857  explicit scoped_lock ( spin_mutex& ) {}
858  void acquire ( spin_mutex& ) {}
859  void release () {}
860  };
861  };
862 
863  //-------------------------------------------------------------------
868  class recursive_mutex
869  {
870  public:
872 
873  class scoped_lock
874  {
875  public:
877  explicit scoped_lock ( recursive_mutex& ) {}
879  void release () {}
880  };
881  };
882 
883  //-------------------------------------------------------------------
888  template <typename T>
889  class atomic
890  {
891  public:
892  atomic () : _val(0) {}
893  operator T& () { return _val; }
894  private:
895  T _val;
896  };
897 
898 #endif // LIBMESH_HAVE_PTHREAD
899 #endif // #ifdef LIBMESH_HAVE_TBB_API
900 
901 
902 
906  template <typename T>
908  {
909  public:
913  typedef T const_iterator;
914 
920  explicit BlockedRange (const unsigned int new_grainsize = 1000) :
921  _grainsize(new_grainsize)
922  {}
923 
931  const const_iterator last,
932  const unsigned int new_grainsize = 1000) :
933  _grainsize(new_grainsize)
934  {
935  this->reset(first, last);
936  }
937 
952  _end(r._end),
953  _begin(r._begin),
955  {}
956 
963  _end(r._end),
964  _begin(r._begin),
966  {
968  beginning = r._begin,
969  ending = r._end,
970  middle = beginning + (ending - beginning)/2u;
971 
972  r._end = _begin = middle;
973  }
974 
978  void reset (const const_iterator first,
979  const const_iterator last)
980  {
981  _begin = first;
982  _end = last;
983  }
984 
988  const_iterator begin () const { return _begin; }
989 
993  const_iterator end () const { return _end; }
994 
999  unsigned int grainsize () const {return _grainsize;}
1000 
1004  void grainsize (const unsigned int &gs) {_grainsize = gs;}
1005 
1009  int size () const { return (_end -_begin); }
1010 
1011  //------------------------------------------------------------------------
1012  // Methods that implement Range concept
1013  //------------------------------------------------------------------------
1014 
1018  bool empty() const { return (_begin == _end); }
1019 
1023  bool is_divisible() const { return ((_begin + this->grainsize()) < _end); }
1024 
1025  private:
1026 
1029  unsigned int _grainsize;
1030  };
1031 
1032 
1033 
1037  extern spin_mutex spin_mtx;
1038 
1043 
1044 } // namespace Threads
1045 
1046 } // namespace libMesh
1047 
1048 #endif // LIBMESH_THREADS_H

Site Created By: libMesh Developers
Last modified: February 07 2014 16:57:07 UTC

Hosted By:
SourceForge.net Logo