1
# -*- encoding: utf-8 -*-
2
3
require 'socket'
4
require 'timeout'
5
require 'io/wait'
6
require 'digest/sha1'
7
8
module Stomp
9
10
  # Low level connection which maps commands and supports
11
  # synchronous receives
12
  class Connection
13
    attr_reader :connection_frame
14
    attr_reader :disconnect_receipt
15
    attr_reader :protocol
16
    attr_reader :session
17
    attr_reader :hb_received # Heartbeat received on time
18
    attr_reader :hb_sent # Heartbeat sent successfully
19
    #alias :obj_send :send
20
21
    def self.default_port(ssl)
22
      ssl ? 61612 : 61613
23
    end
24
    
25
    # A new Connection object accepts the following parameters:
26
    #
27
    #   login             (String,  default : '')
28
    #   passcode          (String,  default : '')
29
    #   host              (String,  default : 'localhost')
30
    #   port              (Integer, default : 61613)
31
    #   reliable          (Boolean, default : false)
32
    #   reconnect_delay   (Integer, default : 5)
33
    #
34
    #   e.g. c = Connection.new("username", "password", "localhost", 61613, true)
35
    #
36
    # Hash:
37
    #
38
    #   hash = {
39
    #     :hosts => [
40
    #       {:login => "login1", :passcode => "passcode1", :host => "localhost", :port => 61616, :ssl => false},
41
    #       {:login => "login2", :passcode => "passcode2", :host => "remotehost", :port => 61617, :ssl => false}
42
    #     ],
43
    #     :reliable => true,
44
    #     :initial_reconnect_delay => 0.01,
45
    #     :max_reconnect_delay => 30.0,
46
    #     :use_exponential_back_off => true,
47
    #     :back_off_multiplier => 2,
48
    #     :max_reconnect_attempts => 0,
49
    #     :randomize => false,
50
    #     :backup => false,
51
    #     :connect_timeout => 0,
52
    #     :connect_headers => {},
53
    #     :parse_timeout => 5,
54
    #     :logger => nil,
55
    #   }
56
    #
57
    #   e.g. c = Connection.new(hash)
58
    #
59
    # TODO
60
    # Stomp URL :
61
    #   A Stomp URL must begin with 'stomp://' and can be in one of the following forms:
62
    #
63
    #   stomp://host:port
64
    #   stomp://host.domain.tld:port
65
    #   stomp://user:pass@host:port
66
    #   stomp://user:pass@host.domain.tld:port
67
    #
