From 48e7b22eb4f941a73780ea422ff5c85abf3f3155 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Tue, 21 Nov 2023 21:42:22 +0900 Subject: [PATCH 01/15] Create three different methods for each realtime methods --- packages/realtime_client/example/main.dart | 4 +- .../realtime_client/lib/realtime_client.dart | 2 +- packages/realtime_client/lib/src/push.dart | 1 + .../lib/src/realtime_channel.dart | 54 ++++++++++++++---- .../lib/src/realtime_presence.dart | 1 + packages/realtime_client/lib/src/types.dart | 55 ++++++++++++++++--- .../realtime_client/test/channel_test.dart | 5 +- packages/realtime_client/test/mock_test.dart | 11 ++-- packages/supabase/example/main.dart | 2 +- .../lib/src/supabase_stream_builder.dart | 6 +- packages/supabase/test/mock_test.dart | 2 +- packages/supabase/test/realtime_test.dart | 2 +- 12 files changed, 112 insertions(+), 33 deletions(-) diff --git a/packages/realtime_client/example/main.dart b/packages/realtime_client/example/main.dart index 632b0a20..b94b1569 100644 --- a/packages/realtime_client/example/main.dart +++ b/packages/realtime_client/example/main.dart @@ -10,11 +10,11 @@ Future main() async { ); final channel = socket.channel('realtime:public'); - channel.on(RealtimeListenTypes.postgresChanges, + channel.on(RealtimeListenType.postgresChanges, ChannelFilter(event: 'DELETE', schema: 'public'), (payload, [ref]) { print('channel delete payload: $payload'); }); - channel.on(RealtimeListenTypes.postgresChanges, + channel.on(RealtimeListenType.postgresChanges, ChannelFilter(event: 'INSERT', schema: 'public'), (payload, [ref]) { print('channel insert payload: $payload'); }); diff --git a/packages/realtime_client/lib/realtime_client.dart b/packages/realtime_client/lib/realtime_client.dart index b11e15cc..439e3917 100644 --- a/packages/realtime_client/lib/realtime_client.dart +++ b/packages/realtime_client/lib/realtime_client.dart @@ -3,4 +3,4 @@ export 'src/realtime_channel.dart'; export 'src/realtime_client.dart'; export 'src/realtime_presence.dart'; export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords; -export 'src/types.dart' hide ToType; +export 'src/types.dart' hide ToType, ToRealtimeEvent, ChannelFilter; diff --git a/packages/realtime_client/lib/src/push.dart b/packages/realtime_client/lib/src/push.dart index dc26403a..d997ab18 100644 --- a/packages/realtime_client/lib/src/push.dart +++ b/packages/realtime_client/lib/src/push.dart @@ -3,6 +3,7 @@ import 'dart:async'; import 'package:realtime_client/realtime_client.dart'; import 'package:realtime_client/src/constants.dart'; import 'package:realtime_client/src/message.dart'; +import 'package:realtime_client/src/types.dart'; typedef Callback = void Function(dynamic response); diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 26c11bb0..d8290516 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -215,7 +215,7 @@ class RealtimeChannel { Future track(Map payload, [Map opts = const {}]) { return send( - type: RealtimeListenTypes.presence, + type: RealtimeListenType.presence, payload: { 'event': 'track', 'payload': payload, @@ -228,7 +228,7 @@ class RealtimeChannel { Map opts = const {}, ]) { return send( - type: RealtimeListenTypes.presence, + type: RealtimeListenType.presence, payload: { 'event': 'untrack', }, @@ -248,14 +248,48 @@ class RealtimeChannel { (reason, [ref]) => callback(reason?.toString())); } - RealtimeChannel on( - RealtimeListenTypes type, - ChannelFilter filter, - BindingCallback callback, - ) { - return onEvents(type.toType(), filter, callback); + RealtimeChannel onPostgresChanges({ + required PostgresChangeEvent event, + String? schema, + String? table, + String? filter, + required void Function(PostgresChangePayload payload) callback, + }) { + return onEvents( + 'postgres_changes', + ChannelFilter( + event: event.toRealtimeEvent(), + schema: schema, + table: table, + filter: filter, + ), + ((payload, [ref]) => + callback(PostgresChangePayload.fromPayload(payload))), + ); + } + + RealtimeChannel onBroadcast({ + required String event, + required void Function(dynamic payload) callback, + }) { + return onEvents('broadcast', ChannelFilter(event: event), + (payload, [ref]) => callback(payload)); } + RealtimeChannel onPresence({ + required PresenceEvent event, + required void Function(dynamic payload) callback, + }) { + return onEvents( + 'presence', + ChannelFilter( + event: event.name, + ), + (payload, [ref]) => callback(payload), + ); + } + + @internal RealtimeChannel onEvents( String type, ChannelFilter filter, BindingCallback callback) { final typeLower = type.toLowerCase(); @@ -306,7 +340,7 @@ class RealtimeChannel { } Future send({ - required RealtimeListenTypes type, + required RealtimeListenType type, String? event, required Map payload, Map opts = const {}, @@ -318,7 +352,7 @@ class RealtimeChannel { payload['event'] = event; } - if (!canPush && type == RealtimeListenTypes.broadcast) { + if (!canPush && type == RealtimeListenType.broadcast) { final headers = { 'Content-Type': 'application/json', 'apikey': socket.accessToken ?? '', diff --git a/packages/realtime_client/lib/src/realtime_presence.dart b/packages/realtime_client/lib/src/realtime_presence.dart index 8dbea020..d8237258 100644 --- a/packages/realtime_client/lib/src/realtime_presence.dart +++ b/packages/realtime_client/lib/src/realtime_presence.dart @@ -1,4 +1,5 @@ import 'package:realtime_client/realtime_client.dart'; +import 'package:realtime_client/src/types.dart'; class Presence { final String presenceRef; diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index 90d3356a..361237b3 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -28,17 +28,34 @@ class Binding { } } +enum PostgresChangeEvent { + all, + insert, + update, + delete, +} + +extension ToRealtimeEvent on PostgresChangeEvent { + String toRealtimeEvent() { + if (this == PostgresChangeEvent.all) { + return '*'; + } else { + return name.toUpperCase(); + } + } +} + class ChannelFilter { - /// For [RealtimeListenTypes.postgresChanges] it's one of: `INSERT`, `UPDATE`, `DELETE` + /// For [RealtimeListenType.postgresChanges] it's one of: `INSERT`, `UPDATE`, `DELETE` /// - /// For [RealtimeListenTypes.presence] it's one of: `sync`, `join`, `leave` + /// For [RealtimeListenType.presence] it's one of: `sync`, `join`, `leave` /// - /// For [RealtimeListenTypes.broadcast] it can be any string + /// For [RealtimeListenType.broadcast] it can be any string final String? event; final String? schema; final String? table; - /// For [RealtimeListenTypes.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in` + /// For [RealtimeListenType.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in` /// /// Only one filter can be applied final String? filter; @@ -62,13 +79,15 @@ class ChannelFilter { enum ChannelResponse { ok, timedOut, rateLimited, error } -enum RealtimeListenTypes { postgresChanges, broadcast, presence } +enum RealtimeListenType { postgresChanges, broadcast, presence } + +enum PresenceEvent { sync, join, leave } enum RealtimeSubscribeStatus { subscribed, channelError, closed, timedOut } -extension ToType on RealtimeListenTypes { +extension ToType on RealtimeListenType { String toType() { - if (this == RealtimeListenTypes.postgresChanges) { + if (this == RealtimeListenType.postgresChanges) { return 'postgres_changes'; } else { return name; @@ -106,3 +125,25 @@ class RealtimeChannelConfig { }; } } + +/// Data class that contains the Postgres change event payload. +class PostgresChangePayload { + final String schema; + final String table; + final DateTime commitTimestamp; + final String eventType; + final Map newRow; + final Map oldRow; + final dynamic errors; + + /// Creates a PostgresChangePayload instance from the enriched postgres + /// change payload + PostgresChangePayload.fromPayload(Map map) + : schema = map['schema'], + table = map['table'], + commitTimestamp = DateTime.parse(map['commit_timestamp'] ?? '19700101'), + eventType = map['eventType'], + newRow = map['new'], + oldRow = map['old'], + errors = map['errors']; +} diff --git a/packages/realtime_client/test/channel_test.dart b/packages/realtime_client/test/channel_test.dart index f4f43ffa..cbe472d7 100644 --- a/packages/realtime_client/test/channel_test.dart +++ b/packages/realtime_client/test/channel_test.dart @@ -4,6 +4,7 @@ import 'dart:io'; import 'package:realtime_client/realtime_client.dart'; import 'package:realtime_client/src/constants.dart'; +import 'package:realtime_client/src/types.dart'; import 'package:test/test.dart'; void main() { @@ -269,7 +270,7 @@ void main() { if (status == RealtimeSubscribeStatus.subscribed) { final completer = Completer(); channel.send( - type: RealtimeListenTypes.broadcast, + type: RealtimeListenType.broadcast, payload: { 'myKey': 'myValue', }, @@ -293,7 +294,7 @@ void main() { () async { final completer = Completer(); channel.send( - type: RealtimeListenTypes.broadcast, + type: RealtimeListenType.broadcast, payload: { 'myKey': 'myValue', }, diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index ac91be20..5f80fbac 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -4,6 +4,7 @@ import 'dart:io'; import 'package:realtime_client/realtime_client.dart'; import 'package:realtime_client/src/constants.dart'; +import 'package:realtime_client/src/types.dart'; import 'package:test/test.dart'; void main() { @@ -210,7 +211,7 @@ void main() { test('.on()', () { final streamController = StreamController>(); - client.channel('public:todoos').on(RealtimeListenTypes.postgresChanges, + client.channel('public:todoos').on(RealtimeListenType.postgresChanges, ChannelFilter(event: '*', schema: 'public', table: 'todos'), (payload, [ref]) { streamController.add(payload); @@ -254,7 +255,7 @@ void main() { final streamController = StreamController>(); client.channel('public:todoos').on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: '*', schema: 'public', @@ -299,7 +300,7 @@ void main() { }, count: 2); final channel = client.channel('public:todoos').on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: '*', schema: 'public', @@ -444,7 +445,7 @@ void main() { test('.on()', () { final streamController = StreamController>(); - client.channel('public:todoos').on(RealtimeListenTypes.postgresChanges, + client.channel('public:todoos').on(RealtimeListenType.postgresChanges, ChannelFilter(event: '*', schema: 'public', table: 'todos'), (payload, [ref]) { streamController.add(payload); @@ -488,7 +489,7 @@ void main() { final streamController = StreamController>(); client.channel('public:todoos').on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: '*', schema: 'public', diff --git a/packages/supabase/example/main.dart b/packages/supabase/example/main.dart index e39fc605..b65e5b24 100644 --- a/packages/supabase/example/main.dart +++ b/packages/supabase/example/main.dart @@ -28,7 +28,7 @@ Future main() async { final realtimeChannel = supabase.channel('my_channel'); realtimeChannel .on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter(event: '*', schema: 'public', table: 'countries'), (payload, [ref]) {}) .subscribe(); diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index b6457667..9cf81a49 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -160,7 +160,7 @@ class SupabaseStreamBuilder extends Stream { _channel = _realtimeClient.channel(_realtimeTopic); _channel!.on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: 'INSERT', schema: _schema, @@ -171,7 +171,7 @@ class SupabaseStreamBuilder extends Stream { _streamData.add(newRecord); _addStream(); }).on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: 'UPDATE', schema: _schema, @@ -190,7 +190,7 @@ class SupabaseStreamBuilder extends Stream { } _addStream(); }).on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: 'DELETE', schema: _schema, diff --git a/packages/supabase/test/mock_test.dart b/packages/supabase/test/mock_test.dart index 65a5a04b..a3af508c 100644 --- a/packages/supabase/test/mock_test.dart +++ b/packages/supabase/test/mock_test.dart @@ -621,7 +621,7 @@ void main() { /// Constructing Supabase query within a realtime callback caused exception /// https://github.com/supabase-community/supabase-flutter/issues/81 test('Calling Postgrest within realtime callback', () async { - client.channel('todos').on(RealtimeListenTypes.postgresChanges, + client.channel('todos').on(RealtimeListenType.postgresChanges, ChannelFilter(event: '*', schema: 'public', table: 'todos'), (event, [_]) async { client.from('todos'); diff --git a/packages/supabase/test/realtime_test.dart b/packages/supabase/test/realtime_test.dart index 00be9193..de5abb4e 100644 --- a/packages/supabase/test/realtime_test.dart +++ b/packages/supabase/test/realtime_test.dart @@ -51,7 +51,7 @@ void main() { test('subscribe on existing subscription fail', () { channel .on( - RealtimeListenTypes.postgresChanges, + RealtimeListenType.postgresChanges, ChannelFilter( event: 'INSERT', schema: 'public', From ec71096906c21915f96b586aaefd696828989c49 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Wed, 22 Nov 2023 17:54:56 +0900 Subject: [PATCH 02/15] refine the public API for realtime --- .../realtime_client/lib/src/realtime_channel.dart | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index d8290516..4c80c627 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -270,22 +270,25 @@ class RealtimeChannel { RealtimeChannel onBroadcast({ required String event, - required void Function(dynamic payload) callback, + required void Function(Map payload) callback, }) { - return onEvents('broadcast', ChannelFilter(event: event), - (payload, [ref]) => callback(payload)); + return onEvents( + 'broadcast', + ChannelFilter(event: event), + (payload, [ref]) => callback(Map.from(payload)), + ); } RealtimeChannel onPresence({ required PresenceEvent event, - required void Function(dynamic payload) callback, + required void Function(Map payload) callback, }) { return onEvents( 'presence', ChannelFilter( event: event.name, ), - (payload, [ref]) => callback(payload), + (payload, [ref]) => callback(Map.from(payload)), ); } From 16d3956890a6882c3731f7a74b1029c0e08dbcaa Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 11:50:18 +0900 Subject: [PATCH 03/15] updated filter syntax for postgres change event --- packages/realtime_client/example/main.dart | 29 +++- .../realtime_client/lib/realtime_client.dart | 3 +- .../lib/src/realtime_channel.dart | 4 +- packages/realtime_client/lib/src/types.dart | 95 +++++++++++- packages/realtime_client/test/mock_test.dart | 88 ++++++----- packages/supabase/example/main.dart | 9 +- .../lib/src/supabase_stream_builder.dart | 144 +++++++++--------- .../src/supabase_stream_filter_builder.dart | 14 +- packages/supabase/test/mock_test.dart | 15 +- packages/supabase/test/realtime_test.dart | 13 +- 10 files changed, 263 insertions(+), 151 deletions(-) diff --git a/packages/realtime_client/example/main.dart b/packages/realtime_client/example/main.dart index b94b1569..7b2c707c 100644 --- a/packages/realtime_client/example/main.dart +++ b/packages/realtime_client/example/main.dart @@ -10,14 +10,27 @@ Future main() async { ); final channel = socket.channel('realtime:public'); - channel.on(RealtimeListenType.postgresChanges, - ChannelFilter(event: 'DELETE', schema: 'public'), (payload, [ref]) { - print('channel delete payload: $payload'); - }); - channel.on(RealtimeListenType.postgresChanges, - ChannelFilter(event: 'INSERT', schema: 'public'), (payload, [ref]) { - print('channel insert payload: $payload'); - }); + channel.onPostgresChanges( + event: PostgresChangeEvent.all, + filter: PostgresChangeFilter( + type: PostgresChangeFilterType.eq, + column: 'column', + value: 'value', + ), + callback: (payload) {}, + ); + channel.onPostgresChanges( + event: PostgresChangeEvent.delete, + schema: 'public', + callback: (payload) { + print('channel delete payload: ${payload.toString()}'); + }); + channel.onPostgresChanges( + event: PostgresChangeEvent.insert, + schema: 'public', + callback: (payload) { + print('channel insert payload: ${payload.toString()}'); + }); socket.onMessage((message) => print('MESSAGE $message')); diff --git a/packages/realtime_client/lib/realtime_client.dart b/packages/realtime_client/lib/realtime_client.dart index 439e3917..63c95601 100644 --- a/packages/realtime_client/lib/realtime_client.dart +++ b/packages/realtime_client/lib/realtime_client.dart @@ -3,4 +3,5 @@ export 'src/realtime_channel.dart'; export 'src/realtime_client.dart'; export 'src/realtime_presence.dart'; export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords; -export 'src/types.dart' hide ToType, ToRealtimeEvent, ChannelFilter; +export 'src/types.dart' + hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenType; diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 251d8cd2..8b56a7ed 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -252,7 +252,7 @@ class RealtimeChannel { required PostgresChangeEvent event, String? schema, String? table, - String? filter, + PostgresChangeFilter? filter, required void Function(PostgresChangePayload payload) callback, }) { return onEvents( @@ -261,7 +261,7 @@ class RealtimeChannel { event: event.toRealtimeEvent(), schema: schema, table: table, - filter: filter, + filter: filter?.toString(), ), ((payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload))), diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index 361237b3..d2997c1a 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -29,13 +29,20 @@ class Binding { } enum PostgresChangeEvent { + /// Listen to all insert, update, and delete events. all, + + /// Listen to insert events. insert, + + /// Listen to update events. update, + + /// Listen to delete events. delete, } -extension ToRealtimeEvent on PostgresChangeEvent { +extension PostgresChangeEventMethods on PostgresChangeEvent { String toRealtimeEvent() { if (this == PostgresChangeEvent.all) { return '*'; @@ -43,6 +50,19 @@ extension ToRealtimeEvent on PostgresChangeEvent { return name.toUpperCase(); } } + + static PostgresChangeEvent fromString(String event) { + switch (event) { + case 'INSERT': + return PostgresChangeEvent.insert; + case 'UPDATE': + return PostgresChangeEvent.update; + case 'DELETE': + return PostgresChangeEvent.delete; + } + throw ArgumentError( + 'Only "INSERT", "UPDATE", or "DELETE" can be can be passed to `fromString()` method.'); + } } class ChannelFilter { @@ -131,19 +151,82 @@ class PostgresChangePayload { final String schema; final String table; final DateTime commitTimestamp; - final String eventType; + final PostgresChangeEvent eventType; final Map newRow; final Map oldRow; final dynamic errors; + PostgresChangePayload({ + required this.schema, + required this.table, + required this.commitTimestamp, + required this.eventType, + required this.newRow, + required this.oldRow, + required this.errors, + }); - /// Creates a PostgresChangePayload instance from the enriched postgres - /// change payload + /// Creates a PostgresChangePayload instance from the enriched postgres change payload PostgresChangePayload.fromPayload(Map map) : schema = map['schema'], table = map['table'], commitTimestamp = DateTime.parse(map['commit_timestamp'] ?? '19700101'), eventType = map['eventType'], - newRow = map['new'], - oldRow = map['old'], + newRow = + Map.from((map['new'] as Map)), + oldRow = + Map.from((map['old'] as Map)), errors = map['errors']; + + @override + String toString() { + return 'PostgresChangePayload(schema: $schema, table: $table, commitTimestamp: $commitTimestamp, eventType: $eventType, newRow: $newRow, oldRow: $oldRow, errors: $errors)'; + } +} + +/// Specifies the type of filter to be applied on realtime Postgres Change listener. +enum PostgresChangeFilterType { + /// Listens to changes where a column's value in a table equals a client-specified value. + eq, + + /// Listens to changes where a column's value in a table does not equal a value specified. + neq, + + /// Listen to changes where a column's value in a table is less than a value specified. + lt, + + /// Listens to changes where a column's value in a table is less than or equal to a value specified. + lte, + + /// Listens to changes where a column's value in a table is greater than a value specified. + gt, + + /// Listens to changes where a column's value in a table is greater than or equal to a value specified. + gte, + + /// Listen to changes when a column's value in a table equals any of the values specified. + inFilter; +} + +class PostgresChangeFilter { + final PostgresChangeFilterType type; + final String column; + final dynamic value; + + PostgresChangeFilter({ + required this.type, + required this.column, + required this.value, + }); + + @override + String toString() { + if (type == PostgresChangeFilterType.inFilter) { + if (value is List) { + return '$column=in.(${value.map((s) => '"$s"').join(',')})'; + } else { + return '$column=in.(${value.map((s) => '"$s"').join(',')})'; + } + } + return '$column=${type.name}.$value'; + } } diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index 5f80fbac..b69c7422 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -4,7 +4,6 @@ import 'dart:io'; import 'package:realtime_client/realtime_client.dart'; import 'package:realtime_client/src/constants.dart'; -import 'package:realtime_client/src/types.dart'; import 'package:test/test.dart'; void main() { @@ -209,13 +208,18 @@ void main() { }); test('.on()', () { - final streamController = StreamController>(); + final streamController = StreamController(); - client.channel('public:todoos').on(RealtimeListenType.postgresChanges, - ChannelFilter(event: '*', schema: 'public', table: 'todos'), (payload, - [ref]) { - streamController.add(payload); - }).subscribe(); + client + .channel('public:todos') + .onPostgresChanges( + event: PostgresChangeEvent.all, + schema: 'public', + table: 'todos', + callback: (payload) { + streamController.add(payload); + }) + .subscribe(); expect( streamController.stream, @@ -252,17 +256,20 @@ void main() { }); test('.on() with filter', () { - final streamController = StreamController>(); + final streamController = StreamController(); - client.channel('public:todoos').on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: '*', + client + .channel('public:todoos') + .onPostgresChanges( + event: PostgresChangeEvent.all, schema: 'public', table: 'todos', - filter: 'id=eq.2'), (payload, [ref]) { - streamController.add(payload); - }).subscribe(); + filter: PostgresChangeFilter( + type: PostgresChangeFilterType.eq, column: 'id', value: 2), + callback: (payload) { + streamController.add(payload); + }) + .subscribe(); expect( streamController.stream, @@ -299,14 +306,13 @@ void main() { } }, count: 2); - final channel = client.channel('public:todoos').on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: '*', - schema: 'public', - table: 'todos', - filter: 'id=eq.2'), - (_, [__]) {}, + final channel = client.channel('public:todoos').onPostgresChanges( + event: PostgresChangeEvent.all, + schema: 'public', + table: 'todos', + filter: PostgresChangeFilter( + type: PostgresChangeFilterType.eq, column: 'id', value: 2), + callback: (payload) {}, ); channel.subscribe(subscribeCallback); @@ -443,13 +449,18 @@ void main() { }); test('.on()', () { - final streamController = StreamController>(); + final streamController = StreamController(); - client.channel('public:todoos').on(RealtimeListenType.postgresChanges, - ChannelFilter(event: '*', schema: 'public', table: 'todos'), (payload, - [ref]) { - streamController.add(payload); - }).subscribe(); + client + .channel('public:todoos') + .onPostgresChanges( + event: PostgresChangeEvent.all, + schema: 'public', + table: 'todos', + callback: (payload) { + streamController.add(payload); + }) + .subscribe(); expect( streamController.stream, @@ -486,17 +497,20 @@ void main() { }); test('.on() with filter', () { - final streamController = StreamController>(); + final streamController = StreamController(); - client.channel('public:todoos').on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: '*', + client + .channel('public:todoos') + .onPostgresChanges( + event: PostgresChangeEvent.all, schema: 'public', table: 'todos', - filter: 'id=eq.2'), (payload, [ref]) { - streamController.add(payload); - }).subscribe(); + filter: PostgresChangeFilter( + type: PostgresChangeFilterType.eq, column: 'id', value: 2), + callback: (payload) { + streamController.add(payload); + }) + .subscribe(); expect( streamController.stream, diff --git a/packages/supabase/example/main.dart b/packages/supabase/example/main.dart index b65e5b24..45cda3ad 100644 --- a/packages/supabase/example/main.dart +++ b/packages/supabase/example/main.dart @@ -27,10 +27,11 @@ Future main() async { // realtime final realtimeChannel = supabase.channel('my_channel'); realtimeChannel - .on( - RealtimeListenType.postgresChanges, - ChannelFilter(event: '*', schema: 'public', table: 'countries'), - (payload, [ref]) {}) + .onPostgresChanges( + event: PostgresChangeEvent.all, + schema: 'public', + table: 'countries', + callback: (payload) {}) .subscribe(); // remember to remove channel when no longer needed diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 9cf81a49..bcc0b4b4 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -5,8 +5,6 @@ import 'package:supabase/supabase.dart'; part 'supabase_stream_filter_builder.dart'; -enum _FilterType { eq, neq, lt, lte, gt, gte, inFilter } - class _StreamPostgrestFilter { _StreamPostgrestFilter({ required this.column, @@ -21,7 +19,7 @@ class _StreamPostgrestFilter { final dynamic value; /// Type of the filer being applied - final _FilterType type; + final PostgresChangeFilterType type; } class _Order { @@ -141,71 +139,71 @@ class SupabaseStreamBuilder extends Stream { Future _getStreamData() async { final currentStreamFilter = _streamFilter; _streamData = []; - String? realtimeFilter; + PostgresChangeFilter? realtimeFilter; if (currentStreamFilter != null) { - if (currentStreamFilter.type == _FilterType.inFilter) { + if (currentStreamFilter.type == PostgresChangeFilterType.inFilter) { final value = currentStreamFilter.value; - if (value is List) { - realtimeFilter = - '${currentStreamFilter.column}=in.(${value.map((s) => '"$s"').join(',')})'; - } else { - realtimeFilter = - '${currentStreamFilter.column}=in.(${value.join(',')})'; - } + realtimeFilter = PostgresChangeFilter( + type: PostgresChangeFilterType.inFilter, + column: currentStreamFilter.column, + value: value, + ); } else { - realtimeFilter = - '${currentStreamFilter.column}=${currentStreamFilter.type.name}.${currentStreamFilter.value}'; + realtimeFilter = PostgresChangeFilter( + type: currentStreamFilter.type, + column: currentStreamFilter.column, + value: currentStreamFilter.value, + ); } } _channel = _realtimeClient.channel(_realtimeTopic); - _channel!.on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: 'INSERT', - schema: _schema, - table: _table, - filter: realtimeFilter, - ), (payload, [ref]) { - final newRecord = Map.from(payload['new']!); - _streamData.add(newRecord); - _addStream(); - }).on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: 'UPDATE', - schema: _schema, - table: _table, - filter: realtimeFilter, - ), (payload, [ref]) { - final updatedIndex = _streamData.indexWhere( - (element) => _isTargetRecord(record: element, payload: payload), - ); - - final updatedRecord = Map.from(payload['new']!); - if (updatedIndex >= 0) { - _streamData[updatedIndex] = updatedRecord; - } else { - _streamData.add(updatedRecord); - } - _addStream(); - }).on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: 'DELETE', - schema: _schema, - table: _table, - filter: realtimeFilter, - ), (payload, [ref]) { - final deletedIndex = _streamData.indexWhere( - (element) => _isTargetRecord(record: element, payload: payload), - ); - if (deletedIndex >= 0) { - /// Delete the data from in memory cache if it was found - _streamData.removeAt(deletedIndex); - _addStream(); - } - }).subscribe((status, [error]) { + + _channel! + .onPostgresChanges( + event: PostgresChangeEvent.insert, + schema: _schema, + table: _table, + filter: realtimeFilter, + callback: (payload) { + final newRecord = payload.newRow; + _streamData.add(newRecord); + _addStream(); + }) + .onPostgresChanges( + event: PostgresChangeEvent.update, + schema: _schema, + table: _table, + filter: realtimeFilter, + callback: (payload) { + final updatedIndex = _streamData.indexWhere( + (element) => _isTargetRecord(record: element, payload: payload), + ); + + final updatedRecord = payload.newRow; + if (updatedIndex >= 0) { + _streamData[updatedIndex] = updatedRecord; + } else { + _streamData.add(updatedRecord); + } + _addStream(); + }) + .onPostgresChanges( + event: PostgresChangeEvent.delete, + schema: _schema, + table: _table, + filter: realtimeFilter, + callback: (payload) { + final deletedIndex = _streamData.indexWhere( + (element) => _isTargetRecord(record: element, payload: payload), + ); + if (deletedIndex >= 0) { + /// Delete the data from in memory cache if it was found + _streamData.removeAt(deletedIndex); + _addStream(); + } + }) + .subscribe((status, [error]) { if (error != null) { _addException(error); } @@ -214,25 +212,25 @@ class SupabaseStreamBuilder extends Stream { PostgrestFilterBuilder query = _queryBuilder.select(); if (_streamFilter != null) { switch (_streamFilter!.type) { - case _FilterType.eq: + case PostgresChangeFilterType.eq: query = query.eq(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.neq: + case PostgresChangeFilterType.neq: query = query.neq(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.lt: + case PostgresChangeFilterType.lt: query = query.lt(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.lte: + case PostgresChangeFilterType.lte: query = query.lte(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.gt: + case PostgresChangeFilterType.gt: query = query.gt(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.gte: + case PostgresChangeFilterType.gte: query = query.gte(_streamFilter!.column, _streamFilter!.value); break; - case _FilterType.inFilter: + case PostgresChangeFilterType.inFilter: query = query.inFilter(_streamFilter!.column, _streamFilter!.value); break; } @@ -258,13 +256,13 @@ class SupabaseStreamBuilder extends Stream { bool _isTargetRecord({ required Map record, - required Map payload, + required PostgresChangePayload payload, }) { late final Map targetRecord; - if (payload['eventType'] == 'UPDATE') { - targetRecord = payload['new']!; - } else if (payload['eventType'] == 'DELETE') { - targetRecord = payload['old']!; + if (payload.eventType == PostgresChangeEvent.update) { + targetRecord = payload.newRow; + } else if (payload.eventType == PostgresChangeEvent.delete) { + targetRecord = payload.oldRow; } return _uniqueColumns .every((column) => record[column] == targetRecord[column]); diff --git a/packages/supabase/lib/src/supabase_stream_filter_builder.dart b/packages/supabase/lib/src/supabase_stream_filter_builder.dart index 53ccd4c6..5cf09bc6 100644 --- a/packages/supabase/lib/src/supabase_stream_filter_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_filter_builder.dart @@ -19,7 +19,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder eq(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.eq, + type: PostgresChangeFilterType.eq, column: column, value: value, ); @@ -35,7 +35,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder neq(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.neq, + type: PostgresChangeFilterType.neq, column: column, value: value, ); @@ -51,7 +51,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder lt(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.lt, + type: PostgresChangeFilterType.lt, column: column, value: value, ); @@ -67,7 +67,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder lte(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.lte, + type: PostgresChangeFilterType.lte, column: column, value: value, ); @@ -83,7 +83,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder gt(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.gt, + type: PostgresChangeFilterType.gt, column: column, value: value, ); @@ -99,7 +99,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder gte(String column, Object value) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.gte, + type: PostgresChangeFilterType.gte, column: column, value: value, ); @@ -115,7 +115,7 @@ class SupabaseStreamFilterBuilder extends SupabaseStreamBuilder { /// ``` SupabaseStreamBuilder inFilter(String column, List values) { _streamFilter = _StreamPostgrestFilter( - type: _FilterType.inFilter, + type: PostgresChangeFilterType.inFilter, column: column, value: values, ); diff --git a/packages/supabase/test/mock_test.dart b/packages/supabase/test/mock_test.dart index a3af508c..a70f2e65 100644 --- a/packages/supabase/test/mock_test.dart +++ b/packages/supabase/test/mock_test.dart @@ -621,11 +621,16 @@ void main() { /// Constructing Supabase query within a realtime callback caused exception /// https://github.com/supabase-community/supabase-flutter/issues/81 test('Calling Postgrest within realtime callback', () async { - client.channel('todos').on(RealtimeListenType.postgresChanges, - ChannelFilter(event: '*', schema: 'public', table: 'todos'), (event, - [_]) async { - client.from('todos'); - }).subscribe(); + client + .channel('todos') + .onPostgresChanges( + event: PostgresChangeEvent.all, + schema: 'public', + table: 'todos', + callback: (payload) async { + client.from('todos'); + }) + .subscribe(); await Future.delayed(const Duration(milliseconds: 700)); diff --git a/packages/supabase/test/realtime_test.dart b/packages/supabase/test/realtime_test.dart index de5abb4e..929a4e4d 100644 --- a/packages/supabase/test/realtime_test.dart +++ b/packages/supabase/test/realtime_test.dart @@ -50,14 +50,11 @@ void main() { /// - error test('subscribe on existing subscription fail', () { channel - .on( - RealtimeListenType.postgresChanges, - ChannelFilter( - event: 'INSERT', - schema: 'public', - table: 'countries', - ), - (payload, [ref]) {}) + .onPostgresChanges( + event: PostgresChangeEvent.insert, + schema: 'public', + table: 'countries', + callback: (payload) {}) .subscribe( (event, [errorMsg]) {}, ); From c866cc9ca16c20cd64ecff10a345f7cb964d434a Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 11:54:52 +0900 Subject: [PATCH 04/15] update mock test --- packages/realtime_client/lib/src/types.dart | 45 ++++++++++++++++---- packages/realtime_client/test/mock_test.dart | 40 ++++++++--------- 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index d2997c1a..d14cadc5 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -1,3 +1,6 @@ +// ignore_for_file: public_member_api_docs, sort_constructors_first +import 'package:collection/collection.dart'; + typedef BindingCallback = void Function(dynamic payload, [dynamic ref]); class Binding { @@ -166,21 +169,47 @@ class PostgresChangePayload { }); /// Creates a PostgresChangePayload instance from the enriched postgres change payload - PostgresChangePayload.fromPayload(Map map) - : schema = map['schema'], - table = map['table'], - commitTimestamp = DateTime.parse(map['commit_timestamp'] ?? '19700101'), - eventType = map['eventType'], + PostgresChangePayload.fromPayload(Map payload) + : schema = payload['schema'], + table = payload['table'], + commitTimestamp = + DateTime.parse(payload['commit_timestamp'] ?? '19700101'), + eventType = payload['eventType'], newRow = - Map.from((map['new'] as Map)), + Map.from((payload['new'] as Map)), oldRow = - Map.from((map['old'] as Map)), - errors = map['errors']; + Map.from((payload['old'] as Map)), + errors = payload['errors']; @override String toString() { return 'PostgresChangePayload(schema: $schema, table: $table, commitTimestamp: $commitTimestamp, eventType: $eventType, newRow: $newRow, oldRow: $oldRow, errors: $errors)'; } + + @override + bool operator ==(covariant PostgresChangePayload other) { + if (identical(this, other)) return true; + final mapEquals = const DeepCollectionEquality().equals; + + return other.schema == schema && + other.table == table && + other.commitTimestamp == commitTimestamp && + other.eventType == eventType && + mapEquals(other.newRow, newRow) && + mapEquals(other.oldRow, oldRow) && + other.errors == errors; + } + + @override + int get hashCode { + return schema.hashCode ^ + table.hashCode ^ + commitTimestamp.hashCode ^ + eventType.hashCode ^ + newRow.hashCode ^ + oldRow.hashCode ^ + errors.hashCode; + } } /// Specifies the type of filter to be applied on realtime Postgres Change listener. diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index b69c7422..d9d2ee8b 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -224,7 +224,7 @@ void main() { expect( streamController.stream, emitsInOrder([ - { + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2021-08-01T08:00:20Z', @@ -232,8 +232,8 @@ void main() { 'new': {'id': 3, 'task': 'task 3', 'status': true}, 'old': {}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2021-08-01T08:00:30Z', @@ -241,8 +241,8 @@ void main() { 'new': {'id': 2, 'task': 'task 2 updated', 'status': false}, 'old': {'id': 2}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-14T02:12:52Z', @@ -250,7 +250,7 @@ void main() { 'new': {}, 'old': {'id': 2}, 'errors': null - }, + }), ]), ); }); @@ -274,7 +274,7 @@ void main() { expect( streamController.stream, emitsInOrder([ - { + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2021-08-01T08:00:30Z', @@ -282,8 +282,8 @@ void main() { 'new': {'id': 2, 'task': 'task 2 updated', 'status': false}, 'old': {'id': 2}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-14T02:12:52Z', @@ -291,7 +291,7 @@ void main() { 'new': {}, 'old': {'id': 2}, 'errors': null - }, + }) ]), ); }); @@ -465,7 +465,7 @@ void main() { expect( streamController.stream, emitsInOrder([ - { + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-24T05:42:01.303668+00:00', @@ -473,8 +473,8 @@ void main() { 'new': {'id': 1, 'task': 'task 1', 'status': true}, 'old': {}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-24T05:42:01.303668+00:00', @@ -482,8 +482,8 @@ void main() { 'new': {'id': 2, 'task': 'task 2 updated', 'status': false}, 'old': {'id': 2}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-24T05:42:01.303668+00:00', @@ -491,7 +491,7 @@ void main() { 'new': {}, 'old': {'id': 2}, 'errors': null - }, + }), ]), ); }); @@ -515,7 +515,7 @@ void main() { expect( streamController.stream, emitsInOrder([ - { + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-24T05:42:01.303668+00:00', @@ -523,8 +523,8 @@ void main() { 'new': {'id': 2, 'task': 'task 2 updated', 'status': false}, 'old': {'id': 2}, 'errors': null - }, - { + }), + PostgresChangePayload.fromPayload({ 'schema': 'public', 'table': 'todos', 'commit_timestamp': '2022-09-24T05:42:01.303668+00:00', @@ -532,7 +532,7 @@ void main() { 'new': {}, 'old': {'id': 2}, 'errors': null - }, + }), ]), ); }); From 68a907387005b4f1056e6a8129ed3eac48a52669 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:10:45 +0900 Subject: [PATCH 05/15] update fromPayload constructor --- packages/realtime_client/lib/src/types.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index d14cadc5..6f544550 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -174,7 +174,7 @@ class PostgresChangePayload { table = payload['table'], commitTimestamp = DateTime.parse(payload['commit_timestamp'] ?? '19700101'), - eventType = payload['eventType'], + eventType = PostgresChangeEventMethods.fromString(payload['eventType']), newRow = Map.from((payload['new'] as Map)), oldRow = From d7e48f43fb05a5a492af682a452516511bf04abc Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 12:20:20 +0900 Subject: [PATCH 06/15] fix type cast --- packages/realtime_client/lib/src/types.dart | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index 6f544550..3074743b 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -175,10 +175,8 @@ class PostgresChangePayload { commitTimestamp = DateTime.parse(payload['commit_timestamp'] ?? '19700101'), eventType = PostgresChangeEventMethods.fromString(payload['eventType']), - newRow = - Map.from((payload['new'] as Map)), - oldRow = - Map.from((payload['old'] as Map)), + newRow = Map.from(payload['new']), + oldRow = Map.from(payload['old']), errors = payload['errors']; @override From 143411125fa29ff3370d198f047f4d0440227f90 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 15:29:32 +0900 Subject: [PATCH 07/15] revert renaming some symbols --- packages/realtime_client/lib/realtime_client.dart | 2 +- .../realtime_client/lib/src/realtime_channel.dart | 11 +++++------ packages/realtime_client/lib/src/types.dart | 14 +++++++------- packages/realtime_client/test/channel_test.dart | 4 ++-- packages/realtime_client/test/mock_test.dart | 1 - 5 files changed, 15 insertions(+), 17 deletions(-) diff --git a/packages/realtime_client/lib/realtime_client.dart b/packages/realtime_client/lib/realtime_client.dart index 63c95601..691b6b07 100644 --- a/packages/realtime_client/lib/realtime_client.dart +++ b/packages/realtime_client/lib/realtime_client.dart @@ -4,4 +4,4 @@ export 'src/realtime_client.dart'; export 'src/realtime_presence.dart'; export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords; export 'src/types.dart' - hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenType; + hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenTypes; diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 8b56a7ed..45b2249f 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -215,7 +215,7 @@ class RealtimeChannel { Future track(Map payload, [Map opts = const {}]) { return send( - type: RealtimeListenType.presence, + type: RealtimeListenTypes.presence, payload: { 'event': 'track', 'payload': payload, @@ -228,7 +228,7 @@ class RealtimeChannel { Map opts = const {}, ]) { return send( - type: RealtimeListenType.presence, + type: RealtimeListenTypes.presence, payload: { 'event': 'untrack', }, @@ -263,8 +263,7 @@ class RealtimeChannel { table: table, filter: filter?.toString(), ), - ((payload, [ref]) => - callback(PostgresChangePayload.fromPayload(payload))), + (payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)), ); } @@ -345,7 +344,7 @@ class RealtimeChannel { } Future send({ - required RealtimeListenType type, + required RealtimeListenTypes type, String? event, required Map payload, Map opts = const {}, @@ -357,7 +356,7 @@ class RealtimeChannel { payload['event'] = event; } - if (!canPush && type == RealtimeListenType.broadcast) { + if (!canPush && type == RealtimeListenTypes.broadcast) { final headers = { 'Content-Type': 'application/json', 'apikey': socket.accessToken ?? '', diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index 3074743b..c7beab4d 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -69,16 +69,16 @@ extension PostgresChangeEventMethods on PostgresChangeEvent { } class ChannelFilter { - /// For [RealtimeListenType.postgresChanges] it's one of: `INSERT`, `UPDATE`, `DELETE` + /// For [RealtimeListenTypes.postgresChanges] it's one of: `INSERT`, `UPDATE`, `DELETE` /// - /// For [RealtimeListenType.presence] it's one of: `sync`, `join`, `leave` + /// For [RealtimeListenTypes.presence] it's one of: `sync`, `join`, `leave` /// - /// For [RealtimeListenType.broadcast] it can be any string + /// For [RealtimeListenTypes.broadcast] it can be any string final String? event; final String? schema; final String? table; - /// For [RealtimeListenType.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in` + /// For [RealtimeListenTypes.postgresChanges] it's of the format `column=filter.value` with `filter` being one of `eq, neq, lt, lte, gt, gte, in` /// /// Only one filter can be applied final String? filter; @@ -102,15 +102,15 @@ class ChannelFilter { enum ChannelResponse { ok, timedOut, rateLimited, error } -enum RealtimeListenType { postgresChanges, broadcast, presence } +enum RealtimeListenTypes { postgresChanges, broadcast, presence } enum PresenceEvent { sync, join, leave } enum RealtimeSubscribeStatus { subscribed, channelError, closed, timedOut } -extension ToType on RealtimeListenType { +extension ToType on RealtimeListenTypes { String toType() { - if (this == RealtimeListenType.postgresChanges) { + if (this == RealtimeListenTypes.postgresChanges) { return 'postgres_changes'; } else { return name; diff --git a/packages/realtime_client/test/channel_test.dart b/packages/realtime_client/test/channel_test.dart index cbe472d7..1e876084 100644 --- a/packages/realtime_client/test/channel_test.dart +++ b/packages/realtime_client/test/channel_test.dart @@ -270,7 +270,7 @@ void main() { if (status == RealtimeSubscribeStatus.subscribed) { final completer = Completer(); channel.send( - type: RealtimeListenType.broadcast, + type: RealtimeListenTypes.broadcast, payload: { 'myKey': 'myValue', }, @@ -294,7 +294,7 @@ void main() { () async { final completer = Completer(); channel.send( - type: RealtimeListenType.broadcast, + type: RealtimeListenTypes.broadcast, payload: { 'myKey': 'myValue', }, diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index d9d2ee8b..87ab2a79 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -90,7 +90,6 @@ void main() { 'schema': 'public', 'table': 'todos', 'type': 'INSERT', - if (postgresFilter != null) 'filter': postgresFilter, 'columns': [ { 'name': 'id', From e2332a67969b06032d63e0a7ed2821301ff45622 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 17:38:31 +0900 Subject: [PATCH 08/15] fix: typo in test --- packages/realtime_client/lib/src/types.dart | 23 +++++++++---------- packages/realtime_client/test/mock_test.dart | 14 +++++++---- .../lib/src/supabase_stream_builder.dart | 8 +++---- 3 files changed, 24 insertions(+), 21 deletions(-) diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index c7beab4d..457be026 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -1,4 +1,3 @@ -// ignore_for_file: public_member_api_docs, sort_constructors_first import 'package:collection/collection.dart'; typedef BindingCallback = void Function(dynamic payload, [dynamic ref]); @@ -155,16 +154,16 @@ class PostgresChangePayload { final String table; final DateTime commitTimestamp; final PostgresChangeEvent eventType; - final Map newRow; - final Map oldRow; + final Map newRecord; + final Map oldRecord; final dynamic errors; PostgresChangePayload({ required this.schema, required this.table, required this.commitTimestamp, required this.eventType, - required this.newRow, - required this.oldRow, + required this.newRecord, + required this.oldRecord, required this.errors, }); @@ -175,13 +174,13 @@ class PostgresChangePayload { commitTimestamp = DateTime.parse(payload['commit_timestamp'] ?? '19700101'), eventType = PostgresChangeEventMethods.fromString(payload['eventType']), - newRow = Map.from(payload['new']), - oldRow = Map.from(payload['old']), + newRecord = Map.from(payload['new']), + oldRecord = Map.from(payload['old']), errors = payload['errors']; @override String toString() { - return 'PostgresChangePayload(schema: $schema, table: $table, commitTimestamp: $commitTimestamp, eventType: $eventType, newRow: $newRow, oldRow: $oldRow, errors: $errors)'; + return 'PostgresChangePayload(schema: $schema, table: $table, commitTimestamp: $commitTimestamp, eventType: $eventType, newRow: $newRecord, oldRow: $oldRecord, errors: $errors)'; } @override @@ -193,8 +192,8 @@ class PostgresChangePayload { other.table == table && other.commitTimestamp == commitTimestamp && other.eventType == eventType && - mapEquals(other.newRow, newRow) && - mapEquals(other.oldRow, oldRow) && + mapEquals(other.newRecord, newRecord) && + mapEquals(other.oldRecord, oldRecord) && other.errors == errors; } @@ -204,8 +203,8 @@ class PostgresChangePayload { table.hashCode ^ commitTimestamp.hashCode ^ eventType.hashCode ^ - newRow.hashCode ^ - oldRow.hashCode ^ + newRecord.hashCode ^ + oldRecord.hashCode ^ errors.hashCode; } } diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index 87ab2a79..1716ec17 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -90,6 +90,7 @@ void main() { 'schema': 'public', 'table': 'todos', 'type': 'INSERT', + if (postgresFilter != null) 'filter': postgresFilter, 'columns': [ { 'name': 'id', @@ -258,13 +259,16 @@ void main() { final streamController = StreamController(); client - .channel('public:todoos') + .channel('public:todos') .onPostgresChanges( event: PostgresChangeEvent.all, schema: 'public', table: 'todos', filter: PostgresChangeFilter( - type: PostgresChangeFilterType.eq, column: 'id', value: 2), + type: PostgresChangeFilterType.eq, + column: 'id', + value: 2, + ), callback: (payload) { streamController.add(payload); }) @@ -305,7 +309,7 @@ void main() { } }, count: 2); - final channel = client.channel('public:todoos').onPostgresChanges( + final channel = client.channel('public:todos').onPostgresChanges( event: PostgresChangeEvent.all, schema: 'public', table: 'todos', @@ -451,7 +455,7 @@ void main() { final streamController = StreamController(); client - .channel('public:todoos') + .channel('public:todos') .onPostgresChanges( event: PostgresChangeEvent.all, schema: 'public', @@ -499,7 +503,7 @@ void main() { final streamController = StreamController(); client - .channel('public:todoos') + .channel('public:todos') .onPostgresChanges( event: PostgresChangeEvent.all, schema: 'public', diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index bcc0b4b4..11d0e9c1 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -166,7 +166,7 @@ class SupabaseStreamBuilder extends Stream { table: _table, filter: realtimeFilter, callback: (payload) { - final newRecord = payload.newRow; + final newRecord = payload.newRecord; _streamData.add(newRecord); _addStream(); }) @@ -180,7 +180,7 @@ class SupabaseStreamBuilder extends Stream { (element) => _isTargetRecord(record: element, payload: payload), ); - final updatedRecord = payload.newRow; + final updatedRecord = payload.newRecord; if (updatedIndex >= 0) { _streamData[updatedIndex] = updatedRecord; } else { @@ -260,9 +260,9 @@ class SupabaseStreamBuilder extends Stream { }) { late final Map targetRecord; if (payload.eventType == PostgresChangeEvent.update) { - targetRecord = payload.newRow; + targetRecord = payload.newRecord; } else if (payload.eventType == PostgresChangeEvent.delete) { - targetRecord = payload.oldRow; + targetRecord = payload.oldRecord; } return _uniqueColumns .every((column) => record[column] == targetRecord[column]); From e2d80db014b72bb3c88af1c6904ff3e46c1d314a Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 18:01:24 +0900 Subject: [PATCH 09/15] remove unnecessary reply message --- packages/realtime_client/test/mock_test.dart | 35 -------------------- 1 file changed, 35 deletions(-) diff --git a/packages/realtime_client/test/mock_test.dart b/packages/realtime_client/test/mock_test.dart index 1716ec17..ccd77089 100644 --- a/packages/realtime_client/test/mock_test.dart +++ b/packages/realtime_client/test/mock_test.dart @@ -38,41 +38,6 @@ void main() { ['postgres_changes'] .first['filter']; - final replyString = jsonEncode({ - 'event': 'phx_reply', - 'payload': { - 'response': { - 'postgres_changes': [ - { - 'id': 77086988, - 'event': 'INSERT', - 'schema': 'public', - 'table': 'todos', - if (postgresFilter != null) 'filter': postgresFilter, - }, - { - 'id': 25993878, - 'event': 'UPDATE', - 'schema': 'public', - 'table': 'todos', - if (postgresFilter != null) 'filter': postgresFilter, - }, - { - 'id': 48673474, - 'event': 'DELETE', - 'schema': 'public', - 'table': 'todos', - if (postgresFilter != null) 'filter': postgresFilter, - } - ] - }, - 'status': 'ok' - }, - 'ref': '1', - 'topic': 'realtime:public:todos' - }); - webSocket!.add(replyString); - final topic = (jsonDecode(request as String) as Map)['topic']; // Send an insert event From 94ca58f8941f8ce3989c47a0ff21b542203a68cc Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 21:44:27 +0900 Subject: [PATCH 10/15] update readme --- .../lib/src/realtime_channel.dart | 13 ++++ packages/supabase_flutter/README.md | 64 ++++++++++--------- 2 files changed, 48 insertions(+), 29 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 45b2249f..afe68903 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -343,6 +343,19 @@ class RealtimeChannel { return pushEvent; } + /// Sends a realtime broadcast message. + Future sendBroadcast({ + required String event, + required Map payload, + }) { + return send( + type: RealtimeListenTypes.broadcast, + event: event, + payload: payload, + ); + } + + @internal Future send({ required RealtimeListenTypes type, String? event, diff --git a/packages/supabase_flutter/README.md b/packages/supabase_flutter/README.md index 2b995be3..e64fa3e0 100644 --- a/packages/supabase_flutter/README.md +++ b/packages/supabase_flutter/README.md @@ -245,15 +245,16 @@ You can get notified whenever there is a change in your Supabase tables. ```dart final myChannel = supabase.channel('my_channel'); -myChannel.on( - RealtimeListenTypes.postgresChanges, - ChannelFilter( - event: '*', +myChannel + .onPostgresChanges( + event: PostgresChangeEvent.all, schema: 'public', table: 'countries', - ), (payload, [ref]) { - // Do something fun or interesting when there is an change on the database -}).subscribe(); + callback: (payload) { + // Do something fun or interesting when there is an change on the database + }, + ) + .subscribe(); ``` #### [Broadcast](https://supabase.com/docs/guides/realtime#broadcast) @@ -264,17 +265,16 @@ Broadcast lets you send and receive low latency messages between connected clien final myChannel = supabase.channel('my_channel'); // Subscribe to `cursor-pos` broadcast event -myChannel.on( - RealtimeListenTypes.broadcast, - ChannelFilter(event: 'cursor-pos'), - (payload, [ref]) { - // Do something fun or interesting when there is an change on the database - }, -).subscribe(); +final myChannel = supabase.channel('my_channel'); + +myChannel + .onBroadcast(event: 'cursor-pos', callback: (payload) {} + // Do something fun or interesting when there is an change on the database + ) + .subscribe(); // Send a broadcast message to other connected clients -await myChannel.send( - type: RealtimeListenTypes.broadcast, +await myChannel.sendBroadcast( event: 'cursor-pos', payload: {'x': 30, 'y': 50}, ); @@ -288,19 +288,25 @@ Presence let's you easily create "I'm online" feature. final myChannel = supabase.channel('my_channel'); // Subscribe to presence events -myChannel.on( - RealtimeListenTypes.presence, ChannelFilter(event: 'sync'), - (payload, [ref]) { - final onlineUsers = myChannel.presenceState(); - // handle sync event -}).on(RealtimeListenTypes.presence, ChannelFilter(event: 'join'), - (payload, [ref]) { - // New users have joined -}).on(RealtimeListenTypes.presence, ChannelFilter(event: 'leave'), - (payload, [ref]) { - // Users have left -}).subscribe(((status, [_]) async { - if (status == 'SUBSCRIBED') { +myChannel + .onPresence( + event: PresenceEvent.sync, + callback: (payload) { + final onlineUsers = myChannel.presenceState(); + // handle sync event + }) + .onPresence( + event: PresenceEvent.join, + callback: (payload) { + // New users have joined + }) + .onPresence( + event: PresenceEvent.leave, + callback: (payload) { + // Users have left + }) + .subscribe(((status, [_]) async { + if (status == RealtimeSubscribeStatus.subscribed) { // Send the current user's state upon subscribing final status = await myChannel .track({'online_at': DateTime.now().toIso8601String()}); From 651d1a172419b6f61a80c333a7e7d3eb727ab37d Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Mon, 27 Nov 2023 21:46:25 +0900 Subject: [PATCH 11/15] update send broadcast message method name --- packages/realtime_client/lib/src/realtime_channel.dart | 2 +- packages/supabase_flutter/README.md | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index afe68903..31f65eb5 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -344,7 +344,7 @@ class RealtimeChannel { } /// Sends a realtime broadcast message. - Future sendBroadcast({ + Future sendBroadcastMessage({ required String event, required Map payload, }) { diff --git a/packages/supabase_flutter/README.md b/packages/supabase_flutter/README.md index e64fa3e0..d4c763d2 100644 --- a/packages/supabase_flutter/README.md +++ b/packages/supabase_flutter/README.md @@ -274,7 +274,7 @@ myChannel .subscribe(); // Send a broadcast message to other connected clients -await myChannel.sendBroadcast( +await myChannel.sendBroadcastMessage( event: 'cursor-pos', payload: {'x': 30, 'y': 50}, ); From 4577dc802a7e1605d4ca304596fedc7493f99bf4 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Thu, 30 Nov 2023 08:55:31 +0900 Subject: [PATCH 12/15] add internal label to subTopic --- packages/realtime_client/lib/src/realtime_channel.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 31f65eb5..7e4b8440 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -20,6 +20,7 @@ class RealtimeChannel { List _pushBuffer = []; late RealtimePresence presence; late final String broadcastEndpointURL; + @internal final String subTopic; final String topic; Map params; From c6af96980361d955cde98ff1a83b8612213dc685 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Thu, 30 Nov 2023 09:38:37 +0900 Subject: [PATCH 13/15] remove unnecessary filtering logic on supabase stream builder --- .../lib/src/supabase_stream_builder.dart | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/packages/supabase/lib/src/supabase_stream_builder.dart b/packages/supabase/lib/src/supabase_stream_builder.dart index 11d0e9c1..b43944e7 100644 --- a/packages/supabase/lib/src/supabase_stream_builder.dart +++ b/packages/supabase/lib/src/supabase_stream_builder.dart @@ -141,20 +141,11 @@ class SupabaseStreamBuilder extends Stream { _streamData = []; PostgresChangeFilter? realtimeFilter; if (currentStreamFilter != null) { - if (currentStreamFilter.type == PostgresChangeFilterType.inFilter) { - final value = currentStreamFilter.value; - realtimeFilter = PostgresChangeFilter( - type: PostgresChangeFilterType.inFilter, - column: currentStreamFilter.column, - value: value, - ); - } else { - realtimeFilter = PostgresChangeFilter( - type: currentStreamFilter.type, - column: currentStreamFilter.column, - value: currentStreamFilter.value, - ); - } + realtimeFilter = PostgresChangeFilter( + type: currentStreamFilter.type, + column: currentStreamFilter.column, + value: currentStreamFilter.value, + ); } _channel = _realtimeClient.channel(_realtimeTopic); From 931803ed6053d99dd82ad8236d741f6050859ac0 Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Thu, 30 Nov 2023 10:20:25 +0900 Subject: [PATCH 14/15] add comment docs --- .../lib/src/realtime_channel.dart | 67 +++++++++++++++++++ packages/realtime_client/lib/src/types.dart | 9 +++ 2 files changed, 76 insertions(+) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 7e4b8440..85567871 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -14,16 +14,22 @@ class RealtimeChannel { final Map> _bindings = {}; final Duration _timeout; ChannelStates _state = ChannelStates.closed; + @internal bool joinedOnce = false; + @internal late Push joinPush; late RetryTimer _rejoinTimer; List _pushBuffer = []; late RealtimePresence presence; + @internal late final String broadcastEndpointURL; @internal final String subTopic; + @internal final String topic; + @internal Map params; + @internal final RealtimeClient socket; RealtimeChannel( @@ -249,6 +255,32 @@ class RealtimeChannel { (reason, [ref]) => callback(reason?.toString())); } + /// Sets up a listener on your Supabase database. + /// + /// [event] determines whether you listen to `insert`, `update`, `delete`, or all of the events. + /// + /// [schema] is the schema of the database on which to set up the listener. + /// The listener will return all changes from every listenable schema if omitted. + /// + /// [table] is the table of the database on which to setup the listener. + /// The listener will return all changes from every listenable table if omitted. + /// + /// [filter] can be used to further control which rows to listen to within the given [schema] and [table]. + /// + /// ```dart + /// supabase.channel('my_channel').onPostgresChanges( + /// event: PostgresChangeEvent.all, + /// schema: 'public', + /// table: 'messages', + /// filter: PostgresChangeFilter( + /// type: PostgresChangeFilterType.eq, + /// column: 'room_id', + /// value: 200, + /// ), + /// callback: (payload) { + /// print(payload); + /// }).subscribe(); + /// ``` RealtimeChannel onPostgresChanges({ required PostgresChangeEvent event, String? schema, @@ -268,6 +300,17 @@ class RealtimeChannel { ); } + /// Sets up a listener for realtime broadcast messages. + /// + /// [event] is the broadcast event name to which you want to listen. + /// + /// ```dart + /// supabase.channel('my_channel').onBroadcast( + /// event: 'position', + /// callback: (payload) { + /// print(payload); + /// }).subscribe(); + /// ``` RealtimeChannel onBroadcast({ required String event, required void Function(Map payload) callback, @@ -279,6 +322,30 @@ class RealtimeChannel { ); } + /// Sets up a listen for realtime presence. + /// + /// [event] sets which presence event to listen to. + /// + /// ```dart + /// final channel = supabase.channel('my_channel'); + /// channel + /// .onPresence( + /// event: PresenceEvent.sync, + /// callback: (payload) { + /// print('Synced presence state: ${channel.presenceState()}'); + /// }) + /// .onPresence( + /// event: PresenceEvent.join, + /// callback: (payload) { + /// print('Newly joined presences $payload'); + /// }) + /// .onPresence( + /// event: PresenceEvent.leave, + /// callback: (payload) { + /// print('Newly left presences: $payload'); + /// }) + /// .subscribe(); + /// ``` RealtimeChannel onPresence({ required PresenceEvent event, required void Function(Map payload) callback, diff --git a/packages/realtime_client/lib/src/types.dart b/packages/realtime_client/lib/src/types.dart index 457be026..ed9f30d2 100644 --- a/packages/realtime_client/lib/src/types.dart +++ b/packages/realtime_client/lib/src/types.dart @@ -233,11 +233,20 @@ enum PostgresChangeFilterType { inFilter; } +/// {@template postgres_change_filter} +/// Creates a filter for realtime postgres change listener. +/// {@endtemplate} class PostgresChangeFilter { + /// The type of the filter to set. final PostgresChangeFilterType type; + + /// The column to set the filter on. final String column; + + /// The value to perform the filter on. final dynamic value; + /// {@macro postgres_change_filter} PostgresChangeFilter({ required this.type, required this.column, From 86853d80959f6fcf3508669ecdb2da1b83f3181a Mon Sep 17 00:00:00 2001 From: dshukertjr <18113850+dshukertjr@users.noreply.github.com> Date: Thu, 30 Nov 2023 19:22:37 +0900 Subject: [PATCH 15/15] subscribe now returns the channel --- packages/realtime_client/lib/src/realtime_channel.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/realtime_client/lib/src/realtime_channel.dart b/packages/realtime_client/lib/src/realtime_channel.dart index 85567871..85c2c5e5 100644 --- a/packages/realtime_client/lib/src/realtime_channel.dart +++ b/packages/realtime_client/lib/src/realtime_channel.dart @@ -105,7 +105,7 @@ class RealtimeChannel { /// Pass a [callback] to react to different status changes. /// /// [timeout] parameter can be used to override the default timeout set on [RealtimeClient]. - void subscribe([ + RealtimeChannel subscribe([ void Function(RealtimeSubscribeStatus status, Object? error)? callback, Duration? timeout, ]) { @@ -213,6 +213,7 @@ class RealtimeChannel { return; }); } + return this; } Map presenceState() {