Class | Stomp::Connection |
In: |
lib/stomp/connection.rb
|
Parent: | Object |
Low level connection which maps commands and supports synchronous receives
A new Connection object accepts the following parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) reconnect_delay (Integer, default : 5) e.g. c = Client.new("username", "password", "localhost", 61613, true)
TODO Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://user:pass@host:port stomp://user:pass@host.domain.tld:port
# File lib/stomp/connection.rb, line 28 28: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) 29: @host = host 30: @port = port 31: @login = login 32: @passcode = passcode 33: @transmit_semaphore = Mutex.new 34: @read_semaphore = Mutex.new 35: @socket_semaphore = Mutex.new 36: @reliable = reliable 37: @reconnect_delay = reconnect_delay 38: @connect_headers = connect_headers 39: @closed = false 40: @subscriptions = {} 41: @failure = nil 42: socket 43: end
Syntactic sugar for ‘Connection.new’ See ‘initialize’ for usage.
# File lib/stomp/connection.rb, line 46 46: def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {}) 47: Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers) 48: end
Receive a frame, block until the frame is received
# File lib/stomp/connection.rb, line 161 161: def __old_receive 162: # The recive my fail so we may need to retry. 163: while TRUE 164: begin 165: s = socket 166: return _receive(s) 167: rescue 168: @failure = $!; 169: raise unless @reliable 170: $stderr.print "receive failed: " + $!; 171: end 172: end 173: end
Abort a transaction by name
# File lib/stomp/connection.rb, line 110 110: def abort(name, headers = {}) 111: headers[:transaction] = name 112: transmit("ABORT", headers) 113: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 98 98: def ack(message_id, headers = {}) 99: headers['message-id'] = message_id 100: transmit("ACK", headers) 101: end
Begin a transaction, requires a name for the transaction
# File lib/stomp/connection.rb, line 89 89: def begin(name, headers = {}) 90: headers[:transaction] = name 91: transmit("BEGIN", headers) 92: end
Commit a transaction by name
# File lib/stomp/connection.rb, line 104 104: def commit(name, headers = {}) 105: headers[:transaction] = name 106: transmit("COMMIT", headers) 107: end
Close this connection
# File lib/stomp/connection.rb, line 146 146: def disconnect(headers = {}) 147: transmit("DISCONNECT", headers) 148: @closed = true 149: end
Return a pending message if one is available, otherwise return nil
# File lib/stomp/connection.rb, line 153 153: def poll 154: @read_semaphore.synchronize do 155: return nil if @socket.nil? || !@socket.ready? 156: return receive 157: end 158: end
# File lib/stomp/connection.rb, line 175 175: def receive 176: super_result = __old_receive() 177: if super_result.nil? && @reliable 178: $stderr.print "connection.receive returning EOF as nil - resetting connection.\n" 179: @socket = nil 180: super_result = __old_receive() 181: end 182: return super_result 183: end
Send message to destination
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/connection.rb, line 140 140: def send(destination, message, headers = {}) 141: headers[:destination] = destination 142: transmit("SEND", headers, message) 143: end
# File lib/stomp/connection.rb, line 50 50: def socket 51: # Need to look into why the following synchronize does not work. 52: #@read_semaphore.synchronize do 53: s = @socket; 54: while s.nil? || !@failure.nil? 55: @failure = nil 56: begin 57: s = TCPSocket.open @host, @port 58: headers = @connect_headers.clone 59: headers[:login] = @login 60: headers[:passcode] = @passcode 61: _transmit(s, "CONNECT", headers) 62: @connect = _receive(s) 63: # replay any subscriptions. 64: @subscriptions.each { |k,v| _transmit(s, "SUBSCRIBE", v) } 65: rescue 66: @failure = $!; 67: s=nil; 68: raise unless @reliable 69: $stderr.print "connect failed: " + $! +" will retry in #{@reconnect_delay}\n"; 70: sleep(@reconnect_delay); 71: end 72: end 73: @socket = s 74: return s; 75: #end 76: end
Subscribe to a destination, must specify a name
# File lib/stomp/connection.rb, line 116 116: def subscribe(name, headers = {}, subId = nil) 117: headers[:destination] = name 118: transmit("SUBSCRIBE", headers) 119: 120: # Store the sub so that we can replay if we reconnect. 121: if @reliable 122: subId = name if subId.nil? 123: @subscriptions[subId] = headers 124: end 125: end
Unsubscribe from a destination, must specify a name
# File lib/stomp/connection.rb, line 128 128: def unsubscribe(name, headers = {}, subId = nil) 129: headers[:destination] = name 130: transmit("UNSUBSCRIBE", headers) 131: if @reliable 132: subId = name if subId.nil? 133: @subscriptions.delete(subId) 134: end 135: end