From a477ae4637e2bf2a5b8198bc6cf5c58243eab920 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Sat, 30 Dec 2023 22:07:48 +0100 Subject: [PATCH 01/10] New methods + using existing helper --- src/cable/connection.cr | 6 +++++- src/cable/server.cr | 10 +++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/src/cable/connection.cr b/src/cable/connection.cr index d077ab2..f8acde7 100644 --- a/src/cable/connection.cr +++ b/src/cable/connection.cr @@ -52,6 +52,10 @@ module Cable @connection_rejected = true end + def channels : Array(Channel) + Connection::CHANNELS[connection_identifier] + end + def closed? : Bool socket.closed? end @@ -75,7 +79,7 @@ module Cable end def send_message(message : String) - return if socket.closed? + return if closed? socket.send(message) end diff --git a/src/cable/server.cr b/src/cable/server.cr index 55767d1..ea2c512 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -70,6 +70,14 @@ module Cable connections.delete(connection_id).try(&.close) end + def active_connections_for(token : String) : Array(Connection) + connections.values.select { |connection| connection.token == token && !connection.closed? } + end + + def subscribed_channels_for(token : String) : Array(Channel) + active_connections_for(token).map { |connection| connection.channels } + end + def subscribe_channel(channel : Channel, identifier : String) @channel_mutex.synchronize do if !@channels.has_key?(identifier) @@ -112,7 +120,7 @@ module Cable @channels[channel_identifier].each do |channel| # TODO: would be nice to have a test where we open two connections # close one, and make sure the other one receives the message - if channel.connection.socket.closed? + if channel.connection.closed? channel.close else Cable::Logger.info { "#{channel.class} transmitting #{parsed_message} (via streamed from #{channel.stream_identifier})" } From 48e026a7d42f6dd754df6557b80f518d472b77be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Sat, 30 Dec 2023 22:30:51 +0100 Subject: [PATCH 02/10] DRY suggestions --- src/cable/channel.cr | 51 +++++--------------------------------------- 1 file changed, 5 insertions(+), 46 deletions(-) diff --git a/src/cable/channel.cr b/src/cable/channel.cr index 10c4426..4150e86 100644 --- a/src/cable/channel.cr +++ b/src/cable/channel.cr @@ -59,7 +59,7 @@ module Cable @stream_identifier = stream_identifier.to_s end - def self.broadcast_to(channel : String, message : JSON::Any) + def self.broadcast_to(channel : String, message : JSON::Any | Hash(String, String)) Cable::Logger.info { "[ActionCable] Broadcasting to #{channel}: #{message}" } Cable.server.publish(channel, message.to_json) end @@ -70,60 +70,19 @@ module Cable Cable.server.publish(channel, message) end - def self.broadcast_to(channel : String, message : Hash(String, String)) - Cable::Logger.info { "[ActionCable] Broadcasting to #{channel}: #{message}" } - Cable.server.publish(channel, message.to_json) - end - - def broadcast(message : String) - if stream_id = stream_identifier.presence - Cable::Logger.info { "[ActionCable] Broadcasting to #{self.class}: #{message}" } - Cable.server.send_to_channels(stream_id, message) - else - Cable::Logger.error { "#{self.class}.transmit(message : String) with #{message} without already using stream_from(stream_identifier)" } - end - end - - def broadcast(message : JSON::Any) + def broadcast(message : String | JSON::Any | Hash(String, String)) if stream_id = stream_identifier.presence Cable::Logger.info { "[ActionCable] Broadcasting to #{self.class}: #{message}" } Cable.server.send_to_channels(stream_id, message) else - Cable::Logger.error { "#{self.class}.transmit(message : JSON::Any) with #{message} without already using stream_from(stream_identifier)" } + Cable::Logger.error { "#{self.class}.transmit(message : #{message.class}) with #{message} without already using stream_from(stream_identifier)" } end end - def broadcast(message : Hash(String, String)) - if stream_id = stream_identifier.presence - Cable::Logger.info { "[ActionCable] Broadcasting to #{self.class}: #{message}" } - Cable.server.send_to_channels(stream_id, message.to_json) - else - Cable::Logger.error { "#{self.class}.transmit(message : Hash(String, String)) with #{message} without already using stream_from(stream_identifier)" } - end - end - - # broadcast single message to single connection for this channel - def transmit(message : String) - Cable::Logger.info { "[ActionCable] transmitting to #{self.class}: #{message}" } - connection.socket.send({ - identifier: identifier, - message: Cable.server.safe_decode_message(message), - }.to_json) - end - - # broadcast single message to single connection for this channel - def transmit(message : JSON::Any) - Cable::Logger.info { "[ActionCable] transmitting to #{self.class}: #{message}" } - connection.socket.send({ - identifier: identifier, - message: Cable.server.safe_decode_message(message), - }.to_json) - end - # broadcast single message to single connection for this channel - def transmit(message : Hash(String, String)) + def transmit(message : String | JSON::Any | Hash(String, String)) Cable::Logger.info { "[ActionCable] transmitting to #{self.class}: #{message}" } - connection.socket.send({ + connection.send_message({ identifier: identifier, message: Cable.server.safe_decode_message(message), }.to_json) From b6f1b763ef4e6e699e9902d711963414808e2314 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Sat, 30 Dec 2023 23:32:26 +0100 Subject: [PATCH 03/10] Minimal specs --- spec/cable/handler_spec.cr | 4 ++++ src/cable/connection.cr | 3 ++- src/cable/server.cr | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/spec/cable/handler_spec.cr b/spec/cable/handler_spec.cr index 160e254..e1a9832 100644 --- a/spec/cable/handler_spec.cr +++ b/spec/cable/handler_spec.cr @@ -50,6 +50,8 @@ describe Cable::Handler do ws2 = HTTP::WebSocket.new("ws://#{listen_address}/updates?test_token=1") Cable.server.connections.size.should eq(1) + Cable.server.active_connections_for("1").size.should eq(1) + Cable.server.subscribed_channels_for("1").size.should eq(0) messages = [ {type: "welcome"}.to_json, @@ -69,6 +71,8 @@ describe Cable::Handler do ws2.run Cable.server.connections.size.should eq(1) + Cable.server.active_connections_for("1").size.should eq(1) + Cable.server.subscribed_channels_for("1").size.should eq(1) end it "malformed data from client" do diff --git a/src/cable/connection.cr b/src/cable/connection.cr index f8acde7..95b0e7d 100644 --- a/src/cable/connection.cr +++ b/src/cable/connection.cr @@ -53,7 +53,8 @@ module Cable end def channels : Array(Channel) - Connection::CHANNELS[connection_identifier] + return Array(Channel).new unless Connection::CHANNELS.has_key?(connection_identifier) + Connection::CHANNELS.[connection_identifier].values end def closed? : Bool diff --git a/src/cable/server.cr b/src/cable/server.cr index ea2c512..6e70ff6 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -75,7 +75,7 @@ module Cable end def subscribed_channels_for(token : String) : Array(Channel) - active_connections_for(token).map { |connection| connection.channels } + active_connections_for(token).sum { |connection| connection.channels } end def subscribe_channel(channel : Channel, identifier : String) From 905c07af76a4f4f0b5717dbcd3f86f0582c8f1c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Wed, 3 Jan 2024 13:59:17 +0100 Subject: [PATCH 04/10] Typo --- src/cable/connection.cr | 6 +++--- src/cable/handler.cr | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/cable/connection.cr b/src/cable/connection.cr index 95b0e7d..0a3f29e 100644 --- a/src/cable/connection.cr +++ b/src/cable/connection.cr @@ -2,7 +2,7 @@ require "uuid" module Cable abstract class Connection - class UnathorizedConnectionException < Exception; end + class UnauthorizedConnectionException < Exception; end property internal_identifier : String = "0" property connection_identifier : String = "" @@ -38,7 +38,7 @@ module Cable # gather connection_identifier after the connection has gathered the id from identified_by(field) self.connection_identifier = "#{internal_identifier}-#{UUID.random}" subscribe_to_internal_channel - rescue e : UnathorizedConnectionException + rescue e : UnauthorizedConnectionException reject_connection! unsubscribe_from_internal_channel socket.close(HTTP::WebSocket::CloseCode::NormalClosure, "Farewell") @@ -86,7 +86,7 @@ module Cable end def reject_unauthorized_connection - raise UnathorizedConnectionException.new + raise UnauthorizedConnectionException.new end # Convert the `message` to a proper `Payload`. diff --git a/src/cable/handler.cr b/src/cable/handler.cr index fb0d21d..f4af6c8 100644 --- a/src/cable/handler.cr +++ b/src/cable/handler.cr @@ -49,7 +49,7 @@ module Cable socket.close(HTTP::WebSocket::CloseCode::InvalidFramePayloadData, "Invalid message") Cable.server.remove_connection(connection_id) Cable.settings.on_error.call(e, "Cable::Handler#socket.on_message") - rescue e : Cable::Connection::UnathorizedConnectionException + rescue e : Cable::Connection::UnauthorizedConnectionException # handle unauthorized connections # no need to log them ws_pinger.stop From 1e4cb8aaa264df340aca7f7c20202981e13436f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Wed, 3 Jan 2024 15:20:45 +0100 Subject: [PATCH 05/10] connection#close fix for channel-less connections --- spec/cable/connection_spec.cr | 20 +++++++++++++++----- src/cable/connection.cr | 22 ++++++++++++---------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/spec/cable/connection_spec.cr b/spec/cable/connection_spec.cr index a801847..0dbc6ed 100644 --- a/spec/cable/connection_spec.cr +++ b/spec/cable/connection_spec.cr @@ -3,12 +3,22 @@ require "../spec_helper" include RequestHelpers describe Cable::Connection do - it "removes the connection channel on close" do - connect do |connection, _socket| - connection.receive({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json) - ConnectionTest::CHANNELS.keys.size.should eq(1) + describe "#close" do + it "closes the connection socket even without channel subscriptions" do + connect do |connection, _socket| + connection.closed?.should eq(false) + connection.close + connection.closed?.should eq(true) + end + end + it "removes the connection channel on close" do + connect do |connection, _socket| + connection.receive({"command" => "subscribe", "identifier" => {channel: "ChatChannel", room: "1"}.to_json}.to_json) + ConnectionTest::CHANNELS.keys.size.should eq(1) + connection.close + ConnectionTest::CHANNELS.keys.size.should eq(0) + end end - ConnectionTest::CHANNELS.keys.size.should eq(0) end describe "#receive" do diff --git a/src/cable/connection.cr b/src/cable/connection.cr index 0a3f29e..bd748f2 100644 --- a/src/cable/connection.cr +++ b/src/cable/connection.cr @@ -62,20 +62,22 @@ module Cable end def close - return true unless Connection::CHANNELS.has_key?(connection_identifier) + if Connection::CHANNELS.has_key?(connection_identifier) + Connection::CHANNELS[connection_identifier].each do |identifier, channel| + # the ordering here is important + Connection::CHANNELS[connection_identifier].delete(identifier) + channel.close + rescue e : IO::Error + Cable.settings.on_error.call(e, "IO::Error: #{e.message} -> #{self.class.name}#close") + end - Connection::CHANNELS[connection_identifier].each do |identifier, channel| - # the ordering here is important - Connection::CHANNELS[connection_identifier].delete(identifier) - channel.close - rescue e : IO::Error - Cable.settings.on_error.call(e, "IO::Error: #{e.message} -> #{self.class.name}#close") + Connection::CHANNELS.delete(connection_identifier) + unsubscribe_from_internal_channel end - Connection::CHANNELS.delete(connection_identifier) - unsubscribe_from_internal_channel - Cable::Logger.info { "Terminating connection #{connection_identifier}" } + return true if closed? + Cable::Logger.info { "Terminating connection #{connection_identifier}" } socket.close end From 630915611ffb28aa9c35601f08f2183b3a2c1f9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Wed, 3 Jan 2024 15:49:41 +0100 Subject: [PATCH 06/10] server#active_connections_for specs --- spec/cable/server_spec.cr | 33 +++++++++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/spec/cable/server_spec.cr b/spec/cable/server_spec.cr index 956ed0e..dc45b8f 100644 --- a/spec/cable/server_spec.cr +++ b/spec/cable/server_spec.cr @@ -19,4 +19,37 @@ describe Cable::Server do end end end + + describe "#active_connections_for" do + it "accurately returns active connections for a specificic token" do + Cable.reset_server + Cable.temp_config(backend_class: Cable::DevBackend) do + Cable.server.active_connections_for("abc123").size.should eq(0) + Cable.server.active_connections_for("def456").size.should eq(0) + + socket = DummySocket.new(IO::Memory.new) + request = builds_request("abc123") + connection = ApplicationCable::Connection.new(request, socket) + Cable.server.add_connection(connection) + + Cable.server.active_connections_for("abc123").size.should eq(1) + + other_socket = DummySocket.new(IO::Memory.new) + other_request = builds_request("def456") + other_connection = ApplicationCable::Connection.new(other_request, other_socket) + Cable.server.add_connection(other_connection) + + Cable.server.active_connections_for("def456").size.should eq(1) + + connection.close + + Cable.server.active_connections_for("abc123").size.should eq(0) + Cable.server.active_connections_for("def456").size.should eq(1) + + other_connection.close + + Cable.server.active_connections_for("def456").size.should eq(0) + end + end + end end From 6e53b117c6ad098a393efbdacb28bd5ced2ff5f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Wed, 3 Jan 2024 18:21:38 +0100 Subject: [PATCH 07/10] Warning comments for methods --- src/cable/server.cr | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/cable/server.cr b/src/cable/server.cr index 6e70ff6..c5d5726 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -70,10 +70,15 @@ module Cable connections.delete(connection_id).try(&.close) end + # You shouldn't rely on these two methods for an exhaustive array of connections + # if your application can spawn more than one Cable.server instance. + + # Only returns connections opened on this instance. def active_connections_for(token : String) : Array(Connection) connections.values.select { |connection| connection.token == token && !connection.closed? } end + # Only returns channel subscriptions opened on this instance. def subscribed_channels_for(token : String) : Array(Channel) active_connections_for(token).sum { |connection| connection.channels } end From 75d595754c0f9258db5181ec71c889b422b5dca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Sat, 6 Jan 2024 12:51:16 +0100 Subject: [PATCH 08/10] last server specs + helpers --- spec/cable/server_spec.cr | 62 +++++++++++++++++++++++++++++++++------ src/cable/server.cr | 3 +- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/spec/cable/server_spec.cr b/spec/cable/server_spec.cr index dc45b8f..a505ec0 100644 --- a/spec/cable/server_spec.cr +++ b/spec/cable/server_spec.cr @@ -7,9 +7,7 @@ describe Cable::Server do it "finds the connection and disconnects it" do Cable.reset_server Cable.temp_config(backend_class: Cable::DevBackend) do - socket = DummySocket.new(IO::Memory.new) - request = builds_request("abc123") - connection = ApplicationCable::Connection.new(request, socket) + connection = creates_new_connection("abc123") Cable.server.add_connection(connection) connection.connection_identifier.should contain("abc123") @@ -27,16 +25,12 @@ describe Cable::Server do Cable.server.active_connections_for("abc123").size.should eq(0) Cable.server.active_connections_for("def456").size.should eq(0) - socket = DummySocket.new(IO::Memory.new) - request = builds_request("abc123") - connection = ApplicationCable::Connection.new(request, socket) + connection = creates_new_connection("abc123") Cable.server.add_connection(connection) Cable.server.active_connections_for("abc123").size.should eq(1) - other_socket = DummySocket.new(IO::Memory.new) - other_request = builds_request("def456") - other_connection = ApplicationCable::Connection.new(other_request, other_socket) + other_connection = creates_new_connection("def456") Cable.server.add_connection(other_connection) Cable.server.active_connections_for("def456").size.should eq(1) @@ -52,4 +46,54 @@ describe Cable::Server do end end end + + describe "#subscribed_channels_for" do + it "accurately returns active channel subscriptions for a specificic token" do + Cable.reset_server + Cable.temp_config(backend_class: Cable::DevBackend) do + connection_1 = creates_new_connection("aa") + connection_2 = creates_new_connection("bb") + + Cable.server.add_connection(connection_1) + Cable.server.add_connection(connection_2) + + Cable.server.subscribed_channels_for("aa").size.should eq(0) + Cable.server.subscribed_channels_for("bb").size.should eq(0) + + connection_1.subscribe(subscribe_payload("room_a")) + + Cable.server.subscribed_channels_for("aa").size.should eq(1) + Cable.server.subscribed_channels_for("bb").size.should eq(0) + + connection_1.subscribe(subscribe_payload("room_b")) + + Cable.server.subscribed_channels_for("aa").size.should eq(2) + Cable.server.subscribed_channels_for("bb").size.should eq(0) + + connection_2.subscribe(subscribe_payload("room_a")) + + Cable.server.subscribed_channels_for("aa").size.should eq(2) + Cable.server.subscribed_channels_for("bb").size.should eq(1) + + connection_1.close + connection_2.close + end + Cable.reset_server + end + end +end + +def creates_new_connection(token : String | Nil) : ApplicationCable::Connection + ApplicationCable::Connection.new(builds_request(token: token), DummySocket.new(IO::Memory.new)) +end + +def subscribe_payload(room : String) : Cable::Payload + payload_json = { + command: "subscribe", + identifier: { + channel: "ChatChannel", + room: room, + }.to_json, + }.to_json + Cable::Payload.from_json(payload_json) end diff --git a/src/cable/server.cr b/src/cable/server.cr index c5d5726..a4abf4d 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -70,7 +70,8 @@ module Cable connections.delete(connection_id).try(&.close) end - # You shouldn't rely on these two methods for an exhaustive array of connections + # You shouldn't rely on these following two methods + # for an exhaustive array of connections and channels # if your application can spawn more than one Cable.server instance. # Only returns connections opened on this instance. From 7ad31ac94aed846ee9a72798dff950315d097b99 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Tue, 9 Jan 2024 12:47:18 +0100 Subject: [PATCH 09/10] More explicit method return type --- src/cable/connection.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cable/connection.cr b/src/cable/connection.cr index bd748f2..cdac347 100644 --- a/src/cable/connection.cr +++ b/src/cable/connection.cr @@ -52,8 +52,8 @@ module Cable @connection_rejected = true end - def channels : Array(Channel) - return Array(Channel).new unless Connection::CHANNELS.has_key?(connection_identifier) + def channels : Array(Cable::Channel) + return Array(Cable::Channel).new unless Connection::CHANNELS.has_key?(connection_identifier) Connection::CHANNELS.[connection_identifier].values end From 4a9dacff6471b7784cadbfbcb1fc5dcf66f7584d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Marronnier?= Date: Tue, 9 Jan 2024 18:28:06 +0100 Subject: [PATCH 10/10] Ameba correction --- src/cable/server.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cable/server.cr b/src/cable/server.cr index a4abf4d..7cd9ebe 100644 --- a/src/cable/server.cr +++ b/src/cable/server.cr @@ -81,7 +81,7 @@ module Cable # Only returns channel subscriptions opened on this instance. def subscribed_channels_for(token : String) : Array(Channel) - active_connections_for(token).sum { |connection| connection.channels } + active_connections_for(token).sum(&.channels) end def subscribe_channel(channel : Channel, identifier : String)