Pol  Revision:4b29d2b
message_queue.h
Go to the documentation of this file.
1 /*
2 ATTENTION:
3 This header is part of the PCH
4 Remove the include in all StdAfx.h files or live with the consequences :)
5 */
6 
7 #ifndef MESSAGE_QUEUE_H
8 #define MESSAGE_QUEUE_H
9 #include <chrono>
10 #include <condition_variable>
11 #include <list>
12 #include <mutex>
13 
14 #include <boost/noncopyable.hpp>
15 
16 namespace Pol
17 {
18 namespace Clib
19 {
20 template <typename Message>
21 class message_queue : boost::noncopyable
22 {
23 public:
24  message_queue();
26 
27  // push new message into queue and notify possible wait_pop
28  void push( Message const& msg );
29  void push( std::list<Message>& msg_list );
30  // push new message into queue and notify possible wait_pop
31  // will move the msg into the queue, thus the reference is likely to be
32  // invalid afterwards
33  void push_move( Message&& msg );
34 
35  // check if empty (a bit senseless)
36  bool empty() const;
37  // return current size (unsafe aka senseless)
38  std::size_t size() const;
39 
40  // tries to get a message true on success false otherwise
41  bool try_pop( Message* msg );
42  // waits till queue is non empty
43  void pop_wait( Message* msg );
44  // waits till queue is non empty and fill list
45  void pop_wait( std::list<Message>* msgs );
46  // empties the queue (unsafe)
47  void pop_remaining( std::list<Message>* msgs );
48 
49  void cancel();
50  struct Canceled
51  {
52  };
53 
54 private:
55  std::list<Message> _queue;
56  mutable std::mutex _mutex;
57  std::condition_variable _notifier;
58  bool _cancel;
59 };
60 
61 template <typename Message>
63 {
64 }
65 
66 template <typename Message>
68 {
69  cancel();
70 }
71 
72 template <typename Message>
73 void message_queue<Message>::push( Message const& msg )
74 {
75  std::list<Message> tmp;
76  tmp.push_back( msg ); // costly pushback outside the lock
77  bool signal = false;
78  {
79  std::lock_guard<std::mutex> lock( _mutex );
80  signal = _queue.empty();
81  _queue.splice( _queue.end(), tmp ); // fast splice inside
82  }
83  if ( signal )
84  _notifier.notify_one();
85 }
86 
87 template <typename Message>
88 void message_queue<Message>::push_move( Message&& msg )
89 {
90  std::list<Message> tmp;
91  tmp.emplace_back( std::move( msg ) ); // costly pushback outside the lock
92  bool signal = false;
93  {
94  std::lock_guard<std::mutex> lock( _mutex );
95  signal = _queue.empty();
96  _queue.splice( _queue.end(), tmp ); // fast splice inside
97  }
98  if ( signal )
99  _notifier.notify_one();
100 }
101 
102 template <typename Message>
103 void message_queue<Message>::push( std::list<Message>& msg_list )
104 {
105  bool signal = false;
106  {
107  std::lock_guard<std::mutex> lock( _mutex );
108  signal = _queue.empty();
109  _queue.splice( _queue.end(), msg_list ); // fast splice inside
110  }
111  if ( signal )
112  _notifier.notify_one();
113 }
114 
115 template <typename Message>
117 {
118  std::lock_guard<std::mutex> lock( _mutex );
119  return _queue.empty();
120 }
121 
122 template <typename Message>
123 std::size_t message_queue<Message>::size() const
124 {
125  std::lock_guard<std::mutex> lock( _mutex );
126  return _queue.size();
127 }
128 
130 template <typename Message>
132 {
133  std::lock_guard<std::mutex> lock( _mutex );
134  if ( _queue.empty() )
135  return false;
136  *msg = std::move( _queue.front() );
137  _queue.pop_front();
138  return true;
139 }
140 
141 template <typename Message>
143 {
144  std::unique_lock<std::mutex> lock( _mutex );
145  while ( _queue.empty() && !_cancel )
146  _notifier.wait( lock ); // will unlock mutex during wait
147  if ( _cancel )
148  throw Canceled();
149  *msg = std::move( _queue.front() );
150  _queue.pop_front();
151 }
152 
153 template <typename Message>
154 void message_queue<Message>::pop_wait( std::list<Message>* msgs )
155 {
156  std::unique_lock<std::mutex> lock( _mutex );
157  while ( _queue.empty() && !_cancel )
158  _notifier.wait( lock ); // will unlock mutex during wait
159  if ( _cancel )
160  throw Canceled();
161  msgs->splice( msgs->end(), _queue );
162 }
163 
164 template <typename Message>
165 void message_queue<Message>::pop_remaining( std::list<Message>* msgs )
166 {
167  std::unique_lock<std::mutex> lock( _mutex );
168  msgs->splice( msgs->end(), _queue );
169 }
170 template <typename Message>
172 {
173  {
174  std::lock_guard<std::mutex> lock( _mutex );
175  _cancel = true;
176  }
177  _notifier.notify_all();
178 }
179 }
180 }
181 
182 #endif
bool try_pop(Message *msg)
tries to get a message true on success false otherwise
void pop_remaining(std::list< Message > *msgs)
void pop_wait(Message *msg)
std::condition_variable _notifier
Definition: message_queue.h:57
void push_move(Message &&msg)
Definition: message_queue.h:88
void push(Message const &msg)
Definition: message_queue.h:73
std::size_t size() const
std::list< Message > _queue
Definition: message_queue.h:55
Definition: berror.cpp:12