PStreams
|
00001 /* $Id: pstream.h,v 1.112 2010/03/20 14:50:47 redi Exp $ 00002 PStreams - POSIX Process I/O for C++ 00003 Copyright (C) 2001,2002,2003,2004,2005,2006,2007,2008 Jonathan Wakely 00004 00005 This file is part of PStreams. 00006 00007 PStreams is free software; you can redistribute it and/or modify 00008 it under the terms of the GNU Lesser General Public License as published by 00009 the Free Software Foundation; either version 3 of the License, or 00010 (at your option) any later version. 00011 00012 PStreams is distributed in the hope that it will be useful, 00013 but WITHOUT ANY WARRANTY; without even the implied warranty of 00014 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00015 GNU Lesser General Public License for more details. 00016 00017 You should have received a copy of the GNU Lesser General Public License 00018 along with this program. If not, see <http://www.gnu.org/licenses/>. 00019 */ 00020 00030 #ifndef REDI_PSTREAM_H_SEEN 00031 #define REDI_PSTREAM_H_SEEN 00032 00033 #include <ios> 00034 #include <streambuf> 00035 #include <istream> 00036 #include <ostream> 00037 #include <string> 00038 #include <vector> 00039 #include <algorithm> // for min() 00040 #include <cerrno> // for errno 00041 #include <cstddef> // for size_t 00042 #include <cstdlib> // for exit() 00043 #include <sys/types.h> // for pid_t 00044 #include <sys/wait.h> // for waitpid() 00045 #include <sys/ioctl.h> // for ioctl() and FIONREAD 00046 #if defined(__sun) 00047 # include <sys/filio.h> // for FIONREAD on Solaris 2.5 00048 #endif 00049 #include <unistd.h> // for pipe() fork() exec() and filedes functions 00050 #include <signal.h> // for kill() 00051 #include <fcntl.h> // for fcntl() 00052 #if REDI_EVISCERATE_PSTREAMS 00053 # include <stdio.h> // for FILE, fdopen() 00054 #endif 00055 00056 00058 #define PSTREAMS_VERSION 0x0070 // 0.7.0 00059 00073 namespace redi 00074 { 00076 struct pstreams 00077 { 00079 typedef std::ios_base::openmode pmode; 00080 00082 typedef std::vector<std::string> argv_type; 00083 00085 typedef int fd_type; 00086 00087 static const pmode pstdin = std::ios_base::out; 00088 static const pmode pstdout = std::ios_base::in; 00089 static const pmode pstderr = std::ios_base::app; 00090 00091 protected: 00092 enum { bufsz = 32 }; 00093 enum { pbsz = 2 }; 00094 }; 00095 00097 template <typename CharT, typename Traits = std::char_traits<CharT> > 00098 class basic_pstreambuf 00099 : public std::basic_streambuf<CharT, Traits> 00100 , public pstreams 00101 { 00102 public: 00103 // Type definitions for dependent types 00104 typedef CharT char_type; 00105 typedef Traits traits_type; 00106 typedef typename traits_type::int_type int_type; 00107 typedef typename traits_type::off_type off_type; 00108 typedef typename traits_type::pos_type pos_type; 00110 typedef fd_type fd_t; 00111 00113 basic_pstreambuf(); 00114 00116 basic_pstreambuf(const std::string& command, pmode mode); 00117 00119 basic_pstreambuf( const std::string& file, 00120 const argv_type& argv, 00121 pmode mode ); 00122 00124 ~basic_pstreambuf(); 00125 00127 basic_pstreambuf* 00128 open(const std::string& command, pmode mode); 00129 00131 basic_pstreambuf* 00132 open(const std::string& file, const argv_type& argv, pmode mode); 00133 00135 basic_pstreambuf* 00136 close(); 00137 00139 basic_pstreambuf* 00140 kill(int signal = SIGTERM); 00141 00143 void 00144 peof(); 00145 00147 bool 00148 read_err(bool readerr = true); 00149 00151 bool 00152 is_open() const; 00153 00155 bool 00156 exited(); 00157 00158 #if REDI_EVISCERATE_PSTREAMS 00159 00160 std::size_t 00161 fopen(FILE*& in, FILE*& out, FILE*& err); 00162 #endif 00163 00165 int 00166 status() const; 00167 00169 int 00170 error() const; 00171 00172 protected: 00174 int_type 00175 overflow(int_type c); 00176 00178 int_type 00179 underflow(); 00180 00182 int_type 00183 pbackfail(int_type c = traits_type::eof()); 00184 00186 int 00187 sync(); 00188 00190 std::streamsize 00191 xsputn(const char_type* s, std::streamsize n); 00192 00194 std::streamsize 00195 write(const char_type* s, std::streamsize n); 00196 00198 std::streamsize 00199 read(char_type* s, std::streamsize n); 00200 00202 std::streamsize 00203 showmanyc(); 00204 00205 protected: 00207 enum buf_read_src { rsrc_out = 0, rsrc_err = 1 }; 00208 00210 pid_t 00211 fork(pmode mode); 00212 00214 int 00215 wait(bool nohang = false); 00216 00218 fd_type& 00219 wpipe(); 00220 00222 fd_type& 00223 rpipe(); 00224 00226 fd_type& 00227 rpipe(buf_read_src which); 00228 00229 void 00230 create_buffers(pmode mode); 00231 00232 void 00233 destroy_buffers(pmode mode); 00234 00236 bool 00237 empty_buffer(); 00238 00239 bool 00240 fill_buffer(bool non_blocking = false); 00241 00243 char_type* 00244 rbuffer(); 00245 00246 buf_read_src 00247 switch_read_buffer(buf_read_src); 00248 00249 private: 00250 basic_pstreambuf(const basic_pstreambuf&); 00251 basic_pstreambuf& operator=(const basic_pstreambuf&); 00252 00253 void 00254 init_rbuffers(); 00255 00256 pid_t ppid_; // pid of process 00257 fd_type wpipe_; // pipe used to write to process' stdin 00258 fd_type rpipe_[2]; // two pipes to read from, stdout and stderr 00259 char_type* wbuffer_; 00260 char_type* rbuffer_[2]; 00261 char_type* rbufstate_[3]; 00263 buf_read_src rsrc_; 00264 int status_; // hold exit status of child process 00265 int error_; // hold errno if fork() or exec() fails 00266 }; 00267 00269 template <typename CharT, typename Traits = std::char_traits<CharT> > 00270 class pstream_common 00271 : virtual public std::basic_ios<CharT, Traits> 00272 , virtual public pstreams 00273 { 00274 protected: 00275 typedef basic_pstreambuf<CharT, Traits> streambuf_type; 00276 00277 typedef pstreams::pmode pmode; 00278 typedef pstreams::argv_type argv_type; 00279 00281 pstream_common(); 00282 00284 pstream_common(const std::string& command, pmode mode); 00285 00287 pstream_common(const std::string& file, const argv_type& argv, pmode mode); 00288 00290 virtual 00291 ~pstream_common() = 0; 00292 00294 void 00295 do_open(const std::string& command, pmode mode); 00296 00298 void 00299 do_open(const std::string& file, const argv_type& argv, pmode mode); 00300 00301 public: 00303 void 00304 close(); 00305 00307 bool 00308 is_open() const; 00309 00311 const std::string& 00312 command() const; 00313 00315 streambuf_type* 00316 rdbuf() const; 00317 00318 #if REDI_EVISCERATE_PSTREAMS 00319 00320 std::size_t 00321 fopen(FILE*& in, FILE*& out, FILE*& err); 00322 #endif 00323 00324 protected: 00325 std::string command_; 00326 streambuf_type buf_; 00327 }; 00328 00329 00340 template <typename CharT, typename Traits = std::char_traits<CharT> > 00341 class basic_ipstream 00342 : public std::basic_istream<CharT, Traits> 00343 , public pstream_common<CharT, Traits> 00344 , virtual public pstreams 00345 { 00346 typedef std::basic_istream<CharT, Traits> istream_type; 00347 typedef pstream_common<CharT, Traits> pbase_type; 00348 00349 using pbase_type::buf_; // declare name in this scope 00350 00351 pmode readable(pmode mode) 00352 { 00353 if (!(mode & (pstdout|pstderr))) 00354 mode |= pstdout; 00355 return mode; 00356 } 00357 00358 public: 00360 typedef typename pbase_type::pmode pmode; 00361 00363 typedef typename pbase_type::argv_type argv_type; 00364 00366 basic_ipstream() 00367 : istream_type(NULL), pbase_type() 00368 { } 00369 00380 basic_ipstream(const std::string& command, pmode mode = pstdout) 00381 : istream_type(NULL), pbase_type(command, readable(mode)) 00382 { } 00383 00395 basic_ipstream( const std::string& file, 00396 const argv_type& argv, 00397 pmode mode = pstdout ) 00398 : istream_type(NULL), pbase_type(file, argv, readable(mode)) 00399 { } 00400 00406 ~basic_ipstream() 00407 { } 00408 00418 void 00419 open(const std::string& command, pmode mode = pstdout) 00420 { 00421 this->do_open(command, readable(mode)); 00422 } 00423 00434 void 00435 open( const std::string& file, 00436 const argv_type& argv, 00437 pmode mode = pstdout ) 00438 { 00439 this->do_open(file, argv, readable(mode)); 00440 } 00441 00446 basic_ipstream& 00447 out() 00448 { 00449 this->buf_.read_err(false); 00450 return *this; 00451 } 00452 00457 basic_ipstream& 00458 err() 00459 { 00460 this->buf_.read_err(true); 00461 return *this; 00462 } 00463 }; 00464 00465 00475 template <typename CharT, typename Traits = std::char_traits<CharT> > 00476 class basic_opstream 00477 : public std::basic_ostream<CharT, Traits> 00478 , public pstream_common<CharT, Traits> 00479 , virtual public pstreams 00480 { 00481 typedef std::basic_ostream<CharT, Traits> ostream_type; 00482 typedef pstream_common<CharT, Traits> pbase_type; 00483 00484 using pbase_type::buf_; // declare name in this scope 00485 00486 public: 00488 typedef typename pbase_type::pmode pmode; 00489 00491 typedef typename pbase_type::argv_type argv_type; 00492 00494 basic_opstream() 00495 : ostream_type(NULL), pbase_type() 00496 { } 00497 00508 basic_opstream(const std::string& command, pmode mode = pstdin) 00509 : ostream_type(NULL), pbase_type(command, mode|pstdin) 00510 { } 00511 00523 basic_opstream( const std::string& file, 00524 const argv_type& argv, 00525 pmode mode = pstdin ) 00526 : ostream_type(NULL), pbase_type(file, argv, mode|pstdin) 00527 { } 00528 00534 ~basic_opstream() { } 00535 00545 void 00546 open(const std::string& command, pmode mode = pstdin) 00547 { 00548 this->do_open(command, mode|pstdin); 00549 } 00550 00561 void 00562 open( const std::string& file, 00563 const argv_type& argv, 00564 pmode mode = pstdin) 00565 { 00566 this->do_open(file, argv, mode|pstdin); 00567 } 00568 }; 00569 00570 00584 template <typename CharT, typename Traits = std::char_traits<CharT> > 00585 class basic_pstream 00586 : public std::basic_iostream<CharT, Traits> 00587 , public pstream_common<CharT, Traits> 00588 , virtual public pstreams 00589 { 00590 typedef std::basic_iostream<CharT, Traits> iostream_type; 00591 typedef pstream_common<CharT, Traits> pbase_type; 00592 00593 using pbase_type::buf_; // declare name in this scope 00594 00595 public: 00597 typedef typename pbase_type::pmode pmode; 00598 00600 typedef typename pbase_type::argv_type argv_type; 00601 00603 basic_pstream() 00604 : iostream_type(NULL), pbase_type() 00605 { } 00606 00617 basic_pstream(const std::string& command, pmode mode = pstdout|pstdin) 00618 : iostream_type(NULL), pbase_type(command, mode) 00619 { } 00620 00632 basic_pstream( const std::string& file, 00633 const argv_type& argv, 00634 pmode mode = pstdout|pstdin ) 00635 : iostream_type(NULL), pbase_type(file, argv, mode) 00636 { } 00637 00643 ~basic_pstream() { } 00644 00654 void 00655 open(const std::string& command, pmode mode = pstdout|pstdin) 00656 { 00657 this->do_open(command, mode); 00658 } 00659 00670 void 00671 open( const std::string& file, 00672 const argv_type& argv, 00673 pmode mode = pstdout|pstdin ) 00674 { 00675 this->do_open(file, argv, mode); 00676 } 00677 00682 basic_pstream& 00683 out() 00684 { 00685 this->buf_.read_err(false); 00686 return *this; 00687 } 00688 00693 basic_pstream& 00694 err() 00695 { 00696 this->buf_.read_err(true); 00697 return *this; 00698 } 00699 }; 00700 00701 00723 template <typename CharT, typename Traits = std::char_traits<CharT> > 00724 class basic_rpstream 00725 : public std::basic_ostream<CharT, Traits> 00726 , private std::basic_istream<CharT, Traits> 00727 , private pstream_common<CharT, Traits> 00728 , virtual public pstreams 00729 { 00730 typedef std::basic_ostream<CharT, Traits> ostream_type; 00731 typedef std::basic_istream<CharT, Traits> istream_type; 00732 typedef pstream_common<CharT, Traits> pbase_type; 00733 00734 using pbase_type::buf_; // declare name in this scope 00735 00736 public: 00738 typedef typename pbase_type::pmode pmode; 00739 00741 typedef typename pbase_type::argv_type argv_type; 00742 00744 basic_rpstream() 00745 : ostream_type(NULL), istream_type(NULL), pbase_type() 00746 { } 00747 00758 basic_rpstream(const std::string& command, pmode mode = pstdout|pstdin) 00759 : ostream_type(NULL) , istream_type(NULL) , pbase_type(command, mode) 00760 { } 00761 00773 basic_rpstream( const std::string& file, 00774 const argv_type& argv, 00775 pmode mode = pstdout|pstdin ) 00776 : ostream_type(NULL), istream_type(NULL), pbase_type(file, argv, mode) 00777 { } 00778 00780 ~basic_rpstream() { } 00781 00791 void 00792 open(const std::string& command, pmode mode = pstdout|pstdin) 00793 { 00794 this->do_open(command, mode); 00795 } 00796 00807 void 00808 open( const std::string& file, 00809 const argv_type& argv, 00810 pmode mode = pstdout|pstdin ) 00811 { 00812 this->do_open(file, argv, mode); 00813 } 00814 00820 istream_type& 00821 out() 00822 { 00823 this->buf_.read_err(false); 00824 return *this; 00825 } 00826 00832 istream_type& 00833 err() 00834 { 00835 this->buf_.read_err(true); 00836 return *this; 00837 } 00838 }; 00839 00840 00842 typedef basic_pstreambuf<char> pstreambuf; 00844 typedef basic_ipstream<char> ipstream; 00846 typedef basic_opstream<char> opstream; 00848 typedef basic_pstream<char> pstream; 00850 typedef basic_rpstream<char> rpstream; 00851 00852 00865 template <typename C, typename T> 00866 inline std::basic_ostream<C,T>& 00867 peof(std::basic_ostream<C,T>& s) 00868 { 00869 typedef basic_pstreambuf<C,T> pstreambuf; 00870 if (pstreambuf* p = dynamic_cast<pstreambuf*>(s.rdbuf())) 00871 p->peof(); 00872 return s; 00873 } 00874 00875 00876 /* 00877 * member definitions for pstreambuf 00878 */ 00879 00880 00887 template <typename C, typename T> 00888 inline 00889 basic_pstreambuf<C,T>::basic_pstreambuf() 00890 : ppid_(-1) // initialise to -1 to indicate no process run yet. 00891 , wpipe_(-1) 00892 , wbuffer_(NULL) 00893 , rsrc_(rsrc_out) 00894 , status_(-1) 00895 , error_(0) 00896 { 00897 init_rbuffers(); 00898 } 00899 00908 template <typename C, typename T> 00909 inline 00910 basic_pstreambuf<C,T>::basic_pstreambuf(const std::string& command, pmode mode) 00911 : ppid_(-1) // initialise to -1 to indicate no process run yet. 00912 , wpipe_(-1) 00913 , wbuffer_(NULL) 00914 , rsrc_(rsrc_out) 00915 , status_(-1) 00916 , error_(0) 00917 { 00918 init_rbuffers(); 00919 open(command, mode); 00920 } 00921 00931 template <typename C, typename T> 00932 inline 00933 basic_pstreambuf<C,T>::basic_pstreambuf( const std::string& file, 00934 const argv_type& argv, 00935 pmode mode ) 00936 : ppid_(-1) // initialise to -1 to indicate no process run yet. 00937 , wpipe_(-1) 00938 , wbuffer_(NULL) 00939 , rsrc_(rsrc_out) 00940 , status_(-1) 00941 , error_(0) 00942 { 00943 init_rbuffers(); 00944 open(file, argv, mode); 00945 } 00946 00951 template <typename C, typename T> 00952 inline 00953 basic_pstreambuf<C,T>::~basic_pstreambuf() 00954 { 00955 close(); 00956 } 00957 00985 template <typename C, typename T> 00986 basic_pstreambuf<C,T>* 00987 basic_pstreambuf<C,T>::open(const std::string& command, pmode mode) 00988 { 00989 const char * shell_path = "/bin/sh"; 00990 #if 0 00991 const std::string argv[] = { "sh", "-c", command }; 00992 return this->open(shell_path, argv_type(argv, argv+3), mode); 00993 #else 00994 basic_pstreambuf<C,T>* ret = NULL; 00995 00996 if (!is_open()) 00997 { 00998 switch(fork(mode)) 00999 { 01000 case 0 : 01001 // this is the new process, exec command 01002 ::execl(shell_path, "sh", "-c", command.c_str(), (char*)NULL); 01003 01004 // can only reach this point if exec() failed 01005 01006 // parent can get exit code from waitpid() 01007 ::_exit(errno); 01008 // using std::exit() would make static dtors run twice 01009 01010 case -1 : 01011 // couldn't fork, error already handled in pstreambuf::fork() 01012 break; 01013 01014 default : 01015 // this is the parent process 01016 // activate buffers 01017 create_buffers(mode); 01018 ret = this; 01019 } 01020 } 01021 return ret; 01022 #endif 01023 } 01024 01033 inline void 01034 close_fd(pstreams::fd_type& fd) 01035 { 01036 if (fd >= 0 && ::close(fd) == 0) 01037 fd = -1; 01038 } 01039 01050 template <int N> 01051 inline void 01052 close_fd_array(pstreams::fd_type (&fds)[N]) 01053 { 01054 for (std::size_t i = 0; i < N; ++i) 01055 close_fd(fds[i]); 01056 } 01057 01087 template <typename C, typename T> 01088 basic_pstreambuf<C,T>* 01089 basic_pstreambuf<C,T>::open( const std::string& file, 01090 const argv_type& argv, 01091 pmode mode ) 01092 { 01093 basic_pstreambuf<C,T>* ret = NULL; 01094 01095 if (!is_open()) 01096 { 01097 // constants for read/write ends of pipe 01098 enum { RD, WR }; 01099 01100 // open another pipe and set close-on-exec 01101 fd_type ck_exec[] = { -1, -1 }; 01102 if (-1 == ::pipe(ck_exec) 01103 || -1 == ::fcntl(ck_exec[RD], F_SETFD, FD_CLOEXEC) 01104 || -1 == ::fcntl(ck_exec[WR], F_SETFD, FD_CLOEXEC)) 01105 { 01106 error_ = errno; 01107 close_fd_array(ck_exec); 01108 } 01109 else 01110 { 01111 switch(fork(mode)) 01112 { 01113 case 0 : 01114 // this is the new process, exec command 01115 { 01116 char** arg_v = new char*[argv.size()+1]; 01117 for (std::size_t i = 0; i < argv.size(); ++i) 01118 { 01119 const std::string& src = argv[i]; 01120 char*& dest = arg_v[i]; 01121 dest = new char[src.size()+1]; 01122 dest[ src.copy(dest, src.size()) ] = '\0'; 01123 } 01124 arg_v[argv.size()] = NULL; 01125 01126 ::execvp(file.c_str(), arg_v); 01127 01128 // can only reach this point if exec() failed 01129 01130 // parent can get error code from ck_exec pipe 01131 error_ = errno; 01132 01133 ::write(ck_exec[WR], &error_, sizeof(error_)); 01134 ::close(ck_exec[WR]); 01135 ::close(ck_exec[RD]); 01136 01137 ::_exit(error_); 01138 // using std::exit() would make static dtors run twice 01139 } 01140 01141 case -1 : 01142 // couldn't fork, error already handled in pstreambuf::fork() 01143 close_fd_array(ck_exec); 01144 break; 01145 01146 default : 01147 // this is the parent process 01148 01149 // check child called exec() successfully 01150 ::close(ck_exec[WR]); 01151 switch (::read(ck_exec[RD], &error_, sizeof(error_))) 01152 { 01153 case 0: 01154 // activate buffers 01155 create_buffers(mode); 01156 ret = this; 01157 break; 01158 case -1: 01159 error_ = errno; 01160 break; 01161 default: 01162 // error_ contains error code from child 01163 // call wait() to clean up and set ppid_ to 0 01164 this->wait(); 01165 break; 01166 } 01167 ::close(ck_exec[RD]); 01168 } 01169 } 01170 } 01171 return ret; 01172 } 01173 01190 template <typename C, typename T> 01191 pid_t 01192 basic_pstreambuf<C,T>::fork(pmode mode) 01193 { 01194 pid_t pid = -1; 01195 01196 // Three pairs of file descriptors, for pipes connected to the 01197 // process' stdin, stdout and stderr 01198 // (stored in a single array so close_fd_array() can close all at once) 01199 fd_type fd[] = { -1, -1, -1, -1, -1, -1 }; 01200 fd_type* const pin = fd; 01201 fd_type* const pout = fd+2; 01202 fd_type* const perr = fd+4; 01203 01204 // constants for read/write ends of pipe 01205 enum { RD, WR }; 01206 01207 // N.B. 01208 // For the pstreambuf pin is an output stream and 01209 // pout and perr are input streams. 01210 01211 if (!error_ && mode&pstdin && ::pipe(pin)) 01212 error_ = errno; 01213 01214 if (!error_ && mode&pstdout && ::pipe(pout)) 01215 error_ = errno; 01216 01217 if (!error_ && mode&pstderr && ::pipe(perr)) 01218 error_ = errno; 01219 01220 if (!error_) 01221 { 01222 pid = ::fork(); 01223 switch (pid) 01224 { 01225 case 0 : 01226 { 01227 // this is the new process 01228 01229 // for each open pipe close one end and redirect the 01230 // respective standard stream to the other end 01231 01232 if (*pin >= 0) 01233 { 01234 ::close(pin[WR]); 01235 ::dup2(pin[RD], STDIN_FILENO); 01236 ::close(pin[RD]); 01237 } 01238 if (*pout >= 0) 01239 { 01240 ::close(pout[RD]); 01241 ::dup2(pout[WR], STDOUT_FILENO); 01242 ::close(pout[WR]); 01243 } 01244 if (*perr >= 0) 01245 { 01246 ::close(perr[RD]); 01247 ::dup2(perr[WR], STDERR_FILENO); 01248 ::close(perr[WR]); 01249 } 01250 break; 01251 } 01252 case -1 : 01253 { 01254 // couldn't fork for some reason 01255 error_ = errno; 01256 // close any open pipes 01257 close_fd_array(fd); 01258 break; 01259 } 01260 default : 01261 { 01262 // this is the parent process, store process' pid 01263 ppid_ = pid; 01264 01265 // store one end of open pipes and close other end 01266 if (*pin >= 0) 01267 { 01268 wpipe_ = pin[WR]; 01269 ::close(pin[RD]); 01270 } 01271 if (*pout >= 0) 01272 { 01273 rpipe_[rsrc_out] = pout[RD]; 01274 ::close(pout[WR]); 01275 } 01276 if (*perr >= 0) 01277 { 01278 rpipe_[rsrc_err] = perr[RD]; 01279 ::close(perr[WR]); 01280 } 01281 } 01282 } 01283 } 01284 else 01285 { 01286 // close any pipes we opened before failure 01287 close_fd_array(fd); 01288 } 01289 return pid; 01290 } 01291 01301 template <typename C, typename T> 01302 basic_pstreambuf<C,T>* 01303 basic_pstreambuf<C,T>::close() 01304 { 01305 const bool running = is_open(); 01306 01307 sync(); // this might call wait() and reap the child process 01308 01309 // rather than trying to work out whether or not we need to clean up 01310 // just do it anyway, all cleanup functions are safe to call twice. 01311 01312 destroy_buffers(pstdin|pstdout|pstderr); 01313 01314 // close pipes before wait() so child gets EOF/SIGPIPE 01315 close_fd(wpipe_); 01316 close_fd_array(rpipe_); 01317 01318 do 01319 { 01320 error_ = 0; 01321 } while (wait() == -1 && error() == EINTR); 01322 01323 return running ? this : NULL; 01324 } 01325 01329 template <typename C, typename T> 01330 inline void 01331 basic_pstreambuf<C,T>::init_rbuffers() 01332 { 01333 rpipe_[rsrc_out] = rpipe_[rsrc_err] = -1; 01334 rbuffer_[rsrc_out] = rbuffer_[rsrc_err] = NULL; 01335 rbufstate_[0] = rbufstate_[1] = rbufstate_[2] = NULL; 01336 } 01337 01338 template <typename C, typename T> 01339 void 01340 basic_pstreambuf<C,T>::create_buffers(pmode mode) 01341 { 01342 if (mode & pstdin) 01343 { 01344 delete[] wbuffer_; 01345 wbuffer_ = new char_type[bufsz]; 01346 this->setp(wbuffer_, wbuffer_ + bufsz); 01347 } 01348 if (mode & pstdout) 01349 { 01350 delete[] rbuffer_[rsrc_out]; 01351 rbuffer_[rsrc_out] = new char_type[bufsz]; 01352 rsrc_ = rsrc_out; 01353 this->setg(rbuffer_[rsrc_out] + pbsz, rbuffer_[rsrc_out] + pbsz, 01354 rbuffer_[rsrc_out] + pbsz); 01355 } 01356 if (mode & pstderr) 01357 { 01358 delete[] rbuffer_[rsrc_err]; 01359 rbuffer_[rsrc_err] = new char_type[bufsz]; 01360 if (!(mode & pstdout)) 01361 { 01362 rsrc_ = rsrc_err; 01363 this->setg(rbuffer_[rsrc_err] + pbsz, rbuffer_[rsrc_err] + pbsz, 01364 rbuffer_[rsrc_err] + pbsz); 01365 } 01366 } 01367 } 01368 01369 template <typename C, typename T> 01370 void 01371 basic_pstreambuf<C,T>::destroy_buffers(pmode mode) 01372 { 01373 if (mode & pstdin) 01374 { 01375 this->setp(NULL, NULL); 01376 delete[] wbuffer_; 01377 wbuffer_ = NULL; 01378 } 01379 if (mode & pstdout) 01380 { 01381 if (rsrc_ == rsrc_out) 01382 this->setg(NULL, NULL, NULL); 01383 delete[] rbuffer_[rsrc_out]; 01384 rbuffer_[rsrc_out] = NULL; 01385 } 01386 if (mode & pstderr) 01387 { 01388 if (rsrc_ == rsrc_err) 01389 this->setg(NULL, NULL, NULL); 01390 delete[] rbuffer_[rsrc_err]; 01391 rbuffer_[rsrc_err] = NULL; 01392 } 01393 } 01394 01395 template <typename C, typename T> 01396 typename basic_pstreambuf<C,T>::buf_read_src 01397 basic_pstreambuf<C,T>::switch_read_buffer(buf_read_src src) 01398 { 01399 if (rsrc_ != src) 01400 { 01401 char_type* tmpbufstate[] = {this->eback(), this->gptr(), this->egptr()}; 01402 this->setg(rbufstate_[0], rbufstate_[1], rbufstate_[2]); 01403 for (std::size_t i = 0; i < 3; ++i) 01404 rbufstate_[i] = tmpbufstate[i]; 01405 rsrc_ = src; 01406 } 01407 return rsrc_; 01408 } 01409 01426 template <typename C, typename T> 01427 int 01428 basic_pstreambuf<C,T>::wait(bool nohang) 01429 { 01430 int exited = -1; 01431 if (is_open()) 01432 { 01433 int status; 01434 switch(::waitpid(ppid_, &status, nohang ? WNOHANG : 0)) 01435 { 01436 case 0 : 01437 // nohang was true and process has not exited 01438 exited = 0; 01439 break; 01440 case -1 : 01441 error_ = errno; 01442 break; 01443 default : 01444 // process has exited 01445 ppid_ = 0; 01446 status_ = status; 01447 exited = 1; 01448 // Close wpipe, would get SIGPIPE if we used it. 01449 destroy_buffers(pstdin); 01450 close_fd(wpipe_); 01451 // Must free read buffers and pipes on destruction 01452 // or next call to open()/close() 01453 break; 01454 } 01455 } 01456 return exited; 01457 } 01458 01469 template <typename C, typename T> 01470 inline basic_pstreambuf<C,T>* 01471 basic_pstreambuf<C,T>::kill(int signal) 01472 { 01473 basic_pstreambuf<C,T>* ret = NULL; 01474 if (is_open()) 01475 { 01476 if (::kill(ppid_, signal)) 01477 error_ = errno; 01478 else 01479 { 01480 #if 0 01481 // TODO call exited() to check for exit and clean up? leave to user? 01482 if (signal==SIGTERM || signal==SIGKILL) 01483 this->exited(); 01484 #endif 01485 ret = this; 01486 } 01487 } 01488 return ret; 01489 } 01490 01498 template <typename C, typename T> 01499 inline bool 01500 basic_pstreambuf<C,T>::exited() 01501 { 01502 return ppid_ == 0 || wait(true)==1; 01503 } 01504 01505 01511 template <typename C, typename T> 01512 inline int 01513 basic_pstreambuf<C,T>::status() const 01514 { 01515 return status_; 01516 } 01517 01521 template <typename C, typename T> 01522 inline int 01523 basic_pstreambuf<C,T>::error() const 01524 { 01525 return error_; 01526 } 01527 01532 template <typename C, typename T> 01533 inline void 01534 basic_pstreambuf<C,T>::peof() 01535 { 01536 sync(); 01537 destroy_buffers(pstdin); 01538 close_fd(wpipe_); 01539 } 01540 01551 template <typename C, typename T> 01552 inline bool 01553 basic_pstreambuf<C,T>::is_open() const 01554 { 01555 return ppid_ > 0; 01556 } 01557 01566 template <typename C, typename T> 01567 inline bool 01568 basic_pstreambuf<C,T>::read_err(bool readerr) 01569 { 01570 buf_read_src src = readerr ? rsrc_err : rsrc_out; 01571 if (rpipe_[src]>=0) 01572 { 01573 switch_read_buffer(src); 01574 return true; 01575 } 01576 return false; 01577 } 01578 01589 template <typename C, typename T> 01590 typename basic_pstreambuf<C,T>::int_type 01591 basic_pstreambuf<C,T>::overflow(int_type c) 01592 { 01593 if (!empty_buffer()) 01594 return traits_type::eof(); 01595 else if (!traits_type::eq_int_type(c, traits_type::eof())) 01596 return this->sputc(c); 01597 else 01598 return traits_type::not_eof(c); 01599 } 01600 01601 01602 template <typename C, typename T> 01603 int 01604 basic_pstreambuf<C,T>::sync() 01605 { 01606 return !exited() && empty_buffer() ? 0 : -1; 01607 } 01608 01614 template <typename C, typename T> 01615 std::streamsize 01616 basic_pstreambuf<C,T>::xsputn(const char_type* s, std::streamsize n) 01617 { 01618 if (n < this->epptr() - this->pptr()) 01619 { 01620 traits_type::copy(this->pptr(), s, n); 01621 this->pbump(n); 01622 return n; 01623 } 01624 else 01625 { 01626 for (std::streamsize i = 0; i < n; ++i) 01627 { 01628 if (traits_type::eq_int_type(this->sputc(s[i]), traits_type::eof())) 01629 return i; 01630 } 01631 return n; 01632 } 01633 } 01634 01638 template <typename C, typename T> 01639 bool 01640 basic_pstreambuf<C,T>::empty_buffer() 01641 { 01642 const std::streamsize count = this->pptr() - this->pbase(); 01643 if (count > 0) 01644 { 01645 const std::streamsize written = this->write(this->wbuffer_, count); 01646 if (written > 0) 01647 { 01648 if (const std::streamsize unwritten = count - written) 01649 traits_type::move(this->pbase(), this->pbase()+written, unwritten); 01650 this->pbump(-written); 01651 return true; 01652 } 01653 } 01654 return false; 01655 } 01656 01664 template <typename C, typename T> 01665 typename basic_pstreambuf<C,T>::int_type 01666 basic_pstreambuf<C,T>::underflow() 01667 { 01668 if (this->gptr() < this->egptr() || fill_buffer()) 01669 return traits_type::to_int_type(*this->gptr()); 01670 else 01671 return traits_type::eof(); 01672 } 01673 01682 template <typename C, typename T> 01683 typename basic_pstreambuf<C,T>::int_type 01684 basic_pstreambuf<C,T>::pbackfail(int_type c) 01685 { 01686 if (this->gptr() != this->eback()) 01687 { 01688 this->gbump(-1); 01689 if (!traits_type::eq_int_type(c, traits_type::eof())) 01690 *this->gptr() = traits_type::to_char_type(c); 01691 return traits_type::not_eof(c); 01692 } 01693 else 01694 return traits_type::eof(); 01695 } 01696 01697 template <typename C, typename T> 01698 std::streamsize 01699 basic_pstreambuf<C,T>::showmanyc() 01700 { 01701 int avail = 0; 01702 if (sizeof(char_type) == 1) 01703 avail = fill_buffer(true) ? this->egptr() - this->gptr() : -1; 01704 #ifdef FIONREAD 01705 else 01706 { 01707 if (::ioctl(rpipe(), FIONREAD, &avail) == -1) 01708 avail = -1; 01709 else if (avail) 01710 avail /= sizeof(char_type); 01711 } 01712 #endif 01713 return std::streamsize(avail); 01714 } 01715 01719 template <typename C, typename T> 01720 bool 01721 basic_pstreambuf<C,T>::fill_buffer(bool non_blocking) 01722 { 01723 const std::streamsize pb1 = this->gptr() - this->eback(); 01724 const std::streamsize pb2 = pbsz; 01725 const std::streamsize npb = std::min(pb1, pb2); 01726 01727 char_type* const rbuf = rbuffer(); 01728 01729 traits_type::move(rbuf + pbsz - npb, this->gptr() - npb, npb); 01730 01731 std::streamsize rc = -1; 01732 01733 if (non_blocking) 01734 { 01735 const int flags = ::fcntl(rpipe(), F_GETFL); 01736 if (flags != -1) 01737 { 01738 const bool blocking = !(flags & O_NONBLOCK); 01739 if (blocking) 01740 ::fcntl(rpipe(), F_SETFL, flags | O_NONBLOCK); // set non-blocking 01741 01742 error_ = 0; 01743 rc = read(rbuf + pbsz, bufsz - pbsz); 01744 01745 if (rc == -1 && error_ == EAGAIN) // nothing available 01746 rc = 0; 01747 else if (rc == 0) // EOF 01748 rc = -1; 01749 01750 if (blocking) 01751 ::fcntl(rpipe(), F_SETFL, flags); // restore 01752 } 01753 } 01754 else 01755 rc = read(rbuf + pbsz, bufsz - pbsz); 01756 01757 if (rc > 0 || (rc == 0 && non_blocking)) 01758 { 01759 this->setg( rbuf + pbsz - npb, 01760 rbuf + pbsz, 01761 rbuf + pbsz + rc ); 01762 return true; 01763 } 01764 else 01765 { 01766 this->setg(NULL, NULL, NULL); 01767 return false; 01768 } 01769 } 01770 01778 template <typename C, typename T> 01779 inline std::streamsize 01780 basic_pstreambuf<C,T>::write(const char_type* s, std::streamsize n) 01781 { 01782 std::streamsize nwritten = 0; 01783 if (wpipe() >= 0) 01784 { 01785 nwritten = ::write(wpipe(), s, n * sizeof(char_type)); 01786 if (nwritten == -1) 01787 error_ = errno; 01788 else 01789 nwritten /= sizeof(char_type); 01790 } 01791 return nwritten; 01792 } 01793 01801 template <typename C, typename T> 01802 inline std::streamsize 01803 basic_pstreambuf<C,T>::read(char_type* s, std::streamsize n) 01804 { 01805 std::streamsize nread = 0; 01806 if (rpipe() >= 0) 01807 { 01808 nread = ::read(rpipe(), s, n * sizeof(char_type)); 01809 if (nread == -1) 01810 error_ = errno; 01811 else 01812 nread /= sizeof(char_type); 01813 } 01814 return nread; 01815 } 01816 01818 template <typename C, typename T> 01819 inline typename basic_pstreambuf<C,T>::fd_type& 01820 basic_pstreambuf<C,T>::wpipe() 01821 { 01822 return wpipe_; 01823 } 01824 01826 template <typename C, typename T> 01827 inline typename basic_pstreambuf<C,T>::fd_type& 01828 basic_pstreambuf<C,T>::rpipe() 01829 { 01830 return rpipe_[rsrc_]; 01831 } 01832 01834 template <typename C, typename T> 01835 inline typename basic_pstreambuf<C,T>::fd_type& 01836 basic_pstreambuf<C,T>::rpipe(buf_read_src which) 01837 { 01838 return rpipe_[which]; 01839 } 01840 01842 template <typename C, typename T> 01843 inline typename basic_pstreambuf<C,T>::char_type* 01844 basic_pstreambuf<C,T>::rbuffer() 01845 { 01846 return rbuffer_[rsrc_]; 01847 } 01848 01849 01850 /* 01851 * member definitions for pstream_common 01852 */ 01853 01863 template <typename C, typename T> 01864 inline 01865 pstream_common<C,T>::pstream_common() 01866 : std::basic_ios<C,T>(NULL) 01867 , command_() 01868 , buf_() 01869 { 01870 this->init(&buf_); 01871 } 01872 01881 template <typename C, typename T> 01882 inline 01883 pstream_common<C,T>::pstream_common(const std::string& command, pmode mode) 01884 : std::basic_ios<C,T>(NULL) 01885 , command_(command) 01886 , buf_() 01887 { 01888 this->init(&buf_); 01889 do_open(command, mode); 01890 } 01891 01901 template <typename C, typename T> 01902 inline 01903 pstream_common<C,T>::pstream_common( const std::string& file, 01904 const argv_type& argv, 01905 pmode mode ) 01906 : std::basic_ios<C,T>(NULL) 01907 , command_(file) 01908 , buf_() 01909 { 01910 this->init(&buf_); 01911 do_open(file, argv, mode); 01912 } 01913 01923 template <typename C, typename T> 01924 inline 01925 pstream_common<C,T>::~pstream_common() 01926 { 01927 } 01928 01937 template <typename C, typename T> 01938 inline void 01939 pstream_common<C,T>::do_open(const std::string& command, pmode mode) 01940 { 01941 if (!buf_.open((command_=command), mode)) 01942 this->setstate(std::ios_base::failbit); 01943 } 01944 01954 template <typename C, typename T> 01955 inline void 01956 pstream_common<C,T>::do_open( const std::string& file, 01957 const argv_type& argv, 01958 pmode mode ) 01959 { 01960 if (!buf_.open((command_=file), argv, mode)) 01961 this->setstate(std::ios_base::failbit); 01962 } 01963 01965 template <typename C, typename T> 01966 inline void 01967 pstream_common<C,T>::close() 01968 { 01969 if (!buf_.close()) 01970 this->setstate(std::ios_base::failbit); 01971 } 01972 01977 template <typename C, typename T> 01978 inline bool 01979 pstream_common<C,T>::is_open() const 01980 { 01981 return buf_.is_open(); 01982 } 01983 01985 template <typename C, typename T> 01986 inline const std::string& 01987 pstream_common<C,T>::command() const 01988 { 01989 return command_; 01990 } 01991 01993 // TODO document behaviour if buffer replaced. 01994 template <typename C, typename T> 01995 inline typename pstream_common<C,T>::streambuf_type* 01996 pstream_common<C,T>::rdbuf() const 01997 { 01998 return const_cast<streambuf_type*>(&buf_); 01999 } 02000 02001 02002 #if REDI_EVISCERATE_PSTREAMS 02003 02035 template <typename C, typename T> 02036 std::size_t 02037 basic_pstreambuf<C,T>::fopen(FILE*& in, FILE*& out, FILE*& err) 02038 { 02039 in = out = err = NULL; 02040 std::size_t open_files = 0; 02041 if (wpipe() > -1) 02042 { 02043 if ((in = ::fdopen(wpipe(), "w"))) 02044 { 02045 open_files |= pstdin; 02046 } 02047 } 02048 if (rpipe(rsrc_out) > -1) 02049 { 02050 if ((out = ::fdopen(rpipe(rsrc_out), "r"))) 02051 { 02052 open_files |= pstdout; 02053 } 02054 } 02055 if (rpipe(rsrc_err) > -1) 02056 { 02057 if ((err = ::fdopen(rpipe(rsrc_err), "r"))) 02058 { 02059 open_files |= pstderr; 02060 } 02061 } 02062 return open_files; 02063 } 02064 02075 template <typename C, typename T> 02076 inline std::size_t 02077 pstream_common<C,T>::fopen(FILE*& fin, FILE*& fout, FILE*& ferr) 02078 { 02079 return buf_.fopen(fin, fout, ferr); 02080 } 02081 02082 #endif // REDI_EVISCERATE_PSTREAMS 02083 02084 02085 } // namespace redi 02086 02092 #endif // REDI_PSTREAM_H_SEEN 02093 02094 // vim: ts=2 sw=2 expandtab 02095