Stxxl  1.2.1
sort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/sort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002-2003 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  * Copyright (C) 2006 Johannes Singler <singler@ira.uka.de>
8  * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #ifndef STXXL_SORT_HEADER
16 #define STXXL_SORT_HEADER
17 
18 #include <list>
19 #include <functional>
20 
21 #include <stxxl/bits/mng/mng.h>
22 #include <stxxl/bits/common/rand.h>
23 #include <stxxl/bits/mng/adaptor.h>
24 #include <stxxl/bits/common/simple_vector.h>
25 #include <stxxl/bits/common/switch.h>
27 #include <stxxl/bits/mng/block_alloc_interleaved.h>
28 #include <stxxl/bits/algo/intksort.h>
29 #include <stxxl/bits/algo/adaptor.h>
30 #include <stxxl/bits/algo/async_schedule.h>
31 #include <stxxl/bits/mng/block_prefetcher.h>
32 #include <stxxl/bits/mng/buf_writer.h>
33 #include <stxxl/bits/algo/run_cursor.h>
34 #include <stxxl/bits/algo/losertree.h>
35 #include <stxxl/bits/algo/inmemsort.h>
36 #include <stxxl/bits/parallel.h>
37 
38 
39 //#define SORT_OPTIMAL_PREFETCHING
40 //#define INTERLEAVED_ALLOC
41 //#define STXXL_CHECK_ORDER_IN_SORTS
42 
43 __STXXL_BEGIN_NAMESPACE
44 
47 
48 
51 namespace sort_local
52 {
53  template <typename BIDTp_, typename ValTp_>
54  struct trigger_entry
55  {
56  typedef BIDTp_ bid_type;
57  typedef ValTp_ value_type;
58 
59  bid_type bid;
60  value_type value;
61 
62  operator bid_type ()
63  {
64  return bid;
65  }
66  };
67 
68  template <typename BIDTp_, typename ValTp_, typename ValueCmp_>
69  struct trigger_entry_cmp : public std::binary_function<trigger_entry<BIDTp_, ValTp_>, trigger_entry<BIDTp_, ValTp_>, bool>
70  {
71  typedef trigger_entry<BIDTp_, ValTp_> trigger_entry_type;
72  ValueCmp_ cmp;
73  trigger_entry_cmp(ValueCmp_ c) : cmp(c) { }
74  trigger_entry_cmp(const trigger_entry_cmp & a) : cmp(a.cmp) { }
75  bool operator () (const trigger_entry_type & a, const trigger_entry_type & b) const
76  {
77  return cmp(a.value, b.value);
78  }
79  };
80 
81  template <typename block_type,
82  typename prefetcher_type,
83  typename value_cmp>
84  struct run_cursor2_cmp
85  {
86  typedef run_cursor2<block_type, prefetcher_type> cursor_type;
87  value_cmp cmp;
88 
89  run_cursor2_cmp(value_cmp c) : cmp(c) { }
90  run_cursor2_cmp(const run_cursor2_cmp & a) : cmp(a.cmp) { }
91  inline bool operator () (const cursor_type & a, const cursor_type & b)
92  {
93  if (UNLIKELY(b.empty()))
94  return true;
95  // sentinel emulation
96  if (UNLIKELY(a.empty()))
97  return false;
98  //sentinel emulation
99 
100  return (cmp(a.current(), b.current()));
101  }
102  };
103 
104  template <typename block_type, typename bid_type>
105  struct read_next_after_write_completed
106  {
107  block_type * block;
108  bid_type bid;
109  request_ptr * req;
110  void operator () (request * /*completed_req*/)
111  {
112  * req = block->read(bid);
113  }
114  };
115 
116 
117  template <
118  typename block_type,
119  typename run_type,
120  typename input_bid_iterator,
121  typename value_cmp>
122  void
123  create_runs(
124  input_bid_iterator it,
125  run_type ** runs,
126  int_type nruns,
127  int_type _m,
128  value_cmp cmp)
129  {
130  typedef typename block_type::value_type type;
131  typedef typename block_type::bid_type bid_type;
132  STXXL_VERBOSE1("stxxl::create_runs nruns=" << nruns << " m=" << _m);
133 
134  int_type m2 = _m / 2;
135  block_manager * bm = block_manager::get_instance();
136  block_type * Blocks1 = new block_type[m2];
137  block_type * Blocks2 = new block_type[m2];
138  bid_type * bids1 = new bid_type[m2];
139  bid_type * bids2 = new bid_type[m2];
140  request_ptr * read_reqs1 = new request_ptr[m2];
141  request_ptr * read_reqs2 = new request_ptr[m2];
142  request_ptr * write_reqs = new request_ptr[m2];
143  read_next_after_write_completed<block_type, bid_type> * next_run_reads =
144  new read_next_after_write_completed<block_type, bid_type>[m2];
145 
146  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
147 
148  int_type i;
149  int_type run_size = 0, next_run_size = 0;
150 
151  assert(nruns >= 2);
152 
153  run_size = runs[0]->size();
154  assert(run_size == m2);
155 
156  for (i = 0; i < run_size; ++i)
157  {
158  STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks1[i].elem));
159  bids1[i] = *(it++);
160  read_reqs1[i] = Blocks1[i].read(bids1[i]);
161  }
162 
163  run_size = runs[1]->size();
164 
165  for (i = 0; i < run_size; ++i)
166  {
167  STXXL_VERBOSE1("stxxl::create_runs posting read " << long(Blocks2[i].elem));
168  bids2[i] = *(it++);
169  read_reqs2[i] = Blocks2[i].read(bids2[i]);
170  }
171 
172  for (int_type k = 0; k < nruns - 1; ++k)
173  {
174  run_type * run = runs[k];
175  run_size = run->size();
176  assert(run_size == m2);
177  next_run_size = runs[k + 1]->size();
178  assert((next_run_size == m2) || (next_run_size <= m2 && k == nruns - 2));
179 
180  STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
181  wait_all(read_reqs1, run_size);
182  STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
183  for (i = 0; i < run_size; ++i)
184  bm->delete_block(bids1[i]);
185 
186  if (block_type::has_filler)
187  std::sort(
188  ArrayOfSequencesIterator<
189  block_type, typename block_type::value_type, block_type::size
190  >(Blocks1, 0),
191  ArrayOfSequencesIterator<
192  block_type, typename block_type::value_type, block_type::size
193  >(Blocks1, run_size * block_type::size),
194  cmp);
195  else
196  std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
197 
198 
199  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
200  if (k > 0)
201  wait_all(write_reqs, m2);
202  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
203 
204  int_type runplus2size = (k < nruns - 2) ? runs[k + 2]->size() : 0;
205  for (i = 0; i < m2; ++i)
206  {
207  STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
208  (*run)[i].value = Blocks1[i][0];
209  if (i >= runplus2size) {
210  write_reqs[i] = Blocks1[i].write((*run)[i].bid);
211  }
212  else
213  {
214  next_run_reads[i].block = Blocks1 + i;
215  next_run_reads[i].req = read_reqs1 + i;
216  bids1[i] = next_run_reads[i].bid = *(it++);
217  write_reqs[i] = Blocks1[i].write((*run)[i].bid, next_run_reads[i]);
218  }
219  }
220  std::swap(Blocks1, Blocks2);
221  std::swap(bids1, bids2);
222  std::swap(read_reqs1, read_reqs2);
223  }
224 
225  run_type * run = runs[nruns - 1];
226  run_size = run->size();
227  STXXL_VERBOSE1("stxxl::create_runs start waiting read_reqs1");
228  wait_all(read_reqs1, run_size);
229  STXXL_VERBOSE1("stxxl::create_runs finish waiting read_reqs1");
230  for (i = 0; i < run_size; ++i)
231  bm->delete_block(bids1[i]);
232 
233  if (block_type::has_filler) {
234  std::sort(
235  ArrayOfSequencesIterator<
236  block_type, typename block_type::value_type, block_type::size
237  >(Blocks1, 0),
238  ArrayOfSequencesIterator<
239  block_type, typename block_type::value_type, block_type::size
240  >(Blocks1, run_size * block_type::size),
241  cmp);
242  } else {
243  std::sort(Blocks1[0].elem, Blocks1[run_size].elem, cmp);
244  }
245 
246  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
247  wait_all(write_reqs, m2);
248  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
249 
250  for (i = 0; i < run_size; ++i)
251  {
252  STXXL_VERBOSE1("stxxl::create_runs posting write " << long(Blocks1[i].elem));
253  (*run)[i].value = Blocks1[i][0];
254  write_reqs[i] = Blocks1[i].write((*run)[i].bid);
255  }
256 
257  STXXL_VERBOSE1("stxxl::create_runs start waiting write_reqs");
258  wait_all(write_reqs, run_size);
259  STXXL_VERBOSE1("stxxl::create_runs finish waiting write_reqs");
260 
261  delete[] Blocks1;
262  delete[] Blocks2;
263  delete[] bids1;
264  delete[] bids2;
265  delete[] read_reqs1;
266  delete[] read_reqs2;
267  delete[] write_reqs;
268  delete[] next_run_reads;
269  }
270 
271 
272  template <typename block_type, typename run_type, typename value_cmp>
273  bool check_sorted_runs(run_type ** runs,
274  unsigned_type nruns,
275  unsigned_type m,
276  value_cmp cmp)
277  {
278  typedef typename block_type::value_type value_type;
279 
280  //STXXL_VERBOSE1("check_sorted_runs Runs: "<<nruns);
281  STXXL_MSG("check_sorted_runs Runs: " << nruns);
282  unsigned_type irun = 0;
283  for (irun = 0; irun < nruns; ++irun)
284  {
285  const unsigned_type nblocks_per_run = runs[irun]->size();
286  unsigned_type blocks_left = nblocks_per_run;
287  block_type * blocks = new block_type[m];
288  request_ptr * reqs = new request_ptr[m];
289  value_type last = cmp.min_value();
290 
291  for (unsigned_type off = 0; off < nblocks_per_run; off += m)
292  {
293  const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
294  const unsigned_type nelements = nblocks * block_type::size;
295  blocks_left -= nblocks;
296 
297  for (unsigned_type j = 0; j < nblocks; ++j)
298  {
299  reqs[j] = blocks[j].read((*runs[irun])[j + off].bid);
300  }
301  wait_all(reqs, reqs + nblocks);
302 
303  if (off && cmp(blocks[0][0], last))
304  {
305  STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
306  STXXL_MSG(" first value: " << blocks[0][0]);
307  STXXL_MSG(" last value: " << last);
308  for (unsigned_type k = 0; k < block_type::size; ++k)
309  STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k]);
310 
311  return false;
312  }
313 
314  for (unsigned_type j = 0; j < nblocks; ++j)
315  {
316  if (!(blocks[j][0] == (*runs[irun])[j + off].value))
317  {
318  STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (j + off));
319  STXXL_MSG(" trigger value: " << (*runs[irun])[j + off].value);
320  STXXL_MSG("Data in the block:");
321  for (unsigned_type k = 0; k < block_type::size; ++k)
322  STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k]);
323 
324  STXXL_MSG("BIDS:");
325  for (unsigned_type k = 0; k < nblocks; ++k)
326  {
327  if (k == j)
328  STXXL_MSG("Bad one comes next.");
329  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
330  }
331 
332  return false;
333  }
334  }
335  if (!stxxl::is_sorted(
336  ArrayOfSequencesIterator<
337  block_type, typename block_type::value_type, block_type::size
338  >(blocks, 0),
339  ArrayOfSequencesIterator<
340  block_type, typename block_type::value_type, block_type::size
341  >(blocks, nelements),
342  cmp))
343  {
344  STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
345  STXXL_MSG("Data in blocks:");
346  for (unsigned_type j = 0; j < nblocks; ++j)
347  {
348  for (unsigned_type k = 0; k < block_type::size; ++k)
349  STXXL_MSG(" Element " << k << " in block " << (j + off) << " is :" << blocks[j][k]);
350  }
351  STXXL_MSG("BIDS:");
352  for (unsigned_type k = 0; k < nblocks; ++k)
353  {
354  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
355  }
356 
357  return false;
358  }
359 
360  last = blocks[nblocks - 1][block_type::size - 1];
361  }
362 
363  assert(blocks_left == 0);
364  delete[] reqs;
365  delete[] blocks;
366  }
367 
368  return true;
369  }
370 
371 
372  template <typename block_type, typename run_type, typename value_cmp>
373  void merge_runs(run_type ** in_runs, int_type nruns, run_type * out_run, unsigned_type _m, value_cmp cmp
374  )
375  {
376  typedef typename block_type::bid_type bid_type;
377  typedef typename block_type::value_type value_type;
379  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
380  typedef run_cursor2_cmp<block_type, prefetcher_type, value_cmp> run_cursor2_cmp_type;
381 
382  int_type i;
383  run_type consume_seq(out_run->size());
384 
385  int_type * prefetch_seq = new int_type[out_run->size()];
386 
387  typename run_type::iterator copy_start = consume_seq.begin();
388  for (i = 0; i < nruns; i++)
389  {
390  // TODO: try to avoid copy
391  copy_start = std::copy(
392  in_runs[i]->begin(),
393  in_runs[i]->end(),
394  copy_start);
395  }
396 
397  std::stable_sort(consume_seq.begin(), consume_seq.end(),
398  trigger_entry_cmp<bid_type, value_type, value_cmp>(cmp));
399 
400  int_type disks_number = config::get_instance()->disks_number();
401 
402 #ifdef PLAY_WITH_OPT_PREF
403  const int_type n_write_buffers = 4 * disks_number;
404 #else
405  const int_type n_prefetch_buffers = STXXL_MAX(2 * disks_number, (3 * (int_type(_m) - nruns) / 4));
406  const int_type n_write_buffers = STXXL_MAX(2 * disks_number, int_type(_m) - nruns - n_prefetch_buffers);
407  #ifdef SORT_OPTIMAL_PREFETCHING
408  // heuristic
409  const int_type n_opt_prefetch_buffers = 2 * disks_number + (3 * (n_prefetch_buffers - 2 * disks_number)) / 10;
410  #endif
411 #endif
412 
413 #ifdef SORT_OPTIMAL_PREFETCHING
414  compute_prefetch_schedule(
415  consume_seq,
416  prefetch_seq,
417  n_opt_prefetch_buffers,
418  disks_number);
419 #else
420  for (i = 0; i < out_run->size(); i++)
421  prefetch_seq[i] = i;
422 
423 #endif
424 
425  prefetcher_type prefetcher(consume_seq.begin(),
426  consume_seq.end(),
427  prefetch_seq,
428  nruns + n_prefetch_buffers);
429 
430  buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
431 
432  int_type out_run_size = out_run->size();
433 
434  block_type * out_buffer = writer.get_free_block();
435 
436 //If parallelism is activated, one can still fall back to the
437 //native merge routine by setting stxxl::SETTINGS::native_merge= true, //otherwise, it is used anyway.
438 
439  if (do_parallel_merge())
440  {
441 #if STXXL_PARALLEL_MULTIWAY_MERGE
442 
443 // begin of STL-style merging
444 
445  typedef stxxl::int64 diff_type;
446  typedef std::pair<typename block_type::iterator, typename block_type::iterator> sequence;
447  typedef typename std::vector<sequence>::size_type seqs_size_type;
448  std::vector<sequence> seqs(nruns);
449  std::vector<block_type *> buffers(nruns);
450 
451  for (int_type i = 0; i < nruns; i++) //initialize sequences
452  {
453  buffers[i] = prefetcher.pull_block(); //get first block of each run
454  seqs[i] = std::make_pair(buffers[i]->begin(), buffers[i]->end());
455  //this memory location stays the same, only the data is exchanged
456  }
457 
458  #ifdef STXXL_CHECK_ORDER_IN_SORTS
459  value_type last_elem = cmp.min_value();
460  #endif
461 
462  for (int_type j = 0; j < out_run_size; ++j) //for the whole output run, out_run_size is in blocks
463  {
464  diff_type rest = block_type::size; //elements still to merge for this output block
465 
466  STXXL_VERBOSE1("output block " << j);
467  do {
468  value_type * min_last_element = NULL; //no element found yet
469  diff_type total_size = 0;
470 
471  for (seqs_size_type i = 0; i < seqs.size(); i++)
472  {
473  if (seqs[i].first == seqs[i].second)
474  continue; //run empty
475 
476  if (min_last_element == NULL)
477  min_last_element = &(*(seqs[i].second - 1));
478  else
479  min_last_element = cmp(*min_last_element, *(seqs[i].second - 1)) ? min_last_element : &(*(seqs[i].second - 1));
480 
481  total_size += seqs[i].second - seqs[i].first;
482  STXXL_VERBOSE1("last " << *(seqs[i].second - 1) << " block size " << (seqs[i].second - seqs[i].first));
483  }
484 
485  assert(min_last_element != NULL); //there must be some element
486 
487  STXXL_VERBOSE1("min_last_element " << min_last_element << " total size " << total_size + (block_type::size - rest));
488 
489  diff_type less_equal_than_min_last = 0;
490  //locate this element in all sequences
491  for (seqs_size_type i = 0; i < seqs.size(); i++)
492  {
493  if (seqs[i].first == seqs[i].second)
494  continue; //empty subsequence
495 
496  typename block_type::iterator position = std::upper_bound(seqs[i].first, seqs[i].second, *min_last_element, cmp);
497  STXXL_VERBOSE1("greater equal than " << position - seqs[i].first);
498  less_equal_than_min_last += position - seqs[i].first;
499  }
500 
501  STXXL_VERBOSE1("finished loop");
502 
503  ptrdiff_t output_size = (std::min)(less_equal_than_min_last, rest); //at most rest elements
504 
505  STXXL_VERBOSE1("before merge" << output_size);
506 
507  stxxl::parallel::multiway_merge(seqs.begin(), seqs.end(), out_buffer->end() - rest, cmp, output_size);
508  //sequence iterators are progressed appropriately
509 
510  STXXL_VERBOSE1("after merge");
511 
512  (*out_run)[j].value = (*out_buffer)[0]; //save smallest value
513 
514  rest -= output_size;
515 
516  STXXL_VERBOSE1("so long");
517 
518  for (seqs_size_type i = 0; i < seqs.size(); i++)
519  {
520  if (seqs[i].first == seqs[i].second) //run empty
521  {
522  if (prefetcher.block_consumed(buffers[i]))
523  {
524  seqs[i].first = buffers[i]->begin(); //reset iterator
525  seqs[i].second = buffers[i]->end();
526  STXXL_VERBOSE1("block ran empty " << i);
527  }
528  else
529  {
530  seqs.erase(seqs.begin() + i); //remove this sequence
531  buffers.erase(buffers.begin() + i);
532  STXXL_VERBOSE1("seq removed " << i);
533  }
534  }
535  }
536  } while (rest > 0 && seqs.size() > 0);
537 
538  #ifdef STXXL_CHECK_ORDER_IN_SORTS
539  if (!stxxl::is_sorted(out_buffer->begin(), out_buffer->end(), cmp))
540  {
541  for (value_type * i = out_buffer->begin() + 1; i != out_buffer->end(); i++)
542  if (cmp(*i, *(i - 1)))
543  {
544  STXXL_VERBOSE1("Error at position " << (i - out_buffer->begin()));
545  }
546  assert(false);
547  }
548 
549  if (j > 0) //do not check in first iteration
550  assert(cmp((*out_buffer)[0], last_elem) == false);
551 
552  last_elem = (*out_buffer)[block_type::size - 1];
553  #endif
554 
555 
556  out_buffer = writer.write(out_buffer, (*out_run)[j].bid);
557  }
558 
559 // end of STL-style merging
560 
561 #else
562  assert(false);
563 #endif
564  }
565  else
566  {
567 // begin of native merging procedure
568 
569  loser_tree<run_cursor_type, run_cursor2_cmp_type, block_type::size>
570  losers(&prefetcher, nruns, run_cursor2_cmp_type(cmp));
571 
572 #ifdef STXXL_CHECK_ORDER_IN_SORTS
573  value_type last_elem = cmp.min_value();
574 #endif
575 
576  for (i = 0; i < out_run_size; ++i)
577  {
578  losers.multi_merge(out_buffer->elem);
579  (*out_run)[i].value = *(out_buffer->elem);
580 
581 #ifdef STXXL_CHECK_ORDER_IN_SORTS
582  assert(stxxl::is_sorted(
583  out_buffer->begin(),
584  out_buffer->end(),
585  cmp));
586 
587  if (i)
588  assert(cmp(*(out_buffer->elem), last_elem) == false);
589 
590  last_elem = (*out_buffer).elem[block_type::size - 1];
591 #endif
592 
593  out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
594  }
595 
596 // end of native merging procedure
597  }
598 
599  delete[] prefetch_seq;
600 
601  block_manager * bm = block_manager::get_instance();
602  for (i = 0; i < nruns; ++i)
603  {
604  unsigned_type sz = in_runs[i]->size();
605  for (unsigned_type j = 0; j < sz; ++j)
606  bm->delete_block((*in_runs[i])[j].bid);
607 
608 
609  delete in_runs[i];
610  }
611  }
612 
613 
614  template <typename block_type,
615  typename alloc_strategy,
616  typename input_bid_iterator,
617  typename value_cmp>
618  simple_vector<trigger_entry<typename block_type::bid_type, typename block_type::value_type> > *
619  sort_blocks(input_bid_iterator input_bids,
620  unsigned_type _n,
621  unsigned_type _m,
622  value_cmp cmp
623  )
624  {
625  typedef typename block_type::value_type type;
626  typedef typename block_type::bid_type bid_type;
627  typedef trigger_entry<bid_type, type> trigger_entry_type;
628  typedef simple_vector<trigger_entry_type> run_type;
629  typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
630 
631  unsigned_type m2 = _m / 2;
632  unsigned_type full_runs = _n / m2;
633  unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
634  unsigned_type nruns = full_runs + partial_runs;
635  unsigned_type i;
636 
637  config * cfg = config::get_instance();
638  block_manager * mng = block_manager::get_instance();
639  const unsigned_type ndisks = cfg->disks_number();
640 
641  //STXXL_VERBOSE ("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
642 
643  double begin = timestamp(), after_runs_creation, end;
644 
645  run_type ** runs = new run_type *[nruns];
646 
647  for (i = 0; i < full_runs; i++)
648  runs[i] = new run_type(m2);
649 
650 
651  if (partial_runs)
652  runs[i] = new run_type(_n - full_runs * m2);
653 
654 
655  for (i = 0; i < nruns; ++i)
656  {
657  // FIXME: why has an alloc_strategy to take two arguments disk_index.begin(), disk_index.end() ???
658  mng->new_blocks(alloc_strategy(0, ndisks),
659  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
660  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
661  }
662 
663  sort_local::create_runs<block_type,
664  run_type,
665  input_bid_iterator,
666  value_cmp>(input_bids, runs, nruns, _m, cmp);
667 
668  after_runs_creation = timestamp();
669 
670  double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
671 
672  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
673 
674  // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) runs at once
675 
676  const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) /
677  log(double(_m))))));
678  run_type ** new_runs;
679 
680  while (nruns > 1)
681  {
682  int_type new_nruns = div_and_round_up(nruns, merge_factor);
683  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
684  " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
685 
686  new_runs = new run_type *[new_nruns];
687 
688  int_type runs_left = nruns;
689  int_type cur_out_run = 0;
690  int_type blocks_in_new_run = 0;
691 
692  while (runs_left > 0)
693  {
694  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
695  blocks_in_new_run = 0;
696  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
697  blocks_in_new_run += runs[i]->size();
698 
699  // allocate run
700  new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
701  runs_left -= runs2merge;
702  }
703  // allocate blocks for the new runs
704  if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
705  {
706  // if we sort a file we can reuse the input bids for the output
707  input_bid_iterator cur = input_bids;
708  for (int_type i = 0; cur != (input_bids + _n); ++cur)
709  {
710  (*new_runs[0])[i++].bid = *cur;
711  }
712 
713  bid_type & firstBID = (*new_runs[0])[0].bid;
714  if (firstBID.storage->get_id() != -1)
715  {
716  // the first block does not belong to the file
717  // need to reallocate it
718  mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
719  }
720  bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
721  if (lastBID.storage->get_id() != -1)
722  {
723  // the first block does not belong to the file
724  // need to reallocate it
725  mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
726  }
727  }
728  else
729  {
730  mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
731  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
732  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
733  }
734  // merge all
735  runs_left = nruns;
736  cur_out_run = 0;
737  while (runs_left > 0)
738  {
739  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
740 #ifdef STXXL_CHECK_ORDER_IN_SORTS
741  assert((check_sorted_runs<block_type, run_type, value_cmp>(runs + nruns - runs_left, runs2merge, m2, cmp)));
742 #endif
743  STXXL_VERBOSE("Merging " << runs2merge << " runs");
744  merge_runs<block_type, run_type>(runs + nruns - runs_left,
745  runs2merge, *(new_runs + (cur_out_run++)), _m, cmp
746  );
747  runs_left -= runs2merge;
748  }
749 
750  nruns = new_nruns;
751  delete[] runs;
752  runs = new_runs;
753  }
754 
755  run_type * result = *runs;
756  delete[] runs;
757 
758  end = timestamp();
759 
760  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
761  after_runs_creation - begin << " s");
762  STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
763  STXXL_VERBOSE(*stats::get_instance());
764  UNUSED(begin + io_wait_after_rf);
765 
766  return result;
767  }
768 }
769 
770 
778 template <typename ExtIterator_, typename StrictWeakOrdering_>
779 void sort(ExtIterator_ first, ExtIterator_ last, StrictWeakOrdering_ cmp, unsigned_type M)
780 {
781  typedef simple_vector<sort_local::trigger_entry<typename ExtIterator_::bid_type,
782  typename ExtIterator_::vector_type::value_type> > run_type;
783 
784  typedef typename ExtIterator_::vector_type::value_type value_type;
785  typedef typename ExtIterator_::block_type block_type;
786 
787  // verify strict weak ordering of the sentinels
788  assert(!cmp(cmp.min_value(), cmp.min_value()));
789  assert(cmp(cmp.min_value(), cmp.max_value()));
790  assert(!cmp(cmp.max_value(), cmp.max_value()));
791 
792  unsigned_type n = 0;
793 
794  block_manager * mng = block_manager::get_instance();
795 
796  first.flush();
797 
798  if ((last - first) * sizeof(value_type) * sort_memory_usage_factor() < M)
799  {
800  stl_in_memory_sort(first, last, cmp);
801  }
802  else
803  {
804  assert(2 * block_type::raw_size * sort_memory_usage_factor() <= M);
805 
806  if (first.block_offset())
807  {
808  if (last.block_offset()) // first and last element are
809  // not the first elements of their block
810  {
811  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
812  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
813  typename ExtIterator_::bid_type first_bid, last_bid;
814  request_ptr req;
815 
816  req = first_block->read(*first.bid());
817  mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap
818  mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
819  req->wait();
820 
821 
822  req = last_block->read(*last.bid());
823 
824  unsigned_type i = 0;
825  for ( ; i < first.block_offset(); ++i)
826  {
827  first_block->elem[i] = cmp.min_value();
828  }
829 
830  req->wait();
831 
832 
833  req = first_block->write(first_bid);
834  for (i = last.block_offset(); i < block_type::size; ++i)
835  {
836  last_block->elem[i] = cmp.max_value();
837  }
838 
839  req->wait();
840 
841 
842  req = last_block->write(last_bid);
843 
844  n = last.bid() - first.bid() + 1;
845 
846  std::swap(first_bid, *first.bid());
847  std::swap(last_bid, *last.bid());
848 
849  req->wait();
850 
851 
852  delete first_block;
853  delete last_block;
854 
855  run_type * out =
856  sort_local::sort_blocks<
857  typename ExtIterator_::block_type,
858  typename ExtIterator_::vector_type::alloc_strategy,
859  typename ExtIterator_::bids_container_iterator>
860  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
861 
862 
863  first_block = new typename ExtIterator_::block_type;
864  last_block = new typename ExtIterator_::block_type;
865  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
866  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
867  request_ptr * reqs = new request_ptr[2];
868 
869  reqs[0] = first_block->read(first_bid);
870  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
871 
872  reqs[0]->wait();
873  reqs[1]->wait();
874 
875  reqs[0] = last_block->read(last_bid);
876  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
877 
878  for (i = first.block_offset(); i < block_type::size; i++)
879  {
880  first_block->elem[i] = sorted_first_block->elem[i];
881  }
882 
883  reqs[0]->wait();
884  reqs[1]->wait();
885 
886  req = first_block->write(first_bid);
887 
888  for (i = 0; i < last.block_offset(); ++i)
889  {
890  last_block->elem[i] = sorted_last_block->elem[i];
891  }
892 
893  req->wait();
894 
895  req = last_block->write(last_bid);
896 
897  mng->delete_block(out->begin()->bid);
898  mng->delete_block((*out)[out->size() - 1].bid);
899 
900  *first.bid() = first_bid;
901  *last.bid() = last_bid;
902 
903  typename run_type::iterator it = out->begin();
904  ++it;
905  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
906  ++cur_bid;
907 
908  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
909  {
910  *cur_bid = (*it).bid;
911  }
912 
913  delete first_block;
914  delete sorted_first_block;
915  delete sorted_last_block;
916  delete[] reqs;
917  delete out;
918 
919  req->wait();
920 
921 
922  delete last_block;
923  }
924  else
925  {
926  // first element is
927  // not the first element of its block
928  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
929  typename ExtIterator_::bid_type first_bid;
930  request_ptr req;
931 
932  req = first_block->read(*first.bid());
933  mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap
934  req->wait();
935 
936 
937  unsigned_type i = 0;
938  for ( ; i < first.block_offset(); ++i)
939  {
940  first_block->elem[i] = cmp.min_value();
941  }
942 
943  req = first_block->write(first_bid);
944 
945  n = last.bid() - first.bid();
946 
947  std::swap(first_bid, *first.bid());
948 
949  req->wait();
950 
951 
952  delete first_block;
953 
954  run_type * out =
955  sort_local::sort_blocks<
956  typename ExtIterator_::block_type,
957  typename ExtIterator_::vector_type::alloc_strategy,
958  typename ExtIterator_::bids_container_iterator>
959  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
960 
961 
962  first_block = new typename ExtIterator_::block_type;
963 
964  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
965 
966  request_ptr * reqs = new request_ptr[2];
967 
968  reqs[0] = first_block->read(first_bid);
969  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
970 
971  reqs[0]->wait();
972  reqs[1]->wait();
973 
974  for (i = first.block_offset(); i < block_type::size; ++i)
975  {
976  first_block->elem[i] = sorted_first_block->elem[i];
977  }
978 
979  req = first_block->write(first_bid);
980 
981  mng->delete_block(out->begin()->bid);
982 
983  *first.bid() = first_bid;
984 
985  typename run_type::iterator it = out->begin();
986  ++it;
987  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
988  ++cur_bid;
989 
990  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
991  {
992  *cur_bid = (*it).bid;
993  }
994 
995  *cur_bid = (*it).bid;
996 
997  delete sorted_first_block;
998  delete[] reqs;
999  delete out;
1000 
1001  req->wait();
1002 
1003  delete first_block;
1004  }
1005  }
1006  else
1007  {
1008  if (last.block_offset()) // last is
1009  // not the first element of its block
1010  {
1011  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
1012  typename ExtIterator_::bid_type last_bid;
1013  request_ptr req;
1014  unsigned_type i;
1015 
1016  req = last_block->read(*last.bid());
1017  mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
1018  req->wait();
1019 
1020 
1021  for (i = last.block_offset(); i < block_type::size; ++i)
1022  {
1023  last_block->elem[i] = cmp.max_value();
1024  }
1025 
1026  req = last_block->write(last_bid);
1027 
1028  n = last.bid() - first.bid() + 1;
1029 
1030  std::swap(last_bid, *last.bid());
1031 
1032  req->wait();
1033 
1034 
1035  delete last_block;
1036 
1037  run_type * out =
1038  sort_local::sort_blocks<
1039  typename ExtIterator_::block_type,
1040  typename ExtIterator_::vector_type::alloc_strategy,
1041  typename ExtIterator_::bids_container_iterator>
1042  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
1043 
1044 
1045  last_block = new typename ExtIterator_::block_type;
1046  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
1047  request_ptr * reqs = new request_ptr[2];
1048 
1049  reqs[0] = last_block->read(last_bid);
1050  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
1051 
1052  reqs[0]->wait();
1053  reqs[1]->wait();
1054 
1055  for (i = 0; i < last.block_offset(); ++i)
1056  {
1057  last_block->elem[i] = sorted_last_block->elem[i];
1058  }
1059 
1060  req = last_block->write(last_bid);
1061 
1062  mng->delete_block((*out)[out->size() - 1].bid);
1063 
1064  *last.bid() = last_bid;
1065 
1066  typename run_type::iterator it = out->begin();
1067  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
1068 
1069  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
1070  {
1071  *cur_bid = (*it).bid;
1072  }
1073 
1074  delete sorted_last_block;
1075  delete[] reqs;
1076  delete out;
1077 
1078  req->wait();
1079 
1080  delete last_block;
1081  }
1082  else
1083  {
1084  // first and last element are first elements of their of blocks
1085  n = last.bid() - first.bid();
1086 
1087  run_type * out =
1088  sort_local::sort_blocks<typename ExtIterator_::block_type,
1089  typename ExtIterator_::vector_type::alloc_strategy,
1090  typename ExtIterator_::bids_container_iterator>
1091  (first.bid(), n, M / sort_memory_usage_factor() / block_type::raw_size, cmp);
1092 
1093  typename run_type::iterator it = out->begin();
1094  typename ExtIterator_::bids_container_iterator cur_bid = first.bid();
1095 
1096  for ( ; cur_bid != last.bid(); ++cur_bid, ++it)
1097  {
1098  *cur_bid = (*it).bid;
1099  }
1100 
1101  delete out;
1102  }
1103  }
1104  }
1105 
1106 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1107  assert(stxxl::is_sorted(first, last, cmp));
1108 #endif
1109 }
1110 
1112 
1113 __STXXL_END_NAMESPACE
1114 
1115 #endif // !STXXL_SORT_HEADER
1116 // vim: et:ts=4:sw=4