68
    def initialize(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
69
      @received_messages = []
70
      @protocol = Stomp::SPL_10 # Assumed at first
71
      @hb_received = true # Assumed at first
72
      @hb_sent = true # Assumed at first
73
      @hbs = @hbr = false # Sending/Receiving heartbeats. Assume no for now.
74
75
      if login.is_a?(Hash)
76
        hashed_initialize(login)
77
      else
78
        @host = host
79
        @port = port
80
        @login = login
81
        @passcode = passcode
82
        @reliable = reliable
83
        @reconnect_delay = reconnect_delay
84
        @connect_headers = connect_headers
85
        @ssl = false
86
        @parameters = nil
87
        @parse_timeout = 5		# To override, use hashed parameters
88
        @connect_timeout = 0	# To override, use hashed parameters
89
        @logger = nil     		# To override, use hashed parameters
90
      end
91
      
92
      # Use Mutexes:  only one lock per each thread
93
      # Revert to original implementation attempt
94
      @transmit_semaphore = Mutex.new
95
      @read_semaphore = Mutex.new
96
      @socket_semaphore = Mutex.new
97
      
98
      @subscriptions = {}
99
      @failure = nil
100
      @connection_attempts = 0
101
      
102
      socket
103
    end
104
    
105
    def hashed_initialize(params)
106
      
107
      @parameters = refine_params(params)
108
      @reliable =  @parameters[:reliable]
109
      @reconnect_delay = @parameters[:initial_reconnect_delay]
110
      @connect_headers = @parameters[:connect_headers]
111
      @parse_timeout =  @parameters[:parse_timeout]
112
      @connect_timeout =  @parameters[:connect_timeout]
113
      @logger =  @parameters[:logger]
114
      #sets the first host to connect
115
      change_host
116
      if @logger && @logger.respond_to?(:on_connecting)            
117
        @logger.on_connecting(log_params)
118
      end
119
    end
120
    
121
    # Syntactic sugar for 'Connection.new' See 'initialize' for usage.
122
    def Connection.open(login = '', passcode = '', host = 'localhost', port = 61613, reliable = false, reconnect_delay = 5, connect_headers = {})
123
      Connection.new(login, passcode, host, port, reliable, reconnect_delay, connect_headers)
124
    end
125
126
    def socket
127
      @socket_semaphore.synchronize do
128
        used_socket = @socket
129
        used_socket = nil if closed?
130
        
131
        while used_socket.nil? || !@failure.nil?
132
          @failure = nil
133
          begin
134
            used_socket = open_socket
135
            # Open complete
136
            
137
            connect(used_socket)
138
            if @logger && @logger.respond_to?(:on_connected)
139
              @logger.on_connected(log_params) 
140
            end
141
            @connection_attempts = 0
142
          rescue
143
            @failure = $!
144
            used_socket = nil
145
            raise unless @reliable
146
            if @logger && @logger.respond_to?(:on_connectfail)            
147
              @logger.on_connectfail(log_params) 
148
            else
149
              $stderr.print "connect to #{@host} failed: #{$!} will retry(##{@connection_attempts}) in #{@reconnect_delay}\n"
150
            end
151
            raise Stomp::Error::MaxReconnectAttempts if max_reconnect_attempts?
152
153
            sleep(@reconnect_delay)
154
            
155
            @connection_attempts += 1
156
            
157
            if @parameters
158
              change_host
159
              increase_reconnect_delay
160
            end
161
          end
162
        end
163
        @socket = used_socket
164
      end
165
    end
166
  
167
    def refine_params(params)
168
      params = params.uncamelize_and_symbolize_keys
169
      
170
      default_params = {
171
        :connect_headers => {},
172
        :reliable => true,
173
        # Failover parameters
174
        :initial_reconnect_delay => 0.01,
175
        :max_reconnect_delay => 30.0,
176
        :use_exponential_back_off => true,
177
        :back_off_multiplier => 2,
178
        :max_reconnect_attempts => 0,
179
        :randomize => false,
180
        :backup => false,
181
        :connect_timeout => 0,
182
        # Parse Timeout
183
        :parse_timeout => 5
184
      }
185
      
186
      default_params.merge(params)
187
        
188
    end
189
    
190
    def change_host
191
      @parameters[:hosts] = @parameters[:hosts].sort_by { rand } if @parameters[:randomize]
192
      
193
      # Set first as master and send it to the end of array
194
      current_host = @parameters[:hosts].shift
195
      @parameters[:hosts] << current_host
196
      
197
      @ssl = current_host[:ssl]
198
      @host = current_host[:host]
199
      @port = current_host[:port] || Connection::default_port(@ssl)
200
      @login = current_host[:login] || ""
201
      @passcode = current_host[:passcode] || ""
202
      
203
    end
204
    
205
    def max_reconnect_attempts?
206
      !(@parameters.nil? || @parameters[:max_reconnect_attempts].nil?) && @parameters[:max_reconnect_attempts] != 0 && @connection_attempts >= @parameters[:max_reconnect_attempts]
207
    end
208
    
209
    def increase_reconnect_delay
210
211
      @reconnect_delay *= @parameters[:back_off_multiplier] if @parameters[:use_exponential_back_off] 
212
      @reconnect_delay = @parameters[:max_reconnect_delay] if @reconnect_delay > @parameters[:max_reconnect_delay]
213
      
214
      @reconnect_delay
215
    end
216
217
    # Is this connection open?
218
    def open?
219
      !@closed
220
    end
221
222
    # Is this connection closed?
223
    def closed?
224
      @closed
225
    end
226
227
    # Begin a transaction, requires a name for the transaction
228
    def begin(name, headers = {})
229
      raise Stomp::Error::NoCurrentConnection if closed?
230
      headers = headers.symbolize_keys
231
      headers[:transaction] = name
232
      _headerCheck(headers)
233
      transmit(Stomp::CMD_BEGIN, headers)
234
    end
235
236
    # Acknowledge a message, used when a subscription has specified
237
    # client acknowledgement ( connection.subscribe "/queue/a", :ack => 'client'g
238
    #
239
    # Accepts a transaction header ( :transaction => 'some_transaction_id' )
240
    def ack(message_id, headers = {})
241
      raise Stomp::Error::NoCurrentConnection if closed?
242
      raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
243
      headers = headers.symbolize_keys
244
      headers[:'message-id'] = message_id
245
      if @protocol >= Stomp::SPL_11
246
        raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
247
      end
248
      _headerCheck(headers)
249
      transmit(Stomp::CMD_ACK, headers)
250
    end
251
252
    # STOMP 1.1+ NACK
253
    def nack(message_id, headers = {})
254
      raise Stomp::Error::NoCurrentConnection if closed?
255
      raise Stomp::Error::UnsupportedProtocolError if @protocol == Stomp::SPL_10
256
      raise Stomp::Error::MessageIDRequiredError if message_id.nil? || message_id == ""
257
      headers = headers.symbolize_keys
258
      headers[:'message-id'] = message_id
259
      raise Stomp::Error::SubscriptionRequiredError unless headers[:subscription]
260
      _headerCheck(headers)
261
      transmit(Stomp::CMD_NACK, headers)
262
    end
263
264
    # Commit a transaction by name
265
    def commit(name, headers = {})
266
      raise Stomp::Error::NoCurrentConnection if closed?
267
      headers = headers.symbolize_keys
268
      headers[:transaction] = name
269
      _headerCheck(headers)
270
      transmit(Stomp::CMD_COMMIT, headers)
271
    end
272
273
    # Abort a transaction by name
274
    def abort(name, headers = {})
275
      raise Stomp::Error::NoCurrentConnection if closed?
276
      headers = headers.symbolize_keys
277
      headers[:transaction] = name
278
      _headerCheck(headers)
279
      transmit(Stomp::CMD_ABORT, headers)
280
    end
281
282
    # Subscribe to a destination, must specify a name
283
    def subscribe(name, headers = {}, subId = nil)
284
      raise Stomp::Error::NoCurrentConnection if closed?
285
      headers = headers.symbolize_keys
286
      headers[:destination] = name
287
      if @protocol >= Stomp::SPL_11
288
        raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
289
        headers[:id] = subId if headers[:id].nil?
290
      end
291
      _headerCheck(headers)
292
      if @logger && @logger.respond_to?(:on_subscribe)            
293
        @logger.on_subscribe(log_params, headers)
294
      end
295
296
      # Store the sub so that we can replay if we reconnect.
297
      if @reliable
298
        subId = name if subId.nil?
299
        raise Stomp::Error::DuplicateSubscription if @subscriptions[subId]
300
        @subscriptions[subId] = headers
301
      end
302
303
      transmit(Stomp::CMD_SUBSCRIBE, headers)
304
    end
305
306
    # Unsubscribe from a destination, which must be specified
307
    def unsubscribe(dest, headers = {}, subId = nil)
308
      raise Stomp::Error::NoCurrentConnection if closed?
309
      headers = headers.symbolize_keys
310
      headers[:destination] = dest
311
      if @protocol >= Stomp::SPL_11
312
        raise Stomp::Error::SubscriptionRequiredError if (headers[:id].nil? && subId.nil?)
313
      end
314
      _headerCheck(headers)
315
      transmit(Stomp::CMD_UNSUBSCRIBE, headers)
316
      if @reliable
317
        subId = dest if subId.nil?
318
        @subscriptions.delete(subId)
319
      end
320
    end
321
322
    # Publish message to destination
323
    #
324
    # To disable content length header ( :suppress_content_length => true )
325
    # Accepts a transaction header ( :transaction => 'some_transaction_id' )
326
    def publish(destination, message, headers = {})
327
      raise Stomp::Error::NoCurrentConnection if closed?
328
      headers = headers.symbolize_keys
329
      headers[:destination] = destination
330
      _headerCheck(headers)
331
      if @logger && @logger.respond_to?(:on_publish)            
332
        @logger.on_publish(log_params, message, headers)
333
      end
334
      transmit(Stomp::CMD_SEND, headers, message)
335
    end
336
    
337
    def obj_send(*args)
338
      __send__(*args)
339
    end
340
    
341
    def send(*args)
342
      warn("This method is deprecated and will be removed on the next release. Use 'publish' instead")
343
      publish(*args)
344
    end
345
    
346
    # Send a message back to the source or to the dead letter queue
347
    #
348
    # Accepts a dead letter queue option ( :dead_letter_queue => "/queue/DLQ" )
349
    # Accepts a limit number of redeliveries option ( :max_redeliveries => 6 )
350
    # Accepts a force client acknowledgement option (:force_client_ack => true)
351
    def unreceive(message, options = {})
352
      raise Stomp::Error::NoCurrentConnection if closed?
353
      options = { :dead_letter_queue => "/queue/DLQ", :max_redeliveries => 6 }.merge options
354
      # Lets make sure all keys are symbols
355
      message.headers = message.headers.symbolize_keys
356
      
357
      retry_count = message.headers[:retry_count].to_i || 0
358
      message.headers[:retry_count] = retry_count + 1
359
      transaction_id = "transaction-#{message.headers[:'message-id']}-#{retry_count}"
360
      message_id = message.headers.delete(:'message-id')
361
      
362
      begin
363
        self.begin transaction_id
364
        
365
        if client_ack?(message) || options[:force_client_ack]
366
          self.ack(message_id, :transaction => transaction_id)
367
        end
368
        
369
        if retry_count <= options[:max_redeliveries]
370
          self.publish(message.headers[:destination], message.body, message.headers.merge(:transaction => transaction_id))
371
        else
372
          # Poison ack, sending the message to the DLQ
373
          self.publish(options[:dead_letter_queue], message.body, message.headers.merge(:transaction => transaction_id, :original_destination => message.headers[:destination], :persistent => true))
374
        end
375
        self.commit transaction_id
376
      rescue Exception => exception
377
        self.abort transaction_id
378
        raise exception
379
      end
380
    end
381
    
382
    def client_ack?(message)
383
      headers = @subscriptions[message.headers[:destination]]
384
      !headers.nil? && headers[:ack] == "client"
385
    end
386
387
    # Close this connection
388
    def disconnect(headers = {})
389
      raise Stomp::Error::NoCurrentConnection if closed?
390
      headers = headers.symbolize_keys
391
      _headerCheck(headers)
392
      if @protocol >= Stomp::SPL_11
393
        @st.kill if @st # Kill ticker thread if any
394
        @rt.kill if @rt # Kill ticker thread if any
395
      end
396
      transmit(Stomp::CMD_DISCONNECT, headers)
397
      @disconnect_receipt = receive if headers[:receipt]
398
      if @logger && @logger.respond_to?(:on_disconnect)
399
        @logger.on_disconnect(log_params)
400
      end
401
      close_socket
402
    end
403
404
    # Return a pending message if one is available, otherwise
405
    # return nil
406
    def poll
407
      raise Stomp::Error::NoCurrentConnection if closed?
408
      # No need for a read lock here.  The receive method eventually fulfills
409
      # that requirement.
410
      return nil if @socket.nil? || !@socket.ready?
411
      receive
412
    end
413
414
    # Receive a frame, block until the frame is received
415
    def __old_receive
416
      # The receive may fail so we may need to retry.
417
      while TRUE
418
        begin
419
          used_socket = socket
420
          return _receive(used_socket)
421
        rescue
422
          @failure = $!
423
          raise unless @reliable
424
          errstr = "receive failed: #{$!}"
425
          if @logger && @logger.respond_to?(:on_miscerr)
426
            @logger.on_miscerr(log_params, errstr)
427
          else
428
            $stderr.print errstr
429
          end
430
        end
431
      end
432
    end
433
434
    def receive
435
      raise Stomp::Error::NoCurrentConnection if closed?
436
      super_result = __old_receive
437
      if super_result.nil? && @reliable && !closed?
438
        errstr = "connection.receive returning EOF as nil - resetting connection.\n"
439
        if @logger && @logger.respond_to?(:on_miscerr)
440
          @logger.on_miscerr(log_params, errstr)
441
        else
442
          $stderr.print errstr
443
        end
444
        @socket = nil
445
        super_result = __old_receive
446
      end
447
      #
448
      if @logger && @logger.respond_to?(:on_receive)            
449
        @logger.on_receive(log_params, super_result)
450
      end
451
      return super_result
452
    end
453
454
    # Convenience method
455
    def set_logger(logger)
456
      @logger = logger
457
    end
458
459
    # Convenience method
460
    def valid_utf8?(s)
461
      case RUBY_VERSION
462
        when /1\.8/
463
          rv = _valid_utf8?(s)
464
        else
465
          rv = s.encoding.name != Stomp::UTF8 ? false : s.valid_encoding?
466
      end
467
      rv
468
    end
469
470
    # Convenience method for clients, return a SHA1 digest for arbitrary data
471
    def sha1(data)
472
      Digest::SHA1.hexdigest(data)
473
    end
474
475
    # Convenience method for clients, return a type 4 UUID.
476
    def uuid()
477
      b = []
478
      0.upto(15) do |i|
479
        b << rand(255)
480
      end
481
	    b[6] = (b[6] & 0x0F) | 0x40
482
	    b[8] = (b[8] & 0xbf) | 0x80
483
      #             0  1  2  3   4   5  6  7   8  9  10 11 12 13 14 15
484
	    rs = sprintf("%02x%02x%02x%02x-%02x%02x-%02x%02x-%02x%02x%02x%02x%02x%02x%02x%02x", 
485
        b[0], b[1], b[2], b[3], b[4], b[5], b[6], b[7], b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15])
