Skip to content

Commit

Permalink
Merge pull request #138 from nats-io/add-consumer-filters
Browse files Browse the repository at this point in the history
Add support for consumer multiple filters and streams/consumers metadata
  • Loading branch information
wallyqs authored Nov 22, 2023
2 parents 15ac9d4 + 545fccf commit 1fb937b
Show file tree
Hide file tree
Showing 10 changed files with 408 additions and 27 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ jobs:
include:
- name: "Ruby: 3.3.0"
rvm: "3.3.0-preview1"
- name: "Ruby: 3.2.0 (nats-server@dev)"
- name: "Ruby: 3.2.0 (nats-server@v2.9)"
rvm: "3.2.0"
env:
- NATS_SERVER_VERSION=dev
- NATS_SERVER_VERSION=v2.9.24
- name: "Ruby: 3.2.0 (nats-server@main)"
rvm: "3.2.0"
env:
- NATS_SERVER_VERSION=main
allow_failures:
- name: "Ruby: 3.3.0"
- name: "Ruby: 3.2.0 (nats-server@dev)"
- name: "Ruby: 3.2.0 (nats-server@main)"
9 changes: 6 additions & 3 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ group :test do
gem 'rake'
gem 'rspec'
gem 'benchmark-ips'
gem 'rails', require: false
gem 'activerecord', require: false
gem 'sqlite3', require: false
gem 'websocket'
end

Expand All @@ -19,3 +16,9 @@ end
group :development do
gem 'ruby-progressbar'
end

group :rails do
gem 'rails', require: false
gem 'activerecord', require: false
gem 'sqlite3', require: false
end
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
nats-pure (2.3.1)
nats-pure (2.4.0)
concurrent-ruby (~> 1.0)

GEM
Expand Down
89 changes: 82 additions & 7 deletions lib/nats/io/jetstream.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,50 @@ def publish(subject, payload="", **params)

# subscribe binds or creates a push subscription to a JetStream pull consumer.
#
# @param subject [String] Subject from which the messages will be fetched.
# @param subject [String, Array] Subject(s) from which the messages will be fetched.
# @param params [Hash] Options to customize the PushSubscription.
# @option params [String] :stream Name of the Stream to which the consumer belongs.
# @option params [String] :consumer Name of the Consumer to which the PushSubscription will be bound.
# @option params [String] :name Name of the Consumer to which the PushSubscription will be bound.
# @option params [String] :durable Consumer durable name from where the messages will be fetched.
# @option params [Hash] :config Configuration for the consumer.
# @return [NATS::JetStream::PushSubscription]
def subscribe(subject, params={}, &cb)
params[:consumer] ||= params[:durable]
stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]
params[:consumer] ||= params[:name]
multi_filter = case
when (subject.is_a?(Array) and subject.size == 1)
subject = subject.first
false
when (subject.is_a?(Array) and subject.size > 1)
true
end

#
stream = if params[:stream].nil?
if multi_filter
# Use the first subject to try to find the stream.
streams = subject.map do |s|
begin
find_stream_name_by_subject(s)
rescue NATS::JetStream::Error::NotFound
raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
end
end

# Ensure that the filter subjects are not ambiguous.
streams.uniq!
if streams.count > 1
raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
end

streams.first
else
find_stream_name_by_subject(subject)
end
else
params[:stream]
end

queue = params[:queue]
durable = params[:durable]
Expand Down Expand Up @@ -183,7 +217,11 @@ def subscribe(subject, params={}, &cb)
config.deliver_subject = deliver

# Auto created consumers use the filter subject.
config.filter_subject = subject
if multi_filter
config[:filter_subjects] ||= subject
else
config[:filter_subject] ||= subject
end

# Heartbeats / FlowControl
config.flow_control = flow_control
Expand Down Expand Up @@ -219,25 +257,57 @@ def subscribe(subject, params={}, &cb)

# pull_subscribe binds or creates a subscription to a JetStream pull consumer.
#
# @param subject [String] Subject from which the messages will be fetched.
# @param subject [String, Array] Subject or subjects from which the messages will be fetched.
# @param durable [String] Consumer durable name from where the messages will be fetched.
# @param params [Hash] Options to customize the PullSubscription.
# @option params [String] :stream Name of the Stream to which the consumer belongs.
# @option params [String] :consumer Name of the Consumer to which the PullSubscription will be bound.
# @option params [String] :name Name of the Consumer to which the PullSubscription will be bound.
# @option params [Hash] :config Configuration for the consumer.
# @return [NATS::JetStream::PullSubscription]
def pull_subscribe(subject, durable, params={})
if durable.empty? && !params[:consumer]
if (!durable or durable.empty?) && !(params[:consumer] or params[:name])
raise JetStream::Error::InvalidDurableName.new("nats: invalid durable name")
end
multi_filter = case
when (subject.is_a?(Array) and subject.size == 1)
subject = subject.first
false
when (subject.is_a?(Array) and subject.size > 1)
true
end

