Skip to content

Commit

Permalink
Merge pull request #220 from localshred/abrandoned/set_timeout_on_wor…
Browse files Browse the repository at this point in the history
…kers_avail

Abrandoned/set timeout on workers avail
  • Loading branch information
abrandoned committed Sep 16, 2014
2 parents 7fd71e3 + 68bac99 commit 98fd72f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 20 deletions.
4 changes: 2 additions & 2 deletions lib/protobuf/rpc/connectors/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ module Connectors
:request => nil, # The request object sent by the client
:request_type => nil, # The request type expected by the client
:response_type => nil, # The response type expected by the client
:timeout => 300, # The default timeout for the request, also handled by client.rb
:timeout => nil, # The timeout for the request, also handled by client.rb
:client_host => nil, # The hostname or address of this client
:first_alive_load_balance => false, # Do we want to use check_avail frames before request
:first_alive_load_balance => false, # Do we want to use check_avail frames before request
}

class Base
Expand Down
12 changes: 10 additions & 2 deletions lib/protobuf/rpc/connectors/common.rb
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,20 @@ def succeed(response)
complete
end

def timeout
if options[:timeout]
options[:timeout]
else
300 # seconds
end
end

# Wrap the given block in a timeout of the configured number of seconds.
#
def timeout_wrap(&block)
::Timeout.timeout(options[:timeout], &block)
::Timeout.timeout(timeout, &block)
rescue ::Timeout::Error
fail(:RPC_FAILED, "The server took longer than #{options[:timeout]} seconds to respond")
fail(:RPC_FAILED, "The server took longer than #{timeout} seconds to respond")
end

def validate_request_type!
Expand Down
84 changes: 68 additions & 16 deletions lib/protobuf/rpc/connectors/zmq.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ module Connectors
class Zmq < Base
RequestTimeout = Class.new(RuntimeError)
ZmqRecoverableError = Class.new(RuntimeError)
ZmqEagainError = Class.new(RuntimeError)

##
# Included Modules
Expand Down Expand Up @@ -58,6 +59,13 @@ def log_signature
##
# Private Instance methods
#
def check_available_rcv_timeout
@check_available_rcv_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i, 200].max
end

def check_available_snd_timeout
@check_available_snd_timeout ||= [ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i, 200].max
end

def close_connection
# The socket is automatically closed after every request.
Expand All @@ -80,6 +88,8 @@ def create_socket
if first_alive_load_balance?
begin
check_available_response = ""
socket.setsockopt(::ZMQ::RCVTIMEO, check_available_rcv_timeout)
socket.setsockopt(::ZMQ::SNDTIMEO, check_available_snd_timeout)
zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string)

Expand All @@ -88,6 +98,9 @@ def create_socket
end
rescue ZmqRecoverableError
socket = nil # couldn't make a connection and need to try again
else
socket.setsockopt(::ZMQ::RCVTIMEO, -1)
socket.setsockopt(::ZMQ::SNDTIMEO, -1)
end
end
end
Expand Down Expand Up @@ -163,35 +176,56 @@ def ping_port_open?(host)
#
def send_request_with_lazy_pirate
attempt = 0
timeout = options[:timeout].to_f

begin
attempt += 1
send_request_with_timeout(timeout, attempt)
send_request_with_timeout(attempt)
parse_response
rescue RequestTimeout
retry if attempt < CLIENT_RETRIES
fail(:RPC_FAILED, "The server repeatedly failed to respond within #{timeout} seconds")
end
end

def send_request_with_timeout(timeout, attempt = 0)
socket = create_socket
def rcv_timeout
@rcv_timeout ||= begin
case
when options[:timeout] then
options[:timeout]
when ENV.has_key?("PB_ZMQ_CLIENT_RCV_TIMEOUT") then
ENV["PB_ZMQ_CLIENT_RCV_TIMEOUT"].to_i
else
300_000 # 300 seconds
end
end
end

poller = ::ZMQ::Poller.new
poller.register_readable(socket)
def snd_timeout
@snd_timeout ||= begin
case
when options[:timeout] then
options[:timeout]
when ENV.has_key?("PB_ZMQ_CLIENT_SND_TIMEOUT") then
ENV["PB_ZMQ_CLIENT_SND_TIMEOUT"].to_i
else
300_000 # 300 seconds
end
end
end

def send_request_with_timeout(attempt = 0)
socket = create_socket
socket.setsockopt(::ZMQ::RCVTIMEO, rcv_timeout)
socket.setsockopt(::ZMQ::SNDTIMEO, snd_timeout)

logger.debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") }
zmq_error_check(socket.send_string(@request_data), :socket_send_string)
logger.debug { sign_message("Waiting #{timeout} seconds for response (attempt #{attempt}, #{socket})") }

if poller.poll(timeout * 1000) == 1
zmq_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
else
logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
raise RequestTimeout
end
zmq_eagain_error_check(socket.send_string(@request_data), :socket_send_string)
logger.debug { sign_message("Waiting #{rcv_timeout}ms for response (attempt #{attempt}, #{socket})") }
zmq_eagain_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
rescue ZmqEagainError
logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
raise RequestTimeout
ensure
logger.debug { sign_message("Closing Socket") }
zmq_error_check(socket.close, :socket_close) if socket
Expand Down Expand Up @@ -221,6 +255,24 @@ def zmq_context
self.class.zmq_context
end

def zmq_eagain_error_check(return_code, source)
unless ::ZMQ::Util.resultcode_ok?(return_code || -1)
if ::ZMQ::Util.errno == ::ZMQ::EAGAIN
raise ZmqEagainError, <<-ERROR
Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
#{caller(1).join($/)}
ERROR
else
raise <<-ERROR
Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
#{caller(1).join($/)}
ERROR
end
end
end

def zmq_error_check(return_code, source)
unless ::ZMQ::Util.resultcode_ok?(return_code || -1)
raise <<-ERROR
Expand Down

0 comments on commit 98fd72f

Please sign in to comment.