Universal Software Radio Peripheral
|
00001 /* -*- c++ -*- */ 00002 /* 00003 * Copyright 2006 Free Software Foundation, Inc. 00004 * 00005 * This file is part of GNU Radio. 00006 * 00007 * GNU Radio is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 3, or (at your option) 00010 * any later version. 00011 * 00012 * GNU Radio is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with GNU Radio; see the file COPYING. If not, write to 00019 * the Free Software Foundation, Inc., 51 Franklin Street, 00020 * Boston, MA 02110-1301, USA. 00021 */ 00022 00023 #ifndef _CIRCULAR_BUFFER_H_ 00024 #define _CIRCULAR_BUFFER_H_ 00025 00026 #include "mld_threads.h" 00027 #include <stdexcept> 00028 00029 #ifndef DO_DEBUG 00030 #define DO_DEBUG 0 00031 #endif 00032 00033 #if DO_DEBUG 00034 #define DEBUG(X) do{X} while(0); 00035 #else 00036 #define DEBUG(X) do{} while(0); 00037 #endif 00038 00039 template <class T> class circular_buffer 00040 { 00041 private: 00042 // the buffer to use 00043 T* d_buffer; 00044 00045 // the following are in Items (type T) 00046 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I; 00047 UInt32 d_n_avail_write_I, d_n_avail_read_I; 00048 00049 // stuff to control access to class internals 00050 mld_mutex_ptr d_internal; 00051 mld_condition_ptr d_readBlock, d_writeBlock; 00052 00053 // booleans to decide how to control reading, writing, and aborting 00054 bool d_doWriteBlock, d_doFullRead, d_doAbort; 00055 00056 void delete_mutex_cond () { 00057 if (d_internal) { 00058 delete d_internal; 00059 d_internal = NULL; 00060 } 00061 if (d_readBlock) { 00062 delete d_readBlock; 00063 d_readBlock = NULL; 00064 } 00065 if (d_writeBlock) { 00066 delete d_writeBlock; 00067 d_writeBlock = NULL; 00068 } 00069 }; 00070 00071 public: 00072 circular_buffer (UInt32 bufLen_I, 00073 bool doWriteBlock = true, bool doFullRead = false) { 00074 if (bufLen_I == 0) 00075 throw std::runtime_error ("circular_buffer(): " 00076 "Number of items to buffer must be > 0.\n"); 00077 d_bufLen_I = bufLen_I; 00078 d_buffer = (T*) new T[d_bufLen_I]; 00079 d_doWriteBlock = doWriteBlock; 00080 d_doFullRead = doFullRead; 00081 d_internal = NULL; 00082 d_readBlock = d_writeBlock = NULL; 00083 reset (); 00084 DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, " 00085 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, 00086 (d_doWriteBlock ? "true" : "false"), 00087 (d_doFullRead ? "true" : "false"));); 00088 }; 00089 00090 ~circular_buffer () { 00091 delete_mutex_cond (); 00092 delete [] d_buffer; 00093 }; 00094 00095 inline UInt32 n_avail_write_items () { 00096 d_internal->lock (); 00097 UInt32 retVal = d_n_avail_write_I; 00098 d_internal->unlock (); 00099 return (retVal); 00100 }; 00101 00102 inline UInt32 n_avail_read_items () { 00103 d_internal->lock (); 00104 UInt32 retVal = d_n_avail_read_I; 00105 d_internal->unlock (); 00106 return (retVal); 00107 }; 00108 00109 inline UInt32 buffer_length_items () {return (d_bufLen_I);}; 00110 inline bool do_write_block () {return (d_doWriteBlock);}; 00111 inline bool do_full_read () {return (d_doFullRead);}; 00112 00113 void reset () { 00114 d_doAbort = false; 00115 bzero (d_buffer, d_bufLen_I * sizeof (T)); 00116 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0; 00117 d_n_avail_write_I = d_bufLen_I; 00118 delete_mutex_cond (); 00119 // create a mutex to handle contention of shared resources; 00120 // any routine needed access to shared resources uses lock() 00121 // before doing anything, then unlock() when finished. 00122 d_internal = new mld_mutex (); 00123 // link the internal mutex to the read and write conditions; 00124 // when wait() is called, the internal mutex will automatically 00125 // be unlock()'ed. Upon return (from a signal() to the condition), 00126 // the internal mutex will be lock()'ed. 00127 d_readBlock = new mld_condition (d_internal); 00128 d_writeBlock = new mld_condition (d_internal); 00129 }; 00130 00131 /* 00132 * enqueue: add the given buffer of item-length to the queue, 00133 * first-in-first-out (FIFO). 00134 * 00135 * inputs: 00136 * buf: a pointer to the buffer holding the data 00137 * 00138 * bufLen_I: the buffer length in items (of the instantiated type) 00139 * 00140 * returns: 00141 * -1: on overflow (write is not blocking, and data is being 00142 * written faster than it is being read) 00143 * 0: if nothing to do (0 length buffer) 00144 * 1: if success 00145 * 2: in the process of aborting, do doing nothing 00146 * 00147 * will throw runtime errors if inputs are improper: 00148 * buffer pointer is NULL 00149 * buffer length is larger than the instantiated buffer length 00150 */ 00151 00152 int enqueue (T* buf, UInt32 bufLen_I) { 00153 DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " 00154 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, 00155 d_n_avail_write_I, d_n_avail_read_I);); 00156 if (bufLen_I > d_bufLen_I) { 00157 fprintf (stderr, "cannot add buffer longer (%ld" 00158 ") than instantiated length (%ld" 00159 ").\n", bufLen_I, d_bufLen_I); 00160 throw std::runtime_error ("circular_buffer::enqueue()"); 00161 } 00162 00163 if (bufLen_I == 0) 00164 return (0); 00165 if (!buf) 00166 throw std::runtime_error ("circular_buffer::enqueue(): " 00167 "input buffer is NULL.\n"); 00168 d_internal->lock (); 00169 if (d_doAbort) { 00170 d_internal->unlock (); 00171 return (2); 00172 } 00173 // set the return value to 1: success; change if needed 00174 int retval = 1; 00175 if (bufLen_I > d_n_avail_write_I) { 00176 if (d_doWriteBlock) { 00177 while (bufLen_I > d_n_avail_write_I) { 00178 DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n");); 00179 // wait will automatically unlock() the internal mutex 00180 d_writeBlock->wait (); 00181 // and lock() it here. 00182 if (d_doAbort) { 00183 d_internal->unlock (); 00184 DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n");); 00185 return (2); 00186 } 00187 DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n");); 00188 } 00189 } else { 00190 d_n_avail_read_I = d_bufLen_I - bufLen_I; 00191 d_n_avail_write_I = bufLen_I; 00192 DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n");); 00193 retval = -1; 00194 } 00195 } 00196 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0; 00197 if (n_now_I > bufLen_I) 00198 n_now_I = bufLen_I; 00199 else if (n_now_I < bufLen_I) 00200 n_start_I = bufLen_I - n_now_I; 00201 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T)); 00202 if (n_start_I) { 00203 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T)); 00204 d_writeNdx_I = n_start_I; 00205 } else 00206 d_writeNdx_I += n_now_I; 00207 d_n_avail_read_I += bufLen_I; 00208 d_n_avail_write_I -= bufLen_I; 00209 d_readBlock->signal (); 00210 d_internal->unlock (); 00211 return (retval); 00212 }; 00213 00214 /* 00215 * dequeue: removes from the queue the number of items requested, or 00216 * available, into the given buffer on a FIFO basis. 00217 * 00218 * inputs: 00219 * buf: a pointer to the buffer into which to copy the data 00220 * 00221 * bufLen_I: pointer to the number of items to remove in items 00222 * (of the instantiated type) 00223 * 00224 * returns: 00225 * 0: if nothing to do (0 length buffer) 00226 * 1: if success 00227 * 2: in the process of aborting, do doing nothing 00228 * 00229 * will throw runtime errors if inputs are improper: 00230 * buffer pointer is NULL 00231 * buffer length pointer is NULL 00232 * buffer length is larger than the instantiated buffer length 00233 */ 00234 00235 int dequeue (T* buf, UInt32* bufLen_I) { 00236 DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " 00237 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, 00238 d_n_avail_write_I, d_n_avail_read_I);); 00239 if (!bufLen_I) 00240 throw std::runtime_error ("circular_buffer::dequeue(): " 00241 "input bufLen pointer is NULL.\n"); 00242 if (!buf) 00243 throw std::runtime_error ("circular_buffer::dequeue(): " 00244 "input buffer pointer is NULL.\n"); 00245 UInt32 l_bufLen_I = *bufLen_I; 00246 if (l_bufLen_I == 0) 00247 return (0); 00248 if (l_bufLen_I > d_bufLen_I) { 00249 fprintf (stderr, "cannot remove buffer longer (%ld" 00250 ") than instantiated length (%ld" 00251 ").\n", l_bufLen_I, d_bufLen_I); 00252 throw std::runtime_error ("circular_buffer::dequeue()"); 00253 } 00254 00255 d_internal->lock (); 00256 if (d_doAbort) { 00257 d_internal->unlock (); 00258 return (2); 00259 } 00260 if (d_doFullRead) { 00261 while (d_n_avail_read_I < l_bufLen_I) { 00262 DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n");); 00263 // wait will automatically unlock() the internal mutex 00264 d_readBlock->wait (); 00265 // and lock() it here. 00266 if (d_doAbort) { 00267 d_internal->unlock (); 00268 DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n");); 00269 return (2); 00270 } 00271 DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n");); 00272 } 00273 } else { 00274 while (d_n_avail_read_I == 0) { 00275 DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n");); 00276 // wait will automatically unlock() the internal mutex 00277 d_readBlock->wait (); 00278 // and lock() it here. 00279 if (d_doAbort) { 00280 d_internal->unlock (); 00281 DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n");); 00282 return (2); 00283 } 00284 DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n");); 00285 } 00286 } 00287 if (l_bufLen_I > d_n_avail_read_I) 00288 l_bufLen_I = d_n_avail_read_I; 00289 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0; 00290 if (n_now_I > l_bufLen_I) 00291 n_now_I = l_bufLen_I; 00292 else if (n_now_I < l_bufLen_I) 00293 n_start_I = l_bufLen_I - n_now_I; 00294 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T)); 00295 if (n_start_I) { 00296 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T)); 00297 d_readNdx_I = n_start_I; 00298 } else 00299 d_readNdx_I += n_now_I; 00300 *bufLen_I = l_bufLen_I; 00301 d_n_avail_read_I -= l_bufLen_I; 00302 d_n_avail_write_I += l_bufLen_I; 00303 d_writeBlock->signal (); 00304 d_internal->unlock (); 00305 return (1); 00306 }; 00307 00308 void abort () { 00309 d_internal->lock (); 00310 d_doAbort = true; 00311 d_writeBlock->signal (); 00312 d_readBlock->signal (); 00313 d_internal->unlock (); 00314 }; 00315 }; 00316 00317 #endif /* _CIRCULAR_BUFFER_H_ */