pion-net  4.0.9
TCPStream.hpp
1 // ------------------------------------------------------------------
2 // pion-net: a C++ framework for building lightweight HTTP interfaces
3 // ------------------------------------------------------------------
4 // Copyright (C) 2007-2008 Atomic Labs, Inc. (http://www.atomiclabs.com)
5 //
6 // Distributed under the Boost Software License, Version 1.0.
7 // See http://www.boost.org/LICENSE_1_0.txt
8 //
9 
10 #ifndef __PION_TCPSTREAM_HEADER__
11 #define __PION_TCPSTREAM_HEADER__
12 
13 #include <cstring>
14 #include <istream>
15 #include <streambuf>
16 #include <boost/bind.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/condition.hpp>
19 #include <pion/PionConfig.hpp>
20 #include <pion/net/TCPConnection.hpp>
21 
22 
23 namespace pion { // begin namespace pion
24 namespace net { // begin namespace net (Pion Network Library)
25 
26 
33  : public std::basic_streambuf<char, std::char_traits<char> >
34 {
35 public:
36 
37  // data type definitions required for iostream compatability
38  typedef char char_type;
39  typedef std::char_traits<char>::int_type int_type;
40  typedef std::char_traits<char>::off_type off_type;
41  typedef std::char_traits<char>::pos_type pos_type;
42  typedef std::char_traits<char> traits_type;
43 
44  // some integer constants used within TCPStreamBuffer
45  enum {
46  PUT_BACK_MAX = 10, //< number of bytes that can be put back into the read buffer
47  WRITE_BUFFER_SIZE = 8192 //< size of the write buffer
48  };
49 
50 
56  explicit TCPStreamBuffer(TCPConnectionPtr& conn_ptr)
57  : m_conn_ptr(conn_ptr), m_read_buf(m_conn_ptr->getReadBuffer().c_array())
58  {
59  setupBuffers();
60  }
61 
68  explicit TCPStreamBuffer(boost::asio::io_service& io_service,
69  const bool ssl_flag = false)
70  : m_conn_ptr(new TCPConnection(io_service, ssl_flag)),
71  m_read_buf(m_conn_ptr->getReadBuffer().c_array())
72  {
73  setupBuffers();
74  }
75 
82  TCPStreamBuffer(boost::asio::io_service& io_service,
83  TCPConnection::SSLContext& ssl_context)
84  : m_conn_ptr(new TCPConnection(io_service, ssl_context)),
85  m_read_buf(m_conn_ptr->getReadBuffer().c_array())
86  {
87  setupBuffers();
88  }
89 
91  virtual ~TCPStreamBuffer() { sync(); }
92 
94  TCPConnection& getConnection(void) { return *m_conn_ptr; }
95 
97  const TCPConnection& getConnection(void) const { return *m_conn_ptr; }
98 
99 
100 protected:
101 
103  inline void setupBuffers(void) {
104  // use the TCP connection's read buffer and allow for bytes to be put back
105  setg(m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX, m_read_buf+PUT_BACK_MAX);
106  // set write buffer size-1 so that we have an extra char avail for overflow
107  setp(m_write_buf, m_write_buf+(WRITE_BUFFER_SIZE-1));
108  }
109 
115  inline int_type flushOutput(void) {
116  const std::streamsize bytes_to_send = std::streamsize(pptr() - pbase());
117  int_type bytes_sent = 0;
118  if (bytes_to_send > 0) {
119  boost::mutex::scoped_lock async_lock(m_async_mutex);
120  m_bytes_transferred = 0;
121  m_conn_ptr->async_write(boost::asio::buffer(pbase(), bytes_to_send),
122  boost::bind(&TCPStreamBuffer::operationFinished, this,
123  boost::asio::placeholders::error,
124  boost::asio::placeholders::bytes_transferred));
125  m_async_done.wait(async_lock);
126  bytes_sent = m_bytes_transferred;
127  pbump(-bytes_sent);
128  if (m_async_error)
129  bytes_sent = traits_type::eof();
130  }
131  return bytes_sent;
132  }
133 
139  virtual int_type underflow(void) {
140  // first check if we still have bytes available in the read buffer
141  if (gptr() < egptr())
142  return traits_type::to_int_type(*gptr());
143 
144  // calculate the number of bytes we will allow to be put back
145  std::streamsize put_back_num = std::streamsize(gptr() - eback());
146  if (put_back_num > PUT_BACK_MAX)
147  put_back_num = PUT_BACK_MAX;
148 
149  // copy the last bytes read to the beginning of the buffer (for put back)
150  if (put_back_num > 0)
151  memmove(m_read_buf+(PUT_BACK_MAX-put_back_num), gptr()-put_back_num, put_back_num);
152 
153  // read data from the TCP connection
154  // note that this has to be an ansynchronous call; otherwise, it cannot
155  // be cancelled by other threads and will block forever (such as during shutdown)
156  boost::mutex::scoped_lock async_lock(m_async_mutex);
157  m_bytes_transferred = 0;
158  m_conn_ptr->async_read_some(boost::asio::buffer(m_read_buf+PUT_BACK_MAX,
159  TCPConnection::READ_BUFFER_SIZE-PUT_BACK_MAX),
160  boost::bind(&TCPStreamBuffer::operationFinished, this,
161  boost::asio::placeholders::error,
162  boost::asio::placeholders::bytes_transferred));
163  m_async_done.wait(async_lock);
164  if (m_async_error)
165  return traits_type::eof();
166 
167  // reset buffer pointers now that data is available
168  setg(m_read_buf+(PUT_BACK_MAX-put_back_num), //< beginning of putback bytes
169  m_read_buf+PUT_BACK_MAX, //< read position
170  m_read_buf+PUT_BACK_MAX+m_bytes_transferred); //< end of buffer
171 
172  // return next character available
173  return traits_type::to_int_type(*gptr());
174  }
175 
182  virtual int_type overflow(int_type c) {
183  if (! traits_type::eq_int_type(c, traits_type::eof())) {
184  // character is not eof -> add it to the end of the write buffer
185  // we can push this to the back of the write buffer because we set
186  // the size of the write buffer to 1 less than the actual size using setp()
187  *pptr() = c;
188  pbump(1);
189  }
190  // flush data in the write buffer by sending it to the TCP connection
191  return ((flushOutput() == traits_type::eof())
192  ? traits_type::eof() : traits_type::not_eof(c));
193  }
194 
203  virtual std::streamsize xsputn(const char_type *s, std::streamsize n) {
204  const std::streamsize bytes_available = std::streamsize(epptr() - pptr());
205  std::streamsize bytes_sent = 0;
206  if (bytes_available >= n) {
207  // there is enough room in the buffer -> just put it in there
208  memcpy(pptr(), s, n);
209  pbump(n);
210  bytes_sent = n;
211  } else {
212  // there is not enough room left in the buffer
213  if (bytes_available > 0) {
214  // fill up the buffer
215  memcpy(pptr(), s, bytes_available);
216  pbump(bytes_available);
217  }
218  // flush data in the write buffer by sending it to the TCP connection
219  if (flushOutput() == traits_type::eof())
220  return 0;
221  if ((n-bytes_available) >= (WRITE_BUFFER_SIZE-1)) {
222  // the remaining data to send is larger than the buffer available
223  // send it all now rather than buffering
224  boost::mutex::scoped_lock async_lock(m_async_mutex);
225  m_bytes_transferred = 0;
226  m_conn_ptr->async_write(boost::asio::buffer(s+bytes_available,
227  n-bytes_available),
228  boost::bind(&TCPStreamBuffer::operationFinished, this,
229  boost::asio::placeholders::error,
230  boost::asio::placeholders::bytes_transferred));
231  m_async_done.wait(async_lock);
232  bytes_sent = bytes_available + m_bytes_transferred;
233  } else {
234  // the buffer is larger than the remaining data
235  // put remaining data to the beginning of the output buffer
236  memcpy(pbase(), s+bytes_available, n-bytes_available);
237  pbump(n-bytes_available);
238  bytes_sent = n;
239  }
240  }
241  return bytes_sent;
242  }
243 
252  virtual std::streamsize xsgetn(char_type *s, std::streamsize n) {
253  std::streamsize bytes_remaining = n;
254  while (bytes_remaining > 0) {
255  const std::streamsize bytes_available = std::streamsize(egptr() - gptr());
256  const std::streamsize bytes_next_read = ((bytes_available >= bytes_remaining)
257  ? bytes_remaining : bytes_available);
258  // copy available input data from buffer
259  if (bytes_next_read > 0) {
260  memcpy(s, gptr(), bytes_next_read);
261  gbump(bytes_next_read);
262  bytes_remaining -= bytes_next_read;
263  s += bytes_next_read;
264  }
265  if (bytes_remaining > 0) {
266  // call underflow() to read more data
267  if (traits_type::eq_int_type(underflow(), traits_type::eof()))
268  break;
269  }
270  }
271  return(n-bytes_remaining);
272  }
273 
279  virtual int_type sync(void) {
280  return ((flushOutput() == traits_type::eof()) ? -1 : 0);
281  }
282 
283 
284 private:
285 
287  inline void operationFinished(const boost::system::error_code& error_code,
288  std::size_t bytes_transferred)
289  {
290  boost::mutex::scoped_lock async_lock(m_async_mutex);
291  m_async_error = error_code;
292  m_bytes_transferred = bytes_transferred;
293  m_async_done.notify_one();
294  }
295 
296 
298  TCPConnectionPtr m_conn_ptr;
299 
301  boost::mutex m_async_mutex;
302 
304  boost::condition m_async_done;
305 
307  boost::system::error_code m_async_error;
308 
310  std::size_t m_bytes_transferred;
311 
313  char_type * m_read_buf;
314 
316  char_type m_write_buf[WRITE_BUFFER_SIZE];
317 };
318 
319 
324  : public std::basic_iostream<char, std::char_traits<char> >
325 {
326 public:
327 
328  // data type definitions required for iostream compatability
329  typedef char char_type;
330  typedef std::char_traits<char>::int_type int_type;
331  typedef std::char_traits<char>::off_type off_type;
332  typedef std::char_traits<char>::pos_type pos_type;
333  typedef std::char_traits<char> traits_type;
334 
335 
341  explicit TCPStream(TCPConnectionPtr& conn_ptr)
342  : m_tcp_buf(conn_ptr)
343 #ifdef _MSC_VER
344  , std::basic_iostream<char, std::char_traits<char> >(NULL)
345 #endif
346  {
347  // initialize basic_iostream with pointer to the stream buffer
348  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
349  }
350 
357  explicit TCPStream(boost::asio::io_service& io_service,
358  const bool ssl_flag = false)
359  : m_tcp_buf(io_service, ssl_flag)
360 #ifdef _MSC_VER
361  , std::basic_iostream<char, std::char_traits<char> >(NULL)
362 #endif
363  {
364  // initialize basic_iostream with pointer to the stream buffer
365  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
366  }
367 
374  TCPStream(boost::asio::io_service& io_service,
375  TCPConnection::SSLContext& ssl_context)
376  : m_tcp_buf(io_service, ssl_context)
377 #ifdef _MSC_VER
378  , std::basic_iostream<char, std::char_traits<char> >(NULL)
379 #endif
380  {
381  // initialize basic_iostream with pointer to the stream buffer
382  std::basic_ios<char,std::char_traits<char> >::init(&m_tcp_buf);
383  }
384 
393  inline boost::system::error_code accept(boost::asio::ip::tcp::acceptor& tcp_acceptor)
394  {
395  boost::system::error_code ec = m_tcp_buf.getConnection().accept(tcp_acceptor);
396  if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_server();
397  return ec;
398  }
399 
408  inline boost::system::error_code connect(boost::asio::ip::tcp::endpoint& tcp_endpoint)
409  {
410  boost::system::error_code ec = m_tcp_buf.getConnection().connect(tcp_endpoint);
411  if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_client();
412  return ec;
413  }
414 
424  inline boost::system::error_code connect(const boost::asio::ip::address& remote_addr,
425  const unsigned int remote_port)
426  {
427  boost::asio::ip::tcp::endpoint tcp_endpoint(remote_addr, remote_port);
428  boost::system::error_code ec = m_tcp_buf.getConnection().connect(tcp_endpoint);
429  if (! ec && getSSLFlag()) ec = m_tcp_buf.getConnection().handshake_client();
430  return ec;
431  }
432 
434  inline void close(void) { m_tcp_buf.getConnection().close(); }
435 
436  /*
437  Use close instead; basic_socket::cancel is deprecated for Windows XP.
438 
440  inline void cancel(void) { m_tcp_buf.getConnection().cancel(); }
441  */
442 
444  inline bool is_open(void) const { return m_tcp_buf.getConnection().is_open(); }
445 
447  inline bool getSSLFlag(void) const { return m_tcp_buf.getConnection().getSSLFlag(); }
448 
450  inline boost::asio::ip::address getRemoteIp(void) const {
451  return m_tcp_buf.getConnection().getRemoteIp();
452  }
453 
455  TCPStreamBuffer *rdbuf(void) { return &m_tcp_buf; }
456 
457 
458 private:
459 
461  TCPStreamBuffer m_tcp_buf;
462 };
463 
464 
465 } // end namespace net
466 } // end namespace pion
467 
468 #endif
TCPStream(boost::asio::io_service &io_service, const bool ssl_flag=false)
Definition: TCPStream.hpp:357
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
virtual int_type underflow(void)
Definition: TCPStream.hpp:139
TCPStreamBuffer(TCPConnectionPtr &conn_ptr)
Definition: TCPStream.hpp:56
boost::system::error_code handshake_client(void)
virtual int_type overflow(int_type c)
Definition: TCPStream.hpp:182
boost::asio::ip::address getRemoteIp(void) const
returns the client's IP address
bool is_open(void) const
returns true if the connection is currently open
Definition: TCPStream.hpp:444
TCPStreamBuffer * rdbuf(void)
returns a pointer to the stream buffer in use
Definition: TCPStream.hpp:455
virtual std::streamsize xsputn(const char_type *s, std::streamsize n)
Definition: TCPStream.hpp:203
TCPStream(TCPConnectionPtr &conn_ptr)
Definition: TCPStream.hpp:341
boost::asio::ip::address getRemoteIp(void) const
returns the client's IP address
Definition: TCPStream.hpp:450
const TCPConnection & getConnection(void) const
returns a const reference to the current TCP connection
Definition: TCPStream.hpp:97
boost::system::error_code connect(boost::asio::ip::tcp::endpoint &tcp_endpoint)
Definition: TCPStream.hpp:408
bool getSSLFlag(void) const
returns true if the connection is encrypted using SSL
void setupBuffers(void)
sets up the read and write buffers for input and output
Definition: TCPStream.hpp:103
TCPStreamBuffer(boost::asio::io_service &io_service, TCPConnection::SSLContext &ssl_context)
Definition: TCPStream.hpp:82
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
Definition: TCPStream.hpp:393
virtual std::streamsize xsgetn(char_type *s, std::streamsize n)
Definition: TCPStream.hpp:252
boost::system::error_code accept(boost::asio::ip::tcp::acceptor &tcp_acceptor)
virtual int_type sync(void)
Definition: TCPStream.hpp:279
int_type flushOutput(void)
Definition: TCPStream.hpp:115
void close(void)
closes the tcp socket and cancels any pending asynchronous operations
TCPStream(boost::asio::io_service &io_service, TCPConnection::SSLContext &ssl_context)
Definition: TCPStream.hpp:374
boost::system::error_code handshake_server(void)
bool getSSLFlag(void) const
returns true if the connection is encrypted using SSL
Definition: TCPStream.hpp:447
void close(void)
closes the tcp connection
Definition: TCPStream.hpp:434
TCPStreamBuffer(boost::asio::io_service &io_service, const bool ssl_flag=false)
Definition: TCPStream.hpp:68
bool is_open(void) const
returns true if the connection is currently open
TCPConnection & getConnection(void)
returns a reference to the current TCP connection
Definition: TCPStream.hpp:94
virtual ~TCPStreamBuffer()
virtual destructor flushes the write buffer
Definition: TCPStream.hpp:91
boost::system::error_code connect(const boost::asio::ip::address &remote_addr, const unsigned int remote_port)
Definition: TCPStream.hpp:424