libzypp  17.37.5
asyncqueue.h
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 #ifndef ZYPP_NG_THREAD_ASYNCQUEUE_H_INCLUDED
10 #define ZYPP_NG_THREAD_ASYNCQUEUE_H_INCLUDED
11 
12 #include <zypp-core/zyppng/base/AbstractEventSource>
13 
14 #include <queue>
15 #include <set>
16 #include <mutex>
17 #include <memory>
18 #include <optional>
19 #include <condition_variable>
20 
21 namespace zyppng {
22 
23  class AsyncQueueWatch;
24 
26  public:
27  virtual ~AsyncQueueBase();
28 
29  void addWatch ( AsyncQueueWatch &watch );
30  void removeWatch ( AsyncQueueWatch &watch );
31  void notifyWatches ( );
32 
33  private:
34  std::set<AsyncQueueWatch *> _watches;
35  std::recursive_mutex _watchLock;
36  };
37 
42  template< class Message >
43  class AsyncQueue : public AsyncQueueBase {
44 
45  public:
46 
47  using Ptr = std::shared_ptr<AsyncQueue>;
48 
49  static Ptr create () {
50  return Ptr( new AsyncQueue() );
51  }
52 
53  AsyncQueue(const AsyncQueue&) = delete;
54  AsyncQueue& operator=(const AsyncQueue&) = delete;
55 
63  template< typename T = Message >
64  void pushUnlocked ( T &&value ) {
65  _messages.push( std::forward<T>(value) );
66  }
67 
72  template< typename T = Message >
73  void push ( T &&value ) {
74  {
75  std::lock_guard lk( _mut );
76  pushUnlocked( std::forward<T>(value) );
77  }
78  notify();
79  }
80 
84  Message pop () {
85  std::unique_lock<std::mutex> lk( _mut );
86  _cv.wait( lk, [this](){ return _messages.size() > 0; } );
87  Message msg = std::move( _messages.front() );
88  _messages.pop();
89  return msg;
90  }
91 
96  std::optional<Message> tryPop () {
97  std::lock_guard lk( _mut );
98  return tryPopUnlocked();
99  }
100 
105  std::optional<Message> tryPopUnlocked () {
106  if ( _messages.size() ) {
107  Message msg = std::move( _messages.front() );
108  _messages.pop();
109  return msg;
110  }
111  return {};
112  }
113 
118  void lock () {
119  _mut.lock();
120  }
121 
126  void unlock () {
127  _mut.unlock();
128  }
129 
134  void notify () {
135  _cv.notify_all();
136  notifyWatches();
137  }
138 
139  private:
140  AsyncQueue() = default;
141  std::queue<Message> _messages;
142  std::mutex _mut;
143  std::condition_variable _cv;
144  };
145 
148  {
150  public:
151 
152  static std::shared_ptr<AsyncQueueWatch> create ( std::shared_ptr<AsyncQueueBase> queue );
153  ~AsyncQueueWatch() override;
154 
155  void postNotifyEvent ();
156 
157  SignalProxy<void()> sigMessageAvailable();
158 
159  // AbstractEventSource interface
160  void onFdReady(int fd, int events) override;
161  void onSignal(int signal) override;
162 
163  protected:
164  AsyncQueueWatch( std::shared_ptr<AsyncQueueBase> &&queue );
166  };
167 
168 }
169 
170 
171 #endif
static Ptr create()
Definition: asyncqueue.h:49
AsyncQueue & operator=(const AsyncQueue &)=delete
void removeWatch(AsyncQueueWatch &watch)
Definition: asyncqueue.cc:18
std::shared_ptr< AsyncQueue > Ptr
Definition: asyncqueue.h:47
void push(T &&value)
Definition: asyncqueue.h:73
std::recursive_mutex _watchLock
Definition: asyncqueue.h:35
std::optional< Message > tryPopUnlocked()
Definition: asyncqueue.h:105
std::set< AsyncQueueWatch * > _watches
Definition: asyncqueue.h:34
void addWatch(AsyncQueueWatch &watch)
Definition: asyncqueue.cc:12
AsyncQueue()=default
virtual ~AsyncQueueBase()
Definition: asyncqueue.cc:9
void pushUnlocked(T &&value)
Definition: asyncqueue.h:64
#define ZYPP_DECLARE_PRIVATE(Class)
Definition: zyppglobal.h:87
std::queue< Message > _messages
Definition: asyncqueue.h:141
#define LIBZYPP_NG_EXPORT
Definition: zyppglobal.h:8
std::mutex _mut
Definition: asyncqueue.h:142
std::condition_variable _cv
Definition: asyncqueue.h:143
std::optional< Message > tryPop()
Definition: asyncqueue.h:96