486
      rs
487
    end
488
489
    private
490
491
      def _receive( read_socket )
492
        @read_semaphore.synchronize do
493
          # Throw away leading newlines, which are perhaps trailing
494
          # newlines from the preceding message, or alterantely a 1.1+ server
495
          # heartbeat.
496
          begin
497
            last_char = read_socket.getc
498
            return nil if last_char.nil?
499
            if @protocol >= Stomp::SPL_11
500
              plc = parse_char(last_char)
501
              if plc == "\n" # Server Heartbeat
502
                @lr = Time.now.to_f if @hbr
503
              end
504
            end
505
          end until parse_char(last_char) != "\n"
506
          read_socket.ungetc(last_char)
507
508
          # If the reading hangs for more than X seconds, abort the parsing process.
509
          # X defaults to 5.  Override allowed in connection hash parameters.
510
          Timeout::timeout(@parse_timeout, Stomp::Error::PacketParsingTimeout) do
511
            # Reads the beginning of the message until it runs into a empty line
512
            message_header = ''
513
            line = ''
514
            begin
515
              message_header << line
516
              line = read_socket.gets
517
              return nil if line.nil?
518
            end until line =~ /^\s?\n$/
519
520
            # Checks if it includes content_length header
521
            content_length = message_header.match /content-length\s?:\s?(\d+)\s?\n/
