Pol  Revision:cb584c9
threadhelp.cpp
Go to the documentation of this file.
1 
14 #include "threadhelp.h"
15 
16 #include <cstring>
17 #include <exception>
18 #include <thread>
19 
20 #include "esignal.h"
21 #include "logfacility.h"
22 #include "passert.h"
23 
24 #ifndef _WIN32
25 #include <errno.h>
26 #include <pthread.h>
27 #include <unistd.h>
28 #endif
29 
30 // TODO: fix trunc cast warnings
31 #ifdef _MSC_VER
32 #pragma warning( disable : 4311 ) // trunc cast
33 #pragma warning( disable : 4302 ) // trunc cast
34 #endif
35 
36 namespace Pol
37 {
38 namespace threadhelp
39 {
41 std::atomic<unsigned int> child_threads( 0 );
42 static int threads = 0;
43 
44 #ifdef _WIN32
45 void init_threadhelp() {}
46 
47 void thread_sleep_ms( unsigned millis )
48 {
49  Sleep( millis );
50 }
51 size_t thread_pid()
52 {
53  return GetCurrentThreadId();
54 }
55 
56 const DWORD MS_VC_EXCEPTION = 0x406D1388;
57 
58 #pragma pack( push, 8 )
59 typedef struct tagTHREADNAME_INFO
60 {
61  DWORD dwType; // Must be 0x1000.
62  LPCSTR szName; // Pointer to name (in user addr space).
63  DWORD dwThreadID; // Thread ID (-1=caller thread).
64  DWORD dwFlags; // Reserved for future use, must be zero.
65 } THREADNAME_INFO;
66 #pragma pack( pop )
67 
68 void _SetThreadName( DWORD dwThreadID, const char* name )
69 {
70  THREADNAME_INFO info;
71  info.dwType = 0x1000;
72  info.szName = name;
73  info.dwThreadID = dwThreadID;
74  info.dwFlags = 0;
75 
76  __try
77  { // oh my god i hate ms ...
78  RaiseException( MS_VC_EXCEPTION, 0, sizeof( info ) / sizeof( ULONG_PTR ), (ULONG_PTR*)&info );
79  }
80  __except ( EXCEPTION_EXECUTE_HANDLER )
81  {
82  }
83 }
84 void SetThreadName( int threadid, std::string threadName )
85 {
86  // This redirection is needed because std::string has a destructor
87  // which isn't compatible with __try
88  _SetThreadName( threadid, threadName.c_str() );
89 }
90 #else
91 static pthread_attr_t create_detached_attr;
93 
95 {
96  int res;
97  res = pthread_attr_init( &create_detached_attr );
98  passert_always( res == 0 );
99  res = pthread_attr_setdetachstate( &create_detached_attr, PTHREAD_CREATE_DETACHED );
100  passert_always( res == 0 );
101 }
102 
103 void thread_sleep_ms( unsigned millis )
104 {
105  usleep( millis * 1000L );
106 }
107 size_t thread_pid()
108 {
109  return pthread_self();
110 }
111 #endif
112 
113 void run_thread( void ( *threadf )( void ) )
114 {
115  // thread creator calls inc_child_thread_count before starting thread
116  try
117  {
118  ( *threadf )();
119  }
120  catch ( std::exception& ex )
121  {
122  ERROR_PRINT << "Thread exception: " << ex.what() << "\n";
123  }
124 
125  --child_threads;
126 
127  threadmap.Unregister( thread_pid() );
128 }
129 void run_thread( void ( *threadf )( void* ), void* arg )
130 {
131  // thread creator calls inc_child_thread_count before starting thread
132  try
133  {
134  ( *threadf )( arg );
135  }
136  catch ( std::exception& ex )
137  {
138  ERROR_PRINT << "Thread exception: " << ex.what() << "\n";
139  }
140 
141  --child_threads;
142 
143  threadmap.Unregister( thread_pid() );
144 }
145 
147 {
148 public:
149  std::string name;
150  void ( *entry )( void* );
151  void ( *entry_noparam )( void );
152  void* arg;
153 };
154 
155 #ifdef _WIN32
156 unsigned __stdcall thread_stub2( void* v_td )
157 #else
158 void* thread_stub2( void* v_td )
159 #endif
160 {
161  ThreadData* td = reinterpret_cast<ThreadData*>( v_td );
162 
163  void ( *entry )( void* ) = td->entry;
164  void ( *entry_noparam )( void ) = td->entry_noparam;
165  void* arg = td->arg;
166 
167  threadmap.Register( thread_pid(), td->name );
168 
169  delete td;
170  td = NULL;
171 
172  if ( entry != NULL )
173  run_thread( entry, arg );
174  else
175  run_thread( entry_noparam );
176 
177 #ifdef _WIN32
178  _endthreadex( 0 );
179 #else
180  pthread_exit( NULL );
181 #endif
182  return 0;
183 }
184 
185 #ifdef _WIN32
186 void create_thread( ThreadData* td, bool dec_child = false )
187 {
188  // If the thread starts successfully, td will be deleted by thread_stub2.
189  // So we must save the threadName for later.
190  std::string threadName = td->name;
191 
192  unsigned threadid = 0;
193  HANDLE h = (HANDLE)_beginthreadex( NULL, 0, thread_stub2, td, 0, &threadid );
194  if ( h == 0 ) // added for better debugging
195  {
196  POLLOG.Format(
197  "error in create_thread: {:d} {:d} \"{:s}\" \"{:s}\" {:d} {:d} {:s} {:d} {:d} {:d}\n" )
198  << errno << _doserrno << strerror( errno ) << strerror( _doserrno ) << threads++
199  << (unsigned)thread_stub2 << td->name.c_str() << (unsigned)td->entry
200  << (unsigned)td->entry_noparam << td->arg;
201 
202  // dec_child says that we should dec_child_threads when there's an error... :)
203  if ( dec_child )
204  --child_threads;
205  }
206  else
207  {
208  SetThreadName( threadid, threadName );
209  CloseHandle( h );
210  }
211 }
212 #else
213 void create_thread( ThreadData* td, bool dec_child = false )
214 {
215  Clib::SpinLockGuard guard( pthread_attr_lock );
216  pthread_t thread;
217  int result = pthread_create( &thread, &create_detached_attr, thread_stub2, td );
218  if ( result != 0 ) // added for better debugging
219  {
220  POLLOG.Format( "error in create_thread: {:d} {:d} \"{:s}\" {:d} {:} {:s} {:} {:} {:d}\n" )
221  << result << errno << strerror( errno ) << threads++
222  << reinterpret_cast<const void*>( thread_stub2 ) << td->name.c_str()
223  << reinterpret_cast<const void*>( td->entry )
224  << reinterpret_cast<const void*>( td->entry_noparam ) << td->arg;
225 
226  // dec_child says that we should dec_child_threads when there's an error... :)
227  if ( dec_child )
228  --child_threads;
229  }
230 }
231 #endif
232 
233 void start_thread( void ( *entry )( void* ), const char* thread_name, void* arg )
234 {
235  auto td = new ThreadData;
236  td->name = thread_name;
237  td->entry = entry;
238  td->entry_noparam = NULL;
239  td->arg = arg;
240 
241  ++child_threads;
242 
243  create_thread( td, true );
244 }
245 
246 void start_thread( void ( *entry )( void ), const char* thread_name )
247 {
248  auto td = new ThreadData;
249  td->name = thread_name;
250  td->entry = NULL;
251  td->entry_noparam = entry;
252  td->arg = NULL;
253 
254  ++child_threads;
255 
256  create_thread( td, true );
257 }
258 
260  : _spinlock(),
261  _contents()
262 #ifdef _WIN32
263  ,
264  _handles()
265 #endif
266 {
267 }
268 
269 #ifdef _WIN32
270 HANDLE ThreadMap::getThreadHandle( size_t pid ) const
271 {
273  auto itr = _handles.find( pid );
274  if ( itr == _handles.end() )
275  {
276  return 0;
277  }
278  return itr->second;
279 }
280 #endif
281 void ThreadMap::Register( size_t pid, const std::string& name )
282 {
284  _contents.insert( std::make_pair( pid, name ) );
285 #ifdef _WIN32
286  HANDLE hThread = 0;
287  if ( !DuplicateHandle( GetCurrentProcess(), GetCurrentThread(), GetCurrentProcess(), &hThread, 0,
288  FALSE, DUPLICATE_SAME_ACCESS ) )
289  {
290  ERROR_PRINT << "failed to duplicate thread handle\n";
291  return;
292  }
293  _handles.insert( std::make_pair( pid, hThread ) );
294 #endif
295 }
296 void ThreadMap::Unregister( size_t pid )
297 {
299  _contents.erase( pid );
300 #ifdef _WIN32
301  auto itr = _handles.find( pid );
302  if ( itr != _handles.end() )
303  CloseHandle( itr->second );
304  _handles.erase( pid );
305 #endif
306 }
308 {
310  out = _contents;
311 }
312 
313 ThreadRegister::ThreadRegister( const std::string& name )
314 {
315  threadmap.Register( thread_pid(), name );
316 }
318 {
319  threadmap.Unregister( thread_pid() );
320 }
321 
322 
329 TaskThreadPool::TaskThreadPool() : _done( false ), _msg_queue() {}
330 
331 TaskThreadPool::TaskThreadPool( const std::string& name ) : _done( false ), _msg_queue()
332 {
333  // get the count of processors
334  unsigned int max_count = std::thread::hardware_concurrency();
335  if ( !max_count ) // can fail so at least one
336  max_count = 1;
337  init( max_count, name );
338 }
339 
340 TaskThreadPool::TaskThreadPool( unsigned int max_count, const std::string& name )
341  : _done( false ), _msg_queue()
342 {
343  init( max_count, name );
344 }
345 
346 void TaskThreadPool::init( unsigned int max_count, const std::string& name )
347 {
348  for ( unsigned int i = 0; i < max_count; ++i )
349  {
350  _threads.emplace_back( [=]() {
351  ThreadRegister register_thread( "TaskPool " + name );
352  auto f = msg();
353  try
354  {
355  while ( !_done )
356  {
357  _msg_queue.pop_wait( &f );
358  f();
359  }
360  }
361  catch ( msg_queue::Canceled& )
362  {
363  }
364  catch ( std::exception& ex )
365  {
366  ERROR_PRINT << "Thread exception: " << ex.what() << "\n";
367  Clib::force_backtrace( true );
368  return;
369  }
370  // purge the queue empty
371  std::list<msg> remaining;
372  _msg_queue.pop_remaining( &remaining );
373  for ( auto& _f : remaining )
374  _f();
375  } );
376  }
377 }
378 
379 void TaskThreadPool::init_pool( unsigned int max_count, const std::string& name )
380 {
381  if ( !_threads.empty() )
382  return;
383  init( max_count, name );
384 }
385 
387 {
388  if ( _threads.empty() )
389  return;
390  // send both done and cancel to wake up all workers
391  _msg_queue.push( [&]() {
392  _done = true;
393  _msg_queue.cancel();
394  } );
395  for ( auto& thread : _threads )
396  thread.join();
397  _threads.clear();
398 }
400 {
401  deinit_pool();
402 }
403 
406 {
407  _msg_queue.push( msg );
408 }
409 
411 std::future<bool> TaskThreadPool::checked_push( const msg& msg )
412 {
413  auto promise = std::make_shared<std::promise<bool>>();
414  auto ret = promise->get_future();
415  _msg_queue.push( [=]() {
416  try
417  {
418  msg();
419  promise->set_value( true );
420  }
421  catch ( ... )
422  {
423  promise->set_exception( std::current_exception() );
424  }
425  } );
426  return ret;
427 }
428 
429 size_t TaskThreadPool::size() const
430 {
431  return _threads.size();
432 }
433 
434 
435 class DynTaskThreadPool::PoolWorker : boost::noncopyable
436 {
437 public:
438  PoolWorker( DynTaskThreadPool* parent, const std::string& name );
439  bool isbusy() const;
440  void join();
441  void run();
442 
443 private:
444  std::string _name;
445  bool _busy;
446  std::thread _thread;
448  struct BusyGuard
449  {
450  bool* _busy;
451  BusyGuard( bool* busy ) : _busy( busy ) { ( *_busy ) = true; }
452  ~BusyGuard() { ( *_busy ) = false; }
453  };
454 };
456  : _name( name ), _busy( false ), _thread(), _parent( parent )
457 {
458  run();
459 }
461 {
462  return _busy;
463 }
464 
466 {
467  _thread.join();
468 }
469 
471 {
472  _thread = std::thread( [&]() {
473  ThreadRegister register_thread( _name );
474  auto f = msg();
475  try
476  {
477  while ( !_parent->_done && !Clib::exit_signalled )
478  {
479  _parent->_msg_queue.pop_wait( &f );
480  {
481  BusyGuard busy( &_busy );
482  f();
483  }
484  }
485  }
486  catch ( msg_queue::Canceled& )
487  {
488  }
489  catch ( std::exception& ex )
490  {
491  ERROR_PRINT << "Thread exception: " << ex.what() << "\n";
492  Clib::force_backtrace( true );
493  return;
494  }
495  } );
496 }
497 
505 DynTaskThreadPool::DynTaskThreadPool( const std::string& name )
506  : _done( false ), _msg_queue(), _pool_mutex(), _name( "DynTaskPool" + name )
507 {
508 }
509 
511 {
512  std::lock_guard<std::mutex> guard( _pool_mutex );
513  return _threads.size();
514 }
515 
517 {
518  for ( const auto& worker : _threads )
519  {
520  if ( !worker->isbusy() ) // check for a idle instance
521  {
522  return;
523  }
524  }
525  std::lock_guard<std::mutex> guard( _pool_mutex );
526  size_t thread_num = _threads.size();
527  _threads.emplace_back( new PoolWorker( this, _name + " " + fmt::FormatInt( thread_num ).str() ) );
528  ERROR_PRINT << "create pool worker " << _name << " " << thread_num << "\n";
529 }
530 
532 {
533  // send both done and cancel to wake up all workers
534  _msg_queue.push( [&]() {
535  _done = true;
536  _msg_queue.cancel();
537  } );
538  for ( auto& thread : _threads )
539  thread->join();
540 }
541 
544 {
545  create_thread();
546  _msg_queue.push( msg );
547 }
548 
550 std::future<bool> DynTaskThreadPool::checked_push( const msg& msg )
551 {
552  auto promise = std::make_shared<std::promise<bool>>();
553  auto ret = promise->get_future();
554  create_thread();
555  _msg_queue.push( [=]() {
556  try
557  {
558  msg();
559  promise->set_value( true );
560  }
561  catch ( ... )
562  {
563  promise->set_exception( std::current_exception() );
564  }
565  } );
566  return ret;
567 }
568 }
569 }
std::vector< std::unique_ptr< PoolWorker > > _threads
Definition: threadhelp.h:119
void init(unsigned int max_count, const std::string &name)
Definition: threadhelp.cpp:346
PoolWorker(DynTaskThreadPool *parent, const std::string &name)
Definition: threadhelp.cpp:455
static int threads
Definition: threadhelp.cpp:42
void pop_remaining(std::list< Message > *msgs)
void create_thread(ThreadData *td, bool dec_child=false)
Definition: threadhelp.cpp:213
size_t thread_pid()
Definition: threadhelp.cpp:107
typedef DWORD(WINAPI *__SymGetOptions)(VOID)
std::future< bool > checked_push(const msg &msg)
returns a future which will be set once the msg is processed
Definition: threadhelp.cpp:411
void start_thread(void(*entry)(void *), const char *thread_name, void *arg)
Definition: threadhelp.cpp:233
void force_backtrace(bool complete)
Definition: passert.cpp:53
u64 pid
Definition: osmod.cpp:945
void init_pool(unsigned int max_count, const std::string &name)
Definition: threadhelp.cpp:379
std::function< void()> msg
Definition: threadhelp.h:103
void pop_wait(Message *msg)
void init_threadhelp()
Definition: threadhelp.cpp:94
void Unregister(size_t pid)
Definition: threadhelp.cpp:296
void CopyContents(Contents &out) const
Definition: threadhelp.cpp:307
std::future< bool > checked_push(const msg &msg)
returns a future which will be set once the msg is processed
Definition: threadhelp.cpp:550
void push(const msg &msg)
simply fire and forget only the deconstructor ensures the msg to be finished
Definition: threadhelp.cpp:543
static pthread_attr_t create_detached_attr
Definition: threadhelp.cpp:91
#define POLLOG
Definition: logfacility.h:219
TaskThreadPool()
Creates a threadpool of workers. blocks on deconstruction eg: TaskThreadPool workers; for (...
Definition: threadhelp.cpp:329
ThreadRegister(const std::string &name)
Definition: threadhelp.cpp:313
void run_thread(void(*threadf)(void))
Definition: threadhelp.cpp:113
void push(Message const &msg)
Definition: message_queue.h:73
std::atomic< unsigned int > child_threads
std::map< size_t, std::string > Contents
Definition: threadhelp.h:43
static Clib::SpinLock pthread_attr_lock
Definition: threadhelp.cpp:92
std::string name
Definition: osmod.cpp:943
Clib::SpinLock _spinlock
Definition: threadhelp.h:55
#define ERROR_PRINT
Definition: logfacility.h:230
void push(const msg &msg)
simply fire and forget only the deconstructor ensures the msg to be finished
Definition: threadhelp.cpp:405
void thread_sleep_ms(unsigned millis)
Definition: threadhelp.cpp:103
DynTaskThreadPool(const std::string &name)
Creates a dynamic threadpool of workers. if no idle worker is found creates a new worker thread block...
Definition: threadhelp.cpp:505
bool run(int argc, char **argv)
#define passert_always(exp)
Definition: passert.h:80
void * thread_stub2(void *v_td)
Definition: threadhelp.cpp:158
std::lock_guard< SpinLock > SpinLockGuard
Definition: spinlock.h:33
std::atomic< bool > _done
Definition: threadhelp.h:93
Definition: berror.cpp:12
std::atomic< bool > exit_signalled
ThreadMap threadmap
Definition: threadhelp.cpp:40
std::function< void()> msg
Definition: threadhelp.h:76
void Register(size_t pid, const std::string &name)
Definition: threadhelp.cpp:281
std::vector< std::thread > _threads
Definition: threadhelp.h:95