GNU Radio 3.2.2 C++ API
|
00001 /* -*- c++ -*- */ 00002 /* 00003 * Copyright 2006 Free Software Foundation, Inc. 00004 * 00005 * This file is part of GNU Radio. 00006 * 00007 * GNU Radio is free software; you can redistribute it and/or modify 00008 * it under the terms of the GNU General Public License as published by 00009 * the Free Software Foundation; either version 3, or (at your option) 00010 * any later version. 00011 * 00012 * GNU Radio is distributed in the hope that it will be useful, 00013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 * GNU General Public License for more details. 00016 * 00017 * You should have received a copy of the GNU General Public License 00018 * along with GNU Radio; see the file COPYING. If not, write to 00019 * the Free Software Foundation, Inc., 51 Franklin Street, 00020 * Boston, MA 02110-1301, USA. 00021 */ 00022 00023 #ifndef _CIRCULAR_BUFFER_H_ 00024 #define _CIRCULAR_BUFFER_H_ 00025 00026 #include "mld_threads.h" 00027 #include <stdexcept> 00028 00029 #define DO_DEBUG 0 00030 00031 #if DO_DEBUG 00032 #define DEBUG(X) do{X} while(0); 00033 #else 00034 #define DEBUG(X) do{} while(0); 00035 #endif 00036 00037 template <class T> class circular_buffer 00038 { 00039 private: 00040 // the buffer to use 00041 T* d_buffer; 00042 00043 // the following are in Items (type T) 00044 UInt32 d_bufLen_I, d_readNdx_I, d_writeNdx_I; 00045 UInt32 d_n_avail_write_I, d_n_avail_read_I; 00046 00047 // stuff to control access to class internals 00048 mld_mutex_ptr d_internal; 00049 mld_condition_ptr d_readBlock, d_writeBlock; 00050 00051 // booleans to decide how to control reading, writing, and aborting 00052 bool d_doWriteBlock, d_doFullRead, d_doAbort; 00053 00054 void delete_mutex_cond () { 00055 if (d_internal) { 00056 delete d_internal; 00057 d_internal = NULL; 00058 } 00059 if (d_readBlock) { 00060 delete d_readBlock; 00061 d_readBlock = NULL; 00062 } 00063 if (d_writeBlock) { 00064 delete d_writeBlock; 00065 d_writeBlock = NULL; 00066 } 00067 }; 00068 00069 public: 00070 circular_buffer (UInt32 bufLen_I, 00071 bool doWriteBlock = true, bool doFullRead = false) { 00072 if (bufLen_I == 0) 00073 throw std::runtime_error ("circular_buffer(): " 00074 "Number of items to buffer must be > 0.\n"); 00075 d_bufLen_I = bufLen_I; 00076 d_buffer = (T*) new T[d_bufLen_I]; 00077 d_doWriteBlock = doWriteBlock; 00078 d_doFullRead = doFullRead; 00079 d_internal = NULL; 00080 d_readBlock = d_writeBlock = NULL; 00081 reset (); 00082 DEBUG (fprintf (stderr, "c_b(): buf len (items) = %ld, " 00083 "doWriteBlock = %s, doFullRead = %s\n", d_bufLen_I, 00084 (d_doWriteBlock ? "true" : "false"), 00085 (d_doFullRead ? "true" : "false"))); 00086 }; 00087 00088 ~circular_buffer () { 00089 delete_mutex_cond (); 00090 delete [] d_buffer; 00091 }; 00092 00093 inline UInt32 n_avail_write_items () { 00094 d_internal->lock (); 00095 UInt32 retVal = d_n_avail_write_I; 00096 d_internal->unlock (); 00097 return (retVal); 00098 }; 00099 00100 inline UInt32 n_avail_read_items () { 00101 d_internal->lock (); 00102 UInt32 retVal = d_n_avail_read_I; 00103 d_internal->unlock (); 00104 return (retVal); 00105 }; 00106 00107 inline UInt32 buffer_length_items () {return (d_bufLen_I);}; 00108 inline bool do_write_block () {return (d_doWriteBlock);}; 00109 inline bool do_full_read () {return (d_doFullRead);}; 00110 00111 void reset () { 00112 d_doAbort = false; 00113 bzero (d_buffer, d_bufLen_I * sizeof (T)); 00114 d_readNdx_I = d_writeNdx_I = d_n_avail_read_I = 0; 00115 d_n_avail_write_I = d_bufLen_I; 00116 delete_mutex_cond (); 00117 // create a mutex to handle contention of shared resources; 00118 // any routine needed access to shared resources uses lock() 00119 // before doing anything, then unlock() when finished. 00120 d_internal = new mld_mutex (); 00121 // link the internal mutex to the read and write conditions; 00122 // when wait() is called, the internal mutex will automatically 00123 // be unlock()'ed. Upon return (from a signal() to the condition), 00124 // the internal mutex will be lock()'ed. 00125 d_readBlock = new mld_condition (d_internal); 00126 d_writeBlock = new mld_condition (d_internal); 00127 }; 00128 00129 /* 00130 * enqueue: add the given buffer of item-length to the queue, 00131 * first-in-first-out (FIFO). 00132 * 00133 * inputs: 00134 * buf: a pointer to the buffer holding the data 00135 * 00136 * bufLen_I: the buffer length in items (of the instantiated type) 00137 * 00138 * returns: 00139 * -1: on overflow (write is not blocking, and data is being 00140 * written faster than it is being read) 00141 * 0: if nothing to do (0 length buffer) 00142 * 1: if success 00143 * 2: in the process of aborting, do doing nothing 00144 * 00145 * will throw runtime errors if inputs are improper: 00146 * buffer pointer is NULL 00147 * buffer length is larger than the instantiated buffer length 00148 */ 00149 00150 int enqueue (T* buf, UInt32 bufLen_I) { 00151 DEBUG (fprintf (stderr, "enqueue: buf = %X, bufLen = %ld, #av_wr = %ld, " 00152 "#av_rd = %ld.\n", (unsigned int)buf, bufLen_I, 00153 d_n_avail_write_I, d_n_avail_read_I)); 00154 if (bufLen_I > d_bufLen_I) { 00155 fprintf (stderr, "cannot add buffer longer (%ld" 00156 ") than instantiated length (%ld" 00157 ").\n", bufLen_I, d_bufLen_I); 00158 throw std::runtime_error ("circular_buffer::enqueue()"); 00159 } 00160 00161 if (bufLen_I == 0) 00162 return (0); 00163 if (!buf) 00164 throw std::runtime_error ("circular_buffer::enqueue(): " 00165 "input buffer is NULL.\n"); 00166 d_internal->lock (); 00167 if (d_doAbort) { 00168 d_internal->unlock (); 00169 return (2); 00170 } 00171 // set the return value to 1: success; change if needed 00172 int retval = 1; 00173 if (bufLen_I > d_n_avail_write_I) { 00174 if (d_doWriteBlock) { 00175 while (bufLen_I > d_n_avail_write_I) { 00176 DEBUG (fprintf (stderr, "enqueue: #len > #a, waiting.\n")); 00177 // wait will automatically unlock() the internal mutex 00178 d_writeBlock->wait (); 00179 // and lock() it here. 00180 if (d_doAbort) { 00181 d_internal->unlock (); 00182 DEBUG (fprintf (stderr, "enqueue: #len > #a, aborting.\n")); 00183 return (2); 00184 } 00185 DEBUG (fprintf (stderr, "enqueue: #len > #a, done waiting.\n")); 00186 } 00187 } else { 00188 d_n_avail_read_I = d_bufLen_I - bufLen_I; 00189 d_n_avail_write_I = bufLen_I; 00190 DEBUG (fprintf (stderr, "circular_buffer::enqueue: overflow\n")); 00191 retval = -1; 00192 } 00193 } 00194 UInt32 n_now_I = d_bufLen_I - d_writeNdx_I, n_start_I = 0; 00195 if (n_now_I > bufLen_I) 00196 n_now_I = bufLen_I; 00197 else if (n_now_I < bufLen_I) 00198 n_start_I = bufLen_I - n_now_I; 00199 bcopy (buf, &(d_buffer[d_writeNdx_I]), n_now_I * sizeof (T)); 00200 if (n_start_I) { 00201 bcopy (&(buf[n_now_I]), d_buffer, n_start_I * sizeof (T)); 00202 d_writeNdx_I = n_start_I; 00203 } else 00204 d_writeNdx_I += n_now_I; 00205 d_n_avail_read_I += bufLen_I; 00206 d_n_avail_write_I -= bufLen_I; 00207 d_readBlock->signal (); 00208 d_internal->unlock (); 00209 return (retval); 00210 }; 00211 00212 /* 00213 * dequeue: removes from the queue the number of items requested, or 00214 * available, into the given buffer on a FIFO basis. 00215 * 00216 * inputs: 00217 * buf: a pointer to the buffer into which to copy the data 00218 * 00219 * bufLen_I: pointer to the number of items to remove in items 00220 * (of the instantiated type) 00221 * 00222 * returns: 00223 * 0: if nothing to do (0 length buffer) 00224 * 1: if success 00225 * 2: in the process of aborting, do doing nothing 00226 * 00227 * will throw runtime errors if inputs are improper: 00228 * buffer pointer is NULL 00229 * buffer length pointer is NULL 00230 * buffer length is larger than the instantiated buffer length 00231 */ 00232 00233 int dequeue (T* buf, UInt32* bufLen_I) { 00234 DEBUG (fprintf (stderr, "dequeue: buf = %X, *bufLen = %ld, #av_wr = %ld, " 00235 "#av_rd = %ld.\n", (unsigned int)buf, *bufLen_I, 00236 d_n_avail_write_I, d_n_avail_read_I)); 00237 if (!bufLen_I) 00238 throw std::runtime_error ("circular_buffer::dequeue(): " 00239 "input bufLen pointer is NULL.\n"); 00240 if (!buf) 00241 throw std::runtime_error ("circular_buffer::dequeue(): " 00242 "input buffer pointer is NULL.\n"); 00243 UInt32 l_bufLen_I = *bufLen_I; 00244 if (l_bufLen_I == 0) 00245 return (0); 00246 if (l_bufLen_I > d_bufLen_I) { 00247 fprintf (stderr, "cannot remove buffer longer (%ld" 00248 ") than instantiated length (%ld" 00249 ").\n", l_bufLen_I, d_bufLen_I); 00250 throw std::runtime_error ("circular_buffer::dequeue()"); 00251 } 00252 00253 d_internal->lock (); 00254 if (d_doAbort) { 00255 d_internal->unlock (); 00256 return (2); 00257 } 00258 if (d_doFullRead) { 00259 while (d_n_avail_read_I < l_bufLen_I) { 00260 DEBUG (fprintf (stderr, "dequeue: #a < #len, waiting.\n")); 00261 // wait will automatically unlock() the internal mutex 00262 d_readBlock->wait (); 00263 // and lock() it here. 00264 if (d_doAbort) { 00265 d_internal->unlock (); 00266 DEBUG (fprintf (stderr, "dequeue: #a < #len, aborting.\n")); 00267 return (2); 00268 } 00269 DEBUG (fprintf (stderr, "dequeue: #a < #len, done waiting.\n")); 00270 } 00271 } else { 00272 while (d_n_avail_read_I == 0) { 00273 DEBUG (fprintf (stderr, "dequeue: #a == 0, waiting.\n")); 00274 // wait will automatically unlock() the internal mutex 00275 d_readBlock->wait (); 00276 // and lock() it here. 00277 if (d_doAbort) { 00278 d_internal->unlock (); 00279 DEBUG (fprintf (stderr, "dequeue: #a == 0, aborting.\n")); 00280 return (2); 00281 } 00282 DEBUG (fprintf (stderr, "dequeue: #a == 0, done waiting.\n")); 00283 } 00284 } 00285 if (l_bufLen_I > d_n_avail_read_I) 00286 l_bufLen_I = d_n_avail_read_I; 00287 UInt32 n_now_I = d_bufLen_I - d_readNdx_I, n_start_I = 0; 00288 if (n_now_I > l_bufLen_I) 00289 n_now_I = l_bufLen_I; 00290 else if (n_now_I < l_bufLen_I) 00291 n_start_I = l_bufLen_I - n_now_I; 00292 bcopy (&(d_buffer[d_readNdx_I]), buf, n_now_I * sizeof (T)); 00293 if (n_start_I) { 00294 bcopy (d_buffer, &(buf[n_now_I]), n_start_I * sizeof (T)); 00295 d_readNdx_I = n_start_I; 00296 } else 00297 d_readNdx_I += n_now_I; 00298 *bufLen_I = l_bufLen_I; 00299 d_n_avail_read_I -= l_bufLen_I; 00300 d_n_avail_write_I += l_bufLen_I; 00301 d_writeBlock->signal (); 00302 d_internal->unlock (); 00303 return (1); 00304 }; 00305 00306 void abort () { 00307 d_internal->lock (); 00308 d_doAbort = true; 00309 d_writeBlock->signal (); 00310 d_readBlock->signal (); 00311 d_internal->unlock (); 00312 }; 00313 }; 00314 00315 #endif /* _CIRCULAR_BUFFER_H_ */