Class Stomp::Client
In: lib/stomp/client.rb
Parent: Object

Typical Stomp client class. Uses a listener thread to receive frames from the server, any thread can send.

Receives all happen in one thread, so consider not doing much processing in that thread if you have much message volume.

Methods

abort   acknowledge   begin   close   closed?   commit   join   new   open   open?   send   subscribe   unsubscribe  

Attributes

host  [R] 
login  [R] 
passcode  [R] 
port  [R] 
reliable  [R] 
running  [R] 

Public Class methods

A new Client object can be initialized using two forms:

Standard positional parameters:

  login     (String,  default : '')
  passcode  (String,  default : '')
  host      (String,  default : 'localhost')
  port      (Integer, default : 61613)
  reliable  (Boolean, default : false)

  e.g. c = Client.new('login', 'passcode', 'localhost', 61613, true)

Stomp URL :

  A Stomp URL must begin with 'stomp://' and can be in one of the following forms:

  stomp://host:port
  stomp://host.domain.tld:port
  stomp://login:passcode@host:port
  stomp://login:passcode@host.domain.tld:port

[Source]

    # File lib/stomp/client.rb, line 31
31:     def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
32: 
33:       # Parse stomp:// URL's or set positional params
34:       case login
35:       when /stomp:\/\/([\w\.]+):(\d+)/ # e.g. stomp://host:port
36:         # grabs the matching positions out of the regex which are stored as
37:         # $1 (host), $2 (port), etc
38:         @login = ''
39:         @passcode = ''
40:         @host = $1
41:         @port = $2.to_i
42:         @reliable = false
43:       when /stomp:\/\/([\w\.]+):(\w+)@([\w\.]+):(\d+)/ # e.g. stomp://login:passcode@host:port
44:         @login = $1
45:         @passcode = $2
46:         @host = $3
47:         @port = $4.to_i
48:         @reliable = false
49:       else
50:         @login = login
51:         @passcode = passcode
52:         @host = host
53:         @port = port.to_i
54:         @reliable = reliable
55:       end
56: 
57:       raise ArgumentError if @host.nil? || @host.empty?
58:       raise ArgumentError if @port.nil? || @port == '' || @port < 1 || @port > 65535
59:       raise ArgumentError unless @reliable.is_a?(TrueClass) || @reliable.is_a?(FalseClass)
60: 
61:       @id_mutex = Mutex.new
62:       @ids = 1
63:       @connection = Connection.new(@login, @passcode, @host, @port, @reliable)
64:       @listeners = {}
65:       @receipt_listeners = {}
66:       @running = true
67:       @replay_messages_by_txn = {}
68: 
69:       @listener_thread = Thread.start do
70:         while @running
71:           message = @connection.receive
72:           case
73:           when message.nil?
74:             break
75:           when message.command == 'MESSAGE'
76:             if listener = @listeners[message.headers['destination']]
77:               listener.call(message)
78:             end
79:           when message.command == 'RECEIPT'
80:             if listener = @receipt_listeners[message.headers['receipt-id']]
81:               listener.call(message)
82:             end
83:           end
84:         end
85:       end
86: 
87:     end

Syntactic sugar for ‘Client.new’ See ‘initialize’ for usage.

[Source]

    # File lib/stomp/client.rb, line 90
90:     def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
91:       Client.new(login, passcode, host, port, reliable)
92:     end

Public Instance methods

Abort a transaction by name

[Source]

     # File lib/stomp/client.rb, line 106
106:     def abort(name, headers = {})
107:       @connection.abort(name, headers)
108: 
109:       # lets replay any ack'd messages in this transaction
110:       replay_list = @replay_messages_by_txn[name]
111:       if replay_list
112:         replay_list.each do |message|
113:           if listener = @listeners[message.headers['destination']]
114:             listener.call(message)
115:           end
116:         end
117:       end
118:     end

Acknowledge a message, used when a subscription has specified client acknowledgement ( connection.subscribe "/queue/a", :ack => ‘client‘g

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 147
147:     def acknowledge(message, headers = {})
148:       txn_id = headers[:transaction]
149:       if txn_id
150:         # lets keep around messages ack'd in this transaction in case we rollback
151:         replay_list = @replay_messages_by_txn[txn_id]
152:         if replay_list.nil?
153:           replay_list = []
154:           @replay_messages_by_txn[txn_id] = replay_list
155:         end
156:         replay_list << message
157:       end
158:       if block_given?
159:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
160:       end
161:       @connection.ack message.headers['message-id'], headers
162:     end

Begin a transaction by name

[Source]

     # File lib/stomp/client.rb, line 101
101:     def begin(name, headers = {})
102:       @connection.begin(name, headers)
103:     end

Close out resources in use by this client

[Source]

     # File lib/stomp/client.rb, line 188
188:     def close
189:       @connection.disconnect
190:       @running = false
191:     end

Is this client closed?

[Source]

     # File lib/stomp/client.rb, line 183
183:     def closed?
184:       @connection.closed?
185:     end

Commit a transaction by name

[Source]

     # File lib/stomp/client.rb, line 121
121:     def commit(name, headers = {})
122:       txn_id = headers[:transaction]
123:       @replay_messages_by_txn.delete(txn_id)
124:       @connection.commit(name, headers)
125:     end

Join the listener thread for this client, generally used to wait for a quit signal

[Source]

    # File lib/stomp/client.rb, line 96
96:     def join
97:       @listener_thread.join
98:     end

Is this client open?

[Source]

     # File lib/stomp/client.rb, line 178
178:     def open?
179:       @connection.open?
180:     end

Send message to destination

If a block is given a receipt will be requested and passed to the block on receipt

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 170
170:     def send(destination, message, headers = {})
171:       if block_given?
172:         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
173:       end
174:       @connection.send(destination, message, headers)
175:     end

Subscribe to a destination, must be passed a block which will be used as a callback listener

Accepts a transaction header ( :transaction => ‘some_transaction_id’ )

[Source]

     # File lib/stomp/client.rb, line 131
131:     def subscribe(destination, headers = {})
132:       raise "No listener given" unless block_given?
133:       @listeners[destination] = lambda {|msg| yield msg}
134:       @connection.subscribe(destination, headers)
135:     end

Unsubecribe from a channel

[Source]

     # File lib/stomp/client.rb, line 138
138:     def unsubscribe(name, headers = {})
139:       @connection.unsubscribe(name, headers)
140:       @listeners[name] = nil
141:     end

[Validate]