From f72852e4aabe7b015182451c35dda2ecc0ac17f2 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 10 Feb 2014 18:55:58 -0700 Subject: [PATCH 1/4] add ability to use inproc instead of tcp for server-broker-worker which enables zero-copy messaging --- lib/protobuf/cli.rb | 1 + lib/protobuf/rpc/servers/zmq/broker.rb | 10 +++++++++- lib/protobuf/rpc/servers/zmq/server.rb | 24 +++++++++++++++++++++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/lib/protobuf/cli.rb b/lib/protobuf/cli.rb index 3af746d8..e7fb79a7 100644 --- a/lib/protobuf/cli.rb +++ b/lib/protobuf/cli.rb @@ -37,6 +37,7 @@ class CLI < ::Thor option :print_deprecation_warnings, :type => :boolean, :default => nil, :desc => 'Cause use of deprecated fields to be printed or ignored.' option :workers_only, :type => :boolean, :default => false, :desc => "Starts process with only workers (no broker/frontend is started) only relevant for Zmq Server" option :worker_port, :type => :numeric, :default => nil, :desc => "Port for 'backend' where workers connect (defaults to port + 1)" + option :zmq_inproc, :type => :boolean, :default => true, :desc => 'Use inproc protocol for zmq Server/Broker/Worker' def start(app_file) debug_say('Configuring the rpc_server process') diff --git a/lib/protobuf/rpc/servers/zmq/broker.rb b/lib/protobuf/rpc/servers/zmq/broker.rb index 492ef44d..36ea0c58 100644 --- a/lib/protobuf/rpc/servers/zmq/broker.rb +++ b/lib/protobuf/rpc/servers/zmq/broker.rb @@ -64,7 +64,15 @@ def init_poller end def init_zmq_context - @zmq_context = ZMQ::Context.new + if inproc? + @zmq_context = ::Protobuf::Rpc::Zmq::Server.zmq_context + else + @zmq_context = ZMQ::Context.new + end + end + + def inproc? + @server.try(:inproc?) end def process_backend diff --git a/lib/protobuf/rpc/servers/zmq/server.rb b/lib/protobuf/rpc/servers/zmq/server.rb index ddc71f59..db3df2f3 100644 --- a/lib/protobuf/rpc/servers/zmq/server.rb +++ b/lib/protobuf/rpc/servers/zmq/server.rb @@ -12,11 +12,17 @@ class Server DEFAULT_OPTIONS = { :beacon_interval => 5, - :broadcast_beacons => false + :broadcast_beacons => false, + :zmq_inproc => true } attr_accessor :options, :workers + # Share zmq_context when using inproc + def self.zmq_context + @zmq_context ||= ZMQ::Context.new + end + def initialize(options) @options = DEFAULT_OPTIONS.merge(options) @workers = [] @@ -38,7 +44,11 @@ def backend_port end def backend_uri - "tcp://#{backend_ip}:#{backend_port}" + if inproc? + "inproc://#{backend_ip}:#{backend_port}" + else + "tcp://#{backend_ip}:#{backend_port}" + end end def beacon_interval @@ -260,7 +270,15 @@ def init_shutdown_pipe end def init_zmq_context - @zmq_context = ZMQ::Context.new + if inproc? + @zmq_context = self.class.zmq_context + else + @zmq_context = ZMQ::Context.new + end + end + + def inproc? + !!self.options[:zmq_inproc] end def start_broker From 2b905a3d5aed0e9b821ddc79f11d96c3d11dd911 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 10 Feb 2014 19:06:47 -0700 Subject: [PATCH 2/4] still use an instance context and make it avail to broker/worker and make inproc? public --- lib/protobuf/rpc/servers/zmq/broker.rb | 4 ++-- lib/protobuf/rpc/servers/zmq/server.rb | 20 ++++++-------------- lib/protobuf/rpc/servers/zmq/worker.rb | 10 +++++++++- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/lib/protobuf/rpc/servers/zmq/broker.rb b/lib/protobuf/rpc/servers/zmq/broker.rb index 36ea0c58..34ed5c1e 100644 --- a/lib/protobuf/rpc/servers/zmq/broker.rb +++ b/lib/protobuf/rpc/servers/zmq/broker.rb @@ -65,14 +65,14 @@ def init_poller def init_zmq_context if inproc? - @zmq_context = ::Protobuf::Rpc::Zmq::Server.zmq_context + @zmq_context = @server.zmq_context else @zmq_context = ZMQ::Context.new end end def inproc? - @server.try(:inproc?) + !!@server.try(:inproc?) end def process_backend diff --git a/lib/protobuf/rpc/servers/zmq/server.rb b/lib/protobuf/rpc/servers/zmq/server.rb index db3df2f3..2acd61f2 100644 --- a/lib/protobuf/rpc/servers/zmq/server.rb +++ b/lib/protobuf/rpc/servers/zmq/server.rb @@ -17,11 +17,7 @@ class Server } attr_accessor :options, :workers - - # Share zmq_context when using inproc - def self.zmq_context - @zmq_context ||= ZMQ::Context.new - end + attr_reader :zmq_context def initialize(options) @options = DEFAULT_OPTIONS.merge(options) @@ -122,6 +118,10 @@ def frontend_uri "tcp://#{frontend_ip}:#{frontend_port}" end + def inproc? + !!self.options[:zmq_inproc] + end + def maintenance_timeout next_maintenance - Time.now.to_i end @@ -270,15 +270,7 @@ def init_shutdown_pipe end def init_zmq_context - if inproc? - @zmq_context = self.class.zmq_context - else - @zmq_context = ZMQ::Context.new - end - end - - def inproc? - !!self.options[:zmq_inproc] + @zmq_context = ZMQ::Context.new end def start_broker diff --git a/lib/protobuf/rpc/servers/zmq/worker.rb b/lib/protobuf/rpc/servers/zmq/worker.rb index 964f823d..7729410f 100644 --- a/lib/protobuf/rpc/servers/zmq/worker.rb +++ b/lib/protobuf/rpc/servers/zmq/worker.rb @@ -74,7 +74,11 @@ def send_data private def init_zmq_context - @zmq_context = ZMQ::Context.new + if inproc? + @zmq_context = @server.zmq_context + else + @zmq_context = ZMQ::Context.new + end end def init_backend_socket @@ -82,6 +86,10 @@ def init_backend_socket zmq_error_check(@backend_socket.connect(@server.backend_uri)) end + def inproc? + !!@server.try(:inproc?) + end + def read_from_backend frames = [] zmq_error_check(@backend_socket.recv_strings(frames)) From 040aa4698b8e49f10d24c219fdf52b9f1f3fda8e Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 10 Feb 2014 20:40:06 -0700 Subject: [PATCH 3/4] default inproc to true --- lib/protobuf/rpc/servers/zmq/server.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/protobuf/rpc/servers/zmq/server.rb b/lib/protobuf/rpc/servers/zmq/server.rb index 2272b638..9e5cbd55 100644 --- a/lib/protobuf/rpc/servers/zmq/server.rb +++ b/lib/protobuf/rpc/servers/zmq/server.rb @@ -13,8 +13,8 @@ class Server DEFAULT_OPTIONS = { :beacon_interval => 5, :broadcast_beacons => false, - :zmq_inproc => true - :broadcast_busy => false + :broadcast_busy => false, + :zmq_inproc => true, } attr_accessor :options, :workers From f1a97d329b687132de5c471725b8c9bed9f71078 Mon Sep 17 00:00:00 2001 From: Brandon Dewitt Date: Mon, 10 Feb 2014 20:44:03 -0700 Subject: [PATCH 4/4] put broadcast_busy in cli options --- lib/protobuf/cli.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/protobuf/cli.rb b/lib/protobuf/cli.rb index e7fb79a7..f0ad47e8 100644 --- a/lib/protobuf/cli.rb +++ b/lib/protobuf/cli.rb @@ -32,6 +32,7 @@ class CLI < ::Thor option :beacon_interval, :type => :numeric, :desc => 'Broadcast beacons every N seconds. (default: 5)' option :beacon_port, :type => :numeric, :desc => 'Broadcast beacons to this port (default: value of ServiceDirectory.port)' option :broadcast_beacons, :type => :boolean, :desc => 'Broadcast beacons for dynamic discovery (Currently only available with ZeroMQ).' + option :broadcast_busy, :type => :boolean, :default => false, :desc => 'Remove busy nodes from cluster when all workers are busy (Currently only available with ZeroMQ).' option :debug, :type => :boolean, :default => false, :aliases => %w(-d), :desc => 'Debug Mode. Override log level to DEBUG.' option :gc_pause_request, :type => :boolean, :default => false, :desc => 'Enable/Disable GC pause during request.' option :print_deprecation_warnings, :type => :boolean, :default => nil, :desc => 'Cause use of deprecated fields to be printed or ignored.'