Stxxl 1.2.1
|
00001 /*************************************************************************** 00002 * include/stxxl/bits/algo/async_schedule.h 00003 * 00004 * Part of the STXXL. See http://stxxl.sourceforge.net 00005 * 00006 * Copyright (C) 2002 Roman Dementiev <dementiev@mpi-sb.mpg.de> 00007 * 00008 * Distributed under the Boost Software License, Version 1.0. 00009 * (See accompanying file LICENSE_1_0.txt or copy at 00010 * http://www.boost.org/LICENSE_1_0.txt) 00011 **************************************************************************/ 00012 00013 #ifndef STXXL_ASYNC_SCHEDULE_HEADER 00014 #define STXXL_ASYNC_SCHEDULE_HEADER 00015 00016 #include <queue> 00017 #include <algorithm> 00018 #include <functional> 00019 00020 #ifdef STXXL_BOOST_CONFIG 00021 #include <boost/config.hpp> 00022 #endif 00023 #include <stxxl/bits/io/iobase.h> 00024 00025 #include <stxxl/bits/compat_hash_map.h> 00026 #include <stxxl/bits/compat_hash_set.h> 00027 00028 00029 __STXXL_BEGIN_NAMESPACE 00030 00031 struct sim_event // only one type of event: WRITE COMPLETED 00032 { 00033 int_type timestamp; 00034 int_type iblock; 00035 inline sim_event(int_type t, int_type b) : timestamp(t), iblock(b) { } 00036 }; 00037 00038 struct sim_event_cmp : public std::binary_function<sim_event, sim_event, bool> 00039 { 00040 inline bool operator () (const sim_event & a, const sim_event & b) const 00041 { 00042 return a.timestamp > b.timestamp; 00043 } 00044 }; 00045 00046 int_type simulate_async_write( 00047 int * disks, 00048 const int_type L, 00049 const int_type m_init, 00050 const int_type D, 00051 std::pair<int_type, int_type> * o_time); 00052 00053 typedef std::pair<int_type, int_type> write_time_pair; 00054 struct write_time_cmp : public std::binary_function<write_time_pair, write_time_pair, bool> 00055 { 00056 inline bool operator () (const write_time_pair & a, const write_time_pair & b) const 00057 { 00058 return a.second > b.second; 00059 } 00060 }; 00061 00062 void compute_prefetch_schedule( 00063 int_type * first, 00064 int_type * last, 00065 int_type * out_first, 00066 int_type m, 00067 int_type D); 00068 00069 template <typename run_type> 00070 void simulate_async_write( 00071 const run_type & input, 00072 const int_type m_init, 00073 const int_type D, 00074 std::pair<int_type, int_type> * o_time) 00075 { 00076 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type; 00077 typedef std::queue<int_type> disk_queue_type; 00078 00079 const int_type L = input.size(); 00080 assert(L >= D); 00081 disk_queue_type * disk_queues = new disk_queue_type[L]; 00082 event_queue_type event_queue; 00083 00084 int_type m = m_init; 00085 int_type i = L - 1; 00086 int_type oldtime = 0; 00087 bool * disk_busy = new bool[D]; 00088 00089 while (m && (i >= 0)) 00090 { 00091 int_type disk = input[i].bid.storage->get_id(); 00092 disk_queues[disk].push(i); 00093 i--; 00094 m--; 00095 } 00096 00097 for (int_type ii = 0; ii < D; ii++) 00098 if (!disk_queues[ii].empty()) 00099 { 00100 int_type j = disk_queues[ii].front(); 00101 disk_queues[ii].pop(); 00102 event_queue.push(sim_event(1, j)); 00103 } 00104 00105 while (!event_queue.empty()) 00106 { 00107 sim_event cur = event_queue.top(); 00108 event_queue.pop(); 00109 if (oldtime != cur.timestamp) 00110 { 00111 // clear disk_busy 00112 for (int_type i = 0; i < D; i++) 00113 disk_busy[i] = false; 00114 00115 00116 oldtime = cur.timestamp; 00117 } 00118 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1); 00119 00120 m++; 00121 if (i >= 0) 00122 { 00123 m--; 00124 int_type disk = input[i].bid.storage->get_id(); 00125 if (disk_busy[disk]) 00126 { 00127 disk_queues[disk].push(i); 00128 } 00129 else 00130 { 00131 event_queue.push(sim_event(cur.timestamp + 1, i)); 00132 disk_busy[disk] = true; 00133 } 00134 00135 i--; 00136 } 00137 00138 // add next block to write 00139 int_type disk = input[cur.iblock].bid.storage->get_id(); 00140 if (!disk_busy[disk] && !disk_queues[disk].empty()) 00141 { 00142 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front())); 00143 disk_queues[disk].pop(); 00144 disk_busy[disk] = true; 00145 } 00146 } 00147 00148 delete[] disk_busy; 00149 delete[] disk_queues; 00150 } 00151 00152 00153 template <typename run_type> 00154 void compute_prefetch_schedule( 00155 const run_type & input, 00156 int_type * out_first, 00157 int_type m, 00158 int_type D) 00159 { 00160 typedef std::pair<int_type, int_type> pair_type; 00161 const int_type L = input.size(); 00162 if (L <= D) 00163 { 00164 for (int_type i = 0; i < L; ++i) 00165 out_first[i] = i; 00166 00167 return; 00168 } 00169 pair_type * write_order = new pair_type[L]; 00170 00171 simulate_async_write(input, m, D, write_order); 00172 00173 std::stable_sort(write_order, write_order + L, write_time_cmp()); 00174 00175 for (int_type i = 0; i < L; i++) 00176 out_first[i] = write_order[i].first; 00177 00178 00179 delete[] write_order; 00180 } 00181 00182 00183 template <typename bid_iterator_type> 00184 void simulate_async_write( 00185 bid_iterator_type input, 00186 const int_type L, 00187 const int_type m_init, 00188 const int_type /*D*/, 00189 std::pair<int_type, int_type> * o_time) 00190 { 00191 typedef std::priority_queue<sim_event, std::vector<sim_event>, sim_event_cmp> event_queue_type; 00192 typedef std::queue<int_type> disk_queue_type; 00193 00194 //disk_queue_type * disk_queues = new disk_queue_type[L]; 00195 typedef typename compat_hash_map<int, disk_queue_type>::result disk_queues_type; 00196 disk_queues_type disk_queues; 00197 event_queue_type event_queue; 00198 00199 int_type m = m_init; 00200 int_type i = L - 1; 00201 int_type oldtime = 0; 00202 //bool * disk_busy = new bool [D]; 00203 compat_hash_set<int>::result disk_busy; 00204 00205 while (m && (i >= 0)) 00206 { 00207 int_type disk = (*(input + i)).storage->get_id(); 00208 disk_queues[disk].push(i); 00209 i--; 00210 m--; 00211 } 00212 00213 /* 00214 for(int_type i=0;i<D;i++) 00215 if(!disk_queues[i].empty()) 00216 { 00217 int_type j = disk_queues[i].front(); 00218 disk_queues[i].pop(); 00219 event_queue.push(sim_event(1,j)); 00220 } 00221 */ 00222 disk_queues_type::iterator it = disk_queues.begin(); 00223 for ( ; it != disk_queues.end(); ++it) 00224 { 00225 int_type j = (*it).second.front(); 00226 (*it).second.pop(); 00227 event_queue.push(sim_event(1, j)); 00228 } 00229 00230 while (!event_queue.empty()) 00231 { 00232 sim_event cur = event_queue.top(); 00233 event_queue.pop(); 00234 if (oldtime != cur.timestamp) 00235 { 00236 // clear disk_busy 00237 //for(int_type i=0;i<D;i++) 00238 // disk_busy[i] = false; 00239 disk_busy.clear(); 00240 00241 oldtime = cur.timestamp; 00242 } 00243 o_time[cur.iblock] = std::pair<int_type, int_type>(cur.iblock, cur.timestamp + 1); 00244 00245 m++; 00246 if (i >= 0) 00247 { 00248 m--; 00249 int_type disk = (*(input + i)).storage->get_id(); 00250 if (disk_busy.find(disk) != disk_busy.end()) 00251 { 00252 disk_queues[disk].push(i); 00253 } 00254 else 00255 { 00256 event_queue.push(sim_event(cur.timestamp + 1, i)); 00257 disk_busy.insert(disk); 00258 } 00259 00260 i--; 00261 } 00262 00263 // add next block to write 00264 int_type disk = (*(input + cur.iblock)).storage->get_id(); 00265 if (disk_busy.find(disk) == disk_busy.end() && !disk_queues[disk].empty()) 00266 { 00267 event_queue.push(sim_event(cur.timestamp + 1, disk_queues[disk].front())); 00268 disk_queues[disk].pop(); 00269 disk_busy.insert(disk); 00270 } 00271 } 00272 00273 //delete [] disk_busy; 00274 //delete [] disk_queues; 00275 } 00276 00277 00278 template <typename bid_iterator_type> 00279 void compute_prefetch_schedule( 00280 bid_iterator_type input_begin, 00281 bid_iterator_type input_end, 00282 int_type * out_first, 00283 int_type m, 00284 int_type D) 00285 { 00286 typedef std::pair<int_type, int_type> pair_type; 00287 const int_type L = input_end - input_begin; 00288 STXXL_VERBOSE1("compute_prefetch_schedule: sequence length=" << L << " disks=" << D); 00289 if (L <= D) 00290 { 00291 for (int_type i = 0; i < L; ++i) 00292 out_first[i] = i; 00293 00294 return; 00295 } 00296 pair_type * write_order = new pair_type[L]; 00297 00298 simulate_async_write(input_begin, L, m, D, write_order); 00299 00300 std::stable_sort(write_order, write_order + L, write_time_cmp()); 00301 00302 STXXL_VERBOSE1("Computed prefetch order for " << L << " blocks with " << 00303 D << " disks and " << m << " blocks"); 00304 for (int_type i = 0; i < L; i++) 00305 { 00306 out_first[i] = write_order[i].first; 00307 STXXL_VERBOSE1(write_order[i].first); 00308 } 00309 00310 delete[] write_order; 00311 } 00312 00313 __STXXL_END_NAMESPACE 00314 00315 #endif // !STXXL_ASYNC_SCHEDULE_HEADER