Issue #98, client fail-over fails.
[stomp:mainline.git] / lib / stomp / client.rb
1 # -*- encoding: utf-8 -*-
2
3 require 'thread'
4 require 'digest/sha1'
5 require 'timeout'
6 require 'forwardable'
7
8 module Stomp
9
10   # Typical Stomp client class. Uses a listener thread to receive frames
11   # from the server, any thread can send.
12   #
13   # Receives all happen in one thread, so consider not doing much processing
14   # in that thread if you have much message volume.
15   class Client
16     extend Forwardable
17
18     # Parameters hash
19     attr_reader :parameters
20
21     def_delegators :@connection, :login, :passcode, :port, :host, :ssl
22     def_delegator :@parameters, :reliable
23
24     # A new Client object can be initialized using three forms:
25     #
26     # Hash (this is the recommended Client initialization method):
27     #
28     #   hash = {
29     #     :hosts => [
30     #       {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
31     #       {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
32     #     ],
33     #     :reliable => true,
34     #     :initial_reconnect_delay => 0.01,
35     #     :max_reconnect_delay => 30.0,
36     #     :use_exponential_back_off => true,
37     #     :back_off_multiplier => 2,
38     #     :max_reconnect_attempts => 0,
39     #     :randomize => false,
40     #     :connect_timeout => 0,
41     #     :connect_headers => {},
42     #     :parse_timeout => 5,
43     #     :logger => nil,
44     #     :dmh => false,
45     #     :closed_check => true,
46     #     :hbser => false,
47     #     :stompconn => false,
48     #     :usecrlf => false,
49     #     :max_hbread_fails => 0,
50     #     :max_hbrlck_fails => 0,
51     #     :fast_hbs_adjust => 0.0,
52     #     :connread_timeout => 0,
53     #     :tcp_nodelay => true,
54     #     :start_timeout => 10,
55     #   }
56     #
57     #   e.g. c = Stomp::Client.new(hash)
58     #
59     # Positional parameters:
60     #   login     (String,  default : '')
61     #   passcode  (String,  default : '')
62     #   host      (String,  default : 'localhost')
63     #   port      (Integer, default : 61613)
64     #   reliable  (Boolean, default : false)
65     #
66     #   e.g. c = Stomp::Client.new('login', 'passcode', 'localhost', 61613, true)
67     #
68     # Stomp URL :
69     #   A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
70     #
71     #   stomp://host:port
72     #   stomp://host.domain.tld:port
73     #   stomp://login:passcode@host:port
74     #   stomp://login:passcode@host.domain.tld:port
75     #
76     #   e.g. c = Stomp::Client.new(urlstring)
77     #
78     def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, autoflush = false)
79       parse_hash_params(login) ||
80         parse_stomp_url(login) ||
81         parse_failover_url(login) ||
82         parse_positional_params(login, passcode, host, port, reliable)
83
84       @logger = @parameters[:logger] ||= Stomp::NullLogger.new
85       @start_timeout = @parameters[:start_timeout] || 10.0
86       check_arguments!()
87
88       begin
89         timeout(@start_timeout) {
90           create_error_handler
91           create_connection(autoflush)
92           start_listeners()
93         }
94       rescue TimeoutError
95         ex = Stomp::Error::StartTimeoutException.new(@start_timeout)
96         raise ex
97       end
98     end
99
100     def create_error_handler
101       client_thread = Thread.current
102
103       @error_listener = lambda do |error|
104         exception = case error.body
105                       when /ResourceAllocationException/i
106                         Stomp::Error::ProducerFlowControlException.new(error)
107                       when /ProtocolException/i
108                         Stomp::Error::ProtocolException.new(error)
109                       else
110                         Stomp::Error::BrokerException.new(error)
111                     end
112
113         client_thread.raise exception
114       end
115     end
116
117     def create_connection(autoflush)
118       @connection = Connection.new(@parameters)
119       @connection.autoflush = autoflush
120     end
121     private :create_connection
122
123     # open is syntactic sugar for 'Client.new', see 'initialize' for usage.
124     def self.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false)
125       Client.new(login, passcode, host, port, reliable)
126     end
127
128     # join the listener thread for this client,
129     # generally used to wait for a quit signal.
130     def join(limit = nil)
131       @listener_thread.join(limit)
132     end
133
134     # Begin starts work in a a transaction by name.
135     def begin(name, headers = {})
136       @connection.begin(name, headers)
137     end
138
139     # Abort aborts work in a transaction by name.
140     def abort(name, headers = {})
141       @connection.abort(name, headers)
142
143       # replay any ack'd messages in this transaction
144       replay_list = @replay_messages_by_txn[name]
145       if replay_list
146         replay_list.each do |message|
147           find_listener(message) # find_listener also calls the listener
148         end
149       end
150     end
151
152     # Commit commits work in a transaction by name.
153     def commit(name, headers = {})
154       txn_id = headers[:transaction]
155       @replay_messages_by_txn.delete(txn_id)
156       @connection.commit(name, headers)
157     end
158
159     # Subscribe to a destination, must be passed a block
160     # which will be used as a callback listener.
161     # Accepts a transaction header ( :transaction => 'some_transaction_id' ).
162     def subscribe(destination, headers = {})
163       raise "No listener given" unless block_given?
164       # use subscription id to correlate messages to subscription. As described in
165       # the SUBSCRIPTION section of the protocol: http://stomp.github.com/.
166       # If no subscription id is provided, generate one.
167       set_subscription_id_if_missing(destination, headers)
168       if @listeners[headers[:id]]
169         raise "attempting to subscribe to a queue with a previous subscription"
170       end
171       @listeners[headers[:id]] = lambda {|msg| yield msg}
172       @connection.subscribe(destination, headers)
173     end
174
175     # Unsubscribe from a subscription by name.
176     def unsubscribe(name, headers = {})
177       set_subscription_id_if_missing(name, headers)
178       @connection.unsubscribe(name, headers)
179       @listeners[headers[:id]] = nil
180     end
181
182     # Acknowledge a message, used when a subscription has specified
183     # client acknowledgement ( connection.subscribe("/queue/a",{:ack => 'client'}).
184     # Accepts a transaction header ( :transaction => 'some_transaction_id' ).
185     def ack(message, headers = {})
186       txn_id = headers[:transaction]
187       if txn_id
188         # lets keep around messages ack'd in this transaction in case we rollback
189         replay_list = @replay_messages_by_txn[txn_id]
190         if replay_list.nil?
191           replay_list = []
192           @replay_messages_by_txn[txn_id] = replay_list
193         end
194         replay_list << message
195       end
196       if block_given?
197         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
198       end
199       context = ack_context_for(message, headers)
200       @connection.ack context[:message_id], context[:headers]
201     end
202
203     # For posterity, we alias:
204     alias acknowledge ack
205
206     # Stomp 1.1+ NACK.
207     def nack(message, headers = {})
208       context = ack_context_for(message, headers)
209       @connection.nack context[:message_id], context[:headers]
210     end
211
212     #
213     def ack_context_for(message, headers)
214       id = case protocol
215         when Stomp::SPL_12
216          'ack'
217         when Stomp::SPL_11
218          headers.merge!(:subscription => message.headers['subscription'])
219          'message-id'
220         else
221          'message-id'
222       end
223       {:message_id => message.headers[id], :headers => headers}
224     end
225
226     # Unreceive a message, sending it back to its queue or to the DLQ.
227     def unreceive(message, options = {})
228       @connection.unreceive(message, options)
229     end
230
231     # Publishes message to destination.
232     # If a block is given a receipt will be requested and passed to the
233     # block on receipt.
234     # Accepts a transaction header ( :transaction => 'some_transaction_id' ).
235     def publish(destination, message, headers = {})
236       if block_given?
237         headers['receipt'] = register_receipt_listener lambda {|r| yield r}
238       end
239       @connection.publish(destination, message, headers)
240     end
241
242     # Return the broker's CONNECTED frame to the client.  Misnamed.
243     def connection_frame()
244       @connection.connection_frame
245     end
246
247     # Return any RECEIPT frame received by DISCONNECT.
248     def disconnect_receipt()
249       @connection.disconnect_receipt
250     end
251
252     # open? tests if this client connection is open.
253     def open?
254       @connection.open?()
255     end
256
257     # close? tests if this client connection is closed.
258     def closed?()
259       @connection.closed?()
260     end
261
262     # jruby? tests if the connection has detcted a JRuby environment
263     def jruby?()
264       @connection.jruby
265     end
266
267     # close frees resources in use by this client.  The listener thread is
268     # terminated, and disconnect on the connection is called.
269     def close(headers={})
270       @listener_thread.exit
271       @connection.disconnect(headers)
272     end
273
274     # running checks if the thread was created and is not dead.
275     def running()
276       @listener_thread && !!@listener_thread.status
277     end
278
279     # set_logger identifies a new callback logger.
280     def set_logger(logger)
281       @logger = logger
282       @connection.set_logger(logger)
283     end
284
285     # protocol returns the current client's protocol level.
286     def protocol()
287       @connection.protocol()
288     end
289
290     # valid_utf8? validates any given string for UTF8 compliance.
291     def valid_utf8?(s)
292       @connection.valid_utf8?(s)
293     end
294
295     # sha1 returns a SHA1 sum of a given string.
296     def sha1(data)
297       @connection.sha1(data)
298     end
299
300     # uuid returns a type 4 UUID.
301     def uuid()
302       @connection.uuid()
303     end
304
305     # hbsend_interval returns the connection's heartbeat send interval.
306     def hbsend_interval()
307       @connection.hbsend_interval()
308     end
309
310     # hbrecv_interval returns the connection's heartbeat receive interval.
311     def hbrecv_interval()
312       @connection.hbrecv_interval()
313     end
314
315     # hbsend_count returns the current connection's heartbeat send count.
316     def hbsend_count()
317       @connection.hbsend_count()
318     end
319
320     # hbrecv_count returns the current connection's heartbeat receive count.
321     def hbrecv_count()
322       @connection.hbrecv_count()
323     end
324
325     # Poll for asynchronous messages issued by broker.
326     # Return nil of no message available, else the message
327     def poll()
328       @connection.poll()
329     end
330
331     # autoflush= sets the current connection's autoflush setting.
332     def autoflush=(af)
333       @connection.autoflush = af
334     end
335
336     # autoflush returns the current connection's autoflush setting.
337     def autoflush()
338       @connection.autoflush()
339     end
340
341   end # Class
342
343 end # Module
344