522
            message_body = ''
523
524
            # If it does, reads the specified amount of bytes
525
            char = ''
526
            if content_length
527
              message_body = read_socket.read content_length[1].to_i
528
              raise Stomp::Error::InvalidMessageLength unless parse_char(read_socket.getc) == "\0"
529
            # Else reads, the rest of the message until the first \0
530
            else
531
              message_body << char while (char = parse_char(read_socket.getc)) != "\0"
532
            end
533
534
            if @protocol >= Stomp::SPL_11
535
              @lr = Time.now.to_f if @hbr
536
            end
537
538
            # Adds the excluded \n and \0 and tries to create a new message with it
539
            msg = Message.new(message_header + "\n" + message_body + "\0", @protocol >= Stomp::SPL_11)
540
            #
541
            if @protocol >= Stomp::SPL_11 && msg.command != Stomp::CMD_CONNECTED
542
              msg.headers = _decodeHeaders(msg.headers)
543
            end
544
            msg
545
          end
546
        end
547
      end
548
549
      def parse_char(char)
550
        RUBY_VERSION > '1.9' ? char : char.chr
551
      end
552
553
      def transmit(command, headers = {}, body = '')
554
        # The transmit may fail so we may need to retry.
555
        while TRUE
556
          begin
557
            used_socket = socket
558
            _transmit(used_socket, command, headers, body)
559
            return
560
          rescue Stomp::Error::MaxReconnectAttempts => e
561
              raise
562
          rescue
563
            @failure = $!
564
            raise unless @reliable
565
            errstr = "transmit to #{@host} failed: #{$!}\n"
566
            if @logger && @logger.respond_to?(:on_miscerr)
567
              @logger.on_miscerr(log_params, errstr)
568
            else
569
              $stderr.print errstr
570
            end
571
          end
572
        end
573
      end
574
575
      def _transmit(used_socket, command, headers = {}, body = '')
576
        if @protocol >= Stomp::SPL_11 && command != Stomp::CMD_CONNECT
