Class | Stomp::Client |
In: |
lib/stomp/client.rb
|
Parent: | Object |
host | [R] | |
login | [R] | |
passcode | [R] | |
port | [R] | |
reliable | [R] | |
running | [R] |
A new Client object can be initialized using two forms:
Standard positional parameters:
login (String, default : '') passcode (String, default : '') host (String, default : 'localhost') port (Integer, default : 61613) reliable (Boolean, default : false) e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)
Stomp URL :
A Stomp URL must begin with 'stomp://' and can be in one of the following forms: stomp://host:port stomp://host.domain.tld:port stomp://login:passcode@host:port stomp://login:passcode@host.domain.tld:port
# File lib/stomp/client.rb, line 31 31: def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) 32: 33: # Parse stomp:// URL's or set positional params 34: case login 35: when /stomp:\/\/([\w\.]+):(\d+)/ # e.g. stomp://host:port 36: # grabs the matching positions out of the regex which are stored as 37: # $1 (host), $2 (port), etc 38: @login = '' 39: @passcode = '' 40: @host = $1 41: @port = $2.to_i 42: @reliable = false 43: when /stomp:\/\/([\w\.]+):(\w+)@([\w\.]+):(\d+)/ # e.g. stomp://login:passcode@host:port 44: @login = $1 45: @passcode = $2 46: @host = $3 47: @port = $4.to_i 48: @reliable = false 49: else 50: @login = login 51: @passcode = passcode 52: @host = host 53: @port = port.to_i 54: @reliable = reliable 55: end 56: 57: raise ArgumentError if @host.nil? || @host.empty? 58: raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535 59: raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass) 60: 61: @id_mutex = Mutex.new 62: @ids = 1 63: @connection = Connection.new(@login, @passcode, @host, @port, @reliable) 64: @listeners = {} 65: @receipt_listeners = {} 66: @running = true 67: @replay_messages_by_txn = {} 68: 69: @listener_thread = Thread.start do 70: while @running 71: message = @connection.receive 72: case 73: when message.nil? 74: break 75: when message.command == 'MESSAGE' 76: if listener = @listeners[message.headers['destination']] 77: listener.call(message) 78: end 79: when message.command == 'RECEIPT' 80: if listener = @receipt_listeners[message.headers['receipt-id']] 81: listener.call(message) 82: end 83: end 84: end 85: end 86: 87: end
Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.
# File lib/stomp/client.rb, line 90 90: def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false) 91: Client.new(login, passcode, host, port, reliable) 92: end
Abort a transaction by name
# File lib/stomp/client.rb, line 106 106: def abort(name, headers = {}) 107: @connection.abort(name, headers) 108: 109: # lets replay any ack'd messages in this transaction 110: replay_list = @replay_messages_by_txn[name] 111: if replay_list 112: replay_list.each do |message| 113: if listener = @listeners[message.headers['destination']] 114: listener.call(message) 115: end 116: end 117: end 118: end
Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 147 147: def acknowledge(message, headers = {}) 148: txn_id = headers[:transaction] 149: if txn_id 150: # lets keep around messages ack'd in this transaction in case we rollback 151: replay_list = @replay_messages_by_txn[txn_id] 152: if replay_list.nil? 153: replay_list = [] 154: @replay_messages_by_txn[txn_id] = replay_list 155: end 156: replay_list << message 157: end 158: if block_given? 159: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 160: end 161: @connection.ack message.headers['message-id'], headers 162: end
Begin a transaction by name
# File lib/stomp/client.rb, line 101 101: def begin(name, headers = {}) 102: @connection.begin(name, headers) 103: end
Close out resources in use by this client
# File lib/stomp/client.rb, line 188 188: def close 189: @connection.disconnect 190: @running = false 191: end
Commit a transaction by name
# File lib/stomp/client.rb, line 121 121: def commit(name, headers = {}) 122: txn_id = headers[:transaction] 123: @replay_messages_by_txn.delete(txn_id) 124: @connection.commit(name, headers) 125: end
Join the listener thread for this client, generally used to wait for a quit signal
# File lib/stomp/client.rb, line 96 96: def join 97: @listener_thread.join 98: end
Send message to destination
If a block is given a receipt will be requested and passed to the block on receipt
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 170 170: def send(destination, message, headers = {}) 171: if block_given? 172: headers['receipt'] = register_receipt_listener lambda {|r| yield r} 173: end 174: @connection.send(destination, message, headers) 175: end
Subscribe to a destination, must be passed a block which will be used as a callback listener
Accepts a transaction header ( :transaction => ‘some_transaction_id’ )
# File lib/stomp/client.rb, line 131 131: def subscribe(destination, headers = {}) 132: raise "No listener given" unless block_given? 133: @listeners[destination] = lambda {|msg| yield msg} 134: @connection.subscribe(destination, headers) 135: end