00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef Rdma_Acceptor_h
00022 #define Rdma_Acceptor_h
00023
00024 #include "rdma_wrap.h"
00025
00026 #include "qpid/sys/AtomicValue.h"
00027 #include "qpid/sys/Dispatcher.h"
00028 #include "qpid/sys/Mutex.h"
00029
00030 #include <netinet/in.h>
00031
00032 #include <boost/function.hpp>
00033 #include <boost/ptr_container/ptr_deque.hpp>
00034 #include <deque>
00035
00036 namespace Rdma {
00037
00038 class Connection;
00039
00040 class AsynchIO
00041 {
00042 typedef boost::function1<void, AsynchIO&> ErrorCallback;
00043 typedef boost::function2<void, AsynchIO&, Buffer*> ReadCallback;
00044 typedef boost::function1<void, AsynchIO&> IdleCallback;
00045 typedef boost::function2<void, AsynchIO&, Buffer*> FullCallback;
00046
00047 QueuePair::intrusive_ptr qp;
00048 qpid::sys::DispatchHandleRef dataHandle;
00049 int bufferSize;
00050 int recvCredit;
00051 int xmitCredit;
00052 int recvBufferCount;
00053 int xmitBufferCount;
00054 int outstandingWrites;
00055 bool closed;
00056 bool deleting;
00057 enum State { IDLE, DATA, PENDING_DATA, NOTIFY_WRITE, PENDING_NOTIFY, DELETED };
00058 qpid::sys::AtomicValue<State> state;
00059
00060 std::deque<Buffer*> bufferQueue;
00061 qpid::sys::Mutex bufferQueueLock;
00062 boost::ptr_deque<Buffer> buffers;
00063
00064 ReadCallback readCallback;
00065 IdleCallback idleCallback;
00066 FullCallback fullCallback;
00067 ErrorCallback errorCallback;
00068
00069 public:
00070
00071
00072
00073 AsynchIO(
00074 QueuePair::intrusive_ptr q,
00075 int size,
00076 int xCredit,
00077 int rCount,
00078 ReadCallback rc,
00079 IdleCallback ic,
00080 FullCallback fc,
00081 ErrorCallback ec
00082 );
00083
00084 void start(qpid::sys::Poller::shared_ptr poller);
00085 bool writable() const;
00086 bool bufferAvailable() const;
00087 void queueWrite(Buffer* buff);
00088 void notifyPendingWrite();
00089 void queueWriteClose();
00090 void deferDelete();
00091 int incompletedWrites() const;
00092 Buffer* getBuffer();
00093 void returnBuffer(Buffer*);
00094
00095 private:
00096
00097 ~AsynchIO();
00098
00099
00100
00101
00102 const static int FlagsMask = 0x10000000;
00103 const static int IgnoreData = 0x10000000;
00104
00105 void dataEvent(qpid::sys::DispatchHandle& handle);
00106 void processCompletions();
00107 void doWriteCallback();
00108 };
00109
00110 inline bool AsynchIO::writable() const {
00111 return (!closed && outstandingWrites < xmitBufferCount && xmitCredit > 0);
00112 }
00113
00114 inline int AsynchIO::incompletedWrites() const {
00115 return outstandingWrites;
00116 }
00117
00118 inline bool AsynchIO::bufferAvailable() const {
00119 return !bufferQueue.empty();
00120 }
00121
00122
00123
00124 struct ConnectionParams {
00125 int maxRecvBufferSize;
00126 int initialXmitCredit ;
00127
00128 ConnectionParams(int s, int c) :
00129 maxRecvBufferSize(s),
00130 initialXmitCredit(c)
00131 {}
00132 };
00133
00134 enum ErrorType {
00135 ADDR_ERROR,
00136 ROUTE_ERROR,
00137 CONNECT_ERROR,
00138 UNREACHABLE,
00139 UNKNOWN
00140 };
00141
00142 typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, ErrorType> ErrorCallback;
00143 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> DisconnectedCallback;
00144
00145 class ConnectionManager {
00146 Connection::intrusive_ptr ci;
00147 qpid::sys::DispatchHandle handle;
00148
00149 protected:
00150 ErrorCallback errorCallback;
00151 DisconnectedCallback disconnectedCallback;
00152
00153 public:
00154 ConnectionManager(
00155 ErrorCallback errc,
00156 DisconnectedCallback dc
00157 );
00158
00159 virtual ~ConnectionManager() {}
00160
00161 void start(qpid::sys::Poller::shared_ptr poller);
00162
00163 private:
00164 void event(qpid::sys::DispatchHandle& handle);
00165
00166 virtual void startConnection(Connection::intrusive_ptr ci) = 0;
00167 virtual void connectionEvent(Connection::intrusive_ptr ci) = 0;
00168 };
00169
00170 typedef boost::function2<bool, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectionRequestCallback;
00171 typedef boost::function1<void, Rdma::Connection::intrusive_ptr&> EstablishedCallback;
00172
00173 class Listener : public ConnectionManager
00174 {
00175 sockaddr src_addr;
00176 ConnectionParams checkConnectionParams;
00177 ConnectionRequestCallback connectionRequestCallback;
00178 EstablishedCallback establishedCallback;
00179
00180 public:
00181 Listener(
00182 const sockaddr& src,
00183 const ConnectionParams& cp,
00184 EstablishedCallback ec,
00185 ErrorCallback errc,
00186 DisconnectedCallback dc,
00187 ConnectionRequestCallback crc = 0
00188 );
00189
00190 private:
00191 void startConnection(Connection::intrusive_ptr ci);
00192 void connectionEvent(Connection::intrusive_ptr ci);
00193 };
00194
00195 typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> RejectedCallback;
00196 typedef boost::function2<void, Rdma::Connection::intrusive_ptr&, const ConnectionParams&> ConnectedCallback;
00197
00198 class Connector : public ConnectionManager
00199 {
00200 sockaddr dst_addr;
00201 ConnectionParams connectionParams;
00202 RejectedCallback rejectedCallback;
00203 ConnectedCallback connectedCallback;
00204
00205 public:
00206 Connector(
00207 const sockaddr& dst,
00208 const ConnectionParams& cp,
00209 ConnectedCallback cc,
00210 ErrorCallback errc,
00211 DisconnectedCallback dc,
00212 RejectedCallback rc = 0
00213 );
00214
00215 private:
00216 void startConnection(Connection::intrusive_ptr ci);
00217 void connectionEvent(Connection::intrusive_ptr ci);
00218 };
00219 }
00220
00221 #endif // Rdma_Acceptor_h