Class Stomp::Connection
In: lib/stomp/connection.rb
Parent: Object

Low level connection which maps commands and supports synchronous receives

Methods

__old_receive   abort   ack   begin   closed?   commit   disconnect   new   open   open?   poll   receive   send   socket   subscribe   unsubscribe  

Public Class methods

A new Connection object accepts the following parameters:

  login             (String,  default : '')
  passcode          (String,  default : '')
  host              (String,  default : 'localhost')
  port              (Integer, default : 61613)
  reliable          (Boolean, default : false)
  reconnect_delay   (Integer, default : 5)

  e.g. c = Client.new("username", "password", "localhost", 61613, true)

TODO Stomp URL :

  A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

  stomp://host:port
  stomp://host.domain.tld:port
  stomp://user:pass@host:port
  stomp://user:pass@host.domain.tld:port

[Source]

    # File lib/stomp/connection.rb, line 28
28:     def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
29:       @host = host
30:       @port = port
31:       @login = login
32:       @passcode = passcode
33:       @transmit_semaphore = Mutex.new
34:       @read_semaphore = Mutex.new
35:       @socket_semaphore = Mutex.new
36:       @reliable = reliable
37:       @reconnect_delay = reconnect_delay
38:       @connect_headers = connect_headers
39:       @closed = false
40:       @subscriptions = {}
41:       @failure = nil
42:       socket
43:     end

Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.

[Source]

    # File lib/stomp/connection.rb, line 46
46:     def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
47:       Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
48:     end

Public Instance methods

Receive a frame, block until the frame is received

[Source]

     # File lib/stomp/connection.rb, line 161
161:     def __old_receive
162:       # The recive my fail so we may need to retry.
163:       while TRUE
164:         begin
165:           s = socket
166:           return _receive(s)
167:         rescue
168:           @failure = $!;
169:           raise unless @reliable
170:           $stderr.print "receive failed: " + $!;
171:         end
172:       end
173:     end

Abort a transaction by name

[Source]

     # File lib/stomp/connection.rb, line 110
110:     def abort(name, headers = {})
111:       headers[:transaction] = name
112:       transmit("ABORT", headers)
113:     end

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/connection.rb, line 98
 98:     def ack(message_id, headers = {})
 99:       headers['message-id'] = message_id
100:       transmit("ACK", headers)
101:     end

Begin a transaction, requires a name for the transaction

[Source]

    # File lib/stomp/connection.rb, line 89
89:     def begin(name, headers = {})
90:       headers[:transaction] = name
91:       transmit("BEGIN", headers)
92:     end

Is this connection closed?

[Source]

    # File lib/stomp/connection.rb, line 84
84:     def closed?
85:       @closed
86:     end

Commit a transaction by name

[Source]

     # File lib/stomp/connection.rb, line 104
104:     def commit(name, headers = {})
105:       headers[:transaction] = name
106:       transmit("COMMIT", headers)
107:     end

Close this connection

[Source]

     # File lib/stomp/connection.rb, line 146
146:     def disconnect(headers = {})
147:       transmit("DISCONNECT", headers)
148:       @closed = true
149:     end

Is this connection open?

[Source]

    # File lib/stomp/connection.rb, line 79
79:     def open?
80:       !@closed
81:     end

Return a pending message if one is available, otherwise return nil

[Source]

     # File lib/stomp/connection.rb, line 153
153:     def poll
154:       @read_semaphore.synchronize do
155:         return nil if @socket.nil? || !@socket.ready?
156:         return receive
157:       end
158:     end

[Source]

     # File lib/stomp/connection.rb, line 175
175:     def receive
176:       super_result = __old_receive()
177:       if super_result.nil? && @reliable
178:         $stderr.print "connection.receive returning EOF as nil - resetting connection.\n"
179:         @socket = nil
180:         super_result = __old_receive()
181:       end
182:       return super_result
183:     end

Send message to destination

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/connection.rb, line 140
140:     def send(destination, message, headers = {})
141:       headers[:destination] = destination
142:       transmit("SEND", headers, message)
143:     end

[Source]

    # File lib/stomp/connection.rb, line 50
50:     def socket
51:       # Need to look into why the following synchronize does not work.
52:       #@read_semaphore.synchronize do
53:         s = @socket;
54:         while s.nil? || !@failure.nil?
55:           @failure = nil
56:           begin
57:             s = TCPSocket.open @host, @port
58:             headers = @connect_headers.clone
59:             headers[:login] = @login
60:             headers[:passcode] = @passcode
61:             _transmit(s, "CONNECT", headers)
62:             @connect = _receive(s)
63:             # replay any subscriptions.
64:             @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) }
65:           rescue
66:             @failure = $!;
67:             s=nil;
68:             raise unless @reliable
69:             $stderr.print "connect failed: " + $! +" will retry in #{@reconnect_delay}\n";
70:             sleep(@reconnect_delay);
71:           end
72:         end
73:         @socket = s
74:         return s;
75:       #end
76:     end

Subscribe to a destination, must specify a name

[Source]

     # File lib/stomp/connection.rb, line 116
116:     def subscribe(name, headers = {}, subId = nil)
117:       headers[:destination] = name
118:       transmit("SUBSCRIBE", headers)
119: 
120:       # Store the sub so that we can replay if we reconnect.
121:       if @reliable
122:         subId = name if subId.nil?
123:         @subscriptions[subId] = headers
124:       end
125:     end

Unsubscribe from a destination, must specify a name

[Source]

     # File lib/stomp/connection.rb, line 128
128:     def unsubscribe(name, headers = {}, subId = nil)
129:       headers[:destination] = name
130:       transmit("UNSUBSCRIBE", headers)
131:       if @reliable
132:         subId = name if subId.nil?
133:         @subscriptions.delete(subId)
134:       end
135:     end

[Validate]