Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(realtime_client): Introduce type safe realtime methods #725

Merged
merged 17 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions packages/realtime_client/example/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@ Future<void> main() async {
);

final channel = socket.channel('realtime:public');
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'DELETE', schema: 'public'), (payload, [ref]) {
print('channel delete payload: $payload');
});
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'INSERT', schema: 'public'), (payload, [ref]) {
print('channel insert payload: $payload');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.all,
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'column',
value: 'value',
),
callback: (payload) {},
);
channel.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: 'public',
callback: (payload) {
print('channel delete payload: ${payload.toString()}');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
callback: (payload) {
print('channel insert payload: ${payload.toString()}');
});

socket.onMessage((message) => print('MESSAGE $message'));

Expand Down
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords;
export 'src/types.dart' hide ToType;
export 'src/types.dart'
hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenTypes;
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/message.dart';
import 'package:realtime_client/src/types.dart';

typedef Callback = void Function(dynamic response);

Expand Down
131 changes: 124 additions & 7 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,22 @@ class RealtimeChannel {
final Map<String, List<Binding>> _bindings = {};
final Duration _timeout;
ChannelStates _state = ChannelStates.closed;
@internal
bool joinedOnce = false;
@internal
late Push joinPush;
late RetryTimer _rejoinTimer;
List<Push> _pushBuffer = [];
late RealtimePresence presence;
@internal
late final String broadcastEndpointURL;
@internal
final String subTopic;
@internal
final String topic;
@internal
Map<String, dynamic> params;
@internal
final RealtimeClient socket;

RealtimeChannel(
Expand Down Expand Up @@ -98,7 +105,7 @@ class RealtimeChannel {
/// Pass a [callback] to react to different status changes.
///
/// [timeout] parameter can be used to override the default timeout set on [RealtimeClient].
void subscribe([
RealtimeChannel subscribe([
void Function(RealtimeSubscribeStatus status, Object? error)? callback,
Duration? timeout,
]) {
Expand Down Expand Up @@ -206,6 +213,7 @@ class RealtimeChannel {
return;
});
}
return this;
}

Map<String, dynamic> presenceState() {
Expand Down Expand Up @@ -248,12 +256,108 @@ class RealtimeChannel {
(reason, [ref]) => callback(reason?.toString()));
}

RealtimeChannel on(
RealtimeListenTypes type,
ChannelFilter filter,
BindingCallback callback,
) {
return onEvents(type.toType(), filter, callback);
/// Sets up a listener on your Supabase database.
///
/// [event] determines whether you listen to `insert`, `update`, `delete`, or all of the events.
///
/// [schema] is the schema of the database on which to set up the listener.
/// The listener will return all changes from every listenable schema if omitted.
///
/// [table] is the table of the database on which to setup the listener.
/// The listener will return all changes from every listenable table if omitted.
///
/// [filter] can be used to further control which rows to listen to within the given [schema] and [table].
///
/// ```dart
/// supabase.channel('my_channel').onPostgresChanges(
/// event: PostgresChangeEvent.all,
/// schema: 'public',
/// table: 'messages',
/// filter: PostgresChangeFilter(
/// type: PostgresChangeFilterType.eq,
/// column: 'room_id',
/// value: 200,
/// ),
/// callback: (payload) {
/// print(payload);
/// }).subscribe();
/// ```
RealtimeChannel onPostgresChanges({
required PostgresChangeEvent event,
String? schema,
String? table,
PostgresChangeFilter? filter,
required void Function(PostgresChangePayload payload) callback,
}) {
return onEvents(
'postgres_changes',
ChannelFilter(
event: event.toRealtimeEvent(),
schema: schema,
table: table,
filter: filter?.toString(),
),
(payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)),
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
);
}

/// Sets up a listener for realtime broadcast messages.
///
/// [event] is the broadcast event name to which you want to listen.
///
/// ```dart
/// supabase.channel('my_channel').onBroadcast(
/// event: 'position',
/// callback: (payload) {
/// print(payload);
/// }).subscribe();
/// ```
RealtimeChannel onBroadcast({
required String event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'broadcast',
ChannelFilter(event: event),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

/// Sets up a listen for realtime presence.
///
/// [event] sets which presence event to listen to.
///
/// ```dart
/// final channel = supabase.channel('my_channel');
/// channel
/// .onPresence(
/// event: PresenceEvent.sync,
/// callback: (payload) {
/// print('Synced presence state: ${channel.presenceState()}');
/// })
/// .onPresence(
/// event: PresenceEvent.join,
/// callback: (payload) {
/// print('Newly joined presences $payload');
/// })
/// .onPresence(
/// event: PresenceEvent.leave,
/// callback: (payload) {
/// print('Newly left presences: $payload');
/// })
/// .subscribe();
/// ```
RealtimeChannel onPresence({
required PresenceEvent event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'presence',
ChannelFilter(
event: event.name,
),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

@internal
Expand Down Expand Up @@ -308,6 +412,19 @@ class RealtimeChannel {
return pushEvent;
}

/// Sends a realtime broadcast message.
Future<ChannelResponse> sendBroadcastMessage({
required String event,
required Map<String, dynamic> payload,
}) {
return send(
type: RealtimeListenTypes.broadcast,
event: event,
payload: payload,
);
}

@internal
Future<ChannelResponse> send({
required RealtimeListenTypes type,
String? event,
Expand Down
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/realtime_presence.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/types.dart';

class Presence {
final String presenceRef;
Expand Down
Loading
Loading