Stxxl  1.2.1
iobase.h
1 /***************************************************************************
2  * include/stxxl/bits/io/iobase.h
3  *
4  * Part of the STXXL. See http://stxxl.sourceforge.net
5  *
6  * Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de>
7  * Copyright (C) 2008 Andreas Beckmann <beckmann@cs.uni-frankfurt.de>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef STXXL_IOBASE_HEADER
15 #define STXXL_IOBASE_HEADER
16 
17 #ifdef STXXL_BOOST_CONFIG
18  #include <boost/config.hpp>
19 #endif
20 
21 #if defined (__linux__)
22  #define STXXL_CHECK_BLOCK_ALIGNING
23 #endif
24 
25 //#ifdef __sun__
26 //#define O_DIRECT 0
27 //#endif
28 
29 
30 #include <cstdlib>
31 #include <cstdio>
32 #include <cstring>
33 #include <cerrno>
34 
35 #include <fcntl.h>
36 #include <sys/types.h>
37 #include <sys/stat.h>
38 
39 #include <iostream>
40 #include <algorithm>
41 #include <string>
42 #include <queue>
43 #include <map>
44 #include <set>
45 
46 #ifdef BOOST_MSVC
47 // this is not stxxl/bits/io/io.h !
48  #include <io.h>
49 #else
50  #include <unistd.h>
51  #include <sys/resource.h>
52  #include <sys/wait.h>
53 #endif
54 
55 #ifdef STXXL_BOOST_THREADS // Use Portable Boost threads
56 // Boost.Threads headers
57  #include <boost/thread/thread.hpp>
58  #include <boost/thread/mutex.hpp>
59  #include <boost/bind.hpp>
60 #else
61  #include <pthread.h>
62 #endif
63 
64 
65 #ifndef O_SYNC
66  #define O_SYNC 0
67 #endif
68 #ifndef O_RSYNC
69  #define O_RSYNC 0
70 #endif
71 #ifndef O_DSYNC
72  #define O_DSYNC 0
73 #endif
74 
75 
76 #if defined (__linux__)
77 //#include <asm/fcntl.h>
78  #if !defined (O_DIRECT) && (defined (__alpha__) || defined (__i386__))
79  #define O_DIRECT 040000 /* direct disk access */
80  #endif
81 #endif
82 
83 
84 #ifndef O_DIRECT
85  #define O_DIRECT O_SYNC
86 #endif
87 
88 
89 #include <stxxl/bits/namespace.h>
90 #include <stxxl/bits/io/iostats.h>
91 #include <stxxl/bits/common/semaphore.h>
92 #include <stxxl/bits/common/mutex.h>
93 #include <stxxl/bits/common/switch.h>
94 #include <stxxl/bits/common/state.h>
95 #include <stxxl/bits/common/exceptions.h>
96 #include <stxxl/bits/io/completion_handler.h>
97 
98 
99 __STXXL_BEGIN_NAMESPACE
100 
105 
106 #define BLOCK_ALIGN 4096
107 
108 typedef void * (*thread_function_t)(void *);
109 typedef stxxl::int64 DISKID;
110 
111 class request;
112 class request_ptr;
113 
115 
117 {
119  void operator () (request *) { }
120 };
121 
123 
126 class file : private noncopyable
127 {
128 protected:
129  int id;
130 
134  file(int _id) : id(_id) { }
135 
136 public:
138 
142  {
143  RDONLY = 1,
144  WRONLY = 2,
145  RDWR = 4,
146  CREAT = 8,
147  DIRECT = 16,
148  TRUNC = 32
149  };
150 
157  virtual request_ptr aread(void * buffer, stxxl::int64 pos, size_t bytes,
158  completion_handler on_cmpl) = 0;
165  virtual request_ptr awrite(void * buffer, stxxl::int64 pos, size_t bytes,
166  completion_handler on_cmpl) = 0;
167 
170  virtual void set_size(stxxl::int64 newsize) = 0;
173  virtual stxxl::int64 size() = 0;
175  __STXXL_DEPRECATED(int get_disk_number())
176  {
177  return id;
178  }
182  int get_id()
183  {
184  return id;
185  }
186 
188  virtual void lock() { }
189 
191  virtual void delete_region(int64 offset, unsigned_type size)
192  {
193  UNUSED(offset);
194  UNUSED(size);
195  }
196 
197  virtual ~file() { }
198 };
199 
200 class mc;
201 class disk_queue;
202 class disk_queues;
203 
205 
209 class request : private noncopyable
210 {
211  friend int wait_any(request_ptr req_array[], int count);
212  template <class request_iterator_>
213  friend
214  request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end);
215  friend class file;
216  friend class disk_queue;
217  friend class disk_queues;
218  friend class request_ptr;
219 
220 protected:
221  virtual bool add_waiter(onoff_switch * sw) = 0;
222  virtual void delete_waiter(onoff_switch * sw) = 0;
223  //virtual void enqueue () = 0;
224  virtual void serve() = 0;
225  //virtual unsigned size() const;
226 
227  completion_handler on_complete;
228  int ref_cnt;
229  std::auto_ptr<stxxl::io_error> error;
230 
231  mutex ref_cnt_mutex;
232 
233 public:
234  enum request_type { READ, WRITE };
235 
236 protected:
237  file * file_;
238  void * buffer;
239  stxxl::int64 offset;
240  size_t bytes;
241  request_type type;
242 
243  void completed()
244  {
245  on_complete(this);
246  }
247 
248  // returns number of references
249  int nref()
250  {
251  scoped_mutex_lock Lock(ref_cnt_mutex);
252  return ref_cnt;
253  }
254 
255 public:
256  request(completion_handler on_compl,
257  file * file__,
258  void * buffer_,
259  stxxl::int64 offset_,
260  size_t bytes_,
261  request_type type_) :
262  on_complete(on_compl), ref_cnt(0),
263  file_(file__),
264  buffer(buffer_),
265  offset(offset_),
266  bytes(bytes_),
267  type(type_)
268  {
269  STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": creation, cnt: " << ref_cnt);
270  }
272  virtual void wait() = 0;
275  virtual bool poll() = 0;
278  virtual const char * io_type()
279  {
280  return "none";
281  }
282  virtual ~request()
283  {
284  STXXL_VERBOSE3("request " << static_cast<void *>(this) << ": deletion, cnt: " << ref_cnt);
285  }
286  file * get_file() const { return file_; }
287  void * get_buffer() const { return buffer; }
288  stxxl::int64 get_offset() const { return offset; }
289  size_t get_size() const { return bytes; }
290  size_t size() const { return bytes; }
291  request_type get_type() const { return type; }
292 
293  virtual std::ostream & print(std::ostream & out) const
294  {
295  out << "File object address: " << (void *)get_file();
296  out << " Buffer address: " << (void *)get_buffer();
297  out << " File offset: " << get_offset();
298  out << " Transfer size: " << get_size() << " bytes";
299  out << " Type of transfer: " << ((get_type() == READ) ? "READ" : "WRITE");
300  return out;
301  }
302 
305  void error_occured(const char * msg)
306  {
307  error.reset(new stxxl::io_error(msg));
308  }
309 
312  void error_occured(const std::string & msg)
313  {
314  error.reset(new stxxl::io_error(msg));
315  }
316 
318  void check_errors() throw (stxxl::io_error)
319  {
320  if (error.get())
321  throw * (error.get());
322  }
323 
324 private:
325  void add_ref()
326  {
327  scoped_mutex_lock Lock(ref_cnt_mutex);
328  ref_cnt++;
329  STXXL_VERBOSE3("request add_ref() " << static_cast<void *>(this) << ": adding reference, cnt: " << ref_cnt);
330  }
331 
332  bool sub_ref()
333  {
334  int val;
335  {
336  scoped_mutex_lock Lock(ref_cnt_mutex);
337  val = --ref_cnt;
338  STXXL_VERBOSE3("request sub_ref() " << static_cast<void *>(this) << ": subtracting reference cnt: " << ref_cnt);
339  }
340  assert(val >= 0);
341  return (val == 0);
342  }
343 };
344 
345 inline std::ostream & operator << (std::ostream & out, const request & req)
346 {
347  return req.print(out);
348 }
349 
351 
354 {
355  request * ptr;
356  void add_ref()
357  {
358  if (ptr)
359  {
360  ptr->add_ref();
361  }
362  }
363  void sub_ref()
364  {
365  if (ptr)
366  {
367  if (ptr->sub_ref())
368  {
369  STXXL_VERBOSE3("the last copy " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
370  delete ptr;
371  ptr = NULL;
372  }
373  else
374  {
375  STXXL_VERBOSE3("more copies " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
376  }
377  }
378  }
379 
380 public:
382  request_ptr(request * ptr_ = NULL) : ptr(ptr_)
383  {
384  STXXL_VERBOSE3("create constructor (request =" << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
385  add_ref();
386  }
388  request_ptr(const request_ptr & p) : ptr(p.ptr)
389  {
390  STXXL_VERBOSE3("copy constructor (copying " << static_cast<void *>(ptr) << ") this=" << static_cast<void *>(this));
391  add_ref();
392  }
395  {
396  STXXL_VERBOSE3("Destructor of a request_ptr pointing to " << static_cast<void *>(ptr) << " this=" << static_cast<void *>(this));
397  sub_ref();
398  }
402  {
403  // assert(p.ptr);
404  return (*this = p.ptr);
405  }
409  {
410  STXXL_VERBOSE3("assign operator begin (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
411  if (p != ptr)
412  {
413  sub_ref();
414  ptr = p;
415  add_ref();
416  }
417  STXXL_VERBOSE3("assign operator end (assigning " << static_cast<void *>(p) << ") this=" << static_cast<void *>(this));
418  return *this;
419  }
423  {
424  assert(ptr);
425  return *ptr;
426  }
430  {
431  assert(ptr);
432  return ptr;
433  }
438  request * get() const { return ptr; }
439 
441  bool valid() const { return ptr; }
442 
444  bool empty() const { return !ptr; }
445 };
446 
448 
453 inline int wait_any(request_ptr req_array[], int count);
457 inline void wait_all(request_ptr req_array[], int count);
463 inline bool poll_any(request_ptr req_array[], int count, int & index);
464 
465 
466 void wait_all(request_ptr req_array[], int count)
467 {
468  for (int i = 0; i < count; i++)
469  {
470  req_array[i]->wait();
471  }
472 }
473 
474 template <class request_iterator_>
475 void wait_all(request_iterator_ reqs_begin, request_iterator_ reqs_end)
476 {
477  while (reqs_begin != reqs_end)
478  {
479  (request_ptr(*reqs_begin))->wait();
480  ++reqs_begin;
481  }
482 }
483 
484 bool poll_any(request_ptr req_array[], int count, int & index)
485 {
486  index = -1;
487  for (int i = 0; i < count; i++)
488  {
489  if (req_array[i]->poll())
490  {
491  index = i;
492  return true;
493  }
494  }
495  return false;
496 }
497 
498 template <class request_iterator_>
499 request_iterator_ poll_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
500 {
501  while (reqs_begin != reqs_end)
502  {
503  if ((request_ptr(*reqs_begin))->poll())
504  return reqs_begin;
505 
506  ++reqs_begin;
507  }
508  return reqs_end;
509 }
510 
511 
512 int wait_any(request_ptr req_array[], int count)
513 {
514  stats::scoped_wait_timer wait_timer;
515 
516  onoff_switch sw;
517  int i = 0, index = -1;
518 
519  for ( ; i < count; i++)
520  {
521  if (req_array[i]->add_waiter(&sw))
522  {
523  // already done
524  index = i;
525 
526  while (--i >= 0)
527  req_array[i]->delete_waiter(&sw);
528 
529  req_array[index]->check_errors();
530 
531  return index;
532  }
533  }
534 
535  sw.wait_for_on();
536 
537  for (i = 0; i < count; i++)
538  {
539  req_array[i]->delete_waiter(&sw);
540  if (index < 0 && req_array[i]->poll())
541  index = i;
542  }
543 
544  return index;
545 }
546 
547 template <class request_iterator_>
548 request_iterator_ wait_any(request_iterator_ reqs_begin, request_iterator_ reqs_end)
549 {
550  stats::scoped_wait_timer wait_timer;
551 
552  onoff_switch sw;
553 
554  request_iterator_ cur = reqs_begin, result = reqs_end;
555 
556  for ( ; cur != reqs_end; cur++)
557  {
558  if ((request_ptr(*cur))->add_waiter(&sw))
559  {
560  // already done
561  result = cur;
562 
563  if (cur != reqs_begin)
564  {
565  while (--cur != reqs_begin)
566  (request_ptr(*cur))->delete_waiter(&sw);
567 
568  (request_ptr(*cur))->delete_waiter(&sw);
569  }
570 
571  (request_ptr(*result))->check_errors();
572 
573  return result;
574  }
575  }
576 
577  sw.wait_for_on();
578 
579  for (cur = reqs_begin; cur != reqs_end; cur++)
580  {
581  (request_ptr(*cur))->delete_waiter(&sw);
582  if (result == reqs_end && (request_ptr(*cur))->poll())
583  result = cur;
584  }
585 
586  return result;
587 }
588 
589 class disk_queue : private noncopyable
590 {
591 public:
592  enum priority_op { READ, WRITE, NONE };
593 
594 private:
595  mutex write_mutex;
596  mutex read_mutex;
597  std::queue<request_ptr> write_queue;
598  std::queue<request_ptr> read_queue;
599 
600  semaphore sem;
601 
602  priority_op _priority_op;
603 
604 #ifdef STXXL_BOOST_THREADS
605  boost::thread thread;
606 #else
607  pthread_t thread;
608 #endif
609 
610 
611  static void * worker(void * arg);
612 
613 public:
614  disk_queue(int n = 1); // max number of requests simultaneously submitted to disk
615 
616  void set_priority_op(priority_op op)
617  {
618  _priority_op = op;
619  }
620  void add_readreq(request_ptr & req);
621  void add_writereq(request_ptr & req);
622  ~disk_queue();
623 };
624 
627 class disk_queues : public singleton<disk_queues>
628 {
629  friend class singleton<disk_queues>;
630 
631 protected:
632  std::map<DISKID, disk_queue *> queues;
633  disk_queues() { }
634 
635 public:
636  void add_readreq(request_ptr & req, DISKID disk)
637  {
638  if (queues.find(disk) == queues.end())
639  {
640  // create new disk queue
641  queues[disk] = new disk_queue();
642  }
643  queues[disk]->add_readreq(req);
644  }
645  void add_writereq(request_ptr & req, DISKID disk)
646  {
647  if (queues.find(disk) == queues.end())
648  {
649  // create new disk queue
650  queues[disk] = new disk_queue();
651  }
652  queues[disk]->add_writereq(req);
653  }
654  ~disk_queues()
655  {
656  // deallocate all queues
657  for (std::map<DISKID, disk_queue *>::iterator i =
658  queues.begin(); i != queues.end(); i++)
659  delete (*i).second;
660  }
666  void set_priority_op(disk_queue::priority_op op)
667  {
668  for (std::map<DISKID, disk_queue *>::iterator i =
669  queues.begin(); i != queues.end(); i++)
670  i->second->set_priority_op(op);
671  }
672 };
673 
675 
676 __STXXL_END_NAMESPACE
677 
678 #endif // !STXXL_IOBASE_HEADER