libzypp  17.37.5
provide.cc
Go to the documentation of this file.
1 #include "private/provide_p.h"
2 #include "private/providedbg_p.h"
5 #include <zypp-core/zyppng/io/IODevice>
6 #include <zypp-core/Url.h>
7 #include <zypp-core/base/DtorReset>
9 #include <zypp-media/MediaException>
10 #include <zypp-media/FileCheckException>
11 #include <zypp-media/CDTools>
12 
13 // required to generate uuids
14 #include <glib.h>
15 
16 
17 L_ENV_CONSTR_DEFINE_FUNC(ZYPP_MEDIA_PROVIDER_DEBUG)
18 
19 namespace zyppng {
20 
22  : BasePrivate(pub)
23  , _workDir( std::move(workDir) )
24  , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
25  {
26  if ( _workDir.empty() ) {
28  } else {
30  }
31 
32  MIL << "Provider workdir is: " << _workDir << std::endl;
33 
34  _scheduleTrigger->setSingleShot(true);
36  }
37 
39  {
40  if ( provideDebugEnabled () ) {
41  std::string_view reasonStr;
42  switch( reason ) {
43  case ProvideStart:
44  reasonStr = "ProvideStart";
45  break;
46  case QueueIdle:
47  reasonStr = "QueueIdle";
48  break;
49  case EnqueueItem:
50  reasonStr = "EnqueueItem";
51  break;
52  case EnqueueReq:
53  reasonStr = "EnqueueReq";
54  break;
55  case FinishReq:
56  reasonStr = "FinishReq";
57  break;
58  case RestartAttach:
59  reasonStr = "RestartAttach";
60  break;
61  }
62  DBG << "Triggering the schedule timer (" << reasonStr << ")" << std::endl;
63  }
64 
65  // we use a single shot timer that instantly times out when the event loop is entered the next time
66  // this way we compress many schedule requests that happen during a eventloop run into one
67  _scheduleTrigger->start(0);
68  }
69 
71  {
72  if ( !_isRunning ) {
73  MIL << "Provider is not started, NOT scheduling" << std::endl;
74  return;
75  }
76 
77  if ( _isScheduling ) {
78  DBG_PRV << "Scheduling triggered during scheduling, returning immediately." << std::endl;
79  return;
80  }
81 
82  const int cpuLimit =
83 #ifdef _SC_NPROCESSORS_ONLN
84  sysconf(_SC_NPROCESSORS_ONLN) * 2;
85 #else
87 #endif
88 
89  // helper lambda to find the worker that is idle for the longest time
90  constexpr auto findLaziestWorker = []( const auto &workerQueues, const auto &idleNames ) {
91  auto candidate = workerQueues.end();
92  ProvideQueue::TimePoint candidateIdleSince = ProvideQueue::TimePoint::max();
93 
94  //find the worker thats idle the longest
95  for ( const auto &name : idleNames ) {
96  auto thisElem = workerQueues.find(name);
97  if ( thisElem == workerQueues.end() )
98  continue;
99 
100  const auto idleS = thisElem->second->idleSince();
101  if ( idleS
102  && ( candidate == workerQueues.end() || *idleS < candidateIdleSince ) ) {
103  candidateIdleSince = *idleS;
104  candidate = thisElem;
105  }
106  }
107 
108  if ( candidate != workerQueues.end() )
109  MIL_PRV << "Found idle worker:" << candidate->first << " idle since: " << candidateIdleSince.time_since_epoch().count() << std::endl;
110 
111  return candidate;
112  };
113 
114  // clean up old media
115 
116  for ( auto iMedia = _attachedMediaInfos.begin(); iMedia != _attachedMediaInfos.end(); ) {
117  if ( (*iMedia)->refCount() > 1 ) {
118  MIL_PRV << "Not releasing media " << (*iMedia)->_name << " refcount is not zero" << std::endl;
119  ++iMedia;
120  continue;
121  }
122  if ( (*iMedia)->_workerType == ProvideQueue::Config::Downloading ) {
123  // we keep the information around for an hour so we do not constantly download the media files for no reason
124  if ( (*iMedia)->_idleSince && std::chrono::steady_clock::now() - (*iMedia)->_idleSince.value() >= std::chrono::hours(1) ) {
125  MIL << "Detaching medium " << (*iMedia)->_name << " for baseUrl " << (*iMedia)->attachedUrl() << std::endl;
126  iMedia = _attachedMediaInfos.erase(iMedia);
127  continue;
128  } else {
129  MIL_PRV << "Not releasing media " << (*iMedia)->_name << " downloading worker and not timed out yet." << std::endl;
130  }
131  } else {
132  // mounting handlers, we need to send a request to the workers
133  auto bQueue = (*iMedia)->_backingQueue.lock();
134  if ( bQueue ) {
135  zypp::Url url = (*iMedia)->attachedUrl();
136  url.setScheme( url.getScheme() + std::string( constants::ATTACHED_MEDIA_SUFFIX) );
137  url.setAuthority( (*iMedia)->_name );
138  const auto &req = ProvideRequest::createDetach( url );
139  if ( req ) {
140  MIL << "Detaching medium " << (*iMedia)->_name << " for baseUrl " << (*iMedia)->attachedUrl() << std::endl;
141  bQueue->enqueue ( *req );
142  iMedia = _attachedMediaInfos.erase(iMedia);
143  continue;
144  } else {
145  ERR << "Could not send detach request, creating the request failed" << std::endl;
146  }
147  } else {
148  ERR << "Could not send detach request since no backing queue was defined" << std::endl;
149  }
150  }
151  ++iMedia;
152  }
153 
154  zypp::DtorReset schedFlag( _isScheduling, false );
155  _isScheduling = true;
156 
157  const auto schedStart = std::chrono::steady_clock::now();
158  MIL_PRV << "Start scheduling" << std::endl;
159 
160  zypp::OnScopeExit deferExitMessage( [&](){
161  const auto dur = std::chrono::steady_clock::now() - schedStart;
162  MIL_PRV << "Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>( dur ).count () << std::endl;
163  });
164 
165  // bump inactive items
166  for ( auto it = _items.begin (); it != _items.end(); ) {
167  // was maybe released during scheduling
168  if ( !(*it) )
169  it = _items.erase(it);
170  else {
171  auto &item = *it;
172  if ( item->state() == ProvideItem::Uninitialized ) {
173  item->initialize();
174  }
175  it++;
176  }
177  }
178 
179  // we are scheduling now, everything that triggered the timer until now we can forget about
180  _scheduleTrigger->stop();
181 
182  for( auto queueIter = _queues.begin(); queueIter != _queues.end(); queueIter ++ ) {
183 
184  const auto &scheme = queueIter->_schemeName;
185  auto &queue = queueIter->_requests;
186 
187  if ( !queue.size() )
188  continue;
189 
190  const auto &configOpt = schemeConfig ( scheme );
191 
192  MIL_PRV << "Start scheduling for scheme:" << scheme << " queue size is: " << queue.size() << std::endl;
193 
194  if ( !configOpt ) {
195  // FAIL all requests in this queue
196  ERR << "Scheme: " << scheme << " failed to return a valid configuration." << std::endl;
197 
198  while( queue.size() ) {
199  auto item = std::move( queue.front() );
200  queue.pop_front();
201  if ( item->owner() )
202  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to query scheme config.")) );
203  }
204 
205  continue;
206  }
207 
208  // the scheme config that defines how we schedule requests on this set of queues
209  const auto &config = configOpt.get();
210  const auto isSingleInstance = ( (config.cfg_flags() & ProvideQueue::Config::SingleInstance) == ProvideQueue::Config::SingleInstance );
211  if ( config.worker_type() == ProvideQueue::Config::Downloading && !isSingleInstance ) {
212 
213  for( auto i = queue.begin (); i != queue.end(); ) {
214 
215  // this is the only place where we remove elements from the queue when the scheduling flag is active
216  // other code just nulls out requests in the queue if during scheduling items need to be removed
217  while ( i != queue.end() && !(*i) ) {
218  i = queue.erase(i);
219  }
220 
221  if ( i == queue.end() )
222  break;
223 
224  ProvideRequestRef item = *i;
225 
226  // Downloading queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
227  // If we hit this code path, its a bug
228  if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
229  i = queue.erase(i);
230  if ( item->owner() )
231  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Downloading Queues do not support ProvideMessage::Code::Attach requests") ) );
232  continue;
233  }
234 
235  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
236 
237  // how many workers for this type do already exist
238  int existingTypeWorkers = 0;
239 
240  // how many currently active connections are there
241  int existingConnections = 0;
242 
243  // all currently available possible queues for the request
244  std::vector< std::pair<zypp::Url, ProvideQueue*> > possibleHostWorkers;
245 
246  // currently idle workers
247  std::vector<std::string> idleWorkers;
248 
249  // all mirrors without a existing worker
250  std::vector<zypp::Url> mirrsWithoutWorker;
251  for ( const auto &url : item->urls() ) {
252 
253  if ( effectiveScheme( url.getScheme() ) != scheme ) {
254  MIL << "Mirror URL " << url << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
255  continue;
256  }
257 
258  if( item->owner()->canRedirectTo( item, url ) )
259  mirrsWithoutWorker.push_back( url );
260  else {
261  MIL_PRV << "URL was rejected" << url << std::endl;
262  }
263  }
264 
265  // at this point the list contains all useable mirrors, if this list is empty the request needs to fail
266  if( mirrsWithoutWorker.size() == 0 ) {
267  MIL << "Request has NO usable URLs" << std::endl;
268  if ( item->owner() )
269  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
270  i = queue.erase(i);
271  continue;
272  }
273 
274 
275  for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
276  if ( ProvideQueue::Config::Downloading != workerQueue->workerConfig().worker_type() )
277  continue;
278 
279  existingTypeWorkers ++;
280  existingConnections += workerQueue->activeRequests();
281 
282  if ( workerQueue->isIdle() )
283  idleWorkers.push_back (queueName);
284 
285  if ( !zypp::str::startsWith( queueName, scheme ) )
286  continue;
287 
288  for ( auto i = mirrsWithoutWorker.begin (); i != mirrsWithoutWorker.end(); ) {
289  const auto &u = *i;
290  if ( u.getHost() == workerQueue->hostname() ) {
291  if ( workerQueue->requestCount() < constants::DEFAULT_ACTIVE_CONN_PER_HOST )
292  possibleHostWorkers.push_back( {u, workerQueue.get()} );
293  i = mirrsWithoutWorker.erase( i );
294  // we can not stop after removing the first hit, since there could be multiple mirrors with the same hostname
295  } else {
296  ++i;
297  }
298  }
299  }
300 
301  if( provideDebugEnabled() ) {
302  MIL << "Current stats: " << std::endl;
303  MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
304  MIL << "Existing active connections: " << existingConnections << std::endl;
305  MIL << "Possible host workers: "<< possibleHostWorkers.size() << std::endl;
306  MIL << "Mirrors without worker: " << mirrsWithoutWorker.size() << std::endl;
307  }
308 
309  // need to wait for requests to finish in order to schedule more requests
310  if ( existingConnections >= constants::DEFAULT_ACTIVE_CONN ) {
311  MIL_PRV << "Reached maximum nr of connections, break" << std::endl;
312  break;
313  }
314 
315  // if no workers are running, take the first mirror and start a worker for it
316  // if < nr of workers are running, use a mirror we do not have a conn yet to
317  if ( existingTypeWorkers < constants::DEFAULT_MAX_DYNAMIC_WORKERS
318  && mirrsWithoutWorker.size() ) {
319 
320  MIL_PRV << "Free worker slots and available mirror URLs, starting a new worker" << std::endl;
321 
322  //@TODO out of the available mirrors use the best one based on statistics ( if available )
323  bool found = false;
324  for( const auto &url : mirrsWithoutWorker ) {
325 
326  // mark this URL as used now, in case the queue can not be started we won't try it anymore
327  if ( !item->owner()->safeRedirectTo ( item, url ) )
328  continue;
329 
330  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
331  if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
332  break;
333  } else {
334 
335  MIL_PRV << "Started worker for " << url.getHost() << " enqueing request" << std::endl;
336 
337  item->setActiveUrl(url);
338  found = true;
339 
340  std::string str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
341  _workerQueues[str] = q;
342  q->enqueue( item );
343  break;
344  }
345  }
346 
347  if( found ) {
348  i = queue.erase(i);
349  continue;
350  }
351  }
352 
353  // if we cannot start a new worker, find the best queue where we can push the item into
354  if ( possibleHostWorkers.size() ) {
355 
356  MIL_PRV << "No free worker slots, looking for the best existing worker" << std::endl;
357  bool found = false;
358  while( possibleHostWorkers.size () ) {
359  std::vector< std::pair<zypp::Url, ProvideQueue *> >::iterator candidate = possibleHostWorkers.begin();
360  for ( auto i = candidate+1; i != possibleHostWorkers.end(); i++ ) {
361  if ( i->second->activeRequests () < candidate->second->activeRequests () )
362  candidate = i;
363  }
364 
365  if ( !item->owner()->safeRedirectTo( item, candidate->first ) ) {
366  possibleHostWorkers.erase( candidate );
367  continue;
368  }
369 
370  MIL_PRV << "Using existing worker " << candidate->first.getHost() << " to download request" << std::endl;
371 
372  found = true;
373  item->setActiveUrl( candidate->first );
374  candidate->second->enqueue( item );
375  break;
376  }
377 
378  if( found ) {
379  i = queue.erase(i);
380  continue;
381  }
382  }
383 
384  // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
385  // a new worker
386  if ( idleWorkers.size() && mirrsWithoutWorker.size() ) {
387 
388  MIL_PRV << "No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
389 
390  auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
391  if ( candidate != _workerQueues.end() ) {
392 
393  // for now we decomission the worker and start a new one, should we instead introduce a "reset" message
394  // that repurposes the worker to another hostname/workdir config?
395  _workerQueues.erase(candidate);
396 
397  //@TODO out of the available mirrors use the best one based on statistics ( if available )
398  bool found = false;
399  for( const auto &url : mirrsWithoutWorker ) {
400 
401  if ( !item->owner()->safeRedirectTo ( item, url ) )
402  continue;
403 
404  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
405  if ( !q->startup( scheme, _workDir / scheme / url.getHost(), url.getHost() ) ) {
406  break;
407  } else {
408 
409  MIL_PRV << "Replaced worker for " << url.getHost() << ", enqueing request" << std::endl;
410 
411  item->setActiveUrl(url);
412  found = true;
413 
414  auto str = zypp::str::Format("%1%://%2%") % scheme % url.getHost();
415  _workerQueues[str] = q;
416  q->enqueue( item );
417  }
418  }
419 
420  if( found ) {
421  i = queue.erase(i);
422  continue;
423  }
424  }
425  }
426 
427  // if we reach here we skip over the item and try to schedule it again later
428  MIL_PRV << "End of line, deferring request for next try." << std::endl;
429  i++;
430 
431  }
432  } else if ( config.worker_type() == ProvideQueue::Config::CPUBound && !isSingleInstance ) {
433 
434  for( auto i = queue.begin (); i != queue.end(); ) {
435 
436  // this is the only place where we remove elements from the queue when the scheduling flag is active
437  // other code just nulls out requests in the queue if during scheduling items need to be removed
438  while ( i != queue.end() && !(*i) ) {
439  i = queue.erase(i);
440  }
441 
442  if ( i == queue.end() )
443  break;
444 
445  // make a real reference so it does not dissapear when we remove it from the queue
446  ProvideRequestRef item = *i;
447 
448  // CPU bound queues do not support attaching via a AttachRequest, this is handled by simply providing the media verification files
449  // If we hit this code path, its a bug
450  if( item->code() == ProvideMessage::Code::Attach || item->code() == ProvideMessage::Code::Detach ) {
451  i = queue.erase(i);
452  if ( item->owner () )
453  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("CPU bound Queues do not support ProvideAttachSpecRef requests") ) );
454  continue;
455  }
456 
457  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
458 
459  // how many workers for this type do already exist
460  int existingTypeWorkers = 0;
461  int existingSchemeWorkers = 0;
462 
463  // all currently available possible queues for the request
464  std::vector< ProvideQueue* > possibleWorkers;
465 
466  // currently idle workers
467  std::vector<std::string> idleWorkers;
468 
469  // the URL we are going to use this time
470  zypp::Url url;
471 
472  //CPU bound queues do not spawn per mirrors, we use the first compatible URL
473  for ( const auto &tmpurl : item->urls() ) {
474  if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
475  MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
476  continue;
477  }
478  url = tmpurl;
479  break;
480  }
481 
482  // at this point if the URL is empty the request needs to fail
483  if( !url.isValid() ) {
484  MIL << "Request has NO usable URLs" << std::endl;
485  if ( item->owner() )
486  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
487  i = queue.erase(i);
488  continue;
489  }
490 
491  for ( auto &[ queueName, workerQueue ] : _workerQueues ) {
492 
493  if ( ProvideQueue::Config::CPUBound != workerQueue->workerConfig().worker_type() )
494  continue;
495 
496  const bool thisScheme = zypp::str::startsWith( queueName, scheme );
497 
498  existingTypeWorkers ++;
499  if ( thisScheme ) {
500  existingSchemeWorkers++;
501  if ( workerQueue->canScheduleMore() )
502  possibleWorkers.push_back(workerQueue.get());
503  }
504 
505  if ( workerQueue->isIdle() )
506  idleWorkers.push_back(queueName);
507  }
508 
509  if( provideDebugEnabled() ) {
510  MIL << "Current stats: " << std::endl;
511  MIL << "Existing type workers: " << existingTypeWorkers << std::endl;
512  MIL << "Possible CPU workers: "<< possibleWorkers.size() << std::endl;
513  }
514 
515  // first we use existing idle workers of the current type
516  if ( possibleWorkers.size() ) {
517  bool found = false;
518  for ( auto &w : possibleWorkers ) {
519  if ( w->isIdle() ) {
520  MIL_PRV << "Using existing idle worker to provide request" << std::endl;
521  // this is not really required because we are not doing redirect checks
522  item->owner()->redirectTo ( item, url );
523  item->setActiveUrl( url );
524  w->enqueue( item );
525  i = queue.erase(i);
526  found = true;
527  break;
528  }
529  }
530  if ( found )
531  continue;
532  }
533 
534  // we first start as many workers as we need before queueing more request to existing ones
535  if ( existingTypeWorkers < cpuLimit ) {
536 
537  MIL_PRV << "Free CPU slots, starting a new worker" << std::endl;
538 
539  // this is not really required because we are not doing redirect checks
540  item->owner()->redirectTo ( item, url );
541 
542  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
543  if ( q->startup( scheme, _workDir / scheme ) ) {
544 
545  item->setActiveUrl(url);
546 
547  auto str = zypp::str::Format("%1%#%2%") % scheme % existingSchemeWorkers;
548  _workerQueues[str] = q;
549  q->enqueue( item );
550  i = queue.erase(i);
551  continue;
552  } else {
553  // CPU bound requests can not recover from this error
554  i = queue.erase(i);
555  if ( item->owner() )
556  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
557  continue;
558  }
559  }
560 
561  // we can not start more workers, all we can do now is fill up queues of existing ones
562  if ( possibleWorkers.size() ) {
563  MIL_PRV << "No free CPU slots, looking for the best existing worker" << std::endl;
564 
565  if( possibleWorkers.size () ) {
566  std::vector<ProvideQueue *>::iterator candidate = possibleWorkers.begin();
567  for ( auto i = candidate+1; i != possibleWorkers.end(); i++ ) {
568  if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
569  candidate = i;
570  }
571 
572  // this is not really required because we are not doing redirect checks
573  item->owner()->redirectTo ( item, url );
574 
575  MIL_PRV << "Using existing worker to provide request" << std::endl;
576  item->setActiveUrl( url );
577  (*candidate)->enqueue( item );
578  i = queue.erase(i);
579  continue;
580  }
581  }
582 
583  // if we reach this place all we can now try is to decomission idle queues and use the new slot to start
584  // a new worker
585  if ( idleWorkers.size() ) {
586 
587  MIL_PRV << "No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
588 
589  auto candidate = findLaziestWorker( _workerQueues, idleWorkers );
590  if ( candidate != _workerQueues.end() ) {
591 
592  _workerQueues.erase(candidate);
593 
594  // this is not really required because we are not doing redirect checks
595  item->owner()->redirectTo ( item, url );
596 
597  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
598  if ( q->startup( scheme, _workDir / scheme ) ) {
599 
600  MIL_PRV << "Replaced worker, enqueing request" << std::endl;
601 
602  item->setActiveUrl(url);
603 
604  auto str = zypp::str::Format("%1%#%2%") % scheme % ( existingSchemeWorkers + 1 );
605  _workerQueues[str] = q;
606  q->enqueue( item );
607  i = queue.erase(i);
608  continue;
609  } else {
610  // CPU bound requests can not recover from this error
611  i = queue.erase(i);
612  if ( item->owner() )
613  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
614  continue;
615  }
616  }
617  } else {
618  MIL_PRV << "No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
619  break;
620  }
621 
622  // if we reach here we skip over the item and try to schedule it again later
623  MIL_PRV << "End of line, deferring request for next try." << std::endl;
624  i++;
625  }
626 
627  } else {
628  // either SingleInstance worker or Mounting/VolatileMounting
629 
630  for( auto i = queue.begin (); i != queue.end(); ) {
631 
632  // this is the only place where we remove elements from the queue when the scheduling flag is active
633  // other code just nulls out requests in the queue if during scheduling items need to be removed
634  while ( i != queue.end() && !(*i) ) {
635  i = queue.erase(i);
636  }
637 
638  if ( i == queue.end() )
639  break;
640 
641  // make a real reference so it does not dissapear when we remove it from the queue
642  ProvideRequestRef item = *i;
643  MIL_PRV << "Trying to schedule request: " << item->urls().front() << std::endl;
644 
645  zypp::Url url;
646 
647  //mounting queues do not spawn per mirrors, we use the first compatible URL
648  for ( const auto &tmpurl : item->urls() ) {
649  if ( effectiveScheme( tmpurl.getScheme() ) != scheme ) {
650  MIL << "Mirror URL " << tmpurl << " is incompatible with current scheme: " << scheme << ", ignoring." << std::endl;
651  continue;
652  }
653  url = tmpurl;
654  break;
655  }
656 
657  // at this point if the URL is empty the request needs to fail
658  if( !url.isValid() ) {
659  MIL << "Request has NO usable URLs" << std::endl;
660  if ( item->owner() )
661  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR(zypp::media::MediaException("No usable URLs in request spec.")) );
662  i = queue.erase(i);
663  continue;
664  }
665 
666 
667  ProvideQueue *qToUse = nullptr;
668  if ( !_workerQueues.count(scheme) ) {
669  ProvideQueueRef q = std::make_shared<ProvideQueue>( *this );
670  if ( !q->startup( scheme, _workDir / scheme ) ) {
671  ERR << "Worker startup failed!" << std::endl;
672  // mounting/single instance requests can not recover from this error
673  i = queue.erase(i);
674 
675  if ( item->owner() )
676  item->owner()->finishReq( nullptr, item, ZYPP_EXCPT_PTR( zypp::Exception("Unable to start worker for request.") ) );
677  continue;
678  }
679 
680  MIL_PRV << "Started worker, enqueing request" << std::endl;
681  qToUse = q.get();
682  _workerQueues[scheme] = q;
683  } else {
684  MIL_PRV << "Found worker, enqueing request" << std::endl;
685  qToUse = _workerQueues.at(scheme).get();
686  }
687 
688  // this is not really required because we are not doing redirect checks
689  item->owner()->redirectTo ( item, url );
690 
691  item->setActiveUrl(url);
692  qToUse->enqueue( item );
693  i = queue.erase(i);
694  }
695  }
696  }
697  }
698 
699  std::list<ProvideItemRef> &ProvidePrivate::items()
700  {
701  return _items;
702  }
703 
705  {
706  return _credManagerOptions;
707  }
708 
709  std::vector<zypp::Url> ProvidePrivate::sanitizeUrls(const std::vector<zypp::Url> &urls)
710  {
711  std::vector<zypp::Url> usableMirrs;
712  std::optional<ProvideQueue::Config> scheme;
713 
714  for ( auto mirrIt = urls.begin() ; mirrIt != urls.end(); mirrIt++ ) {
715  const auto &s = schemeConfig( effectiveScheme( mirrIt->getScheme() ) );
716  if ( !s ) {
717  WAR << "URL: " << *mirrIt << " is not supported, ignoring!" << std::endl;
718  continue;
719  }
720  if ( !scheme ) {
721  scheme = *s;
722  usableMirrs.push_back ( *mirrIt );
723  } else {
724  if ( scheme->worker_type () == s->worker_type () ) {
725  usableMirrs.push_back( *mirrIt );
726  } else {
727  WAR << "URL: " << *mirrIt << " has different worker type than the primary URL: "<< usableMirrs.front() <<", ignoring!" << std::endl;
728  }
729  }
730  }
731 
732  if ( !scheme || usableMirrs.empty() ) {
733  return {};
734  }
735 
736  return usableMirrs;
737  }
738 
739  std::vector<AttachedMediaInfo_Ptr> &ProvidePrivate::attachedMediaInfos()
740  {
741  return _attachedMediaInfos;
742  }
743 
745  {
746  if ( auto i = _schemeConfigs.find( scheme ); i != _schemeConfigs.end() ) {
747  return expected<ProvideQueue::Config>::success(i->second);
748  } else {
749  // we do not have the queue config yet, we need to start a worker to get one
750  ProvideQueue q( *this );
751  if ( !q.startup( scheme, _workDir / scheme ) ) {
752  return expected<ProvideQueue::Config>::error(ZYPP_EXCPT_PTR(zypp::media::MediaException("Failed to start worker to read scheme config.")));
753  }
754  auto newItem = _schemeConfigs.insert( std::make_pair( scheme, q.workerConfig() ));
755  return expected<ProvideQueue::Config>::success(newItem.first->second);
756  }
757  }
758 
759  std::optional<zypp::ManagedFile> ProvidePrivate::addToFileCache( const zypp::filesystem::Pathname &downloadedFile )
760  {
761  const auto &key = downloadedFile.asString();
762 
763  if ( !zypp::PathInfo(downloadedFile).isExist() ) {
764  _fileCache.erase ( key );
765  return {};
766  }
767 
768  auto i = _fileCache.insert( { key, FileCacheItem() } );
769  if ( !i.second ) {
770  // file did already exist in the cache, return the shared data
771  i.first->second._deathTimer.reset();
772  return i.first->second._file;
773  }
774 
775  i.first->second._file = zypp::ManagedFile( downloadedFile, zypp::filesystem::unlink );
776  return i.first->second._file;
777  }
778 
779  bool ProvidePrivate::isInCache ( const zypp::Pathname &downloadedFile ) const
780  {
781  const auto &key = downloadedFile.asString();
782  return (_fileCache.count(key) > 0);
783  }
784 
785  void ProvidePrivate::queueItem ( ProvideItemRef item )
786  {
787  _items.push_back( item );
789  }
790 
792  {
793  auto elem = std::find_if( _items.begin(), _items.end(), [item]( const auto &i){ return i.get() == item; } );
794  if ( elem != _items.end() ) {
795  if ( _isScheduling ) {
796  (*elem).reset();
797  } else {
798  _items.erase(elem);
799  }
800  }
801  }
802 
803  std::string ProvidePrivate::nextMediaId() const
804  {
805  zypp::AutoDispose rawStr( g_uuid_string_random (), g_free );
806  return zypp::str::asString ( rawStr.value() );
807  }
808 
809  AttachedMediaInfo_Ptr ProvidePrivate::addMedium( AttachedMediaInfo_Ptr &&medium )
810  {
811  assert( medium );
812  if ( !medium )
813  return nullptr;
814 
815  MIL_PRV << "Registered new media attachment with ID: " << medium->name() << " with mountPoint: (" << medium->_localMountPoint.value_or(zypp::Pathname()) << ")" << std::endl;
816  _attachedMediaInfos.push_back( std::move(medium) );
817 
818  return _attachedMediaInfos.back();
819  }
820 
821  bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
822  {
823  const auto &schemeName = effectiveScheme( req->url().getScheme() );
824  auto existingQ = std::find_if( _queues.begin (), _queues.end(), [&schemeName]( const auto &qItem) {
825  return (qItem._schemeName == schemeName);
826  });
827  if ( existingQ != _queues.end() ) {
828  existingQ->_requests.push_back(req);
829  } else {
830  _queues.push_back( ProvidePrivate::QueueItem{ schemeName, {req} } );
831  }
832 
834  return true;
835  }
836 
837  bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
838  {
839  auto queue = req->currentQueue ();
840  if ( queue ) {
841  queue->cancel( req.get(), error );
842  return true;
843  } else {
844  // Request not started yet, search request queues
845  for ( auto &q : _queues ) {
846  auto elem = std::find( q._requests.begin(), q._requests.end(), req );
847  if ( elem != q._requests.end() ) {
848  q._requests.erase(elem);
849 
850  if ( req->owner() )
851  req->owner()->finishReq( nullptr, req, error );
852  return true;
853  }
854  }
855  }
856  return false;
857  }
858 
860  {
861  return _workerPath;
862  }
863 
864  const std::string ProvidePrivate::queueName( ProvideQueue &q ) const
865  {
866  for ( const auto &v : _workerQueues ) {
867  if ( v.second.get() == &q )
868  return v.first;
869  }
870  return {};
871  }
872 
874  {
875  return _isRunning;
876  }
877 
878  std::string ProvidePrivate::effectiveScheme(const std::string &scheme) const
879  {
880  const std::string &ss = zypp::str::stripSuffix( scheme, constants::ATTACHED_MEDIA_SUFFIX );
881  if ( auto it = _workerAlias.find ( ss ); it != _workerAlias.end () ) {
882  return it->second;
883  }
884  return ss;
885  }
886 
888  {
889  DBG_PRV << "Pulse timeout" << std::endl;
890 
891  auto now = std::chrono::steady_clock::now();
892 
893  if ( _log ) _log->pulse();
894 
895  // release old cache files
896  for ( auto i = _fileCache.begin (); i != _fileCache.end(); ) {
897  auto &cacheItem = i->second;
898  if ( cacheItem._file.unique() ) {
899  if ( cacheItem._deathTimer ) {
900  if ( now - *cacheItem._deathTimer < std::chrono::seconds(20) ) {
901  MIL << "Releasing file " << *i->second._file << " from cache, death timeout." << std::endl;
902  i = _fileCache.erase(i);
903  continue;
904  }
905  } else {
906  // start the death timeout
907  cacheItem._deathTimer = std::chrono::steady_clock::now();
908  }
909  }
910 
911  ++i;
912  }
913  }
914 
916  {
917  if ( !_items.empty() )
918  return;
919  for ( auto &[k,q] : _workerQueues ) {
920  if ( !q->empty() )
921  return;
922  }
923 
924  // all queues are empty
925  _sigIdle.emit();
926  }
927 
929  {
930  if ( item.state() == ProvideItem::Finished ) {
931  auto itemRef = item.shared_this<ProvideItem>();
932  auto i = std::find( _items.begin(), _items.end(), itemRef );
933  if ( i == _items.end() ) {
934  ERR << "State of unknown Item changed, ignoring" << std::endl;
935  return;
936  }
937  if ( _isScheduling )
938  i->reset();
939  else
940  _items.erase(i);
941  }
942  if ( _items.empty() )
943  onQueueIdle();
944  }
945 
947  {
948  //@TODO is it required to handle overflow?
949  return ++_nextRequestId;
950  }
951 
952  ProvideMediaHandle::ProvideMediaHandle(Provide &parent, AttachedMediaInfo_Ptr mediaInfoRef )
953  : _parent( parent.weak_this<Provide>() )
954  , _mediaRef( std::move(mediaInfoRef) )
955  {}
956 
957  std::shared_ptr<Provide> ProvideMediaHandle::parent() const
958  {
959  return _parent.lock();
960  }
961 
963  {
964  return ( _mediaRef.get() != nullptr );
965  }
966 
967  std::string ProvideMediaHandle::handle() const
968  {
969  if ( !_mediaRef )
970  return {};
971  return _mediaRef->_name;
972  }
973 
975  {
976  static zypp::Url invalidHandle;
977  if ( !_mediaRef || !_mediaRef->_mirrors.size() )
978  return invalidHandle;
979  return _mediaRef->_mirrors.at(0);
980  }
981 
982  const std::vector<zypp::Url> &ProvideMediaHandle::mirrors() const
983  {
984  static std::vector<zypp::Url> invalidHandle;
985  if ( !_mediaRef )
986  return invalidHandle;
987  return _mediaRef->_mirrors;
988  }
989 
990  const std::optional<zypp::Pathname> &ProvideMediaHandle::localPath() const
991  {
992  static std::optional<zypp::Pathname> invalidHandle;
993  if ( !_mediaRef )
994  return invalidHandle;
995  return _mediaRef->_localMountPoint;
996  }
997 
998  AttachedMediaInfo_constPtr ProvideMediaHandle::mediaInfo() const
999  {
1000  return _mediaRef;
1001  }
1002 
1003 
1004  Provide::Provide( const zypp::Pathname &workDir ) : Base( *new ProvidePrivate( zypp::Pathname(workDir), *this ) )
1005  {
1006  Z_D();
1007  connect( *d->_pulseTimer, &Timer::sigExpired, *d, &ProvidePrivate::onPulseTimeout );
1008  }
1009 
1010  ProvideRef Provide::create( const zypp::filesystem::Pathname &workDir )
1011  {
1012  return ProvideRef( new Provide(workDir) );
1013  }
1014 
1015  expected<Provide::LazyMediaHandle> Provide::prepareMedia(const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request)
1016  {
1017  Z_D();
1018  // sanitize the mirrors to contain only URLs that have same worker types
1019  std::vector<zypp::Url> usableMirrs = d->sanitizeUrls( urls );
1020  if ( usableMirrs.empty() ) {
1022  }
1023  return expected<Provide::LazyMediaHandle>::success( shared_this<Provide>(), std::move(usableMirrs), request );
1024  }
1025 
1027  {
1028  return prepareMedia( std::vector<zypp::Url>{url}, request );
1029  }
1030 
1032  {
1033  using namespace zyppng::operators;
1034  if ( lazyHandle.attached() )
1035  return makeReadyResult( expected<MediaHandle>::success( *lazyHandle.handle() ) );
1036 
1037  MIL << "Attaching lazy medium with label: [" << lazyHandle.spec().label() << "]" << std::endl;
1038 
1039  return attachMedia( lazyHandle.urls(), lazyHandle.spec () )
1040  | and_then([lazyHandle]( MediaHandle handle ) {
1041  lazyHandle._sharedData->_mediaHandle = handle;
1042  return expected<MediaHandle>::success( std::move(handle) );
1043  });
1044  }
1045 
1047  {
1048  return attachMedia ( std::vector<zypp::Url>{url}, request );
1049  }
1050 
1051  AsyncOpRef<expected<Provide::MediaHandle>> Provide::attachMedia( const std::vector<zypp::Url> &urls, const ProvideMediaSpec &request )
1052  {
1053  Z_D();
1054 
1055  // sanitize the mirrors to contain only URLs that have same worker types
1056  std::vector<zypp::Url> usableMirrs = d->sanitizeUrls( urls );
1057  if ( usableMirrs.empty() ) {
1059  }
1060 
1061  // first check if there is a already attached medium we can use as well
1062  auto &attachedMedia = d->attachedMediaInfos ();
1063  for ( auto &medium : attachedMedia ) {
1064  if ( medium->isSameMedium ( usableMirrs, request ) ) {
1066  }
1067  }
1068 
1069  auto op = AttachMediaItem::create( usableMirrs, request, *d_func() );
1070  d->queueItem (op);
1071  return op->promise();
1072  }
1073 
1074  AsyncOpRef< expected<ProvideRes> > Provide::provide( const std::vector<zypp::Url> &urls, const ProvideFileSpec &request )
1075  {
1076  Z_D();
1077  auto op = ProvideFileItem::create( urls, request, *d );
1078  d->queueItem (op);
1079  return op->promise();
1080  }
1081 
1083  {
1084  return provide( std::vector<zypp::Url>{ url }, request );
1085  }
1086 
1087  AsyncOpRef< expected<ProvideRes> > Provide::provide( const MediaHandle &attachHandle, const zypp::Pathname &fileName, const ProvideFileSpec &request )
1088  {
1089  Z_D();
1090  const auto i = std::find( d->_attachedMediaInfos.begin(), d->_attachedMediaInfos.end(), attachHandle.mediaInfo() );
1091  if ( i == d->_attachedMediaInfos.end() ) {
1093  }
1094 
1095  std::vector<zypp::Url> urls;
1096 
1097  // real mount devices use a ID to reference a attached medium, for those we do not need to send the baseUrl as well since its already
1098  // part of the mount point, so if we mount host:/path/to/repo to the ID 1234 and look for the file /path/to/repo/file1 the request URL will look like: nfs-media://1234/file1
1099  if ( (*i)->_workerType == ProvideQueue::Config::SimpleMount || (*i)->_workerType == ProvideQueue::Config::VolatileMount ) {
1100  auto url = zypp::Url();
1101  // work around the zypp::Url requirements for certain Url schemes by attaching a suffix, that way we are always able to have a authority
1102  url.setScheme( (*i)->attachedUrl().getScheme() + std::string(constants::ATTACHED_MEDIA_SUFFIX) );
1103  url.setAuthority( (*i)->_name );
1104  url.setPathName("/");
1105  url.appendPathName( fileName );
1106  urls.push_back(url);
1107  } else {
1108 
1109  // for other items we need to make the baseUrl part of the request URL
1110  const auto &addUrl = [&]( const zypp::Url u ){
1111  zypp::Url url = u;
1112  url.appendPathName( fileName );
1113  urls.push_back( url );
1114  };
1115  std::for_each ( (*i)->_mirrors.begin(), (*i)->_mirrors.end(), addUrl );
1116  }
1117 
1118  auto op = ProvideFileItem::create( urls, request, *d );
1119  op->setMediaRef( MediaHandle( *this, (*i) ));
1120  d->queueItem (op);
1121 
1122  return op->promise();
1123  }
1124 
1125  AsyncOpRef<expected<ProvideRes> > Provide::provide( const LazyMediaHandle &attachHandle, const zypp::Pathname &fileName, const ProvideFileSpec &request )
1126  {
1127  using namespace zyppng::operators;
1128  return attachMediaIfNeeded ( attachHandle )
1129  | and_then([weakMe = weak_this<Provide>(), fName = fileName, req = request ]( MediaHandle handle ){
1130  auto me = weakMe.lock();
1131  if ( !me )
1132  return makeReadyResult(expected<ProvideRes>::error(ZYPP_EXCPT_PTR(zypp::Exception("Provide was released during a operation"))));
1133  return me->provide( handle, fName, req);
1134  });
1135  }
1136 
1138  {
1139  using namespace zyppng::operators;
1140 
1141  zypp::Url url("chksum:///");
1142  url.setPathName( p );
1143  auto fut = provide( url, zyppng::ProvideFileSpec().setCustomHeaderValue( "chksumType", algorithm ) )
1144  | and_then( [algorithm]( zyppng::ProvideRes &&chksumRes ) {
1145  if ( chksumRes.headers().contains(algorithm) ) {
1146  try {
1147  return expected<zypp::CheckSum>::success( zypp::CheckSum( algorithm, chksumRes.headers().value(algorithm).asString() ) );
1148  } catch ( ... ) {
1150  }
1151  }
1152  return expected<zypp::CheckSum>::error( ZYPP_EXCPT_PTR( zypp::FileCheckException("Invalid/Empty checksum returned from worker") ) );
1153  } );
1154  return fut;
1155  }
1156 
1158  {
1159  using namespace zyppng::operators;
1160 
1161  zypp::Url url("copy:///");
1162  url.setPathName( source );
1163  auto fut = provide( url, ProvideFileSpec().setDestFilenameHint( target ))
1164  | and_then( [&]( ProvideRes &&copyRes ) {
1165  return expected<zypp::ManagedFile>::success( copyRes.asManagedFile() );
1166  } );
1167  return fut;
1168  }
1169 
1171  {
1172  using namespace zyppng::operators;
1173 
1174  auto fName = source.file();
1175  return copyFile( fName, target )
1176  | [ resSave = std::move(source) ] ( auto &&result ) {
1177  // callback lambda to keep the ProvideRes reference around until the op is finished,
1178  // if the op fails the callback will be cleaned up and so the reference
1179  return result;
1180  };
1181  }
1182 
1184  {
1185  Z_D();
1186  d->_isRunning = true;
1187  d->_pulseTimer->start( 5000 );
1188  d->schedule( ProvidePrivate::ProvideStart );
1189  if ( d->_log ) d->_log->provideStart();
1190  }
1191 
1193  {
1194  d_func()->_workerPath = path;
1195  }
1196 
1197  bool Provide::ejectDevice(const std::string &queueRef, const std::string &device)
1198  {
1199  if ( !queueRef.empty() ) {
1201  }
1202  return false;
1203  }
1204 
1205  void Provide::setStatusTracker( ProvideStatusRef tracker )
1206  {
1207  d_func()->_log = tracker;
1208  }
1209 
1211  {
1212  return d_func()->_workDir;
1213  }
1214 
1216  {
1217  Z_D();
1218  return d->_credManagerOptions;
1219  }
1220 
1222  {
1223  d_func()->_credManagerOptions = opt;
1224  }
1225 
1227  {
1228  return d_func()->_sigIdle;
1229  }
1230 
1232  {
1233  return d_func()->_sigMediaChange;
1234  }
1235 
1236  SignalProxy< std::optional<zypp::media::AuthData> ( const zypp::Url &reqUrl, const std::string &triedUsername, const std::map<std::string, std::string> &extraValues ) > Provide::sigAuthRequired()
1237  {
1238  return d_func()->_sigAuthRequired;
1239  }
1240 
1242 
1244  : _provider( parent )
1245  { }
1246 
1248  {
1249  _stats = Stats();
1250  _stats._startTime = std::chrono::steady_clock::now();
1251  _stats._lastPulseTime = std::chrono::steady_clock::now();
1252  }
1253 
1255  {
1256  const auto &sTime = item.startTime();
1257  const auto &fTime = item.finishedTime();
1258  if ( sTime > sTime.min() && fTime >= sTime ) {
1259  auto duration = std::chrono::duration_cast<std::chrono::seconds>( item.finishedTime() - item.startTime() );
1260  if ( duration.count() )
1261  MIL << "Item finished after " << duration.count() << " seconds, with " << zypp::ByteCount( item.currentStats()->_bytesProvided.operator zypp::ByteCount::SizeType() / duration.count() ) << "/s" << std::endl;
1262  MIL << "Item finished after " << (item.finishedTime() - item.startTime()).count() << " ns" << std::endl;
1263  }
1264  pulse( );
1265  }
1266 
1268  {
1269  MIL << "Item failed" << std::endl;
1270  }
1271 
1273  {
1274  return _stats;
1275  }
1276 
1278  {
1279  auto prov = _provider.lock();
1280  if ( !prov )
1281  return;
1282 
1283  const auto lastFinishedBytes = _stats._finishedBytes;
1284  const auto lastPartialBytes = _stats._partialBytes;
1285  _stats._expectedBytes = _stats._finishedBytes; // finished bytes are expected too!
1286  zypp::ByteCount tmpPartialBytes (0); // bytes that are finished in staging, but not commited to cache yet
1287 
1288  for ( const auto &i : prov->d_func()->items() ) {
1289 
1290  if ( !i // maybe released during scheduling
1291  || i->state() == ProvideItem::Cancelling )
1292  continue;
1293 
1294  if ( i->state() == ProvideItem::Uninitialized
1295  || i->state() == ProvideItem::Pending ) {
1296  _stats._expectedBytes += i->bytesExpected();
1297  continue;
1298  }
1299 
1300  i->pulse();
1301 
1302  const auto & stats = i->currentStats();
1303  const auto & prevStats = i->previousStats();
1304  if ( !stats || !prevStats ) {
1305  ERR << "Bug! Stats should be initialized by now" << std::endl;
1306  continue;
1307  }
1308 
1309  if ( i->state() == ProvideItem::Downloading
1310  || i->state() == ProvideItem::Processing
1311  || i->state() == ProvideItem::Finalizing ) {
1312  _stats._expectedBytes += stats->_bytesExpected;
1313  tmpPartialBytes += stats->_bytesProvided;
1314  } else if ( i->state() == ProvideItem::Finished ) {
1315  _stats._finishedBytes += stats->_bytesProvided; // remember those bytes are finished in stats directly
1316  _stats._expectedBytes += stats->_bytesProvided;
1317  }
1318  }
1319 
1320  const auto now = std::chrono::steady_clock::now();
1321  const auto sinceLast = std::chrono::duration_cast<std::chrono::seconds>( now - _stats._lastPulseTime );
1322  const auto lastFinB = lastPartialBytes + lastFinishedBytes;
1323  const auto currFinB = tmpPartialBytes + _stats._finishedBytes;
1324 
1325  const auto diff = currFinB - lastFinB;
1326  _stats._lastPulseTime = now;
1327  _stats._partialBytes = tmpPartialBytes;
1328 
1329  if ( sinceLast >= std::chrono::seconds(1) )
1330  _stats._perSecondSinceLastPulse = ( diff / ( sinceLast.count() ) );
1331 
1332  auto sinceStart = std::chrono::duration_cast<std::chrono::seconds>( _stats._lastPulseTime - _stats._startTime );
1333  if ( sinceStart.count() ) {
1334  const size_t diff = _stats._finishedBytes + _stats._partialBytes;
1335  _stats._perSecond = zypp::ByteCount( diff / sinceStart.count() );
1336  }
1337  }
1338 }
std::string getScheme() const
Returns the scheme name of the URL.
Definition: Url.cc:551
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
Definition: provide_p.h:36
#define MIL
Definition: Logger.h:100
SignalProxy< void()> sigIdle()
Definition: provide.cc:1226
const zypp::Pathname & providerWorkdir() const
Definition: provide.cc:1210
zypp::ByteCount _finishedBytes
Definition: provide.h:86
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
Definition: provideitem.cc:565
std::optional< zypp::ManagedFile > addToFileCache(const zypp::Pathname &downloadedFile)
Definition: provide.cc:759
bool queueRequest(ProvideRequestRef req)
Definition: provide.cc:821
ProvideStatusRef _log
Definition: provide_p.h:145
Pathname realpath() const
Returns this path as the absolute canonical pathname.
Definition: Pathname.cc:231
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
Definition: provide.cc:1074
const zypp::media::CredManagerOptions & credManangerOptions() const
Definition: provide.cc:1215
virtual void pulse()
Definition: provide.cc:1277
ProvideStatus(ProvideRef parent)
Definition: provide.cc:1243
std::list< ProvideItemRef > _items
Definition: provide_p.h:121
const std::vector< zypp::Url > & mirrors() const
Definition: provide.cc:982
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
Definition: Url.cc:804
Store and operate with byte count.
Definition: ByteCount.h:31
Signal< void()> _sigIdle
Definition: provide_p.h:146
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
Definition: provideres.h:35
const zypp::Pathname & workerPath() const
Definition: provide.cc:859
AsyncOpRef< expected< zypp::CheckSum > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
Definition: provide.cc:1137
const ProvideMediaSpec & spec() const
AttachedMediaInfo_Ptr _mediaRef
Definition: provide.h:70
virtual void itemFailed(ProvideItem &item)
Definition: provide.cc:1267
std::string nextMediaId() const
Definition: provide.cc:803
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Definition: String.h:1119
String related utilities and Regular expression matching.
void enqueue(ProvideRequestRef request)
Definition: providequeue.cc:96
bool provideDebugEnabled()
Definition: providedbg_p.h:28
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
Definition: String.h:140
Definition: Arch.h:363
std::unordered_map< std::string, ProvideQueue::Config > _schemeConfigs
Definition: provide_p.h:134
uint32_t nextRequestId()
Definition: provide.cc:946
zypp::Pathname _workerPath
Definition: provide_p.h:142
const std::string & label() const
Definition: providespec.cc:100
std::chrono::time_point< std::chrono::steady_clock > TimePoint
static ProvideRef create(const zypp::Pathname &workDir="")
Definition: provide.cc:1010
Timer::Ptr _scheduleTrigger
Definition: provide_p.h:118
Convenient building of std::string with boost::format.
Definition: String.h:253
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
Definition: Logger.h:117
zypp::ByteCount _perSecond
Definition: provide.h:90
std::chrono::steady_clock::time_point _lastPulseTime
Definition: provide.h:83
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
Definition: Exception.h:463
const std::vector< zypp::Url > & urls() const
State state() const
Definition: provideitem.cc:495
void setAuthority(const std::string &authority)
Set the authority component in the URL.
Definition: Url.cc:716
zyppng::AttachedMediaInfo_constPtr mediaInfo() const
Definition: provide.cc:998
const std::optional< zypp::Pathname > & localPath() const
Definition: provide.cc:990
#define ERR
Definition: Logger.h:102
std::vector< AttachedMediaInfo_Ptr > & attachedMediaInfos()
Definition: provide.cc:739
SignalProxy< void(Timer &t)> sigExpired()
This signal is always emitted when the timer expires.
Definition: timer.cc:120
#define Z_D()
Definition: zyppglobal.h:105
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition: ManagedFile.h:27
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Definition: provide.cc:1221
Assign a vaiable a certain value when going out of scope.
Definition: dtorreset.h:49
bool isInCache(const zypp::Pathname &downloadedFile) const
Definition: provide.cc:779
WeakPtr parent() const
Definition: base.cc:26
expected< LazyMediaHandle > prepareMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition: provide.cc:1015
bool empty() const
Test for an empty path.
Definition: Pathname.h:116
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
Definition: Url.cc:782
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
Definition: provide.cc:744
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Definition: provide.cc:21
std::deque< QueueItem > _queues
Definition: provide_p.h:128
void onItemStateChanged(ProvideItem &item)
Definition: provide.cc:928
std::list< ProvideItemRef > & items()
Definition: provide.cc:699
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc)> sigMediaChangeRequested()
Definition: provide.cc:1231
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Definition: Url.cc:686
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
Definition: base.h:142
const std::string & asString() const
String representation.
Definition: Pathname.h:93
const Config & workerConfig() const
Just inherits Exception to separate media exceptions.
zypp::ByteCount _expectedBytes
Definition: provide.h:87
virtual void itemDone(ProvideItem &item)
Definition: provide.cc:1254
bool ejectDevice(const std::string &queueRef, const std::string &device)
Definition: provide.cc:1197
void setWorkerPath(const zypp::Pathname &path)
Definition: provide.cc:1192
#define WAR
Definition: Logger.h:101
ProvideWeakRef _provider
Definition: provide.h:111
void onPulseTimeout(Timer &)
Definition: provide.cc:887
const std::optional< ItemStats > & currentStats() const
Definition: provideitem.cc:122
bool isRunning() const
Definition: provide.cc:873
The Timer class provides repetitive and single-shot timers.
Definition: timer.h:44
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
Definition: String.h:1156
ProvideMediaHandle MediaHandle
Definition: provide.h:123
std::vector< AttachedMediaInfo_Ptr > _attachedMediaInfos
Definition: provide_p.h:131
void schedule(ScheduleReason reason)
Definition: provide.cc:38
zypp::Pathname _workDir
Definition: provide_p.h:119
std::optional< MediaHandle > handle() const
virtual void provideStart()
Definition: provide.cc:1247
constexpr auto DEFAULT_CPU_WORKERS
Definition: provide_p.h:37
zypp::ByteCount _partialBytes
Definition: provide.h:88
bool isValid() const
Verifies the Url.
Definition: Url.cc:507
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
Definition: provide_p.h:34
const std::string queueName(ProvideQueue &q) const
Definition: provide.cc:864
std::conditional_t< isAsync, AsyncOpRef< T >, T > makeReadyResult(T &&result)
Definition: asyncop.h:297
void dequeueItem(ProvideItem *item)
Definition: provide.cc:791
Provide(const zypp::Pathname &workDir)
Definition: provide.cc:1004
int unlink(const Pathname &path)
Like &#39;unlink&#39;.
Definition: PathInfo.cc:705
static expected success(ConsParams &&...params)
Definition: expected.h:115
#define MIL_PRV
Definition: providedbg_p.h:35
zypp::media::CredManagerOptions _credManagerOptions
Definition: provide_p.h:143
bool startup(const std::string &workerScheme, const zypp::Pathname &workDir, const std::string &hostname="")
Definition: providequeue.cc:61
std::shared_ptr< AsyncOp< T > > AsyncOpRef
Definition: asyncop.h:255
constexpr std::string_view DEFAULT_PROVIDE_WORKER_PATH
Definition: provide_p.h:32
std::unordered_map< std::string, FileCacheItem > _fileCache
Definition: provide_p.h:140
bool dequeueRequest(ProvideRequestRef req, std::exception_ptr error)
Definition: provide.cc:837
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
Definition: provide.cc:1051
Base class for Exception.
Definition: Exception.h:152
constexpr auto DEFAULT_ACTIVE_CONN
Definition: provide_p.h:35
static AttachMediaItemRef create(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request, ProvidePrivate &parent)
void queueItem(ProvideItemRef item)
Definition: provide.cc:785
reference value() const
Reference to the Tp object.
Definition: AutoDispose.h:138
const zypp::Url & baseUrl() const
Definition: provide.cc:974
#define DBG_PRV
Definition: providedbg_p.h:34
zypp::media::CredManagerOptions & credManagerOptions()
Definition: provide.cc:704
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
Definition: provideitem.cc:75
zypp::ByteCount _perSecondSinceLastPulse
Definition: provide.h:89
ZYPP_IMPL_PRIVATE(UnixSignalSource)
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
Definition: AutoDispose.h:94
ProvideWeakRef _parent
Definition: provide.h:69
void setStatusTracker(ProvideStatusRef tracker)
Definition: provide.cc:1205
std::unordered_map< std::string, std::string > _workerAlias
Definition: provide_p.h:103
std::chrono::steady_clock::time_point _startTime
Definition: provide.h:82
const Stats & stats() const
Definition: provide.cc:1272
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
Definition: provide.cc:1157
Wrapper class for ::stat/::lstat.
Definition: PathInfo.h:225
constexpr std::string_view device("device")
ResultType and_then(const expected< T, E > &exp, Function &&f)
Definition: expected.h:423
static bool openTray(const std::string &device_r)
Definition: cdtools.cc:33
std::string handle() const
Definition: provide.cc:967
#define ZYPP_FWD_CURRENT_EXCPT()
Drops a logline and returns the current Exception as a std::exception_ptr.
Definition: Exception.h:471
AttachedMediaInfo_Ptr addMedium(AttachedMediaInfo_Ptr &&medium)
Definition: provide.cc:809
Easy-to use interface to the ZYPP dependency resolver.
Definition: Application.cc:19
Unit::ValueType SizeType
Definition: ByteCount.h:38
void doSchedule(Timer &)
Definition: provide.cc:70
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
Definition: provide_p.h:133
std::string effectiveScheme(const std::string &scheme) const
Definition: provide.cc:878
std::shared_ptr< T > shared_this() const
Definition: base.h:113
std::shared_ptr< Provide > parent() const
Definition: provide.cc:957
virtual std::chrono::steady_clock::time_point startTime() const
Definition: provideitem.cc:132
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
Definition: provide_p.h:33
std::vector< zypp::Url > sanitizeUrls(const std::vector< zypp::Url > &urls)
Definition: provide.cc:709
Url manipulation class.
Definition: Url.h:92
virtual std::chrono::steady_clock::time_point finishedTime() const
Definition: provideitem.cc:137
#define DBG
Definition: Logger.h:99
AsyncOpRef< expected< MediaHandle > > attachMediaIfNeeded(LazyMediaHandle lazyHandle)
Definition: provide.cc:1031
SignalProxy< std::optional< zypp::media::AuthData > const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) > sigAuthRequired()
Definition: provide.cc:1236