Skip to content

Commit

Permalink
Merge pull request #149 from localshred/abrandoned/inproc
Browse files Browse the repository at this point in the history
Abrandoned/inproc
  • Loading branch information
abrandoned committed Feb 11, 2014
2 parents 3f30006 + f1a97d3 commit 8fd0d55
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 4 deletions.
2 changes: 2 additions & 0 deletions lib/protobuf/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ class CLI < ::Thor
option :beacon_interval, :type => :numeric, :desc => 'Broadcast beacons every N seconds. (default: 5)'
option :beacon_port, :type => :numeric, :desc => 'Broadcast beacons to this port (default: value of ServiceDirectory.port)'
option :broadcast_beacons, :type => :boolean, :desc => 'Broadcast beacons for dynamic discovery (Currently only available with ZeroMQ).'
option :broadcast_busy, :type => :boolean, :default => false, :desc => 'Remove busy nodes from cluster when all workers are busy (Currently only available with ZeroMQ).'
option :debug, :type => :boolean, :default => false, :aliases => %w(-d), :desc => 'Debug Mode. Override log level to DEBUG.'
option :gc_pause_request, :type => :boolean, :default => false, :desc => 'Enable/Disable GC pause during request.'
option :print_deprecation_warnings, :type => :boolean, :default => nil, :desc => 'Cause use of deprecated fields to be printed or ignored.'
option :workers_only, :type => :boolean, :default => false, :desc => "Starts process with only workers (no broker/frontend is started) only relevant for Zmq Server"
option :worker_port, :type => :numeric, :default => nil, :desc => "Port for 'backend' where workers connect (defaults to port + 1)"
option :zmq_inproc, :type => :boolean, :default => true, :desc => 'Use inproc protocol for zmq Server/Broker/Worker'

def start(app_file)
debug_say('Configuring the rpc_server process')
Expand Down
10 changes: 9 additions & 1 deletion lib/protobuf/rpc/servers/zmq/broker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,15 @@ def init_poller
end

def init_zmq_context
@zmq_context = ZMQ::Context.new
if inproc?
@zmq_context = @server.zmq_context
else
@zmq_context = ZMQ::Context.new
end
end

def inproc?
!!@server.try(:inproc?)
end

def process_backend
Expand Down
14 changes: 12 additions & 2 deletions lib/protobuf/rpc/servers/zmq/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ class Server
DEFAULT_OPTIONS = {
:beacon_interval => 5,
:broadcast_beacons => false,
:broadcast_busy => false
:broadcast_busy => false,
:zmq_inproc => true,
}

attr_accessor :options, :workers
attr_reader :zmq_context

def initialize(options)
@options = DEFAULT_OPTIONS.merge(options)
Expand All @@ -43,7 +45,11 @@ def backend_port
end

def backend_uri
"tcp://#{backend_ip}:#{backend_port}"
if inproc?
"inproc://#{backend_ip}:#{backend_port}"
else
"tcp://#{backend_ip}:#{backend_port}"
end
end

def beacon_interval
Expand Down Expand Up @@ -121,6 +127,10 @@ def frontend_uri
"tcp://#{frontend_ip}:#{frontend_port}"
end

def inproc?
!!self.options[:zmq_inproc]
end

def maintenance_timeout
next_maintenance - Time.now.to_i
end
Expand Down
10 changes: 9 additions & 1 deletion lib/protobuf/rpc/servers/zmq/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,14 +76,22 @@ def send_data
private

def init_zmq_context
@zmq_context = ZMQ::Context.new
if inproc?
@zmq_context = @server.zmq_context
else
@zmq_context = ZMQ::Context.new
end
end

def init_backend_socket
@backend_socket = @zmq_context.socket(ZMQ::REQ)
zmq_error_check(@backend_socket.connect(@server.backend_uri))
end

def inproc?
!!@server.try(:inproc?)
end

def read_from_backend
frames = []
zmq_error_check(@backend_socket.recv_strings(frames))
Expand Down

0 comments on commit 8fd0d55

Please sign in to comment.