Epetra Package Browser (Single Doxygen Collection) Development
Loading...
Searching...
No Matches
Epetra_MpiDistributor.cpp
Go to the documentation of this file.
1//@HEADER
2// ************************************************************************
3//
4// Epetra: Linear Algebra Services Package
5// Copyright 2011 Sandia Corporation
6//
7// Under the terms of Contract DE-AC04-94AL85000 with Sandia Corporation,
8// the U.S. Government retains certain rights in this software.
9//
10// Redistribution and use in source and binary forms, with or without
11// modification, are permitted provided that the following conditions are
12// met:
13//
14// 1. Redistributions of source code must retain the above copyright
15// notice, this list of conditions and the following disclaimer.
16//
17// 2. Redistributions in binary form must reproduce the above copyright
18// notice, this list of conditions and the following disclaimer in the
19// documentation and/or other materials provided with the distribution.
20//
21// 3. Neither the name of the Corporation nor the names of the
22// contributors may be used to endorse or promote products derived from
23// this software without specific prior written permission.
24//
25// THIS SOFTWARE IS PROVIDED BY SANDIA CORPORATION "AS IS" AND ANY
26// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
27// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
28// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL SANDIA CORPORATION OR THE
29// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
30// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
31// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
32// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
33// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
34// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
35// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36//
37// Questions? Contact Michael A. Heroux (maherou@sandia.gov)
38//
39// ************************************************************************
40//@HEADER
41
43#include "Epetra_MpiComm.h"
44
45#include <stdexcept>
46#include <vector>
47
48//==============================================================================
50 Epetra_Object("Epetra::MpiDistributor"),
51 lengths_to_(0),
52 procs_to_(0),
53 indices_to_(0),
54 size_indices_to_(0),
55 lengths_from_(0),
56 procs_from_(0),
57 indices_from_(0),
58 size_indices_from_(0),
59 resized_(false),
60 sizes_(0),
61 sizes_to_(0),
62 starts_to_(0),
63 starts_to_ptr_(0),
64 indices_to_ptr_(0),
65 sizes_from_(0),
66 starts_from_(0),
67 starts_from_ptr_(0),
68 indices_from_ptr_(0),
69 nrecvs_(0),
70 nsends_(0),
71 nexports_(0),
72 self_msg_(0),
73 max_send_length_(0),
74 total_recv_length_(0),
75 tag_(Comm.GetMpiTag()),
76 epComm_(&Comm),
77 comm_(Comm.GetMpiComm()),
78 request_(0),
79 status_(0),
80 no_delete_(false),
81 send_array_(0),
82 send_array_size_(0),
83 comm_plan_reverse_(0)
84{
85}
86
87//==============================================================================
89 Epetra_Object("Epetra::MpiDistributor"),
90 lengths_to_(0),
91 procs_to_(0),
92 indices_to_(0),
93 size_indices_to_(Distributor.size_indices_to_),
94 lengths_from_(0),
95 procs_from_(0),
96 indices_from_(0),
97 size_indices_from_(Distributor.size_indices_from_),
98 resized_(false),
99 sizes_(0),
100 sizes_to_(0),
101 starts_to_(0),
102 starts_to_ptr_(0),
103 indices_to_ptr_(0),
104 sizes_from_(0),
105 starts_from_(0),
106 starts_from_ptr_(0),
107 indices_from_ptr_(0),
108 nrecvs_(Distributor.nrecvs_),
109 nsends_(Distributor.nsends_),
110 nexports_(Distributor.nexports_),
111 self_msg_(Distributor.self_msg_),
112 max_send_length_(Distributor.max_send_length_),
113 total_recv_length_(Distributor.total_recv_length_),
114 tag_(Distributor.tag_),
115 epComm_(Distributor.epComm_),
116 comm_(Distributor.comm_),
117 request_(0),
118 status_(0),
119 no_delete_(0),// NOTE: Stuff is being copied... you can always delete it
120 send_array_(0),
121 send_array_size_(0),
122 comm_plan_reverse_(0)
123{
124 int i;
125 if (nsends_>0) {
126 lengths_to_ = new int[nsends_];
127 procs_to_ = new int[nsends_];
128 starts_to_ = new int[nsends_];
129 for (i=0; i<nsends_; i++) {
130 lengths_to_[i] = Distributor.lengths_to_[i];
131 procs_to_[i] = Distributor.procs_to_[i];
132 starts_to_[i] = Distributor.starts_to_[i];
133 }
134 }
135 if (size_indices_to_>0) {
136 indices_to_ = new int[size_indices_to_];
137 for (i=0; i<size_indices_to_; i++) {
138 indices_to_[i] = Distributor.indices_to_[i];
139 }
140 indices_to_ptr_ = new int[nexports_];
141 for (i=0; i<nexports_; i++) {
142 indices_to_ptr_[i] = Distributor.indices_to_ptr_[i];
143 }
144 }
145
146 if( nsends_+self_msg_ > 0 ) {
147 if(Distributor.sizes_to_) sizes_to_ = new int[nsends_+self_msg_];
148 if(Distributor.starts_to_ptr_) starts_to_ptr_ = new int[nsends_+self_msg_];
149 for(i=0;i<nsends_+self_msg_; i++) {
150 if(Distributor.sizes_to_) sizes_to_[i] = Distributor.sizes_to_[i];
151 if(Distributor.starts_to_ptr_) starts_to_ptr_[i] = Distributor.starts_to_ptr_[i];
152 }
153 }
154
155 if (nrecvs_>0) {
156 lengths_from_ = new int[nrecvs_];
157 procs_from_ = new int[nrecvs_];
158 starts_from_ = new int[nrecvs_];
159 request_ = new MPI_Request[ nrecvs_ ];
160 status_ = new MPI_Status[ nrecvs_ ];
161 for (i=0; i<nrecvs_; i++) {
162 lengths_from_[i] = Distributor.lengths_from_[i];
163 procs_from_[i] = Distributor.procs_from_[i];
164 starts_from_[i] = Distributor.starts_from_[i];
165 }
166 }
167
168 if (size_indices_from_>0) {
170 for (i=0; i<size_indices_from_; i++) {
171 indices_from_[i] = Distributor.indices_from_[i];
172 }
173 }
174
175 if( nrecvs_+self_msg_ > 0 ) {
176 if(Distributor.sizes_from_) sizes_from_ = new int [nrecvs_+self_msg_];
177 if(Distributor.starts_from_ptr_) starts_from_ptr_ = new int [nrecvs_+self_msg_];
178 for(i=0;i<nrecvs_+self_msg_; i++) {
179 if(Distributor.sizes_from_) sizes_from_[i] = Distributor.sizes_from_[i];
180 if(Distributor.starts_from_ptr_) starts_from_ptr_[i] = Distributor.starts_from_ptr_[i];
181 }
182 }
183
184 // Note: indices_from_ptr_ is always length zero...
185
186}
187
188//==============================================================================
190{
191 if( !no_delete_ )
192 {
193 if( lengths_to_ != 0 ) delete [] lengths_to_;
194 if( procs_to_ != 0 ) delete [] procs_to_;
195 if( indices_to_ != 0 ) delete [] indices_to_;
196 if( lengths_from_ != 0 ) delete [] lengths_from_;
197 if( procs_from_ != 0 ) delete [] procs_from_;
198 if( indices_from_ != 0 ) delete [] indices_from_;
199 if( starts_to_ != 0 ) delete [] starts_to_;
200 if( starts_from_ != 0 ) delete [] starts_from_;
201 }
202
203 if( sizes_ != 0 ) delete [] sizes_;
204 if( sizes_to_ != 0 ) delete [] sizes_to_;
205 if( sizes_from_ != 0 ) delete [] sizes_from_;
206 if( starts_to_ptr_ != 0 ) delete [] starts_to_ptr_;
207 if( starts_from_ptr_ != 0 ) delete [] starts_from_ptr_;
208 if( indices_to_ptr_ != 0 ) delete [] indices_to_ptr_;
209 if( indices_from_ptr_ != 0 ) delete [] indices_from_ptr_;
210
211 if( request_ != 0 ) delete [] request_;
212 if( status_ != 0 ) delete [] status_;
213
214 if( send_array_size_ != 0 ) { delete [] send_array_; send_array_size_ = 0; }
215
216 if( comm_plan_reverse_ != 0 ) delete comm_plan_reverse_;
217}
218
219
220//==============================================================================
221int Epetra_MpiDistributor::CreateFromSends( const int & NumExportIDs,
222 const int * ExportPIDs,
223 bool Deterministic,
224 int & NumRemoteIDs )
225{
226 (void)Deterministic; // Prevent compiler warnings for unused argument.
227 int my_proc;
228 MPI_Comm_rank( comm_, &my_proc );
229
230 int nprocs;
231 MPI_Comm_size( comm_, &nprocs );
232
233 // Do the forward map component
234 CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
235
236 //Invert map to see what msgs are received and what length
237 EPETRA_CHK_ERR( ComputeRecvs_( my_proc, nprocs ) );
238
239 if (nrecvs_>0) {
240 if( !request_ ) {
241 request_ = new MPI_Request[ nrecvs_ ];
242 status_ = new MPI_Status[ nrecvs_ ];
243 }
244 }
245
246 NumRemoteIDs = total_recv_length_;
247
248 return 0;
249}
250
251//==============================================================================
252int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
253 const int * RemoteGIDs,
254 const int * RemotePIDs,
255 bool Deterministic,
256 int & NumExportIDs,
257 int *& ExportGIDs,
258 int *& ExportPIDs )
259{
260 int my_proc;
261 MPI_Comm_rank( comm_, &my_proc );
262
263 int nprocs;
264 MPI_Comm_size( comm_, &nprocs );
265
266 EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
267 ExportGIDs, ExportPIDs, my_proc) );
268
269 int testNumRemoteIDs;
270 EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
271 Deterministic, testNumRemoteIDs ) );
272
273 return(0);
274}
275
276//==============================================================================
277//---------------------------------------------------------------------------
278//CreateFromRecvs Method
279// - create communication plan given a known list of procs to recv from
280//---------------------------------------------------------------------------
281#ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
282int Epetra_MpiDistributor::CreateFromRecvs( const int & NumRemoteIDs,
283 const long long * RemoteGIDs,
284 const int * RemotePIDs,
285 bool Deterministic,
286 int & NumExportIDs,
287 long long *& ExportGIDs,
288 int *& ExportPIDs )
289{
290 int my_proc;
291 MPI_Comm_rank( comm_, &my_proc );
292
293 int nprocs;
294 MPI_Comm_size( comm_, &nprocs );
295
296 EPETRA_CHK_ERR( ComputeSends_( NumRemoteIDs, RemoteGIDs, RemotePIDs, NumExportIDs,
297 ExportGIDs, ExportPIDs, my_proc) );
298
299 int testNumRemoteIDs;
300 EPETRA_CHK_ERR( CreateFromSends( NumExportIDs, ExportPIDs,
301 Deterministic, testNumRemoteIDs ) );
302
303 return(0);
304}
305#endif
306
307
308
309//==============================================================================
310//---------------------------------------------------------------------------
311//CreateFromSendsAndRecvs Method
312//---------------------------------------------------------------------------
314 const int * ExportPIDs,
315 const int & NumRemoteIDs,
316 const int * RemoteGIDs,
317 const int * RemotePIDs,
318 bool Deterministic)
319{
320 (void)RemoteGIDs;
321 (void)Deterministic; // Prevent compiler warnings for unused argument.
322 nexports_ = NumExportIDs;
323
324 int my_proc;
325 MPI_Comm_rank( comm_, &my_proc );
326 int nprocs;
327 MPI_Comm_size( comm_, &nprocs );
328
329 // Do the forward map component
330 CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
331
332 // Do the reverse map component
333 CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
334
335 return 0;
336}
337#ifndef EPETRA_NO_64BIT_GLOBAL_INDICES
339 const int * ExportPIDs,
340 const int & NumRemoteIDs,
341 const long long * RemoteGIDs,
342 const int * RemotePIDs,
343 bool Deterministic)
344{
345 (void)RemoteGIDs;
346 (void)Deterministic; // Prevent compiler warnings for unused argument.
347 nexports_ = NumExportIDs;
348
349 int my_proc;
350 MPI_Comm_rank( comm_, &my_proc );
351 int nprocs;
352 MPI_Comm_size( comm_, &nprocs );
353
354 // Do the forward map component
355 CreateSendStructures_(my_proc,nprocs,NumExportIDs,ExportPIDs);
356
357 // Do the reverse map component
358 CreateRecvStructures_(NumRemoteIDs,RemotePIDs);
359
360 return 0;
361
362
363}
364#endif
365
366
367
368//==============================================================================
370 int nprocs,
371 const int & NumExportIDs,
372 const int * ExportPIDs)
373{
374 nexports_ = NumExportIDs;
375
376 int i;
377
378 // Check to see if items are grouped by processor w/o gaps
379 // If so, indices_to -> 0
380
381 // Setup data structures for quick traversal of arrays
382 int * starts = new int[ nprocs + 1 ];
383 for( i = 0; i < nprocs; i++ )
384 starts[i] = 0;
385
386 int nactive = 0;
387 bool no_send_buff = true;
388 int numDeadIndices = 0; // In some cases the GIDs will not be owned by any processors and the PID will be -1
389
390 for( i = 0; i < NumExportIDs; i++ )
391 {
392 if( no_send_buff && i && (ExportPIDs[i] < ExportPIDs[i-1]) )
393 no_send_buff = false;
394 if( ExportPIDs[i] >= 0 )
395 {
396 ++starts[ ExportPIDs[i] ];
397 ++nactive;
398 }
399 else numDeadIndices++; // Increase the number of dead indices. Used below to leave these out of the analysis
400 }
401
402 self_msg_ = ( starts[my_proc] != 0 ) ? 1 : 0;
403
404 nsends_ = 0;
405
406 if( no_send_buff ) //grouped by processor, no send buffer or indices_to_ needed
407 {
408 for( i = 0; i < nprocs; ++i )
409 if( starts[i] ) ++nsends_;
410
411 if( nsends_ )
412 {
413 procs_to_ = new int[nsends_];
414 starts_to_ = new int[nsends_];
415 lengths_to_ = new int[nsends_];
416 }
417
418 int index = numDeadIndices; // Leave off the dead indices (PID = -1)
419 int proc;
420 for( i = 0; i < nsends_; ++i )
421 {
422 starts_to_[i] = index;
423 proc = ExportPIDs[index];
424 procs_to_[i] = proc;
425 index += starts[proc];
426 }
427
428 if( nsends_ )
430
432
433 for( i = 0; i < nsends_; ++i )
434 {
435 proc = procs_to_[i];
436 lengths_to_[i] = starts[proc];
437 if( (proc != my_proc) && (lengths_to_[i] > max_send_length_) )
439 }
440 }
441 else //not grouped by processor, need send buffer and indices_to_
442 {
443 if( starts[0] != 0 ) nsends_ = 1;
444
445 for( i = 1; i < nprocs; i++ )
446 {
447 if( starts[i] != 0 ) ++nsends_;
448 starts[i] += starts[i-1];
449 }
450
451 for( i = nprocs-1; i != 0; i-- )
452 starts[i] = starts[i-1];
453
454 starts[0] = 0;
455
456 if (nactive>0) {
457 indices_to_ = new int[ nactive ];
458 size_indices_to_ = nactive;
459 }
460
461 for( i = 0; i < NumExportIDs; i++ )
462 if( ExportPIDs[i] >= 0 )
463 {
464 indices_to_[ starts[ ExportPIDs[i] ] ] = i;
465 ++starts[ ExportPIDs[i] ];
466 }
467
468 //Reconstuct starts array to index into indices_to.
469
470 for( i = nprocs-1; i != 0; i-- )
471 starts[i] = starts[i-1];
472 starts[0] = 0;
473 starts[nprocs] = nactive;
474
475 if (nsends_>0) {
476 lengths_to_ = new int[ nsends_ ];
477 procs_to_ = new int[ nsends_ ];
478 starts_to_ = new int[ nsends_ ];
479 }
480
481 int j = 0;
483
484 for( i = 0; i < nprocs; i++ )
485 if( starts[i+1] != starts[i] )
486 {
487 lengths_to_[j] = starts[i+1] - starts[i];
488 starts_to_[j] = starts[i];
489 if( ( i != my_proc ) && ( lengths_to_[j] > max_send_length_ ) )
491 procs_to_[j] = i;
492 j++;
493 }
494 }
495
496 delete [] starts;
497
499
500 return 0;
501}
502
503//==============================================================================
505 const int * RemotePIDs)
506{
507 int i, j;
508
509 // Since the RemotePIDs should be sorted, counting the total number of recvs should be easy...
510 // use nsends as an initial guess for space.
511 std::vector<int> recv_list;
512 recv_list.reserve(nsends_);
513
514 int last_pid=-2;
515 for(i=0; i<NumRemoteIDs; i++) {
516 if(RemotePIDs[i]>last_pid) {
517 recv_list.push_back(RemotePIDs[i]);
518 last_pid = RemotePIDs[i];
519 }
520 else if (RemotePIDs[i]<last_pid)
521 throw std::runtime_error("Epetra_MpiDistributor::CreateRecvStructures_ expected RemotePIDs to be in sorted order");
522 }
523 nrecvs_=recv_list.size();
524
525 if (nrecvs_>0) {
526 starts_from_ = new int[nrecvs_];
527 procs_from_ = new int[nrecvs_];
528 lengths_from_ = new int[nrecvs_];
529 request_ = new MPI_Request[ nrecvs_ ];
530 status_ = new MPI_Status[ nrecvs_ ];
531 }
532
533 for(i=0,j=0; i<nrecvs_; ++i) {
534 int jlast=j;
535 procs_from_[i] = recv_list[i];
536 starts_from_[i] = j;
537 for( ; j<NumRemoteIDs && RemotePIDs[jlast]==RemotePIDs[j] ; j++){;}
538 lengths_from_[i]=j-jlast;
539 }
540 total_recv_length_=NumRemoteIDs;
541
543
544 return 0;
545}
546
547
548//==============================================================================
549//---------------------------------------------------------------------------
550//ComputeRecvs Method
551//---------------------------------------------------------------------------
553 int nprocs )
554{
555 int * msg_count = new int[ nprocs ];
556 int * counts = new int[ nprocs ];
557
558 int i;
559 MPI_Status status;
560
561 for( i = 0; i < nprocs; i++ )
562 {
563 msg_count[i] = 0;
564 counts[i] = 1;
565 }
566
567 for( i = 0; i < nsends_+self_msg_; i++ )
568 msg_count[ procs_to_[i] ] = 1;
569
570#if defined(REDUCE_SCATTER_BUG)
571// the bug is found in mpich on linux platforms
572 MPI_Reduce(msg_count, counts, nprocs, MPI_INT, MPI_SUM, 0, comm_);
573 MPI_Scatter(counts, 1, MPI_INT, &nrecvs_, 1, MPI_INT, 0, comm_);
574#else
575 MPI_Reduce_scatter( msg_count, &nrecvs_, counts, MPI_INT, MPI_SUM, comm_ );
576#endif
577
578 delete [] msg_count;
579 delete [] counts;
580
581 if (nrecvs_>0) {
582 lengths_from_ = new int[nrecvs_];
583 procs_from_ = new int[nrecvs_];
584 for(i=0; i<nrecvs_; ++i) {
585 lengths_from_[i] = 0;
586 procs_from_[i] = 0;
587 }
588 }
589
590#ifndef NEW_COMM_PATTERN
591 for( i = 0; i < (nsends_+self_msg_); i++ )
592 if( procs_to_[i] != my_proc ) {
593 MPI_Send( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
594 }
595 else
596 {
597 //set self_msg_ to end block of recv arrays
599 procs_from_[nrecvs_-1] = my_proc;
600 }
601
602 for( i = 0; i < (nrecvs_-self_msg_); i++ )
603 {
604 MPI_Recv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &status );
605 procs_from_[i] = status.MPI_SOURCE;
606 }
607
608 MPI_Barrier( comm_ );
609#else
610 if (nrecvs_>0) {
611 if( !request_ ) {
612 request_ = new MPI_Request[nrecvs_-self_msg_];
613 status_ = new MPI_Status[nrecvs_-self_msg_];
614 }
615 }
616
617 for( i = 0; i < (nrecvs_-self_msg_); i++ )
618 MPI_Irecv( &(lengths_from_[i]), 1, MPI_INT, MPI_ANY_SOURCE, tag_, comm_, &(request_[i]) );
619
620 MPI_Barrier( comm_ );
621
622 for( i = 0; i < (nsends_+self_msg_); i++ )
623 if( procs_to_[i] != my_proc ) {
624 MPI_Rsend( &(lengths_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_ );
625 }
626 else
627 {
628 //set self_msg_ to end block of recv arrays
630 procs_from_[nrecvs_-1] = my_proc;
631 }
632
633 if( (nrecvs_-self_msg_) > 0 ) MPI_Waitall( (nrecvs_-self_msg_), request_, status_ );
634
635 for( i = 0; i < (nrecvs_-self_msg_); i++ )
636 procs_from_[i] = status_[i].MPI_SOURCE;
637#endif
638
640
641 // Compute indices_from_
642 // Seems to break some rvs communication
643/* Not necessary since rvs communication is always blocked
644 size_indices_from_ = 0;
645 if( nrecvs_ > 0 )
646 {
647 for( i = 0; i < nrecvs_; i++ ) size_indices_from_ += lengths_from_[i];
648 indices_from_ = new int[ size_indices_from_ ];
649
650 for (i=0; i<size_indices_from_; i++) indices_from_[i] = i;
651 }
652*/
653
654 if (nrecvs_>0) starts_from_ = new int[nrecvs_];
655 int j = 0;
656 for( i=0; i<nrecvs_; ++i )
657 {
658 starts_from_[i] = j;
659 j += lengths_from_[i];
660 }
661
663 for( i = 0; i < nrecvs_; i++ )
665
667
668 MPI_Barrier( comm_ );
669
670 return false;
671}
672
673//==============================================================================
674//---------------------------------------------------------------------------
675//ComputeSends Method
676//---------------------------------------------------------------------------
677template<typename id_type>
679 const id_type *& import_ids,
680 const int *& import_procs,
681 int & num_exports,
682 id_type *& export_ids,
683 int *& export_procs,
684 int my_proc ) {
685
687 int i;
688
689 int * proc_list = 0;
690 int * import_objs = 0;
691 char * c_export_objs = 0;
692 const int pack_size = (1 + sizeof(id_type)/sizeof(int));
693
694 if( num_imports > 0 )
695 {
696 proc_list = new int[ num_imports ];
697 import_objs = new int[ num_imports * pack_size];
698
699 for( i = 0; i < num_imports; i++ )
700 {
701 proc_list[i] = import_procs[i];
702
703 *(id_type*)(import_objs + pack_size*i) = import_ids[i];
704 *(import_objs + pack_size*i + (pack_size-1)) = my_proc;
705 }
706 }
707
708 EPETRA_CHK_ERR(tmp_plan.CreateFromSends( num_imports, proc_list,
709 true, num_exports) );
710 if( num_exports > 0 )
711 {
712 //export_objs = new int[ 2 * num_exports ];
713 export_ids = new id_type[ num_exports ];
714 export_procs = new int[ num_exports ];
715 }
716 else
717 {
718 export_ids = 0;
719 export_procs = 0;
720 }
721
722 int len_c_export_objs = 0;
723 EPETRA_CHK_ERR( tmp_plan.Do(reinterpret_cast<char *> (import_objs),
724 pack_size * (int)sizeof( int ),
725 len_c_export_objs,
726 c_export_objs) );
727 int * export_objs = reinterpret_cast<int *>(c_export_objs);
728
729 for( i = 0; i < num_exports; i++ ) {
730 export_ids[i] = *(id_type*)(export_objs + pack_size*i);
731 export_procs[i] = *(export_objs + pack_size*i + (pack_size-1));
732 }
733
734 if( proc_list != 0 ) delete [] proc_list;
735 if( import_objs != 0 ) delete [] import_objs;
736 if( len_c_export_objs != 0 ) delete [] c_export_objs;
737
738 return(0);
739
740}
741
742//==============================================================================
743int Epetra_MpiDistributor::Do( char * export_objs,
744 int obj_size,
745 int & len_import_objs,
746 char *& import_objs )
747{
748 EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, len_import_objs, import_objs) );
750
751
753 for(int i=0; i<NumSends(); i++)
754 lastRoundBytesSend_ += lengths_to_[i]*obj_size;
755 lastRoundBytesRecv_ =len_import_objs;
756
757 return(0);
758}
759
760//==============================================================================
761int Epetra_MpiDistributor::DoReverse( char * export_objs,
762 int obj_size,
763 int & len_import_objs,
764 char *& import_objs )
765{
766 EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size,
767 len_import_objs, import_objs) );
769
770 // This is slightly weird, since DoReversePosts() actually calls DoPosts() on
771 // a reversed distributor
773 for(int i=0; i<NumReceives(); i++)
774 lastRoundBytesRecv_ += lengths_from_[i]*obj_size;
775 lastRoundBytesSend_ =len_import_objs;
776
777 return(0);
778}
779
780//==============================================================================
781int Epetra_MpiDistributor::DoPosts( char * export_objs,
782 int obj_size,
783 int & len_import_objs,
784 char *& import_objs )
785{
786 int i, j, k;
787
788 int my_proc = 0;
789 int self_recv_address = 0;
790
791 MPI_Comm_rank( comm_, &my_proc );
792
793 if( len_import_objs < (total_recv_length_*obj_size) )
794 {
795 if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
796 len_import_objs = total_recv_length_*obj_size;
797 if (len_import_objs>0) import_objs = new char[len_import_objs];
798 for( i=0; i<len_import_objs; ++i ) import_objs[i]=0;
799 }
800
801 k = 0;
802
803 j = 0;
804 for( i = 0; i < (nrecvs_+self_msg_); i++ )
805 {
806 if( procs_from_[i] != my_proc )
807 {
808 MPI_Irecv( &(import_objs[j]),
809 lengths_from_[i] * obj_size,
810 MPI_CHAR, procs_from_[i],
811 tag_, comm_,
812 &(request_[k]) );
813 k++;
814 }
815 else
816 self_recv_address = j;
817
818 j += lengths_from_[i] * obj_size;
819 }
820
821#ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
822 // NOTE (mfh 19 Mar 2012):
823 //
824 // The ready-sends below require that each ready-send's matching
825 // receive (see above) has already been posted. We ensure this with
826 // a barrier. (Otherwise, some process that doesn't need to post
827 // receives might post its ready-send before the receiving process
828 // gets to post its receive.) If you want to remove the barrier,
829 // you'll have to replace the ready-sends below with standard sends
830 // or Isends.
831 MPI_Barrier( comm_ );
832#endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
833
834 //setup scan through procs_to list starting w/ higher numbered procs
835 //Should help balance msg traffic
836 int nblocks = nsends_ + self_msg_;
837 int proc_index = 0;
838 while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
839 ++proc_index;
840 if( proc_index == nblocks ) proc_index = 0;
841
842 int self_num = 0, self_index = 0;
843 int p;
844
845 if( !indices_to_ ) //data already blocked by processor
846 {
847 for( i = 0; i < nblocks; ++i )
848 {
849 p = i + proc_index;
850 if( p > (nblocks-1) ) p -= nblocks;
851
852 if( procs_to_[p] != my_proc ) {
853
854#ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
855 MPI_Rsend( &export_objs[starts_to_[p]*obj_size],
856 lengths_to_[p]*obj_size,
857 MPI_CHAR,
858 procs_to_[p],
859 tag_,
860 comm_ );
861#else
862 MPI_Send( &export_objs[starts_to_[p]*obj_size],
863 lengths_to_[p]*obj_size,
864 MPI_CHAR,
865 procs_to_[p],
866 tag_,
867 comm_ );
868#endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
869 }
870 else {
871 self_num = p;
872 }
873 }
874
875 if( self_msg_ )
876 memcpy( &import_objs[self_recv_address],
877 &export_objs[starts_to_[self_num]*obj_size],
878 lengths_to_[self_num]*obj_size );
879 }
880 else //data not blocked by proc, use send buffer
881 {
882 if( send_array_size_ < (max_send_length_*obj_size) )
883 {
884 if( send_array_!=0 ) {delete [] send_array_; send_array_ = 0;}
887 }
888
889 j = 0;
890 for( i = 0; i < nblocks; i++ )
891 {
892 p = i + proc_index;
893 if( p > (nblocks-1) ) p -= nblocks;
894 if( procs_to_[p] != my_proc )
895 {
896 int offset = 0;
897 j = starts_to_[p];
898 for( k = 0; k < lengths_to_[p]; k++ )
899 {
900 memcpy( &(send_array_[offset]),
901 &(export_objs[indices_to_[j]*obj_size]),
902 obj_size );
903 ++j;
904 offset += obj_size;
905 }
906#ifndef EPETRA_NO_READY_SEND_IN_DO_POSTS
907 MPI_Rsend( send_array_,
908 lengths_to_[p] * obj_size,
909 MPI_CHAR,
910 procs_to_[p],
911 tag_, comm_ );
912#else
913 MPI_Send( send_array_,
914 lengths_to_[p] * obj_size,
915 MPI_CHAR,
916 procs_to_[p],
917 tag_, comm_ );
918#endif // EPETRA_NO_READY_SEND_IN_DO_POSTS
919 }
920 else
921 {
922 self_num = p;
923 self_index = starts_to_[p];
924 }
925 }
926
927 if( self_msg_ )
928 for( k = 0; k < lengths_to_[self_num]; k++ )
929 {
930 memcpy( &(import_objs[self_recv_address]),
931 &(export_objs[indices_to_[self_index]*obj_size]),
932 obj_size );
933 self_index++;
934 self_recv_address += obj_size;
935 }
936 }
937 return(0);
938}
939
940//==============================================================================
942{
943 if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
944
945 return(0);
946}
947
948//==============================================================================
950 int obj_size,
951 int & len_import_objs,
952 char *& import_objs )
953{
954 assert(indices_to_==0); //Can only do reverse comm when original data
955 // is blocked by processor
956
957 // If we don't have a reverse distributor, make one.
958 if( comm_plan_reverse_ == 0 )
960
961 int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, len_import_objs, import_objs);
962
963 return(comm_flag);
964}
965
966//==============================================================================
968{
969 if( comm_plan_reverse_ == 0 ) return (-1);
970
971 int comm_flag = comm_plan_reverse_->DoWaits();
972
973 return(comm_flag);
974}
975
976//==============================================================================
977//---------------------------------------------------------------------------
978//Resize Method (Heaphy)
979//---------------------------------------------------------------------------
981{
982 int i, j, k; // loop counters
983 int sum;
984
985 //if (sizes == 0) return 0;
986
987 int my_proc;
988 MPI_Comm_rank (comm_, &my_proc);
989 int nprocs;
990 MPI_Comm_size( comm_, &nprocs );
991
992 if( resized_ )
993 {
994 //test and see if we are already setup for these sizes
995 bool match = true;
996 for( i = 0; i < nexports_; ++i )
997 match = match && (sizes_[i]==sizes[i]);
998 int matched = match?1:0;
999 int match_count = 0;
1000 MPI_Allreduce( &matched, &match_count, 1, MPI_INT, MPI_SUM, comm_ );
1001 if( match_count == nprocs )
1002 return 0;
1003 else //reset existing sizing arrays
1004 max_send_length_ = 0;
1005 }
1006
1007 if( !sizes_ && nexports_ ) sizes_ = new int[nexports_];
1008 for (i = 0; i < nexports_; i++)
1009 sizes_[i] = sizes[i];
1010
1011 if( !sizes_to_ && (nsends_+self_msg_) ) sizes_to_ = new int[nsends_+self_msg_];
1012 for (i = 0; i < (nsends_+self_msg_); ++i)
1013 sizes_to_[i] = 0;
1014
1016
1017 if( !indices_to_ ) //blocked sends
1018 {
1019
1020 int * index = 0;
1021 int * sort_val = 0;
1022 if (nsends_+self_msg_>0) {
1023 index = new int[nsends_+self_msg_];
1024 sort_val = new int[nsends_+self_msg_];
1025 }
1026 for( i = 0; i < (nsends_+self_msg_); ++i )
1027 {
1028 j = starts_to_[i];
1029 for( k = 0; k < lengths_to_[i]; ++k )
1030 sizes_to_[i] += sizes[j++];
1031 if( (sizes_to_[i] > max_send_length_) && (procs_to_[i] != my_proc) )
1033 }
1034
1035 for( i = 0; i < (nsends_+self_msg_); ++i )
1036 {
1037 sort_val[i] = starts_to_[i];
1038 index[i] = i;
1039 }
1040
1041 if( nsends_+self_msg_ )
1042 Sort_ints_( sort_val, index, (nsends_+self_msg_) );
1043
1044 sum = 0;
1045 for( i = 0; i < (nsends_+self_msg_); ++i )
1046 {
1047 starts_to_ptr_[ index[i] ] = sum;
1048 sum += sizes_to_[ index[i] ];
1049 }
1050
1051 if (index!=0) {delete [] index; index = 0;}
1052 if (sort_val!=0) {delete [] sort_val; sort_val = 0;}
1053 }
1054 else //Sends not blocked, so have to do more work
1055 {
1057 int * offset = 0;
1058 if( nexports_ ) offset = new int[nexports_];
1059
1060 //Compute address for every item in send array
1061 sum = 0;
1062 for( i = 0; i < nexports_; ++i )
1063 {
1064 offset[i] = sum;
1065 sum += sizes_[i];
1066 }
1067
1068 sum = 0;
1069 max_send_length_ = 0;
1070 for( i = 0; i < (nsends_+self_msg_); ++i )
1071 {
1072 starts_to_ptr_[i] = sum;
1073 for( j = starts_to_[i]; j < (starts_to_[i]+lengths_to_[i]); ++j )
1074 {
1075 indices_to_ptr_[j] = offset[ indices_to_[j] ];
1076 sizes_to_[i] += sizes_[ indices_to_[j] ];
1077 }
1078 if( sizes_to_[i] > max_send_length_ && procs_to_[i] != my_proc )
1080 sum += sizes_to_[i];
1081 }
1082
1083 if (offset!=0) {delete [] offset; offset = 0;}
1084 }
1085
1086 // Exchange sizes routine inserted here:
1087 int self_index_to = -1;
1089 if( !sizes_from_ && (nrecvs_+self_msg_) ) sizes_from_ = new int [nrecvs_+self_msg_];
1090
1091#ifndef EPETRA_NEW_COMM_PATTERN
1092 for (i = 0; i < (nsends_+self_msg_); i++)
1093 {
1094 if(procs_to_[i] != my_proc)
1095 MPI_Send ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1096 else
1097 self_index_to = i;
1098 }
1099
1100 MPI_Status status;
1101 for (i = 0; i < (nrecvs_+self_msg_); ++i)
1102 {
1103 sizes_from_[i] = 0;
1104 if (procs_from_[i] != my_proc)
1105 MPI_Recv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &status);
1106 else
1107 sizes_from_[i] = sizes_to_[self_index_to];
1109 }
1110#else
1111 if (nrecvs_>0 && !request_) {
1112 request_ = new MPI_Request[ nrecvs_-self_msg_ ];
1113 status_ = new MPI_Status[ nrecvs_-self_msg_ ];
1114 }
1115
1116 for (i = 0; i < (nsends_+self_msg_); i++)
1117 {
1118 if(procs_to_[i] == my_proc)
1119 self_index_to = i;
1120 }
1121
1122 for (i = 0; i < (nrecvs_+self_msg_); ++i)
1123 {
1124 sizes_from_[i] = 0;
1125 if (procs_from_[i] != my_proc)
1126 MPI_Irecv((void *) &(sizes_from_[i]), 1, MPI_INT, procs_from_[i], tag_, comm_, &(request_[i]));
1127 else
1128 {
1129 sizes_from_[i] = sizes_to_[self_index_to];
1131 }
1132 }
1133
1134 MPI_Barrier( comm_ );
1135
1136 for (i = 0; i < (nsends_+self_msg_); i++)
1137 {
1138 if(procs_to_[i] != my_proc)
1139 MPI_Rsend ((void *) &(sizes_to_[i]), 1, MPI_INT, procs_to_[i], tag_, comm_);
1140 }
1141
1142 if( nrecvs_ > 0 ) MPI_Waitall( nrecvs_, request_, status_ );
1143
1144 for (i = 0; i < (nrecvs_+self_msg_); ++i)
1145 {
1146 if (procs_from_[i] != my_proc)
1148 }
1149#endif
1150 // end of exchanges sizes insert
1151
1152 sum = 0;
1154 for (i = 0; i < (nrecvs_+self_msg_); ++i)
1155 {
1156 starts_from_ptr_[i] = sum;
1157 sum += sizes_from_[i];
1158 }
1159
1160 resized_ = true;
1161
1162 return 0;
1163}
1164
1165//==============================================================================
1166int Epetra_MpiDistributor::Do( char * export_objs,
1167 int obj_size,
1168 int *& sizes,
1169 int & len_import_objs,
1170 char *& import_objs )
1171{
1172 EPETRA_CHK_ERR( DoPosts(export_objs, obj_size, sizes,
1173 len_import_objs, import_objs) );
1175
1176 return(0);
1177}
1178
1179//==============================================================================
1180int Epetra_MpiDistributor::DoReverse( char * export_objs,
1181 int obj_size,
1182 int *& sizes,
1183 int & len_import_objs,
1184 char *& import_objs )
1185{
1186 EPETRA_CHK_ERR( DoReversePosts(export_objs, obj_size, sizes,
1187 len_import_objs, import_objs) );
1189
1190 return(0);
1191}
1192
1193//==============================================================================
1194int Epetra_MpiDistributor::DoPosts( char * export_objs,
1195 int obj_size,
1196 int *& sizes,
1197 int & len_import_objs,
1198 char *& import_objs )
1199{
1200 int ierr = Resize_(sizes);
1201 if (ierr != 0) {
1202 return(ierr);
1203 }
1204
1205 MPI_Barrier( comm_ );
1206
1207 int i, j, k;
1208
1209 int my_proc = 0;
1210 int self_recv_address = 0;
1211
1212 MPI_Comm_rank( comm_, &my_proc );
1213
1214 if( len_import_objs < (total_recv_length_*obj_size) )
1215 {
1216 if( import_objs!=0 ) {delete [] import_objs; import_objs = 0;}
1217 len_import_objs = total_recv_length_*obj_size;
1218 if (len_import_objs>0) import_objs = new char[len_import_objs];
1219 }
1220
1221 k = 0;
1222
1223 for( i = 0; i < (nrecvs_+self_msg_); ++i )
1224 {
1225 if( procs_from_[i] != my_proc )
1226 {
1227 MPI_Irecv( &(import_objs[starts_from_ptr_[i] * obj_size]),
1228 sizes_from_[i] * obj_size,
1229 MPI_CHAR, procs_from_[i], tag_, comm_, &(request_[k]) );
1230 k++;
1231 }
1232 else
1233 self_recv_address = starts_from_ptr_[i] * obj_size;
1234 }
1235
1236 MPI_Barrier( comm_ );
1237
1238 //setup scan through procs_to list starting w/ higher numbered procs
1239 //Should help balance msg traffic
1240 int nblocks = nsends_ + self_msg_;
1241 int proc_index = 0;
1242 while( proc_index < nblocks && procs_to_[proc_index] < my_proc )
1243 ++proc_index;
1244 if( proc_index == nblocks ) proc_index = 0;
1245
1246 int self_num = 0;
1247 int p;
1248
1249 if( !indices_to_ ) //data already blocked by processor
1250 {
1251 for( i = 0; i < nblocks; ++i )
1252 {
1253 p = i + proc_index;
1254 if( p > (nblocks-1) ) p -= nblocks;
1255
1256 if( procs_to_[p] != my_proc )
1257 MPI_Rsend( &export_objs[starts_to_ptr_[p]*obj_size],
1258 sizes_to_[p]*obj_size,
1259 MPI_CHAR,
1260 procs_to_[p],
1261 tag_,
1262 comm_ );
1263 else
1264 self_num = p;
1265 }
1266
1267 if( self_msg_ )
1268 memcpy( &import_objs[self_recv_address],
1269 &export_objs[starts_to_ptr_[self_num]*obj_size],
1270 sizes_to_[self_num]*obj_size );
1271 }
1272 else //data not blocked by proc, need to copy to buffer
1273 {
1275 {
1276 if (send_array_!=0) {delete [] send_array_; send_array_ = 0;}
1277 send_array_ = 0;
1278 send_array_size_ = 0;
1279 }
1280 if( !send_array_size_ )
1281 {
1284 }
1285
1286 for( i=0; i<nblocks; ++i )
1287 {
1288 p = i + proc_index;
1289 if( p > (nblocks-1) ) p -= nblocks;
1290
1291 if( procs_to_[p] != my_proc )
1292 {
1293 int offset = 0;
1294 j = starts_to_[p];
1295 for( k=0; k<lengths_to_[p]; ++k )
1296 {
1297 memcpy( &send_array_[offset],
1298 &export_objs[indices_to_ptr_[j]*obj_size],
1299 sizes_[indices_to_[j]]*obj_size );
1300 offset += sizes_[indices_to_[j]]*obj_size;
1301 ++j;
1302 }
1303 MPI_Rsend( send_array_, sizes_to_[p]*obj_size,
1304 MPI_CHAR, procs_to_[p], tag_, comm_ );
1305 }
1306 else
1307 self_num = p;
1308 }
1309
1310 if( self_msg_ )
1311 {
1312 int jj;
1313 j = starts_to_[self_num];
1314 for( k=0; k<lengths_to_[self_num]; ++k )
1315 {
1316 jj = indices_to_ptr_[j];
1317 memcpy( &import_objs[self_recv_address],
1318 &export_objs[jj*obj_size],
1319 sizes_[indices_to_[j]*obj_size] );
1320 self_recv_address += (obj_size*sizes_[indices_to_[j]]);
1321 ++jj;
1322 }
1323 }
1324 }
1325
1326 return(0);
1327}
1328
1329//==============================================================================
1331 int obj_size,
1332 int *& sizes,
1333 int & len_import_objs,
1334 char *& import_objs )
1335{
1336 assert(indices_to_==0); //Can only do reverse comm when original data
1337 // is blocked by processor
1338
1339 // If we don't have a reverse distributor, make one.
1340 if( comm_plan_reverse_ == 0 )
1342
1343 int comm_flag = comm_plan_reverse_->DoPosts(export_objs, obj_size, sizes, len_import_objs, import_objs);
1344
1345 return(comm_flag);
1346}
1347
1348//==============================================================================
1349void Epetra_MpiDistributor::Print(std::ostream & os) const
1350{
1351 using std::endl;
1352
1353 int myRank = 0, numProcs = 1;
1354 MPI_Comm_rank (comm_, &myRank);
1355 MPI_Comm_size (comm_, &numProcs);
1356
1357 if (myRank == 0) {
1358 os << "Epetra_MpiDistributor (implements Epetra_Distributor)" << std::endl;
1359 }
1360 // Let each MPI process print its data. We assume that all
1361 // processes can print to the given output stream, and execute
1362 // barriers to make it more likely that the output will be in the
1363 // right order.
1364 for (int p = 0; p < numProcs; ++p) {
1365 if (myRank == p) {
1366 os << "[Node " << p << " of " << numProcs << "]" << std::endl;
1367 os << " selfMessage: " << self_msg_ << std::endl;
1368 os << " numSends: " << nsends_ << std::endl;
1369
1370 os << " imagesTo: [";
1371 for (int i = 0; i < nsends_; ++i) {
1372 os << procs_to_[i];
1373 if (i < nsends_ - 1) {
1374 os << " ";
1375 }
1376 }
1377 os << "]" << std::endl;
1378
1379 os << " lengthsTo: [";
1380 for (int i = 0; i < nsends_; ++i) {
1381 os << lengths_to_[i];
1382 if (i < nsends_ - 1) {
1383 os << " ";
1384 }
1385 }
1386 os << "]" << std::endl;
1387
1388 os << " maxSendLength: " << max_send_length_ << std::endl;
1389
1390 os << " startsTo: ";
1391 if (starts_to_ == NULL) {
1392 os << "(NULL)" << std::endl;
1393 } else {
1394 os << "[";
1395 for (int i = 0; i < nsends_; ++i) {
1396 os << starts_to_[i];
1397 if (i < nsends_ - 1) {
1398 os << " ";
1399 }
1400 }
1401 os << "]" << std::endl;
1402 }
1403
1404 os << " indicesTo: ";
1405 if (indices_to_ == NULL) {
1406 os << "(NULL)" << std::endl;
1407 } else {
1408 os << "[";
1409 int k = 0;
1410 for (int i = 0; i < nsends_; ++i) {
1411 for (int j = 0; j < lengths_to_[i]; ++j) {
1412 os << " " << indices_to_[j+k];
1413 }
1414 k += lengths_to_[i];
1415 }
1416 os << "]" << std::endl;
1417 }
1418
1419 os << " numReceives: " << nrecvs_ << std::endl;
1420 os << " totalReceiveLength: " << total_recv_length_ << std::endl;
1421
1422 os << " lengthsFrom: [";
1423 for (int i = 0; i < nrecvs_; ++i) {
1424 os << lengths_from_[i];
1425 if (i < nrecvs_ - 1) {
1426 os << " ";
1427 }
1428 }
1429 os << "]" << std::endl;
1430
1431 os << " startsFrom: [";
1432 for (int i = 0; i < nrecvs_; ++i) {
1433 os << starts_from_[i];
1434 if (i < nrecvs_ - 1) {
1435 os << " ";
1436 }
1437 }
1438 os << "]" << std::endl;
1439
1440 os << " imagesFrom: [";
1441 for (int i = 0; i < nrecvs_; ++i) {
1442 os << procs_from_[i];
1443 if (i < nrecvs_ - 1) {
1444 os << " ";
1445 }
1446 }
1447 os << "]" << std::endl;
1448
1449 // mfh 16 Dec 2011: I found this commented out here; not sure if
1450 // we want to print this, so I'm leaving it commented out.
1451 /*
1452 os << "indices_from: ";
1453 k = 0;
1454 for( i = 0; i < nrecvs_; i++ )
1455 {
1456 for( j = 0; j < lengths_from_[i]; j++ )
1457 os << " " << indices_from_[j+k];
1458 k += lengths_from_[i];
1459 }
1460 */
1461
1462 // Last output is a flush; it leaves a space and also
1463 // helps synchronize output.
1464 os << std::flush;
1465 } // if it's my process' turn to print
1466
1467 // Execute barriers to give output time to synchronize.
1468 // One barrier generally isn't enough.
1469 MPI_Barrier (comm_);
1470 MPI_Barrier (comm_);
1471 MPI_Barrier (comm_);
1472 }
1473}
1474
1475//---------------------------------------------------------------------------
1477 int *vals_sort, // values to be sorted
1478 int *vals_other, // other array to be reordered with sort
1479 int nvals) // length of these two arrays
1480{
1481// It is primarily used to sort messages to improve communication flow.
1482// This routine will also insure that the ordering produced by the invert_map
1483// routines is deterministic. This should make bugs more reproducible. This
1484// is accomplished by sorting the message lists by processor ID.
1485// This is a distribution count sort algorithm (see Knuth)
1486// This version assumes non negative integers.
1487
1488 if (nvals <= 1) return 0;
1489
1490 int i; // loop counter
1491
1492 // find largest int, n, to size sorting array, then allocate and clear it
1493 int n = 0;
1494 for (i = 0; i < nvals; i++)
1495 if (n < vals_sort[i]) n = vals_sort[i];
1496 int *pos = new int [n+2];
1497 for (i = 0; i < n+2; i++) pos[i] = 0;
1498
1499 // copy input arrays into temporary copies to allow sorting original arrays
1500 int *copy_sort = new int [nvals];
1501 int *copy_other = new int [nvals];
1502 for (i = 0; i < nvals; i++)
1503 {
1504 copy_sort[i] = vals_sort[i];
1505 copy_other[i] = vals_other[i];
1506 }
1507
1508 // count the occurances of integers ("distribution count")
1509 int *p = pos+1;
1510 for (i = 0; i < nvals; i++) p[copy_sort[i]]++;
1511
1512 // create the partial sum of distribution counts
1513 for (i = 1; i < n; i++) p[i] += p[i-1];
1514
1515 // the shifted partitial sum is the index to store the data in sort order
1516 p = pos;
1517 for (i = 0; i < nvals; i++)
1518 {
1519 vals_sort [p[copy_sort [i]]] = copy_sort[i];
1520 vals_other [p[copy_sort [i]]++] = copy_other[i];
1521 }
1522
1523 delete [] copy_sort;
1524 delete [] copy_other;
1525 delete [] pos;
1526
1527 return 0;
1528}
1529
1530//-------------------------------------------------------------------------
1532{
1533 (void)src;
1534 //not currently supported
1535 bool throw_error = true;
1536 if (throw_error) {
1537 throw ReportError("Epetra_MpiDistributor::operator= not supported.",-1);
1538 }
1539 return( *this );
1540}
1541
1542
1543//-------------------------------------------------------------------------
1545 int i;
1546 int my_proc = 0;
1547
1548 MPI_Comm_rank( comm_, &my_proc );
1549
1550 if( comm_plan_reverse_ == 0 ) {
1551 int total_send_length = 0;
1552 for( i = 0; i < nsends_+self_msg_; i++ )
1553 total_send_length += lengths_to_[i];
1554
1555 int max_recv_length = 0;
1556 for( i = 0; i < nrecvs_; i++ )
1557 if( procs_from_[i] != my_proc )
1558 if( lengths_from_[i] > max_recv_length )
1559 max_recv_length = lengths_from_[i];
1560
1562
1567
1572
1576
1577 comm_plan_reverse_->max_send_length_ = max_recv_length;
1578 comm_plan_reverse_->total_recv_length_ = total_send_length;
1579
1582
1584 }
1585
1586}
1587
1588//-------------------------------------------------------------------------
1590 if(comm_plan_reverse_==0)
1592
1593 return(dynamic_cast<Epetra_Distributor *>(new Epetra_MpiDistributor(*comm_plan_reverse_)));
1594}
#define EPETRA_CHK_ERR(a)
Epetra_Distributor: The Epetra Gather/Scatter Setup Base Class.
Epetra_MpiComm: The Epetra MPI Communication Class.
MPI implementation of Epetra_Distributor.
int DoReversePosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Do reverse post of buffer of export objects (can do other local work before executing Waits)
int CreateFromRecvs(const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic, int &NumExportIDs, int *&ExportGIDs, int *&ExportPIDs)
Create a communication plan from receive list.
int Sort_ints_(int *vals, int *other, int nvals)
const Epetra_MpiComm * epComm_
Epetra_Distributor * ReverseClone()
Create and extract the reverse version of the distributor.
Epetra_MpiDistributor(const Epetra_MpiComm &Comm)
Default constructor.
void Print(std::ostream &os) const
int ComputeRecvs_(int my_proc, int nprocs)
virtual ~Epetra_MpiDistributor()
Destructor (declared virtual for memory safety).
int NumSends() const
The number of procs to which we will send data.
int DoReverseWaits()
Wait on a reverse set of posts.
int CreateSendStructures_(int my_proc, int nprocs, const int &NumExportIDs, const int *ExportPIDs)
Epetra_MpiDistributor * comm_plan_reverse_
Epetra_MpiDistributor & operator=(const Epetra_MpiDistributor &src)
int ComputeSends_(int num_imports, const id_type *&import_ids, const int *&import_procs, int &num_exports, id_type *&export_ids, int *&export_procs, int my_proc)
int CreateFromSendsAndRecvs(const int &NumExportIDs, const int *ExportPIDs, const int &NumRemoteIDs, const int *RemoteGIDs, const int *RemotePIDs, bool Deterministic)
Create a communication plan from send list and a recv list.
int CreateFromSends(const int &NumExportIDs, const int *ExportPIDs, bool Deterministic, int &NumRemoteIDs)
Create a communication plan from send list.
int CreateRecvStructures_(const int &NumRemoteIDs, const int *RemotePIDs)
int DoReverse(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute reverse of plan on buffer of export objects in a single step.
int Do(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Execute plan on buffer of export objects in a single step.
int DoPosts(char *export_objs, int obj_size, int &len_import_objs, char *&import_objs)
Post buffer of export objects (can do other local work before executing Waits)
int NumReceives() const
The number of procs from which we will receive data.
int DoWaits()
Wait on a set of posts.
Epetra_Object: The base Epetra class.
Definition: Epetra_Object.h:57
virtual int ReportError(const std::string Message, int ErrorCode) const
Error reporting method.