Stxxl  1.2.1
ksort.h
1 /***************************************************************************
2  * include/stxxl/bits/algo/ksort.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_KSORT_HEADER
15 #define STXXL_KSORT_HEADER
16 
17 #include <list>
18 
19 #include <stxxl/bits/mng/mng.h>
20 #include <stxxl/bits/common/rand.h>
21 #include <stxxl/bits/mng/adaptor.h>
22 #include <stxxl/bits/common/simple_vector.h>
23 #include <stxxl/bits/common/switch.h>
24 #include <stxxl/bits/mng/block_alloc_interleaved.h>
25 #include <stxxl/bits/algo/intksort.h>
26 #include <stxxl/bits/algo/adaptor.h>
27 #include <stxxl/bits/algo/async_schedule.h>
28 #include <stxxl/bits/mng/block_prefetcher.h>
29 #include <stxxl/bits/mng/buf_writer.h>
30 #include <stxxl/bits/algo/run_cursor.h>
31 #include <stxxl/bits/algo/losertree.h>
32 #include <stxxl/bits/algo/inmemsort.h>
33 
34 
35 //#define SORT_OPTIMAL_PREFETCHING
36 //#define INTERLEAVED_ALLOC
37 
38 #define OPT_MERGING
39 
40 __STXXL_BEGIN_NAMESPACE
41 
44 
49 
52 namespace ksort_local
53 {
54  template <typename _BIDTp, typename _KeyTp>
55  struct trigger_entry
56  {
57  typedef _BIDTp bid_type;
58  typedef _KeyTp key_type;
59 
60  bid_type bid;
61  key_type key;
62 
63  operator bid_type ()
64  {
65  return bid;
66  }
67  };
68 
69 
70  template <typename _BIDTp, typename _KeyTp>
71  inline bool operator < (const trigger_entry<_BIDTp, _KeyTp> & a,
72  const trigger_entry<_BIDTp, _KeyTp> & b)
73  {
74  return (a.key < b.key);
75  }
76 
77  template <typename _BIDTp, typename _KeyTp>
78  inline bool operator > (const trigger_entry<_BIDTp, _KeyTp> & a,
79  const trigger_entry<_BIDTp, _KeyTp> & b)
80  {
81  return (a.key > b.key);
82  }
83 
84  template <typename type, typename key_type1>
85  struct type_key
86  {
87  typedef key_type1 key_type;
88  key_type key;
89  type * ptr;
90 
91  type_key() { }
92  type_key(key_type k, type * p) : key(k), ptr(p)
93  { }
94  };
95 
96  template <typename type, typename key1>
97  bool operator < (const type_key<type, key1> & a, const type_key<type, key1> & b)
98  {
99  return a.key < b.key;
100  }
101 
102  template <typename type, typename key1>
103  bool operator > (const type_key<type, key1> & a, const type_key<type, key1> & b)
104  {
105  return a.key > b.key;
106  }
107 
108 
109  template <typename block_type, typename bid_type>
110  struct write_completion_handler
111  {
112  block_type * block;
113  bid_type bid;
114  request_ptr * req;
115  void operator () (request * /*completed_req*/)
116  {
117  * req = block->read(bid);
118  }
119  };
120 
121  template <typename type_key_,
122  typename block_type,
123  typename run_type,
124  typename input_bid_iterator,
125  typename key_extractor>
126  inline void write_out(
127  type_key_ * begin,
128  type_key_ * end,
129  block_type * & cur_blk,
130  const block_type * end_blk,
131  int_type & out_block,
132  int_type & out_pos,
133  run_type & run,
134  write_completion_handler<block_type, typename block_type::bid_type> * & next_read,
135  typename block_type::bid_type * & bids,
136  request_ptr * write_reqs,
137  request_ptr * read_reqs,
138  input_bid_iterator & it,
139  key_extractor keyobj)
140  {
141  typedef typename block_type::bid_type bid_type;
142  typedef typename block_type::type type;
143 
144  type * elem = cur_blk->elem;
145  for (type_key_ * p = begin; p < end; p++)
146  {
147  elem[out_pos++] = *(p->ptr);
148 
149  if (out_pos >= block_type::size)
150  {
151  run[out_block].key = keyobj(*(cur_blk->elem));
152 
153  if (cur_blk < end_blk)
154  {
155  next_read->block = cur_blk;
156  next_read->req = read_reqs + out_block;
157  read_reqs[out_block] = NULL;
158  bids[out_block] = next_read->bid = *(it++);
159 
160  write_reqs[out_block] = cur_blk->write(
161  run[out_block].bid,
162  // postpone read of block from next run
163  // after write of block from this run
164  *(next_read++));
165  }
166  else
167  {
168  write_reqs[out_block] = cur_blk->write(run[out_block].bid);
169  }
170 
171  cur_blk++;
172  elem = cur_blk->elem;
173  out_block++;
174  out_pos = 0;
175  }
176  }
177  }
178 
179  template <
180  typename block_type,
181  typename run_type,
182  typename input_bid_iterator,
183  typename key_extractor>
184  void
185  create_runs(
186  input_bid_iterator it,
187  run_type ** runs,
188  const unsigned_type nruns,
189  const unsigned_type m2,
190  key_extractor keyobj)
191  {
192  typedef typename block_type::value_type type;
193  typedef typename block_type::bid_type bid_type;
194  typedef typename key_extractor::key_type key_type;
195  typedef type_key<type, key_type> type_key_;
196 
197  block_manager * bm = block_manager::get_instance();
198  block_type * Blocks1 = new block_type[m2];
199  block_type * Blocks2 = new block_type[m2];
200  bid_type * bids = new bid_type[m2];
201  type_key_ * refs1 = new type_key_[m2 * Blocks1->size];
202  type_key_ * refs2 = new type_key_[m2 * Blocks1->size];
203  request_ptr * read_reqs = new request_ptr[m2];
204  request_ptr * write_reqs = new request_ptr[m2];
205  write_completion_handler<block_type, bid_type> * next_run_reads =
206  new write_completion_handler<block_type, bid_type>[m2];
207 
208  run_type * run;
209  run = *runs;
210  int_type run_size = (*runs)->size();
211  key_type offset = 0;
212  const int log_k1 =
213  static_cast<int>(ceil(log2((m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE) ?
214  (double(m2 * block_type::size * sizeof(type_key_) / STXXL_L2_SIZE)) : 2.)));
215  const int log_k2 = int(log2(double(m2 * Blocks1->size))) - log_k1 - 1;
216  STXXL_VERBOSE("log_k1: " << log_k1 << " log_k2:" << log_k2);
217  const int_type k1 = 1 << log_k1;
218  const int_type k2 = 1 << log_k2;
219  int_type * bucket1 = new int_type[k1];
220  int_type * bucket2 = new int_type[k2];
221  int_type i;
222 
223  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
224 
225  for (i = 0; i < run_size; i++)
226  {
227  bids[i] = *(it++);
228  read_reqs[i] = Blocks1[i].read(bids[i]);
229  }
230 
231  unsigned_type k = 0;
232  const int shift1 = sizeof(key_type) * 8 - log_k1;
233  const int shift2 = shift1 - log_k2;
234  STXXL_VERBOSE("shift1: " << shift1 << " shift2:" << shift2);
235 
236  for ( ; k < nruns; k++)
237  {
238  run = runs[k];
239  run_size = run->size();
240 
241  std::fill(bucket1, bucket1 + k1, 0);
242 
243  type_key_ * ref_ptr = refs1;
244  for (i = 0; i < run_size; i++)
245  {
246  if (k)
247  write_reqs[i]->wait();
248 
249  read_reqs[i]->wait();
250  bm->delete_block(bids[i]);
251 
252  classify_block(Blocks1[i].begin(), Blocks1[i].end(), ref_ptr, bucket1, offset, shift1, keyobj);
253  }
254 
255  exclusive_prefix_sum(bucket1, k1);
256  classify(refs1, refs1 + run_size * Blocks1->size, refs2, bucket1,
257  offset, shift1);
258 
259  int_type out_block = 0;
260  int_type out_pos = 0;
261  unsigned_type next_run_size = (k < nruns - 1) ? (runs[k + 1]->size()) : 0;
262 
263  // recurse on each bucket
264  type_key_ * c = refs2;
265  type_key_ * d = refs1;
266  block_type * cur_blk = Blocks2;
267  block_type * end_blk = Blocks2 + next_run_size;
268  write_completion_handler<block_type, bid_type> * next_read = next_run_reads;
269 
270  for (i = 0; i < k1; i++)
271  {
272  type_key_ * cEnd = refs2 + bucket1[i];
273  type_key_ * dEnd = refs1 + bucket1[i];
274 
275  l1sort(c, cEnd, d, bucket2, k2,
276  offset + (key_type(1) << key_type(shift1)) * key_type(i), shift2); // key_type,key_type,... paranoia
277 
278  write_out(
279  d, dEnd, cur_blk, end_blk,
280  out_block, out_pos, *run, next_read, bids,
281  write_reqs, read_reqs, it, keyobj);
282 
283  c = cEnd;
284  d = dEnd;
285  }
286 
287  std::swap(Blocks1, Blocks2);
288  }
289 
290  wait_all(write_reqs, m2);
291 
292  delete[] bucket1;
293  delete[] bucket2;
294  delete[] refs1;
295  delete[] refs2;
296  delete[] Blocks1;
297  delete[] Blocks2;
298  delete[] bids;
299  delete[] next_run_reads;
300  delete[] read_reqs;
301  delete[] write_reqs;
302  }
303 
304  template <typename block_type,
305  typename prefetcher_type,
306  typename key_extractor>
307  struct run_cursor2_cmp
308  {
309  typedef run_cursor2<block_type, prefetcher_type> cursor_type;
310  key_extractor keyobj;
311  run_cursor2_cmp(key_extractor keyobj_)
312  {
313  keyobj = keyobj_;
314  }
315  inline bool operator () (const cursor_type & a, const cursor_type & b)
316  {
317  if (UNLIKELY(b.empty()))
318  return true;
319  // sentinel emulation
320  if (UNLIKELY(a.empty()))
321  return false;
322  //sentinel emulation
323 
324  return (keyobj(a.current()) < keyobj(b.current()));
325  }
326 
327  private:
328  run_cursor2_cmp() { }
329  };
330 
331 
332  template <typename record_type, typename key_extractor>
333  class key_comparison : public std::binary_function<record_type, record_type, bool>
334  {
335  key_extractor ke;
336 
337  public:
338  key_comparison() { }
339  key_comparison(key_extractor ke_) : ke(ke_) { }
340  bool operator () (const record_type & a, const record_type & b) const
341  {
342  return ke(a) < ke(b);
343  }
344  };
345 
346 
347  template <typename block_type, typename run_type, typename key_ext_>
348  bool check_ksorted_runs(run_type ** runs,
349  unsigned_type nruns,
350  unsigned_type m,
351  key_ext_ keyext)
352  {
353  typedef typename block_type::value_type value_type;
354 
355  //STXXL_VERBOSE1("check_sorted_runs Runs: "<<nruns);
356  STXXL_MSG("check_sorted_runs Runs: " << nruns);
357  unsigned_type irun = 0;
358  for (irun = 0; irun < nruns; ++irun)
359  {
360  const unsigned_type nblocks_per_run = runs[irun]->size();
361  unsigned_type blocks_left = nblocks_per_run;
362  block_type * blocks = new block_type[m];
363  request_ptr * reqs = new request_ptr[m];
364  value_type last = keyext.min_value();
365 
366  for (unsigned_type off = 0; off < nblocks_per_run; off += m)
367  {
368  const unsigned_type nblocks = STXXL_MIN(blocks_left, m);
369  const unsigned_type nelements = nblocks * block_type::size;
370  blocks_left -= nblocks;
371 
372  for (unsigned_type j = 0; j < nblocks; ++j)
373  {
374  reqs[j] = blocks[j].read((*runs[irun])[off + j].bid);
375  }
376  wait_all(reqs, reqs + nblocks);
377 
378  if (off && (keyext(blocks[0][0]) < keyext(last)))
379  {
380  STXXL_MSG("check_sorted_runs wrong first value in the run " << irun);
381  STXXL_MSG(" first value: " << blocks[0][0] << " with key" << keyext(blocks[0][0]));
382  STXXL_MSG(" last value: " << last << " with key" << keyext(last));
383  for (unsigned_type k = 0; k < block_type::size; ++k)
384  STXXL_MSG("Element " << k << " in the block is :" << blocks[0][k] << " key: " << keyext(blocks[0][k]));
385  return false;
386  }
387 
388  for (unsigned_type j = 0; j < nblocks; ++j)
389  {
390  if (keyext(blocks[j][0]) != (*runs[irun])[off + j].key)
391  {
392  STXXL_MSG("check_sorted_runs wrong trigger in the run " << irun << " block " << (off + j));
393  STXXL_MSG(" trigger value: " << (*runs[irun])[off + j].key);
394  STXXL_MSG("Data in the block:");
395  for (unsigned_type k = 0; k < block_type::size; ++k)
396  STXXL_MSG("Element " << k << " in the block is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
397 
398  STXXL_MSG("BIDS:");
399  for (unsigned_type k = 0; k < nblocks; ++k)
400  {
401  if (k == j)
402  STXXL_MSG("Bad one comes next.");
403  STXXL_MSG("BID " << (off + k) << " is: " << ((*runs[irun])[off + k].bid));
404  }
405 
406  return false;
407  }
408  }
409  if (!stxxl::is_sorted(
410 #if 1
411  ArrayOfSequencesIterator<
412  block_type, typename block_type::value_type, block_type::size
413  >(blocks, 0),
414  ArrayOfSequencesIterator<
415  block_type, typename block_type::value_type, block_type::size
416  >(blocks, nelements),
417 #else
418  TwoToOneDimArrayRowAdaptor<
419  block_type, value_type, block_type::size
420  >(blocks, 0),
421  TwoToOneDimArrayRowAdaptor<
422  block_type, value_type, block_type::size
423  >(blocks, nelements),
424 #endif
425  key_comparison<value_type, key_ext_>()))
426  {
427  STXXL_MSG("check_sorted_runs wrong order in the run " << irun);
428  STXXL_MSG("Data in blocks:");
429  for (unsigned_type j = 0; j < nblocks; ++j)
430  {
431  for (unsigned_type k = 0; k < block_type::size; ++k)
432  STXXL_MSG(" Element " << k << " in block " << (off + j) << " is :" << blocks[j][k] << " with key: " << keyext(blocks[j][k]));
433  }
434  STXXL_MSG("BIDS:");
435  for (unsigned_type k = 0; k < nblocks; ++k)
436  {
437  STXXL_MSG("BID " << (k + off) << " is: " << ((*runs[irun])[k + off].bid));
438  }
439 
440  return false;
441  }
442  last = blocks[nblocks - 1][block_type::size - 1];
443  }
444 
445  assert(blocks_left == 0);
446  delete[] reqs;
447  delete[] blocks;
448  }
449 
450  return true;
451  }
452 
453 
454  template <typename block_type, typename run_type, typename key_extractor>
455  void merge_runs(run_type ** in_runs, unsigned_type nruns, run_type * out_run, unsigned_type _m, key_extractor keyobj)
456  {
458  typedef run_cursor2<block_type, prefetcher_type> run_cursor_type;
459 
460  unsigned_type i;
461  run_type consume_seq(out_run->size());
462 
463  int_type * prefetch_seq = new int_type[out_run->size()];
464 
465  typename run_type::iterator copy_start = consume_seq.begin();
466  for (i = 0; i < nruns; i++)
467  {
468  // TODO: try to avoid copy
469  copy_start = std::copy(
470  in_runs[i]->begin(),
471  in_runs[i]->end(),
472  copy_start);
473  }
474  std::stable_sort(consume_seq.begin(), consume_seq.end());
475 
476  unsigned disks_number = config::get_instance()->disks_number();
477 
478 #ifdef PLAY_WITH_OPT_PREF
479  const int_type n_write_buffers = 4 * disks_number;
480 #else
481  const int_type n_prefetch_buffers = STXXL_MAX(int_type(2 * disks_number), (3 * (int_type(_m) - int_type(nruns)) / 4));
482  STXXL_VERBOSE("Prefetch buffers " << n_prefetch_buffers);
483  const int_type n_write_buffers = STXXL_MAX(int_type(2 * disks_number), int_type(_m) - int_type(nruns) - int_type(n_prefetch_buffers));
484  STXXL_VERBOSE("Write buffers " << n_write_buffers);
485  // heuristic
486  const int_type n_opt_prefetch_buffers = 2 * int_type(disks_number) + (3 * (int_type(n_prefetch_buffers) - int_type(2 * disks_number))) / 10;
487  STXXL_VERBOSE("Prefetch buffers " << n_opt_prefetch_buffers);
488 #endif
489 
490 #ifdef SORT_OPTIMAL_PREFETCHING
491  compute_prefetch_schedule(
492  consume_seq,
493  prefetch_seq,
494  n_opt_prefetch_buffers,
495  disks_number);
496 #else
497  for (i = 0; i < out_run->size(); i++)
498  prefetch_seq[i] = i;
499 
500 #endif
501 
502 
503  prefetcher_type prefetcher(consume_seq.begin(),
504  consume_seq.end(),
505  prefetch_seq,
506  nruns + n_prefetch_buffers);
507 
508  buffered_writer<block_type> writer(n_write_buffers, n_write_buffers / 2);
509 
510  unsigned_type out_run_size = out_run->size();
511 
512  run_cursor2_cmp<block_type, prefetcher_type, key_extractor> cmp(keyobj);
513  loser_tree<
514  run_cursor_type,
515  run_cursor2_cmp<block_type, prefetcher_type, key_extractor>,
516  block_type::size> losers(&prefetcher, nruns, cmp);
517 
518 
519  block_type * out_buffer = writer.get_free_block();
520 
521  for (i = 0; i < out_run_size; i++)
522  {
523  losers.multi_merge(out_buffer->elem);
524  (*out_run)[i].key = keyobj(out_buffer->elem[0]);
525  out_buffer = writer.write(out_buffer, (*out_run)[i].bid);
526  }
527 
528  delete[] prefetch_seq;
529 
530  block_manager * bm = block_manager::get_instance();
531  for (i = 0; i < nruns; i++)
532  {
533  unsigned_type sz = in_runs[i]->size();
534  for (unsigned_type j = 0; j < sz; j++)
535  bm->delete_block((*in_runs[i])[j].bid);
536 
537  delete in_runs[i];
538  }
539  }
540 
541 
542  template <typename block_type,
543  typename alloc_strategy,
544  typename input_bid_iterator,
545  typename key_extractor>
546 
547  simple_vector<trigger_entry<typename block_type::bid_type, typename key_extractor::key_type> > *
548  ksort_blocks(input_bid_iterator input_bids, unsigned_type _n, unsigned_type _m, key_extractor keyobj)
549  {
550  typedef typename block_type::value_type type;
551  typedef typename key_extractor::key_type key_type;
552  typedef typename block_type::bid_type bid_type;
553  typedef trigger_entry<bid_type, typename key_extractor::key_type> trigger_entry_type;
554  typedef simple_vector<trigger_entry_type> run_type;
555  typedef typename interleaved_alloc_traits<alloc_strategy>::strategy interleaved_alloc_strategy;
556 
557  unsigned_type m2 = div_and_round_up(_m, 2);
558  const unsigned_type m2_rf = m2 * block_type::raw_size /
559  (block_type::raw_size + block_type::size * sizeof(type_key<type, key_type>));
560  STXXL_VERBOSE("Reducing number of blocks in a run from " << m2 << " to " <<
561  m2_rf << " due to key size: " << sizeof(typename key_extractor::key_type) << " bytes");
562  m2 = m2_rf;
563  unsigned_type full_runs = _n / m2;
564  unsigned_type partial_runs = ((_n % m2) ? 1 : 0);
565  unsigned_type nruns = full_runs + partial_runs;
566  unsigned_type i;
567 
568  config * cfg = config::get_instance();
569  block_manager * mng = block_manager::get_instance();
570  int ndisks = cfg->disks_number();
571 
572  STXXL_VERBOSE("n=" << _n << " nruns=" << nruns << "=" << full_runs << "+" << partial_runs);
573 
574  double begin = timestamp(), after_runs_creation, end;
575 
576  run_type ** runs = new run_type *[nruns];
577 
578  for (i = 0; i < full_runs; i++)
579  runs[i] = new run_type(m2);
580 
581 
582 #ifdef INTERLEAVED_ALLOC
583  if (partial_runs)
584  {
585  unsigned_type last_run_size = _n - full_runs * m2;
586  runs[i] = new run_type(last_run_size);
587 
588  mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
589  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
590  (runs, 0, nruns, last_run_size),
591  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>
592  (runs, _n, nruns, last_run_size));
593  }
594  else
595  mng->new_blocks(interleaved_alloc_strategy(nruns, 0, ndisks),
596  RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
597  (runs, 0, nruns),
598  RunsToBIDArrayAdaptor<block_type::raw_size, run_type>
599  (runs, _n, nruns));
600 
601 #else
602  if (partial_runs)
603  runs[i] = new run_type(_n - full_runs * m2);
604 
605 
606  for (i = 0; i < nruns; i++)
607  {
608  mng->new_blocks(alloc_strategy(0, ndisks),
609  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->begin()),
610  trigger_entry_iterator<typename run_type::iterator, block_type::raw_size>(runs[i]->end()));
611  }
612 #endif
613 
614  create_runs<block_type,
615  run_type,
616  input_bid_iterator,
617  key_extractor>(input_bids, runs, nruns, m2, keyobj);
618 
619  after_runs_creation = timestamp();
620 
621  double io_wait_after_rf = stats::get_instance()->get_io_wait_time();
622 
623  disk_queues::get_instance()->set_priority_op(disk_queue::WRITE);
624 
625  // Optimal merging: merge r = pow(nruns,1/ceil(log(nruns)/log(m))) at once
626 
627  const int_type merge_factor = static_cast<int_type>(ceil(pow(nruns, 1. / ceil(log(double(nruns)) / log(double(_m))))));
628  run_type ** new_runs;
629 
630  while (nruns > 1)
631  {
632  int_type new_nruns = div_and_round_up(nruns, merge_factor);
633  STXXL_VERBOSE("Starting new merge phase: nruns: " << nruns <<
634  " opt_merge_factor: " << merge_factor << " m:" << _m << " new_nruns: " << new_nruns);
635 
636  new_runs = new run_type *[new_nruns];
637 
638  int_type runs_left = nruns;
639  int_type cur_out_run = 0;
640  int_type blocks_in_new_run = 0;
641 
642  while (runs_left > 0)
643  {
644  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
645  blocks_in_new_run = 0;
646  for (unsigned_type i = nruns - runs_left; i < (nruns - runs_left + runs2merge); i++)
647  blocks_in_new_run += runs[i]->size();
648 
649  // allocate run
650  new_runs[cur_out_run++] = new run_type(blocks_in_new_run);
651  runs_left -= runs2merge;
652  }
653  // allocate blocks in the new runs
654  if (cur_out_run == 1 && blocks_in_new_run == int_type(_n) && (input_bids->storage->get_id() == -1))
655  {
656  // if we sort a file we can reuse the input bids for the output
657  input_bid_iterator cur = input_bids;
658  for (int_type i = 0; cur != (input_bids + _n); ++cur)
659  {
660  (*new_runs[0])[i++].bid = *cur;
661  }
662 
663  bid_type & firstBID = (*new_runs[0])[0].bid;
664  if (firstBID.storage->get_id() != -1)
665  {
666  // the first block does not belong to the file
667  // need to reallocate it
668  mng->new_blocks(FR(), &firstBID, (&firstBID) + 1);
669  }
670  bid_type & lastBID = (*new_runs[0])[_n - 1].bid;
671  if (lastBID.storage->get_id() != -1)
672  {
673  // the first block does not belong to the file
674  // need to reallocate it
675  mng->new_blocks(FR(), &lastBID, (&lastBID) + 1);
676  }
677  }
678  else
679  {
680  mng->new_blocks(interleaved_alloc_strategy(new_nruns, 0, ndisks),
681  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, 0, new_nruns, blocks_in_new_run),
682  RunsToBIDArrayAdaptor2<block_type::raw_size, run_type>(new_runs, _n, new_nruns, blocks_in_new_run));
683  }
684 
685 
686  // merge all
687  runs_left = nruns;
688  cur_out_run = 0;
689  while (runs_left > 0)
690  {
691  int_type runs2merge = STXXL_MIN(runs_left, merge_factor);
692 #ifdef STXXL_CHECK_ORDER_IN_SORTS
693  assert((check_ksorted_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left, runs2merge, m2, keyobj)));
694 #endif
695  STXXL_VERBOSE("Merging " << runs2merge << " runs");
696  merge_runs<block_type, run_type, key_extractor>(runs + nruns - runs_left,
697  runs2merge, *(new_runs + (cur_out_run++)), _m, keyobj);
698  runs_left -= runs2merge;
699  }
700 
701  nruns = new_nruns;
702  delete[] runs;
703  runs = new_runs;
704  }
705 
706  run_type * result = *runs;
707  delete[] runs;
708 
709  end = timestamp();
710 
711  STXXL_VERBOSE("Elapsed time : " << end - begin << " s. Run creation time: " <<
712  after_runs_creation - begin << " s");
713  STXXL_VERBOSE("Time in I/O wait(rf): " << io_wait_after_rf << " s");
714  STXXL_VERBOSE(*stats::get_instance());
715  UNUSED(begin + io_wait_after_rf);
716 
717  return result;
718  }
719 }
720 
721 
757 
758 
759 
760 
761 
762 
763 
764 template <typename ExtIterator_, typename KeyExtractor_>
765 void ksort(ExtIterator_ first_, ExtIterator_ last_, KeyExtractor_ keyobj, unsigned_type M__)
766 {
767  typedef simple_vector<ksort_local::trigger_entry<typename ExtIterator_::bid_type,
768  typename KeyExtractor_::key_type> > run_type;
769  typedef typename ExtIterator_::vector_type::value_type value_type;
770  typedef typename ExtIterator_::block_type block_type;
771 
772 
773  unsigned_type n = 0;
774  block_manager * mng = block_manager::get_instance();
775 
776  first_.flush();
777 
778  if ((last_ - first_) * sizeof(value_type) < M__)
779  {
780  stl_in_memory_sort(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>(keyobj));
781  }
782  else
783  {
784  assert(2 * block_type::raw_size <= M__);
785 
786  if (first_.block_offset())
787  {
788  if (last_.block_offset()) // first and last element reside
789  // not in the beginning of the block
790  {
791  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
792  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
793  typename ExtIterator_::bid_type first_bid, last_bid;
794  request_ptr req;
795 
796  req = first_block->read(*first_.bid());
797  mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap
798  mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
799  req->wait();
800 
801 
802  req = last_block->read(*last_.bid());
803 
804  unsigned_type i = 0;
805  for ( ; i < first_.block_offset(); i++)
806  {
807  first_block->elem[i] = keyobj.min_value();
808  }
809 
810  req->wait();
811 
812  req = first_block->write(first_bid);
813  for (i = last_.block_offset(); i < block_type::size; i++)
814  {
815  last_block->elem[i] = keyobj.max_value();
816  }
817 
818  req->wait();
819 
820  req = last_block->write(last_bid);
821 
822  n = last_.bid() - first_.bid() + 1;
823 
824  std::swap(first_bid, *first_.bid());
825  std::swap(last_bid, *last_.bid());
826 
827  req->wait();
828 
829  delete first_block;
830  delete last_block;
831 
832  run_type * out =
833  ksort_local::ksort_blocks<
834  typename ExtIterator_::block_type,
835  typename ExtIterator_::vector_type::alloc_strategy,
836  typename ExtIterator_::bids_container_iterator,
837  KeyExtractor_>
838  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
839 
840 
841  first_block = new typename ExtIterator_::block_type;
842  last_block = new typename ExtIterator_::block_type;
843  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
844  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
845  request_ptr * reqs = new request_ptr[2];
846 
847  reqs[0] = first_block->read(first_bid);
848  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
849  wait_all(reqs, 2);
850 
851  reqs[0] = last_block->read(last_bid);
852  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
853 
854  for (i = first_.block_offset(); i < block_type::size; i++)
855  {
856  first_block->elem[i] = sorted_first_block->elem[i];
857  }
858  wait_all(reqs, 2);
859 
860  req = first_block->write(first_bid);
861 
862  for (i = 0; i < last_.block_offset(); i++)
863  {
864  last_block->elem[i] = sorted_last_block->elem[i];
865  }
866 
867  req->wait();
868 
869 
870  req = last_block->write(last_bid);
871 
872  mng->delete_block(out->begin()->bid);
873  mng->delete_block((*out)[out->size() - 1].bid);
874 
875  *first_.bid() = first_bid;
876  *last_.bid() = last_bid;
877 
878  typename run_type::iterator it = out->begin();
879  it++;
880  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
881  cur_bid++;
882 
883  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
884  {
885  *cur_bid = (*it).bid;
886  }
887 
888  delete first_block;
889  delete sorted_first_block;
890  delete sorted_last_block;
891  delete[] reqs;
892  delete out;
893 
894  req->wait();
895 
896  delete last_block;
897  }
898  else
899  {
900  // first element resides
901  // not in the beginning of the block
902 
903  typename ExtIterator_::block_type * first_block = new typename ExtIterator_::block_type;
904  typename ExtIterator_::bid_type first_bid;
905  request_ptr req;
906 
907  req = first_block->read(*first_.bid());
908  mng->new_blocks(FR(), &first_bid, (&first_bid) + 1); // try to overlap
909  req->wait();
910 
911 
912  unsigned_type i = 0;
913  for ( ; i < first_.block_offset(); i++)
914  {
915  first_block->elem[i] = keyobj.min_value();
916  }
917 
918  req = first_block->write(first_bid);
919 
920  n = last_.bid() - first_.bid();
921 
922  std::swap(first_bid, *first_.bid());
923 
924  req->wait();
925 
926  delete first_block;
927 
928  run_type * out =
929  ksort_local::ksort_blocks<
930  typename ExtIterator_::block_type,
931  typename ExtIterator_::vector_type::alloc_strategy,
932  typename ExtIterator_::bids_container_iterator,
933  KeyExtractor_>
934  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
935 
936 
937  first_block = new typename ExtIterator_::block_type;
938 
939  typename ExtIterator_::block_type * sorted_first_block = new typename ExtIterator_::block_type;
940 
941  request_ptr * reqs = new request_ptr[2];
942 
943  reqs[0] = first_block->read(first_bid);
944  reqs[1] = sorted_first_block->read((*(out->begin())).bid);
945  wait_all(reqs, 2);
946 
947  for (i = first_.block_offset(); i < block_type::size; i++)
948  {
949  first_block->elem[i] = sorted_first_block->elem[i];
950  }
951 
952  req = first_block->write(first_bid);
953 
954  mng->delete_block(out->begin()->bid);
955 
956  *first_.bid() = first_bid;
957 
958  typename run_type::iterator it = out->begin();
959  it++;
960  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
961  cur_bid++;
962 
963  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
964  {
965  *cur_bid = (*it).bid;
966  }
967 
968  *cur_bid = (*it).bid;
969 
970  delete sorted_first_block;
971  delete[] reqs;
972  delete out;
973 
974  req->wait();
975 
976  delete first_block;
977  }
978  }
979  else
980  {
981  if (last_.block_offset()) // last element resides
982  // not in the beginning of the block
983  {
984  typename ExtIterator_::block_type * last_block = new typename ExtIterator_::block_type;
985  typename ExtIterator_::bid_type last_bid;
986  request_ptr req;
987  unsigned_type i;
988 
989  req = last_block->read(*last_.bid());
990  mng->new_blocks(FR(), &last_bid, (&last_bid) + 1);
991  req->wait();
992 
993  for (i = last_.block_offset(); i < block_type::size; i++)
994  {
995  last_block->elem[i] = keyobj.max_value();
996  }
997 
998  req = last_block->write(last_bid);
999 
1000  n = last_.bid() - first_.bid() + 1;
1001 
1002  std::swap(last_bid, *last_.bid());
1003 
1004  req->wait();
1005 
1006  delete last_block;
1007 
1008  run_type * out =
1009  ksort_local::ksort_blocks<
1010  typename ExtIterator_::block_type,
1011  typename ExtIterator_::vector_type::alloc_strategy,
1012  typename ExtIterator_::bids_container_iterator,
1013  KeyExtractor_>
1014  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
1015 
1016 
1017  last_block = new typename ExtIterator_::block_type;
1018  typename ExtIterator_::block_type * sorted_last_block = new typename ExtIterator_::block_type;
1019  request_ptr * reqs = new request_ptr[2];
1020 
1021  reqs[0] = last_block->read(last_bid);
1022  reqs[1] = sorted_last_block->read(((*out)[out->size() - 1]).bid);
1023  wait_all(reqs, 2);
1024 
1025  for (i = 0; i < last_.block_offset(); i++)
1026  {
1027  last_block->elem[i] = sorted_last_block->elem[i];
1028  }
1029 
1030  req = last_block->write(last_bid);
1031 
1032  mng->delete_block((*out)[out->size() - 1].bid);
1033 
1034  *last_.bid() = last_bid;
1035 
1036  typename run_type::iterator it = out->begin();
1037  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
1038 
1039  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
1040  {
1041  *cur_bid = (*it).bid;
1042  }
1043 
1044  delete sorted_last_block;
1045  delete[] reqs;
1046  delete out;
1047 
1048  req->wait();
1049 
1050  delete last_block;
1051  }
1052  else
1053  {
1054  // first and last element reside in the beginning of blocks
1055  n = last_.bid() - first_.bid();
1056 
1057  run_type * out =
1058  ksort_local::ksort_blocks<
1059  typename ExtIterator_::block_type,
1060  typename ExtIterator_::vector_type::alloc_strategy,
1061  typename ExtIterator_::bids_container_iterator,
1062  KeyExtractor_>
1063  (first_.bid(), n, M__ / block_type::raw_size, keyobj);
1064 
1065  typename run_type::iterator it = out->begin();
1066  typename ExtIterator_::bids_container_iterator cur_bid = first_.bid();
1067 
1068  for ( ; cur_bid != last_.bid(); cur_bid++, it++)
1069  {
1070  *cur_bid = (*it).bid;
1071  }
1072 
1073  delete out;
1074  }
1075  }
1076  }
1077 
1078 #ifdef STXXL_CHECK_ORDER_IN_SORTS
1079  assert(stxxl::is_sorted(first_, last_, ksort_local::key_comparison<value_type, KeyExtractor_>()));
1080 #endif
1081 }
1082 
1083 template <typename record_type>
1084 struct ksort_defaultkey
1085 {
1086  typedef typename record_type::key_type key_type;
1087  key_type operator () (const record_type & obj) const
1088  {
1089  return obj.key();
1090  }
1091  record_type max_value() const
1092  {
1093  return record_type::max_value();
1094  }
1095  record_type min_value() const
1096  {
1097  return record_type::min_value();
1098  }
1099 };
1100 
1101 
1108 
1117 template <typename ExtIterator_>
1118 void ksort(ExtIterator_ first_, ExtIterator_ last_, unsigned_type M__)
1119 {
1120  ksort(first_, last_,
1121  ksort_defaultkey<typename ExtIterator_::vector_type::value_type>(), M__);
1122 }
1123 
1124 
1126 
1127 __STXXL_END_NAMESPACE
1128 
1129 #endif // !STXXL_KSORT_HEADER
1130 // vim: et:ts=4:sw=4