577
          headers = _encodeHeaders(headers)
578
        end
579
        @transmit_semaphore.synchronize do
580
          # Handle nil body
581
          body = '' if body.nil?
582
          # The content-length should be expressed in bytes.
583
          # Ruby 1.8: String#length => # of bytes; Ruby 1.9: String#length => # of characters
584
          # With Unicode strings, # of bytes != # of characters.  So, use String#bytesize when available.
585
          body_length_bytes = body.respond_to?(:bytesize) ? body.bytesize : body.length
586
 
587
          # ActiveMQ interprets every message as a BinaryMessage 
588
          # if content_length header is included. 
589
          # Using :suppress_content_length => true will suppress this behaviour
590
          # and ActiveMQ will interpret the message as a TextMessage.
591
          # For more information refer to http://juretta.com/log/2009/05/24/activemq-jms-stomp/
592
          # Lets send this header in the message, so it can maintain state when using unreceive
593
          headers['content-length'] = "#{body_length_bytes}" unless headers[:suppress_content_length]
594
          headers['content-type'] = "text/plain; charset=UTF-8" unless headers['content-type']
595
          used_socket.puts command  
596
          headers.each do |k,v|
597
            if v.is_a?(Array)
598
              v.each do |e|
599
                used_socket.puts "#{k}:#{e}"
600
              end
601
            else
602
              used_socket.puts "#{k}:#{v}"
603
            end
604
          end
605
          used_socket.puts
606
          used_socket.write body
607
          used_socket.write "\0"
608
609
          if @protocol >= Stomp::SPL_11
610
            @ls = Time.now.to_f if @hbs
611
          end
612
613
        end
614
      end
615
      
616
      def open_tcp_socket
617
      	tcp_socket = nil
618
      	Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
619
        	tcp_socket = TCPSocket.open @host, @port
620
      	end
621
622
        tcp_socket
623
      end
624
625
      def open_ssl_socket
626
        require 'openssl' unless defined?(OpenSSL)
627
        ctx = OpenSSL::SSL::SSLContext.new
628
629
        # For client certificate authentication:
630
        # key_path = ENV["STOMP_KEY_PATH"] || "~/stomp_keys"
631
        # ctx.cert = OpenSSL::X509::Certificate.new("#{key_path}/client.cer")
632
        # ctx.key = OpenSSL::PKey::RSA.new("#{key_path}/client.keystore")
633
634
        # For server certificate authentication:
635
        # truststores = OpenSSL::X509::Store.new
636
        # truststores.add_file("#{key_path}/client.ts")
637
        # ctx.verify_mode = OpenSSL::SSL::VERIFY_PEER
638
        # ctx.cert_store = truststores
639
640
        ctx.verify_mode = OpenSSL::SSL::VERIFY_NONE  
641
      	ssl = nil
642
      	Timeout::timeout(@connect_timeout, Stomp::Error::SocketOpenTimeout) do
643
        	ssl = OpenSSL::SSL::SSLSocket.new(open_tcp_socket, ctx)
644
      	end
645
        def ssl.ready?
646
          ! @rbuffer.empty? || @io.ready?
647
        end
648
        ssl.connect
649
        ssl
650
      end
651
652
      def close_socket
653
        begin
654
          # Need to set @closed = true before closing the socket
655
          # within the @read_semaphore thread
656
          @closed = true
657
          @read_semaphore.synchronize do
658
            @socket.close
659
          end
660
        rescue
661
          #Ignoring if already closed
662
        end
663
        @closed
664
      end
665
666
      def open_socket
667
        used_socket = @ssl ? open_ssl_socket : open_tcp_socket
668
        # try to close the old connection if any
669
        close_socket
670
        
671
        @closed = false
672
        # Use keepalive
673
        used_socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, true)
674
        used_socket
675
      end
676
      
677
      def connect(used_socket)
678
        @connect_headers = {} unless @connect_headers # Caller said nil/false
679
        headers = @connect_headers.clone
680
        headers[:login] = @login
681
        headers[:passcode] = @passcode
682
        _pre_connect
683
        _transmit(used_socket, "CONNECT", headers)
684
        @connection_frame = _receive(used_socket)
685
        _post_connect
686
        @disconnect_receipt = nil
687
        @session = @connection_frame.headers["session"] if @connection_frame
688
        # replay any subscriptions.
689
        @subscriptions.each { |k,v| _transmit(used_socket, Stomp::CMD_SUBSCRIBE, v) }
690
      end
691
692
      def log_params
693
        lparms = @parameters.clone if @parameters
694
        lparms = {} unless lparms
695
        lparms[:cur_host] = @host
696
        lparms[:cur_port] = @port
697
        lparms[:cur_login] = @login
698
        lparms[:cur_passcode] = @passcode
699
        lparms[:cur_ssl] = @ssl
700
        lparms[:cur_recondelay] = @reconnect_delay
701
        lparms[:cur_parseto] = @parse_timeout
702
        lparms[:cur_conattempts] = @connection_attempts
703
        #
704
        lparms
705
      end
706
707
      def _pre_connect
708
        @connect_headers = @connect_headers.symbolize_keys
709
        raise Stomp::Error::ProtocolErrorConnect if (@connect_headers[:"accept-version"] && !@connect_headers[:host])
710
        raise Stomp::Error::ProtocolErrorConnect if (!@connect_headers[:"accept-version"] && @connect_headers[:host])
