libzypp  17.37.5
networkrequestdispatcher.cc
Go to the documentation of this file.
1 /*---------------------------------------------------------------------\
2 | ____ _ __ __ ___ |
3 | |__ / \ / / . \ . \ |
4 | / / \ V /| _/ _/ |
5 | / /__ | | | | | | |
6 | /_____||_| |_| |_| |
7 | |
8 ----------------------------------------------------------------------*/
9 #include <zypp-core/Globals.h>
14 #include <zypp-core/zyppng/base/Timer>
15 #include <zypp-core/zyppng/base/SocketNotifier>
16 #include <zypp-core/zyppng/base/EventDispatcher>
18 #include <assert.h>
19 
20 #include <zypp/base/Logger.h>
21 #include <zypp/base/String.h>
22 #include <zypp-core/base/DtorReset>
23 
24 using namespace boost;
25 
27 
28 
29 namespace zyppng {
30 
31 static const std::string & defaultAgentString()
32 {
33  // we need to add the release and identifier to the
34  // agent string.
35  // The target could be not initialized, and then this information
36  // is guessed.
37  static const std::string _value(
39  "ZYpp " LIBZYPP_VERSION_STRING " (curl %s)"
40  , curl_version_info(CURLVERSION_NOW)->version
41  )
42  );
43  return _value;
44 }
45 
46 
47 NetworkRequestDispatcherPrivate::NetworkRequestDispatcherPrivate( NetworkRequestDispatcher &p )
48  : BasePrivate( p )
49  , _timer( Timer::create() )
50  , _multi ( curl_multi_init() )
51  , _userAgent( defaultAgentString() )
52 {
54 
55  curl_multi_setopt( _multi, CURLMOPT_TIMERFUNCTION, NetworkRequestDispatcherPrivate::multi_timer_cb );
56  curl_multi_setopt( _multi, CURLMOPT_TIMERDATA, reinterpret_cast<void *>( this ) );
57  curl_multi_setopt( _multi, CURLMOPT_SOCKETFUNCTION, NetworkRequestDispatcherPrivate::static_socket_callback );
58  curl_multi_setopt( _multi, CURLMOPT_SOCKETDATA, reinterpret_cast<void *>( this ) );
59 
60  // disabled explicit pipelining since it breaks our tests on releases < 15.2
61  // we could consider enabling it starting with a specific CURL version
62  // curl_multi_setopt( _multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX|CURLPIPE_HTTP1 );
63 
64  _timer->setSingleShot( true );
66 }
67 
69 {
71  curl_multi_cleanup( _multi );
72 }
73 
74 //called by curl to setup a timer
75 int NetworkRequestDispatcherPrivate::multi_timer_cb( CURLM *, long timeout_ms, void *thatPtr )
76 {
77  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( thatPtr );
78  assert( that != nullptr );
79 
80  if ( timeout_ms >= 0 ) {
81  that->_timer->start( static_cast<uint64_t>(timeout_ms) );
82  } else {
83  //cancel the timer
84  that->_timer->stop();
85  }
86  return 0;
87 }
88 
90 {
91  handleMultiSocketAction( CURL_SOCKET_TIMEOUT, 0 );
92 }
93 
94 int NetworkRequestDispatcherPrivate::static_socket_callback(CURL * easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp )
95 {
96  NetworkRequestDispatcherPrivate *that = reinterpret_cast<NetworkRequestDispatcherPrivate *>( userp );
97  assert( that != nullptr );
98  return that->socketCallback( easy, s, what, socketp );
99 }
100 
101 int NetworkRequestDispatcherPrivate::socketCallback(CURL *easy, curl_socket_t s, int what, void * )
102 {
103  std::shared_ptr<SocketNotifier> socketp;
104 
105  if ( _socketHandler.count( s ) == 0 ) {
106  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
107  return 0;
108 
109  socketp = SocketNotifier::create( s, SocketNotifier::Read, false );
110  _socketHandler.insert( std::make_pair( s, socketp ) );
111 
113  } else {
114  socketp = _socketHandler[s];
115  }
116 
117  //should never happen
118  if ( !socketp ) {
119  if ( what == CURL_POLL_REMOVE || what == CURL_POLL_NONE )
120  return 0;
121 
122  if ( _socketHandler.count( s ) > 0 )
123  _socketHandler.erase( s );
124 
125  void *privatePtr = nullptr;
126  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
127  privatePtr = nullptr; //make sure this was not filled with bad info
128  }
129 
130  if ( privatePtr ) {
131  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
132  //we stop the download, if we can not listen for socket changes we can not correctly do anything
133  setFinished( *request->z_func(), NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Unable to assign socket listener." ) );
134  return 0;
135  } else {
136  //a broken handle without anything assigned, also should never happen but make sure and clean it up
137  WAR << "Cleaning up unassigned easy handle" << std::endl;
138  curl_multi_remove_handle( _multi, easy );
139  curl_easy_cleanup( easy );
140  return 0;
141  }
142  }
143 
144  //remove the socket
145  if ( what == CURL_POLL_REMOVE ) {
146  socketp->setEnabled( false );
147  _socketHandler.erase( s );
148  return 0;
149  }
150 
151  if ( what == CURL_POLL_IN ) {
152  socketp->setMode( SocketNotifier::Read );
153  } else if ( what == CURL_POLL_OUT ) {
154  socketp->setMode( SocketNotifier::Write );
155  } else if ( what == CURL_POLL_INOUT ) {
156  socketp->setMode( SocketNotifier::Read | SocketNotifier::Write );
157  }
158 
159  socketp->setEnabled();
160  return 0;
161 }
162 
164 {
165  int evBitmask = 0;
166  if ( (events & SocketNotifier::Read) == SocketNotifier::Read )
167  evBitmask |= CURL_CSELECT_IN;
168  if ( (events & SocketNotifier::Write) == SocketNotifier::Write )
169  evBitmask |= CURL_CSELECT_OUT;
170  if ( (events & SocketNotifier::Error) == SocketNotifier::Error )
171  evBitmask |= CURL_CSELECT_ERR;
172 
173  handleMultiSocketAction( listener.socket(), evBitmask );
174 }
175 
176 void NetworkRequestDispatcherPrivate::handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
177 {
178  int running = 0;
179 
180  // when inside a curl callback we can not call another multi curl API,
181  // for now just lock the thing, but we should consider rewriting this
182  // to post events instead of doing direct calls simply to decouple from
183  // that limitation
184  CURLMcode rc = CURLM_OK;
185  {
186  zypp::DtorReset lockSet( _locked );
187  _locked = true;
188  rc = curl_multi_socket_action( _multi, nativeSocket, evBitmask, &running );
189  }
190  if (rc != 0) {
191  //we can not recover from a error like that, cancel all and stop
193  cancelAll( err );
194  //emit error
195  _lastError = err;
196  _sigError.emit( *z_func() );
197  return;
198  }
199 
200  // make sure we dequeue pending requests ( in case a call to dequeue was blocked during the API call )
201  zypp::OnScopeExit scopeFinally([this](){
202  this->dequeuePending();
203  });
204 
205  int msgs_left = 0;
206  CURLMsg *msg = nullptr;
207  while( (msg = curl_multi_info_read( _multi, &msgs_left )) ) {
208  if(msg->msg == CURLMSG_DONE) {
209  CURL *easy = msg->easy_handle;
210  CURLcode res = msg->data.result;
211 
212  void *privatePtr = nullptr;
213  if ( curl_easy_getinfo( easy, CURLINFO_PRIVATE, &privatePtr ) != CURLE_OK ) {
214  WAR << "Unable to get CURLINFO_PRIVATE" << std::endl;
215  continue;
216  }
217 
218  if ( !privatePtr ) {
219  //broken easy handle not associated, should never happen but clean it up
220  WAR << "Cleaning up unassigned easy handle" << std::endl;
221  curl_multi_remove_handle( _multi, easy );
222  curl_easy_cleanup( easy );
223  continue;
224  }
225 
226  NetworkRequestPrivate *request = reinterpret_cast<NetworkRequestPrivate *>( privatePtr );
227  request->dequeueNotify();
228 
229  if ( request->hasMoreWork() && ( res == CURLE_OK || request->canRecover() ) ) {
230  std::string errBuf = "Broken easy handle in request";
231  if ( !request->_easyHandle ) {
233  setFinished( *request->z_func(), e );
234  continue;
235  }
236 
237  // remove the handle from multi to change options
238  curl_multi_remove_handle( _multi, request->_easyHandle );
239 
240  errBuf = "Failed to reinitialize the request";
241  if ( !request->prepareToContinue ( errBuf ) ) {
243  setFinished( *request->z_func(), e );
244  } else {
245  // add the request back to the multi handle, it is not done
246  if ( !addRequestToMultiHandle( *request->z_func() ) )
247  continue;
248 
249  request->aboutToStart( );
250  }
251  } else {
252  // trigger notification about file downloaded
253  // we create a error from the CURL code, there might be a already cached Result which will be used instead
254  // in cases like a RangeFail where we could not recover but there also is no real error code
255  NetworkRequestError e = NetworkRequestErrorPrivate::fromCurlError( *request->z_func(), res, request->errorMessage() );
256  setFinished( *request->z_func(), e );
257  }
258  //attention request could be deleted from here on
259  }
260  }
261 }
262 
264 {
265  //prevent dequeuePending from filling up the runningDownloads again
266  zypp::DtorReset lockReset( _locked );
267  _locked = true;
268 
269  while ( _runningDownloads.size() ) {
270  std::shared_ptr<NetworkRequest> &req = _runningDownloads.back();
271  setFinished(*req, result );
272  }
273  while ( _pendingDownloads.size() ) {
274  std::shared_ptr<NetworkRequest> &req = _pendingDownloads.back();
275  setFinished(*req, result );
276  }
277 }
278 
280 {
281  auto delReq = []( auto &list, NetworkRequest &req ) -> std::shared_ptr<NetworkRequest> {
282  auto it = std::find_if( list.begin(), list.end(), [ &req ]( const std::shared_ptr<NetworkRequest> &r ) {
283  return req.d_func() == r->d_func();
284  } );
285  if ( it != list.end() ) {
286  auto ptr = *it;
287  list.erase( it );
288  return ptr;
289  }
290  return nullptr;
291  };
292 
293  // We have a tricky situation if a network request is called when inside a callback. In those cases, it is
294  // not allowed to call curl_multi_remove_handle. We need to tell the callback to fail, so the download
295  // is cancelled by curl itself. We also need to store the current result for later
296  auto rmode = std::get_if<NetworkRequestPrivate::running_t>( &req.d_func()->_runningMode );
297  if ( rmode ) {
298  if ( rmode->_isInCallback ) {
299  // the first cached result wins)
300  if ( !rmode->_cachedResult )
301  rmode->_cachedResult = result;
302  return;
303  } else if ( rmode->_cachedResult ) {
304  result = rmode->_cachedResult.value();
305  }
306  }
307 
308  auto rLocked = delReq( _runningDownloads, req );
309  if ( !rLocked )
310  rLocked = delReq( _pendingDownloads, req );
311 
312  void *easyHandle = req.d_func()->_easyHandle;
313  if ( easyHandle ) {
314  MIL_MEDIA << "Removing easy handle: " << easyHandle << std::endl;
315  curl_multi_remove_handle( _multi, easyHandle );
316  }
317 
318  req.d_func()->_dispatcher = nullptr;
319 
320  //first set the result, the Request might have a checksum to check as well so a currently
321  //successful request could fail later on
322  req.d_func()->setResult( std::move(result) );
323  _sigDownloadFinished.emit( *z_func(), req );
324 
325  //we got a open slot, try to dequeue or send the finished signals if all queues are empty
326  dequeuePending();
327 }
328 
330 {
331  CURLMcode rc = curl_multi_add_handle( _multi, req.d_func()->_easyHandle );
332  if ( rc != 0 ) {
334  return false;
335  }
336 
337  MIL_MEDIA << "Added easy handle: " << req.d_func()->_easyHandle << std::endl;
338  // make sure to wake up once to register what we have now
339  _timer->start(0);
340  return true;
341 }
342 
344 {
345  if ( !_isRunning || _locked )
346  return;
347 
348  while ( _maxConnections == -1 || ( (std::size_t)_maxConnections > _runningDownloads.size() ) ) {
349  if ( !_pendingDownloads.size() )
350  break;
351 
352  std::shared_ptr<NetworkRequest> req = std::move( _pendingDownloads.front() );
353  _pendingDownloads.pop_front();
354 
355  std::string errBuf = "Failed to initialize easy handle";
356  if ( !req->d_func()->initialize( errBuf ) ) {
357  //@TODO store the CURL error in the errors extra info
359  continue;
360  }
361 
362  if ( !addRequestToMultiHandle( *req ) )
363  continue;
364 
365  req->d_func()->aboutToStart();
366  _sigDownloadStarted.emit( *z_func(), *req );
367 
368  _runningDownloads.push_back( std::move(req) );
369  }
370 
371  //check for empty queues
372  if ( _pendingDownloads.size() == 0 && _runningDownloads.size() == 0 ) {
373  //once we finished all requests, cancel the timer too, so curl is not called without requests
374  _timer->stop();
375  _sigQueueFinished.emit( *z_func() );
376  }
377 }
378 
379 ZYPP_IMPL_PRIVATE(NetworkRequestDispatcher)
380 
381 NetworkRequestDispatcher::NetworkRequestDispatcher( )
382  : Base( * new NetworkRequestDispatcherPrivate ( *this ) )
383 {
384 
385 }
386 
387 bool NetworkRequestDispatcher::supportsProtocol( const Url &url )
388 {
389  curl_version_info_data *curl_info = nullptr;
390  curl_info = curl_version_info(CURLVERSION_NOW);
391  // curl_info does not need any free (is static)
392  if (curl_info->protocols)
393  {
394  const char * const *proto = nullptr;
395  std::string scheme( url.getScheme() );
396  bool found = false;
397  for(proto=curl_info->protocols; !found && *proto; ++proto) {
398  if( scheme == std::string((const char *)*proto))
399  found = true;
400  }
401  return found;
402  }
403  return true;
404 }
405 
406 void NetworkRequestDispatcher::setMaximumConcurrentConnections( const int maxConn )
407 {
408  d_func()->_maxConnections = maxConn;
409 }
410 
411 int NetworkRequestDispatcher::maximumConcurrentConnections () const
412 {
413  return d_func()->_maxConnections;
414 }
415 
416 void NetworkRequestDispatcher::enqueue(const std::shared_ptr<NetworkRequest> &req )
417 {
418  if ( !req )
419  return;
420  Z_D();
421 
422  if ( std::find( d->_runningDownloads.begin(), d->_runningDownloads.end(), req ) != d->_runningDownloads.end() ) {
423  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already running " << std::endl;
424  return;
425  }
426 
427  if ( std::find( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), req ) != d->_pendingDownloads.end() ) {
428  WAR << "Ignoring request to enqueue download " << req->url().asString() << " request is already enqueued " << std::endl;
429  return;
430  }
431 
432  req->d_func()->_dispatcher = this;
433  if ( req->priority() == NetworkRequest::Normal )
434  d->_pendingDownloads.push_back( req );
435  else {
436  auto it = std::find_if( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), [ prio = req->priority() ]( const auto &pendingReq ){
437  return pendingReq->priority() < prio;
438  });
439 
440  //if we have a valid iterator, decrement we found a pending download request with lower prio, insert before that
441  if ( it != d->_pendingDownloads.end() && it != d->_pendingDownloads.begin() )
442  it--;
443  d->_pendingDownloads.insert( it, req );
444  }
445 
446  //dequeue if running and we have capacity
447  d->dequeuePending();
448 }
449 
450 void NetworkRequestDispatcher::setAgentString( const std::string &agent )
451 {
452  Z_D();
453  if ( agent.empty() )
454  d->_userAgent = defaultAgentString();
455  else
456  d->_userAgent = agent;
457 }
458 
459 const std::string &NetworkRequestDispatcher::agentString() const
460 {
461  return d_func()->_userAgent;
462 }
463 
464 void NetworkRequestDispatcher::setHostSpecificHeader( const std::string &host, const std::string &headerName, const std::string &value )
465 {
466  Z_D();
467  if ( value.empty() ) {
468  if ( auto i = d->_customHeaders.find( host ); i != d->_customHeaders.end() ) {
469  if ( auto v = i->second.find( headerName ); v != i->second.end() ) {
470  i->second.erase (v);
471  }
472  if ( i->second.empty() )
473  d->_customHeaders.erase(i);
474  }
475  return;
476  }
477  d->_customHeaders[host][headerName] = value;
478 }
479 
480 const NetworkRequestDispatcher::SpecificHeaderMap &NetworkRequestDispatcher::hostSpecificHeaders() const
481 {
482  return d_func()->_customHeaders;
483 }
484 
485 void NetworkRequestDispatcher::cancel( NetworkRequest &req, std::string reason )
486 {
487  cancel( req, NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
488 }
489 
490 void NetworkRequestDispatcher::cancel(NetworkRequest &req, const NetworkRequestError &err)
491 {
492  Z_D();
493 
494  if ( req.d_func()->_dispatcher != this ) {
495  //TODO throw exception
496  return;
497  }
498 
499  d->setFinished( req, err );
500 }
501 
502 void NetworkRequestDispatcher::cancelAll(std::string reason)
503 {
504  cancelAll( NetworkRequestErrorPrivate::customError( NetworkRequestError::Cancelled, reason.size() ? std::move(reason) : "Request explicitly cancelled" ) );
505 }
506 
507 void NetworkRequestDispatcher::cancelAll(const NetworkRequestError &err)
508 {
509  d_func()->cancelAll ( err );
510 }
511 
512 void NetworkRequestDispatcher::run()
513 {
514  Z_D();
515  d->_isRunning = true;
516 
517  if ( d->_pendingDownloads.size() )
518  d->dequeuePending();
519 }
520 
521 void NetworkRequestDispatcher::reschedule()
522 {
523  Z_D();
524  if ( !d->_pendingDownloads.size() )
525  return;
526 
527  std::stable_sort( d->_pendingDownloads.begin(), d->_pendingDownloads.end(), []( const auto &a, const auto &b ){
528  return a->priority() < b->priority();
529  });
530 
531  d->dequeuePending();
532 }
533 
534 size_t NetworkRequestDispatcher::count()
535 {
536  Z_D();
537  return d->_pendingDownloads.size() + d->_runningDownloads.size();
538 }
539 
540 const zyppng::NetworkRequestError &NetworkRequestDispatcher::lastError() const
541 {
542  return d_func()->_lastError;
543 }
544 
545 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadStarted()
546 {
547  return d_func()->_sigDownloadStarted;
548 }
549 
550 SignalProxy<void (NetworkRequestDispatcher &, NetworkRequest &)> NetworkRequestDispatcher::sigDownloadFinished()
551 {
552  return d_func()->_sigDownloadFinished;
553 }
554 
555 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigQueueFinished()
556 {
557  return d_func()->_sigQueueFinished;
558 }
559 
560 SignalProxy<void ( NetworkRequestDispatcher &)> NetworkRequestDispatcher::sigError()
561 {
562  return d_func()->_sigError;
563 }
564 
565 }
void globalInitCurlOnce()
Definition: curlhelper.cc:64
std::string errorMessage() const
Definition: request.cc:569
static int multi_timer_cb(CURLM *multi, long timeout_ms, void *g)
Definition: String.h:30
unsigned short b
int socketCallback(CURL *easy, curl_socket_t s, int what, void *)
static const std::string & defaultAgentString()
static zyppng::NetworkRequestError fromCurlMError(int nativeCode)
Provides API related macros.
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:117
std::string form(const char *format,...) __attribute__((format(printf
Printf style construction of std::string.
Definition: String.cc:39
static Ptr create(int socket, int evTypes, bool enable=true)
Edition * _value
Definition: SysContent.cc:311
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
#define Z_D()
Definition: zyppglobal.h:105
Signal< void(NetworkRequestDispatcher &)> _sigQueueFinished
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
std::map< curl_socket_t, std::shared_ptr< SocketNotifier > > _socketHandler
static zyppng::NetworkRequestError fromCurlError(NetworkRequest &req, int nativeCode, const std::string &nativeError)
static int static_socket_callback(CURL *easy, curl_socket_t s, int what, void *userp, SocketNotifier *socketp)
std::deque< std::shared_ptr< NetworkRequest > > _pendingDownloads
#define WAR
Definition: Logger.h:101
void handleMultiSocketAction(curl_socket_t nativeSocket, int evBitmask)
The NetworkRequestError class Represents a error that occured in.
const long & ZYPP_MEDIA_CURL_DEBUG()
const long& for setting CURLOPT_DEBUGDATA Returns a reference to a static variable, so it&#39;s safe to pass ...
Definition: curlhelper.cc:36
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
Signal< void(NetworkRequestDispatcher &)> _sigError
zypp::Url Url
Definition: url.h:15
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadStarted
#define MIL_MEDIA
Definition: mediadebug_p.h:29
void setFinished(NetworkRequest &req, NetworkRequestError result)
ZYPP_IMPL_PRIVATE(UnixSignalSource)
unsigned short a
SignalProxy< void(const SocketNotifier &sock, int evTypes)> sigActivated()
Signal< void(NetworkRequestDispatcher &, NetworkRequest &)> _sigDownloadFinished
std::vector< std::shared_ptr< NetworkRequest > > _runningDownloads
void onSocketActivated(const SocketNotifier &listener, int events)
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
bool prepareToContinue(std::string &errBuf)
Definition: request.cc:414
void cancelAll(const NetworkRequestError &result)