00001 #ifndef QPID_CLUSTER_CONNECTION_H
00002 #define QPID_CLUSTER_CONNECTION_H
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025 #include "types.h"
00026 #include "WriteEstimate.h"
00027 #include "OutputInterceptor.h"
00028 #include "NoOpConnectionOutputHandler.h"
00029
00030 #include "qpid/broker/Connection.h"
00031 #include "qpid/amqp_0_10/Connection.h"
00032 #include "qpid/sys/ConnectionInputHandler.h"
00033 #include "qpid/sys/ConnectionOutputHandler.h"
00034 #include "qpid/framing/FrameDecoder.h"
00035 #include "qpid/framing/SequenceNumber.h"
00036
00037 #include <iosfwd>
00038
00039 namespace qpid {
00040
00041 namespace framing { class AMQFrame; }
00042
00043 namespace cluster {
00044 class Cluster;
00045
00047 class Connection :
00048 public RefCounted,
00049 public sys::ConnectionInputHandler,
00050 public framing::AMQP_AllOperations::ClusterConnectionHandler
00051
00052 {
00053 public:
00055 Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
00057 Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, ConnectionId);
00058 ~Connection();
00059
00060 ConnectionId getId() const { return self; }
00061 broker::Connection& getBrokerConnection() { return connection; }
00062
00064 bool isLocal() const;
00065
00067 bool isShadow() const;
00068
00070 bool isCatchUp() const { return catchUp; }
00071
00073 bool isDumped() const;
00074
00075 Cluster& getCluster() { return cluster; }
00076
00077
00078 void received(framing::AMQFrame&);
00079 void closed();
00080 bool doOutput();
00081 bool hasOutput() { return connection.hasOutput(); }
00082 void idleOut() { connection.idleOut(); }
00083 void idleIn() { connection.idleIn(); }
00084
00085
00086 size_t decode(const char* buffer, size_t size);
00087
00088
00089 void deliverBuffer(framing::Buffer&);
00090 void delivered(framing::AMQFrame&);
00091
00092
00093
00094
00095 void sessionState(const SequenceNumber& replayStart,
00096 const SequenceNumber& sendCommandPoint,
00097 const SequenceSet& sentIncomplete,
00098 const SequenceNumber& expected,
00099 const SequenceNumber& received,
00100 const SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
00101
00102 void shadowReady(uint64_t memberId, uint64_t connectionId);
00103
00104 void membership(const framing::FieldTable&, const framing::FieldTable&);
00105
00106 private:
00107 bool catcUp;
00108
00109 void deliverClose();
00110 void deliverDoOutput(uint32_t requested);
00111 void sendDoOutput();
00112
00113 static NoOpConnectionOutputHandler discardHandler;
00114
00115 Cluster& cluster;
00116 ConnectionId self;
00117 bool catchUp;
00118 WriteEstimate writeEstimate;
00119 OutputInterceptor output;
00120 framing::FrameDecoder localDecoder;
00121 framing::FrameDecoder mcastDecoder;
00122 broker::Connection connection;
00123 framing::SequenceNumber mcastSeq;
00124 framing::SequenceNumber deliverSeq;
00125 framing::ChannelId currentChannel;
00126
00127 friend std::ostream& operator<<(std::ostream&, const Connection&);
00128 };
00129
00130 }}
00131
00132 #endif