711
        return unless (@connect_headers[:"accept-version"] && @connect_headers[:host]) # 1.0
712
        # Try 1.1 or greater
713
        okvers = []
714
        avers = @connect_headers[:"accept-version"].split(",")
715
        avers.each do |nver|
716
          if Stomp::SUPPORTED.index(nver)
717
            okvers << nver
718
          end
719
        end
720
        raise Stomp::Error::UnsupportedProtocolError if okvers == []
721
        @connect_headers[:"accept-version"] = okvers.join(",") # This goes to server
722
        # Heartbeats - pre connect
723
        return unless @connect_headers[:"heart-beat"]
724
        _validate_hbheader()
725
      end
726
727
      def _post_connect
728
        return unless (@connect_headers[:"accept-version"] && @connect_headers[:host])
729
        return if @connection_frame.command == Stomp::CMD_ERROR
730
        cfh = @connection_frame.headers.symbolize_keys
731
        @protocol = cfh[:version]
732
        # Should not happen, but check anyway
733
        raise Stomp::Error::UnsupportedProtocolError unless Stomp::SUPPORTED.index(@protocol)
734
        # Heartbeats
735
        return unless @connect_headers[:"heart-beat"]
736
        _init_heartbeats()
737
      end
738
739
      def _validate_hbheader()
740
        return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
741
        parts = @connect_headers[:"heart-beat"].split(",")
742
        if (parts.size != 2) || (parts[0] != parts[0].to_i.to_s) || (parts[1] != parts[1].to_i.to_s)
743
          raise Stomp::Error::InvalidHeartBeatHeaderError
744
        end
745
      end
746
747
      def _init_heartbeats()
748
        return if @connect_headers[:"heart-beat"] == "0,0" # Caller does not want heartbeats.  OK.
749
        #
750
        @cx = @cy = @sx = @sy = 0, # Variable names as in spec
751
        #
752
        @sti = @rti = 0.0 # Send/Receive ticker interval.
753
        #
754
        @ls = @lr = -1.0 # Last send/receive time (from Time.now.to_f)
755
        #
756
        @st = @rt = nil # Send/receive ticker thread
757
        #
758
        cfh = @connection_frame.headers.symbolize_keys
759
        return if cfh[:"heart-beat"] == "0,0" # Server does not want heartbeats
760
        #
761
        parts = @connect_headers[:"heart-beat"].split(",")
762
        @cx = parts[0].to_i
763
        @cy = parts[1].to_i
764
        #
765
        parts = cfh[:"heart-beat"].split(",")
766
        @sx = parts[0].to_i
767
        @sy = parts[1].to_i
768
        # Catch odd situations like someone has used => heart-beat:000,00000
769
        return if (@cx == 0 && @cy == 0) || (@sx == 0 && @sy == 0)
770
        #
771
        @hbs = @hbr = true # Sending/Receiving heartbeats. Assume yes at first.
772
        # Check for sending
773
        @hbs = false if @cx == 0 || @sy == 0
774
        # Check for receiving
775
        @hbr = false if @sx == 0 || @cy == 0
776
        # Should not do heartbeats at all
777
        return if (!@hbs && !@hbr)
778
        # If sending
779
        if @hbs
780
          sm = @cx >= @sy ? @cx : @sy # ticker interval, ms
781
          @sti = 1000.0 * sm # ticker interval, μs
782
          @ls = Time.now.to_f # best guess at start
783
          _start_send_ticker
784
        end
785
786
        # If receiving
787
        if @hbr
788
          rm = @sx >= @cy ? @sx : @cy # ticker interval, ms
789
          @rti = 1000.0 * rm # ticker interval, μs
790
          @lr = Time.now.to_f # best guess at start
791
          _start_receive_ticker
792
        end
793
794
      end
795
796
      def _start_send_ticker
797
        sleeptime = @sti / 1000000.0 # Sleep time secs
798
        @st = Thread.new {
799
          while true do
800
            sleep sleeptime
801
            curt = Time.now.to_f
802
            if @logger && @logger.respond_to?(:on_hbfire)
803
              @logger.on_hbfire(log_params, "send_fire", curt)
804
            end
805
            delta = curt - @ls
806
            if delta > (@sti - (@sti/5.0)) / 1000000.0 # Be tolerant (minus)
807
              if @logger && @logger.respond_to?(:on_hbfire)
808
                @logger.on_hbfire(log_params, "send_heartbeat", curt)
809
              end
810
              # Send a heartbeat
811
              @transmit_semaphore.synchronize do
812
                begin
813
                  @socket.puts
814
                  @ls = curt # Update last send
815
                  @hb_sent = true # Reset if necessary
816
                rescue Exception => sendex
817
                  @hb_sent = false # Set the warning flag
818
                  if @logger && @logger.respond_to?(:on_hbwrite_fail)
819
                    @logger.on_hbwrite_fail(log_params, {"ticker_interval" => @sti,
820
                      "exception" => sendex}) 
821
                  end
822
                  raise # Re-raise.  What else could be done here?
823
                end
824
              end
825
            end
826
            Thread.pass
827
          end
828
        }
829
      end
830
831
      def _start_receive_ticker
832
        sleeptime = @rti / 1000000.0 # Sleep time secs
