Skip to content

Commit

Permalink
Ability to dispatch to list of selected nodes.
Browse files Browse the repository at this point in the history
  • Loading branch information
alfetahe committed Jan 23, 2024
1 parent 74ec496 commit fec6108
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ v0.1.1 - XXXX-XX-XX
Added
- New dispatch option `atomic_priority_set` to set priority and dispatch event in one atomic operation.
- New dispatch option for `members` called `external` to dispatch event to nodes except the local node.
- New dispatch option for `members` to dispatch event to list of selected nodes.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ By default `blockade` will dispatch events to all subscribers across the cluster
- `members` - define which members should receive the event. available options are:
- `global` - dispatch event to all members within the cluster.
- `local` - dispatch event to local handlers only.
- `external` - dispatch event to nodes except the local node.

- `external` - dispatch event to all nodes except the local node.
- `[node()]` - dispatch event to a specific nodes.
Default is `global`.

- `priority` - set priority level for the event. Default is `0`. The if current priority on the event queue
Expand Down
2 changes: 1 addition & 1 deletion src/blockade.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#{name => event_manager(), priority => priority(), discard_events => event_discard()}.
%% Start up options which can be passed to the start_link function.
-type dispatch_opts() ::
#{priority => priority(), members => local | global | external,
#{priority => priority(), members => local | global | external | [node()],
discard_event => event_discard(), atomic_priority_set => priority()}.
%% Dispatch options.
-type queued_event() :: {event(), event_payload(), dispatch_opts()}. %% Queued event.
Expand Down
3 changes: 3 additions & 0 deletions src/blockade_service.erl
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ member_pids(Scope, Event, MemberType) when MemberType == global ->
member_pids(Scope, Event, MemberType) when MemberType == external ->
LocalNode = node(),
lists:filter(fun(Pid) -> node(Pid) =/= LocalNode end, pg:get_members(Scope, Event));
member_pids(Scope, Event, MemberType) when is_list(MemberType) ->
lists:filter(fun(Pid) -> lists:member(node(Pid), MemberType) end,
pg:get_members(Scope, Event));
member_pids(_Scope, _Event, _MemberType) ->
throw({error, invalid_members_option}).

Expand Down
25 changes: 22 additions & 3 deletions test/blockade_dist_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
end_per_testcase/2, test_get_handlers_dist/1, test_get_events_dist/1]).
-export([test_add_handler_dist/1, test_remove_handler_dist/1, test_dispatch_sync_dist/1,
test_dispatch_dist/1, test_dispatch_dist_prio/1, test_dispatch_dist_memb_local/1,
test_dispatch_dist_memb_global/1, test_dispatch_dist_memb_external/1,
test_get_set_priority_dist/1, test_get_event_queue_dist/1,
test_prune_event_queue_dist/1]).
test_dispatch_dist_memb_selection/1, test_dispatch_dist_memb_global/1,
test_dispatch_dist_memb_external/1, test_get_set_priority_dist/1,
test_get_event_queue_dist/1, test_prune_event_queue_dist/1]).

all() ->
[test_add_handler_dist,
Expand All @@ -26,6 +26,7 @@ all() ->
test_dispatch_dist_prio,
test_dispatch_dist_memb_local,
test_dispatch_dist_memb_global,
test_dispatch_dist_memb_selection,
test_dispatch_dist_memb_external,
test_get_set_priority_dist,
test_get_event_queue_dist,
Expand Down Expand Up @@ -194,6 +195,24 @@ test_dispatch_dist_memb_external(Config) ->
Total = length(AllMessages),
true = lists:all(fun(Resp) -> Resp =:= memb_external end, AllMessages).

test_dispatch_dist_memb_selection(Config) ->
Nodes = ?config(nodes, Config),
% Take the first 5 nodes from the list.
SelectedNodes = lists:map(fun({ok, _Pid, Node}) -> Node end, lists:sublist(Nodes, 5)),
blockade_test_helper:add_handler_nodes(test_dispatch_dist_memb_selection,
memb_selection,
Nodes),
blockade:dispatch(test_dispatch_dist_memb_selection,
memb_selection,
{memb_selection, self()},
#{members => SelectedNodes}),

% Need to do one test sync call to make sure all selected nodes have handled the event.
blockade_test_helper:test_sync_msg(test_dispatch_dist_memb_selection, Nodes),
AllMessages = blockade_test_helper:get_all_messages([]),
5 = length(AllMessages),
true = lists:all(fun(Resp) -> Resp =:= memb_selection end, AllMessages).

test_dispatch_dist_memb_global(Config) ->
Nodes = ?config(nodes, Config),
blockade_test_helper:add_handler_nodes(test_dispatch_dist_memb_global,
Expand Down
5 changes: 5 additions & 0 deletions test/blockade_service_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ test_member_pids(Config) ->
ExternalMembers = blockade_service:member_pids(Scope, test_event, external),
false = lists:member(LocalPid, ExternalMembers),
GlobalMembers = blockade_service:member_pids(Scope, test_event, global),

Nodes = ?config(nodes, Config),
SelectedNodes = lists:map(fun({ok, _Pid, Node}) -> Node end, lists:sublist(Nodes, 2)),
SelectedPids = blockade_service:member_pids(Scope, test_event, SelectedNodes),
true = lists:all(fun(Pid) -> lists:member(node(Pid), SelectedNodes) end, SelectedPids),
?NR_OF_NODES + 1 = length(GlobalMembers),
true = lists:all(fun(Pid) -> is_pid(Pid) end, GlobalMembers).

Expand Down

0 comments on commit fec6108

Please sign in to comment.