Skip to content

Commit

Permalink
feat!(realtime_client): Introduce type safe realtime methods (#725)
Browse files Browse the repository at this point in the history
* Create three different methods for each realtime methods

* refine the public API for realtime

* updated filter syntax for postgres change event

* update mock test

* update fromPayload constructor

* fix type cast

* revert renaming some symbols

* fix: typo in test

* remove unnecessary reply message

* update readme

* update send broadcast message method name

* add internal label to subTopic

* remove unnecessary filtering logic on supabase stream builder

* add comment docs

* subscribe now returns the channel
  • Loading branch information
dshukertjr authored Dec 1, 2023
1 parent a4897d0 commit 182f7c9
Show file tree
Hide file tree
Showing 14 changed files with 509 additions and 235 deletions.
29 changes: 21 additions & 8 deletions packages/realtime_client/example/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@ Future<void> main() async {
);

final channel = socket.channel('realtime:public');
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'DELETE', schema: 'public'), (payload, [ref]) {
print('channel delete payload: $payload');
});
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'INSERT', schema: 'public'), (payload, [ref]) {
print('channel insert payload: $payload');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.all,
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'column',
value: 'value',
),
callback: (payload) {},
);
channel.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: 'public',
callback: (payload) {
print('channel delete payload: ${payload.toString()}');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
callback: (payload) {
print('channel insert payload: ${payload.toString()}');
});

socket.onMessage((message) => print('MESSAGE $message'));

Expand Down
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords;
export 'src/types.dart' hide ToType;
export 'src/types.dart'
hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenTypes;
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/message.dart';
import 'package:realtime_client/src/types.dart';

typedef Callback = void Function(dynamic response);

Expand Down
131 changes: 124 additions & 7 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ class RealtimeChannel {
final Map<String, List<Binding>> _bindings = {};
final Duration _timeout;
ChannelStates _state = ChannelStates.closed;
@internal
bool joinedOnce = false;
@internal
late Push joinPush;
late RetryTimer _rejoinTimer;
List<Push> _pushBuffer = [];
late RealtimePresence presence;
@internal
late final String broadcastEndpointURL;
@internal
final String subTopic;
@internal
final String topic;
@internal
Map<String, dynamic> params;
@internal
final RealtimeClient socket;

RealtimeChannel(
Expand Down Expand Up @@ -98,7 +105,7 @@ class RealtimeChannel {
/// Pass a [callback] to react to different status changes.
///
/// [timeout] parameter can be used to override the default timeout set on [RealtimeClient].
void subscribe([
RealtimeChannel subscribe([
void Function(RealtimeSubscribeStatus status, Object? error)? callback,
Duration? timeout,
]) {
Expand Down Expand Up @@ -206,6 +213,7 @@ class RealtimeChannel {
return;
});
}
return this;
}

Map<String, dynamic> presenceState() {
Expand Down Expand Up @@ -248,12 +256,108 @@ class RealtimeChannel {
(reason, [ref]) => callback(reason?.toString()));
}

RealtimeChannel on(
RealtimeListenTypes type,
ChannelFilter filter,
BindingCallback callback,
) {
return onEvents(type.toType(), filter, callback);
/// Sets up a listener on your Supabase database.
///
/// [event] determines whether you listen to `insert`, `update`, `delete`, or all of the events.
///
/// [schema] is the schema of the database on which to set up the listener.
/// The listener will return all changes from every listenable schema if omitted.
///
/// [table] is the table of the database on which to setup the listener.
/// The listener will return all changes from every listenable table if omitted.
///
/// [filter] can be used to further control which rows to listen to within the given [schema] and [table].
///
/// ```dart
/// supabase.channel('my_channel').onPostgresChanges(
/// event: PostgresChangeEvent.all,
/// schema: 'public',
/// table: 'messages',
/// filter: PostgresChangeFilter(
/// type: PostgresChangeFilterType.eq,
/// column: 'room_id',
/// value: 200,
/// ),
/// callback: (payload) {
/// print(payload);
/// }).subscribe();
/// ```
RealtimeChannel onPostgresChanges({
required PostgresChangeEvent event,
String? schema,
String? table,
PostgresChangeFilter? filter,
required void Function(PostgresChangePayload payload) callback,
}) {
return onEvents(
'postgres_changes',
ChannelFilter(
event: event.toRealtimeEvent(),
schema: schema,
table: table,
filter: filter?.toString(),
),
(payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)),
);
}

/// Sets up a listener for realtime broadcast messages.
///
/// [event] is the broadcast event name to which you want to listen.
///
/// ```dart
/// supabase.channel('my_channel').onBroadcast(
/// event: 'position',
/// callback: (payload) {
/// print(payload);
/// }).subscribe();
/// ```
RealtimeChannel onBroadcast({
required String event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'broadcast',
ChannelFilter(event: event),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

/// Sets up a listen for realtime presence.
///
/// [event] sets which presence event to listen to.
///
/// ```dart
/// final channel = supabase.channel('my_channel');
/// channel
/// .onPresence(
/// event: PresenceEvent.sync,
/// callback: (payload) {
/// print('Synced presence state: ${channel.presenceState()}');
/// })
/// .onPresence(
/// event: PresenceEvent.join,
/// callback: (payload) {
/// print('Newly joined presences $payload');
/// })
/// .onPresence(
/// event: PresenceEvent.leave,
/// callback: (payload) {
/// print('Newly left presences: $payload');
/// })
/// .subscribe();
/// ```
RealtimeChannel onPresence({
required PresenceEvent event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'presence',
ChannelFilter(
event: event.name,
),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

@internal
Expand Down Expand Up @@ -308,6 +412,19 @@ class RealtimeChannel {
return pushEvent;
}

/// Sends a realtime broadcast message.
Future<ChannelResponse> sendBroadcastMessage({
required String event,
required Map<String, dynamic> payload,
}) {
return send(
type: RealtimeListenTypes.broadcast,
event: event,
payload: payload,
);
}

@internal
Future<ChannelResponse> send({
required RealtimeListenTypes type,
String? event,
Expand Down
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/realtime_presence.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/types.dart';

class Presence {
final String presenceRef;
Expand Down
Loading

0 comments on commit 182f7c9

Please sign in to comment.