833
        @rt = Thread.new {
834
          while true do
835
            sleep sleeptime
836
            curt = Time.now.to_f
837
            if @logger && @logger.respond_to?(:on_hbfire)
838
              @logger.on_hbfire(log_params, "receive_fire", curt)
839
            end
840
            delta = curt - @lr
841
            if delta > ((@rti + (@rti/5.0)) / 1000000.0) # Be tolerant (plus)
842
              if @logger && @logger.respond_to?(:on_hbfire)
843
                @logger.on_hbfire(log_params, "receive_heartbeat", curt)
844
              end
845
              # Client code could be off doing something else (that is, no reading of
846
              # the socket has been requested by the caller).  Try to  handle that case.
847
              lock = @read_semaphore.try_lock
848
              if lock
849
                last_char = @socket.getc
850
                plc = parse_char(last_char)
851
                if plc == "\n" # Server Heartbeat
852
                  @lr = Time.now.to_f
853
                else
854
                  @socket.ungetc(last_char)
855
                end
856
                @read_semaphore.unlock
857
              else
858
                # Shrug.  Have not received one.  Just set warning flag.
859
                @hb_received = false
860
                if @logger && @logger.respond_to?(:on_hbread_fail)
861
                  @logger.on_hbread_fail(log_params, {"ticker_interval" => @rti}) 
862
                end
863
              end
864
            else
865
              @hb_received = true # Reset if necessary
866
            end
867
            Thread.pass
868
          end
869
        }
870
      end
871
872
    # Ref:
873
    # http://unicode.org/mail-arch/unicode-ml/y2003-m02/att-0467/01-The_Algorithm_to_Valide_an_UTF-8_String
874
    #
875
    def _valid_utf8?(string)
876
      case RUBY_VERSION
877
        when /1\.8\.[56]/
878
          bytes = []
879
          0.upto(string.length-1) {|i|
880
            bytes << string[i]
881
          }
882
        else
883
          bytes = string.bytes
884
      end
885
886
      #
887
      valid = true
888
      index = -1
889
      nb_hex = nil
890
      ni_hex = nil
891
      state = "start"
892
      next_byte_save = nil
893
      #
894
      bytes.each do |next_byte|
895
        index += 1
896
        next_byte_save = next_byte
897
        ni_hex = sprintf "%x", index
898
        nb_hex = sprintf "%x", next_byte
899
        # puts "Top: #{next_byte}(0x#{nb_hex}), index: #{index}(0x#{ni_hex})" if DEBUG
900
        case state
901
902
          # State: 'start'
903
          # The 'start' state:
904
          # * handles all occurrences of valid single byte characters i.e., the ASCII character set
905
          # * provides state transition logic for start bytes of valid characters with 2-4 bytes
906
          # * signals a validation failure for all other single bytes
907
          # 
908
          when "start"
909
            # puts "state: start" if DEBUG
910
            case next_byte
911
912
              # ASCII
913
              # * Input = 0x00-0x7F : change state to START
914
              when (0x00..0x7f)
915
                # puts "state: start 1" if DEBUG
916
                state = "start"
917
918
              # Start byte of two byte characters
919
              # * Input = 0xC2-0xDF: change state to A
920
              when (0xc2..0xdf)
921
                # puts "state: start 2" if DEBUG
922
                state = "a"
923
924
              # Start byte of some three byte characters
925
              # * Input = 0xE1-0xEC, 0xEE-0xEF: change state to B
926
              when (0xe1..0xec)
927
                # puts "state: start 3" if DEBUG
928
                state = "b"
929
              when (0xee..0xef)
930
                # puts "state: start 4" if DEBUG
931
                state = "b"
932
933
              # Start byte of special three byte characters
934
              # * Input = 0xE0: change state to C
935
              when 0xe0
936
                # puts "state: start 5" if DEBUG
937
                state = "c"
938
939
              # Start byte of the remaining three byte characters
940
              # * Input = 0xED: change state to D
941
              when 0xed
942
                # puts "state: start 6" if DEBUG
943
                state = "d"
944
945
              # Start byte of some four byte characters
946
              # * Input = 0xF1-0xF3:change state to E
947
              when (0xf1..0xf3)
948
                # puts "state: start 7" if DEBUG
949
                state = "e"
950
951
              # Start byte of special four byte characters
952
              # * Input = 0xF0: change state to F
953
              when 0xf0
954
                # puts "state: start 8" if DEBUG
955
                state = "f"
956
957
              # Start byte of very special four byte characters
958
              # * Input = 0xF4: change state to G
959
              when 0xf4
960
                # puts "state: start 9" if DEBUG
961
                state = "g"
962
963
              # All other single characters are invalid
964
              # * Input = Others (0x80-0xBF,0xC0-0xC1, 0xF5-0xFF): ERROR
965
              else
966
                valid = false
967
                break
968
            end # of the inner case, the 'start' state
969
970
          # The last continuation byte of a 2, 3, or 4 byte character
971
          # State: 'a'
972
          #  o Input = 0x80-0xBF: change state to START
973
          #  o Others: ERROR
974
          when "a"
975
            # puts "state: a" if DEBUG
976
            if (0x80..0xbf) === next_byte
977
              state = "start"
978
            else
979
              valid = false
980
              break
981
            end
982
983
          # The first continuation byte for most 3 byte characters
984
          # (those with start bytes in: 0xe1-0xec or 0xee-0xef)
985
          # State: 'b'
986
          # o Input = 0x80-0xBF: change state to A
987
          # o Others: ERROR
988
          when "b"