params[:consumer] ||= durable
stream = params[:stream].nil? ? find_stream_name_by_subject(subject) : params[:stream]
params[:consumer] ||= params[:name]
stream = if params[:stream].nil?
if multi_filter
# Use the first subject to try to find the stream.
streams = subject.map do |s|
begin
find_stream_name_by_subject(s)
rescue NATS::JetStream::Error::NotFound
raise NATS::JetStream::Error.new("nats: could not find stream matching filter subject '#{s}'")
end
end

# Ensure that the filter subjects are not ambiguous.
streams.uniq!
if streams.count > 1
raise NATS::JetStream::Error.new("nats: multiple streams matched filter subjects: #{streams}")
end

streams.first
else
find_stream_name_by_subject(subject)
end
else
params[:stream]
end
begin
consumer_info(stream, params[:consumer])
rescue NATS::JetStream::Error::NotFound => e
# If attempting to bind, then this is a hard error.
raise e if params[:stream]
raise e if params[:stream] and !multi_filter

config = if not params[:config]
JetStream::API::ConsumerConfig.new
Expand All @@ -248,6 +318,11 @@ def pull_subscribe(subject, durable, params={})
end
config[:durable_name] = durable
config[:ack_policy] ||= JS::Config::AckExplicit
if multi_filter
config[:filter_subjects] ||= subject
else
config[:filter_subject] ||= subject
end
add_consumer(stream, config)
end

Expand Down
4 changes: 4 additions & 0 deletions lib/nats/io/jetstream/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ def initialize(opts={})
:num_replicas,
# Force memory storage
:mem_storage,

# NATS v2.10 features
:metadata, :filter_subjects, :max_bytes,
keyword_init: true) do
def initialize(opts={})
# Filter unrecognized fields just in case.
Expand Down Expand Up @@ -192,6 +195,7 @@ def initialize(opts={})
:republish,
:allow_direct,
:mirror_direct,
:metadata,
keyword_init: true) do
def initialize(opts={})
# Filter unrecognized fields just in case.
Expand Down
23 changes: 21 additions & 2 deletions lib/nats/io/jetstream/manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,37 @@ def add_consumer(stream, config, params={})
else
config
end

config[:name] ||= config[:durable_name]
req_subject = case
when config[:name]
# NOTE: Only supported after nats-server v2.9.0
###############################################################################
# #
# Using names is the supported way of creating consumers (NATS +v2.9.0. #
# #
###############################################################################
if config[:filter_subject] && config[:filter_subject] != ">"
"#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}.#{config[:filter_subject]}"
else
##############################################################################
# #
# Endpoint to support creating ANY consumer with multi-filters (NATS +v2.10) #
# #
##############################################################################
"#{@prefix}.CONSUMER.CREATE.#{stream}.#{config[:name]}"
end
when config[:durable_name]
###############################################################################
# #
# Endpoint to support creating DURABLES before NATS v2.9.0. #
# #
###############################################################################
"#{@prefix}.CONSUMER.DURABLE.CREATE.#{stream}.#{config[:durable_name]}"
else
###############################################################################
# #
# Endpoint to support creating EPHEMERALS before NATS v2.9.0. #
# #
###############################################################################
"#{@prefix}.CONSUMER.CREATE.#{stream}"
end

Expand Down
2 changes: 1 addition & 1 deletion lib/nats/io/version.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
module NATS
module IO
# VERSION is the version of the client announced on CONNECT to the server.
VERSION = "2.3.1".freeze
VERSION = "2.4.0".freeze

# LANG is the lang runtime of the client announced on CONNECT to the server.
LANG = "#{RUBY_ENGINE}#{RUBY_VERSION}".freeze
Expand Down
18 changes: 11 additions & 7 deletions spec/integration/rails_spec.rb
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
require 'rails'
require 'nats/io/rails'
require 'rails/application'
require 'active_record'
require 'active_record/railtie'

begin
require 'rails'
require 'nats/io/rails'
require 'rails/application'
require 'active_record'
require 'active_record/railtie'
rescue LoadError
end

require 'spec_helper'

describe 'Rails integration' do

before(:all) do
skip 'rails not installed' if not defined?(Rails)
@serverctl = NatsServerControl.new.tap { |s| s.start_server(true) }
end

after(:all) do
@serverctl.kill_server
@serverctl.kill_server if @serverctl
end

around(:each) do |example|
Expand Down
Loading

0 comments on commit 1fb937b

Please sign in to comment.