diff --git a/lib/protobuf/rpc/connectors/base.rb b/lib/protobuf/rpc/connectors/base.rb index 9f7735ab..c2a6b9d6 100644 --- a/lib/protobuf/rpc/connectors/base.rb +++ b/lib/protobuf/rpc/connectors/base.rb @@ -17,9 +17,9 @@ module Connectors :request => nil, # The request object sent by the client :request_type => nil, # The request type expected by the client :response_type => nil, # The response type expected by the client - :timeout => 300, # The default timeout for the request, also handled by client.rb + :timeout => nil, # The timeout for the request, also handled by client.rb :client_host => nil, # The hostname or address of this client - :first_alive_load_balance => false, # Do we want to use check_avail frames before request + :first_alive_load_balance => false, # Do we want to use check_avail frames before request } class Base diff --git a/lib/protobuf/rpc/connectors/common.rb b/lib/protobuf/rpc/connectors/common.rb index 8c7a84cd..2ef5b9a1 100644 --- a/lib/protobuf/rpc/connectors/common.rb +++ b/lib/protobuf/rpc/connectors/common.rb @@ -135,12 +135,20 @@ def succeed(response) complete end + def timeout + if options[:timeout] + options[:timeout] + else + 300 # seconds + end + end + # Wrap the given block in a timeout of the configured number of seconds. # def timeout_wrap(&block) - ::Timeout.timeout(options[:timeout], &block) + ::Timeout.timeout(timeout, &block) rescue ::Timeout::Error - fail(:RPC_FAILED, "The server took longer than #{options[:timeout]} seconds to respond") + fail(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond") end def validate_request_type! diff --git a/lib/protobuf/rpc/connectors/zmq.rb b/lib/protobuf/rpc/connectors/zmq.rb index f4fd0c67..da7e9cd3 100644 --- a/lib/protobuf/rpc/connectors/zmq.rb +++ b/lib/protobuf/rpc/connectors/zmq.rb @@ -8,6 +8,7 @@ module Connectors class Zmq < Base RequestTimeout = Class.new(RuntimeError) ZmqRecoverableError = Class.new(RuntimeError) + ZmqEagainError = Class.new(RuntimeError) ## # Included Modules @@ -58,6 +59,13 @@ def log_signature ## # Private Instance methods # + def check_available_rcv_timeout + @check_available_rcv_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i, 200].max + end + + def check_available_snd_timeout + @check_available_snd_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i, 200].max + end def close_connection # The socket is automatically closed after every request. @@ -80,6 +88,8 @@ def create_socket if first_alive_load_balance? begin check_available_response = "" + socket.setsockopt(::ZMQ::RCVTIMEO, check_available_rcv_timeout) + socket.setsockopt(::ZMQ::SNDTIMEO, check_available_snd_timeout) zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string) zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string) @@ -88,6 +98,9 @@ def create_socket end rescue ZmqRecoverableError socket = nil # couldn't make a connection and need to try again + else + socket.setsockopt(::ZMQ::RCVTIMEO, -1) + socket.setsockopt(::ZMQ::SNDTIMEO, -1) end end end @@ -163,11 +176,10 @@ def ping_port_open?(host) # def send_request_with_lazy_pirate attempt = 0 - timeout = options[:timeout].to_f begin attempt += 1 - send_request_with_timeout(timeout, attempt) + send_request_with_timeout(attempt) parse_response rescue RequestTimeout retry if attempt < CLIENT_RETRIES @@ -175,23 +187,45 @@ def send_request_with_lazy_pirate end end - def send_request_with_timeout(timeout, attempt = 0) - socket = create_socket + def rcv_timeout + @rcv_timeout ||= begin + case + when options[:timeout] then + options[:timeout] + when ENV.has_key?("PB_ZMQ_CLIENT_RCV_TIMEOUT") then + ENV["PB_ZMQ_CLIENT_RCV_TIMEOUT"].to_i + else + 300_000 # 300 seconds + end + end + end - poller = ::ZMQ::Poller.new - poller.register_readable(socket) + def snd_timeout + @snd_timeout ||= begin + case + when options[:timeout] then + options[:timeout] + when ENV.has_key?("PB_ZMQ_CLIENT_SND_TIMEOUT") then + ENV["PB_ZMQ_CLIENT_SND_TIMEOUT"].to_i + else + 300_000 # 300 seconds + end + end + end + + def send_request_with_timeout(attempt = 0) + socket = create_socket + socket.setsockopt(::ZMQ::RCVTIMEO, rcv_timeout) + socket.setsockopt(::ZMQ::SNDTIMEO, snd_timeout) logger.debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") } - zmq_error_check(socket.send_string(@request_data), :socket_send_string) - logger.debug { sign_message("Waiting #{timeout} seconds for response (attempt #{attempt}, #{socket})") } - - if poller.poll(timeout * 1000) == 1 - zmq_error_check(socket.recv_string(@response_data = ""), :socket_recv_string) - logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") } - else - logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") } - raise RequestTimeout - end + zmq_eagain_error_check(socket.send_string(@request_data), :socket_send_string) + logger.debug { sign_message("Waiting #{rcv_timeout}ms for response (attempt #{attempt}, #{socket})") } + zmq_eagain_error_check(socket.recv_string(@response_data = ""), :socket_recv_string) + logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") } + rescue ZmqEagainError + logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") } + raise RequestTimeout ensure logger.debug { sign_message("Closing Socket") } zmq_error_check(socket.close, :socket_close) if socket @@ -221,6 +255,24 @@ def zmq_context self.class.zmq_context end + def zmq_eagain_error_check(return_code, source) + unless ::ZMQ::Util.resultcode_ok?(return_code || -1) + if ::ZMQ::Util.errno == ::ZMQ::EAGAIN + raise ZmqEagainError, <<-ERROR + Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}". + + #{caller(1).join($/)} + ERROR + else + raise <<-ERROR + Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}". + + #{caller(1).join($/)} + ERROR + end + end + end + def zmq_error_check(return_code, source) unless ::ZMQ::Util.resultcode_ok?(return_code || -1) raise <<-ERROR