989
            # puts "state: b" if DEBUG
990
            if (0x80..0xbf) === next_byte
991
              state = "a"
992
            else
993
              valid = false
994
              break
995
            end
996
997
          # The first continuation byte for some special 3 byte characters
998
          # (those with start byte 0xe0)
999
          # State: 'c'
1000
          # o Input = 0xA0-0xBF: change state to A
1001
          # o Others: ERROR
1002
          when "c"
1003
            # puts "state: c" if DEBUG
1004
            if (0xa0..0xbf) === next_byte
1005
              state = "a"
1006
            else
1007
              valid = false
1008
              break
1009
            end
1010
1011
          # The first continuation byte for the remaining 3 byte characters
1012
          # (those with start byte 0xed)
1013
          # State: 'd'
1014
          # o Input = 0x80-0x9F: change state to A
1015
          # o Others: ERROR
1016
          when "d"
1017
            # puts "state: d" if DEBUG
1018
            if (0x80..0x9f) === next_byte
1019
              state = "a"
1020
            else
1021
              valid = false
1022
              break
1023
            end
1024
1025
          # The first continuation byte for some 4 byte characters
1026
          # (those with start bytes in: 0xf1-0xf3)
1027
          # State: 'e'
1028
          # o Input = 0x80-0xBF: change state to B
1029
          # o Others: ERROR
1030
          when "e"
1031
            # puts "state: e" if DEBUG
1032
            if (0x80..0xbf) === next_byte
1033
              state = "b"
1034
            else
1035
              valid = false
1036
              break
1037
            end
1038
1039
          # The first continuation byte for some special 4 byte characters
1040
          # (those with start byte 0xf0)
1041
          # State: 'f'
1042
          # o Input = 0x90-0xBF: change state to B
1043
          # o Others: ERROR
1044
          when "f"
1045
            # puts "state: f" if DEBUG
1046
            if (0x90..0xbf) === next_byte
1047
              state = "b"
1048
            else
1049
              valid = false
1050
              break
1051
            end
1052
1053
          # The first continuation byte for the remaining 4 byte characters
1054
          # (those with start byte 0xf4)
1055
          # State: 'g'
1056
          # o Input = 0x80-0x8F: change state to B
1057
          # o Others: ERROR
1058
          when "g"
1059
            # puts "state: g" if DEBUG
1060
            if (0x80..0x8f) === next_byte
1061
              state = "b"
1062
            else
1063
              valid = false
1064
              break
1065
            end
1066
1067
          #
1068
          else
1069
            raise RuntimeError, "state: default"
1070
        end
1071
      end
1072
      #
1073
      # puts "State at end: #{state}" if DEBUG
1074
      # Catch truncation at end of string
1075
      if valid and state != 'start'
1076
        # puts "Resetting valid value" if DEBUG
1077
        valid = false
1078
      end
1079
      #
1080
      valid
1081
    end # of _valid_utf8?
1082
1083
    def _headerCheck(h)
1084
      return if @protocol == Stomp::SPL_10 # Do nothing for this environment
1085
      #
1086
      h.each_pair do |k,v|
1087
        # Keys here are symbolized
1088
        ks = k.to_s
1089
        ks.force_encoding(Stomp::UTF8) if ks.respond_to?(:force_encoding)
1090
        raise Stomp::Error::UTF8ValidationError unless valid_utf8?(ks)
1091
        #
1092
        if v.is_a?(Array)
1093
          v.each do |e|
1094
            e.force_encoding(Stomp::UTF8) if e.respond_to?(:force_encoding)
1095
            raise Stomp::Error::UTF8ValidationError unless valid_utf8?(e)
1096
          end
1097
        else
1098
          vs = v.to_s # Values are usually Strings, but could be TrueClass or Symbol
1099
          vs.force_encoding(Stomp::UTF8) if vs.respond_to?(:force_encoding)
1100
          raise Stomp::Error::UTF8ValidationError unless valid_utf8?(vs)
1101
        end
1102
      end
1103
    end
1104
1105
    #
1106
    def _encodeHeaders(h)
1107
      eh = {}
1108
      h.each_pair do |k,v|
1109
        # Keys are symbolized
1110
        ks = k.to_s
1111
        if v.is_a?(Array)
1112
          kenc = Stomp::HeaderCodec::encode(ks)
1113
          eh[kenc] = []
1114
          v.each do |e|
1115
            eh[kenc] << Stomp::HeaderCodec::encode(e)
1116
          end
1117
        else
1118
          vs = v.to_s
1119
          eh[Stomp::HeaderCodec::encode(ks)] = Stomp::HeaderCodec::encode(vs)
1120
        end
1121
      end
1122
      eh
1123
    end
1124
1125
    #
1126
    def _decodeHeaders(h)
1127
      dh = {}
1128
      h.each_pair do |k,v|
1129
        # Keys here are NOT! symbolized
1130
        if v.is_a?(Array)
1131
          kdec = Stomp::HeaderCodec::decode(k)
1132
          dh[kdec] = []
1133
          v.each do |e|
1134
            dh[kdec] << Stomp::HeaderCodec::decode(e)
1135
          end
1136
        else
1137
          vs = v.to_s
1138
          dh[Stomp::HeaderCodec::decode(k)] = Stomp::HeaderCodec::decode(vs)
1139
        end
1140
      end
1141
      dh
1142
    end
1143
1144
  end # class
1145
1146
end # module