Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!(realtime_client): Introduce type safe realtime methods #725

Merged
merged 17 commits into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 21 additions & 8 deletions packages/realtime_client/example/main.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,27 @@ Future<void> main() async {
);

final channel = socket.channel('realtime:public');
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'DELETE', schema: 'public'), (payload, [ref]) {
print('channel delete payload: $payload');
});
channel.on(RealtimeListenTypes.postgresChanges,
ChannelFilter(event: 'INSERT', schema: 'public'), (payload, [ref]) {
print('channel insert payload: $payload');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.all,
filter: PostgresChangeFilter(
type: PostgresChangeFilterType.eq,
column: 'column',
value: 'value',
),
callback: (payload) {},
);
channel.onPostgresChanges(
event: PostgresChangeEvent.delete,
schema: 'public',
callback: (payload) {
print('channel delete payload: ${payload.toString()}');
});
channel.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
callback: (payload) {
print('channel insert payload: ${payload.toString()}');
});

socket.onMessage((message) => print('MESSAGE $message'));

Expand Down
3 changes: 2 additions & 1 deletion packages/realtime_client/lib/realtime_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ export 'src/realtime_channel.dart';
export 'src/realtime_client.dart';
export 'src/realtime_presence.dart';
export 'src/transformers.dart' hide getEnrichedPayload, getPayloadRecords;
export 'src/types.dart' hide ToType;
export 'src/types.dart'
hide ToType, PostgresChangeEventMethods, ChannelFilter, RealtimeListenTypes;
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/push.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import 'dart:async';
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/message.dart';
import 'package:realtime_client/src/types.dart';

typedef Callback = void Function(dynamic response);

Expand Down
60 changes: 54 additions & 6 deletions packages/realtime_client/lib/src/realtime_channel.dart
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,47 @@ class RealtimeChannel {
(reason, [ref]) => callback(reason?.toString()));
}

RealtimeChannel on(
RealtimeListenTypes type,
ChannelFilter filter,
BindingCallback callback,
) {
return onEvents(type.toType(), filter, callback);
RealtimeChannel onPostgresChanges({
required PostgresChangeEvent event,
String? schema,
String? table,
PostgresChangeFilter? filter,
required void Function(PostgresChangePayload payload) callback,
}) {
return onEvents(
'postgres_changes',
ChannelFilter(
event: event.toRealtimeEvent(),
schema: schema,
table: table,
filter: filter?.toString(),
),
(payload, [ref]) => callback(PostgresChangePayload.fromPayload(payload)),
Vinzent03 marked this conversation as resolved.
Show resolved Hide resolved
);
}

RealtimeChannel onBroadcast({
required String event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'broadcast',
ChannelFilter(event: event),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

RealtimeChannel onPresence({
required PresenceEvent event,
required void Function(Map<String, dynamic> payload) callback,
}) {
return onEvents(
'presence',
ChannelFilter(
event: event.name,
),
(payload, [ref]) => callback(Map<String, dynamic>.from(payload)),
);
}

@internal
Expand Down Expand Up @@ -308,6 +343,19 @@ class RealtimeChannel {
return pushEvent;
}

/// Sends a realtime broadcast message.
Future<ChannelResponse> sendBroadcastMessage({
required String event,
required Map<String, dynamic> payload,
}) {
return send(
type: RealtimeListenTypes.broadcast,
event: event,
payload: payload,
);
}

@internal
Future<ChannelResponse> send({
required RealtimeListenTypes type,
String? event,
Expand Down
1 change: 1 addition & 0 deletions packages/realtime_client/lib/src/realtime_presence.dart
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/types.dart';

class Presence {
final String presenceRef;
Expand Down
150 changes: 150 additions & 0 deletions packages/realtime_client/lib/src/types.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import 'package:collection/collection.dart';

typedef BindingCallback = void Function(dynamic payload, [dynamic ref]);

class Binding {
Expand Down Expand Up @@ -28,6 +30,43 @@ class Binding {
}
}

enum PostgresChangeEvent {
/// Listen to all insert, update, and delete events.
all,

/// Listen to insert events.
insert,

/// Listen to update events.
update,

/// Listen to delete events.
delete,
}

extension PostgresChangeEventMethods on PostgresChangeEvent {
String toRealtimeEvent() {
if (this == PostgresChangeEvent.all) {
return '*';
} else {
return name.toUpperCase();
}
}

static PostgresChangeEvent fromString(String event) {
switch (event) {
case 'INSERT':
return PostgresChangeEvent.insert;
case 'UPDATE':
return PostgresChangeEvent.update;
case 'DELETE':
return PostgresChangeEvent.delete;
}
throw ArgumentError(
'Only "INSERT", "UPDATE", or "DELETE" can be can be passed to `fromString()` method.');
}
}

class ChannelFilter {
/// For [RealtimeListenTypes.postgresChanges] it's one of: `INSERT`, `UPDATE`, `DELETE`
///
Expand Down Expand Up @@ -64,6 +103,8 @@ enum ChannelResponse { ok, timedOut, rateLimited, error }

enum RealtimeListenTypes { postgresChanges, broadcast, presence }

enum PresenceEvent { sync, join, leave }

enum RealtimeSubscribeStatus { subscribed, channelError, closed, timedOut }

extension ToType on RealtimeListenTypes {
Expand Down Expand Up @@ -106,3 +147,112 @@ class RealtimeChannelConfig {
};
}
}

/// Data class that contains the Postgres change event payload.
class PostgresChangePayload {
final String schema;
final String table;
final DateTime commitTimestamp;
final PostgresChangeEvent eventType;
final Map<String, dynamic> newRecord;
final Map<String, dynamic> oldRecord;
final dynamic errors;
PostgresChangePayload({
required this.schema,
required this.table,
required this.commitTimestamp,
required this.eventType,
required this.newRecord,
required this.oldRecord,
required this.errors,
});

/// Creates a PostgresChangePayload instance from the enriched postgres change payload
PostgresChangePayload.fromPayload(Map<String, dynamic> payload)
: schema = payload['schema'],
table = payload['table'],
commitTimestamp =
DateTime.parse(payload['commit_timestamp'] ?? '19700101'),
eventType = PostgresChangeEventMethods.fromString(payload['eventType']),
newRecord = Map<String, dynamic>.from(payload['new']),
oldRecord = Map<String, dynamic>.from(payload['old']),
errors = payload['errors'];

@override
String toString() {
return 'PostgresChangePayload(schema: $schema, table: $table, commitTimestamp: $commitTimestamp, eventType: $eventType, newRow: $newRecord, oldRow: $oldRecord, errors: $errors)';
}

@override
bool operator ==(covariant PostgresChangePayload other) {
if (identical(this, other)) return true;
final mapEquals = const DeepCollectionEquality().equals;

return other.schema == schema &&
other.table == table &&
other.commitTimestamp == commitTimestamp &&
other.eventType == eventType &&
mapEquals(other.newRecord, newRecord) &&
mapEquals(other.oldRecord, oldRecord) &&
other.errors == errors;
}

@override
int get hashCode {
return schema.hashCode ^
table.hashCode ^
commitTimestamp.hashCode ^
eventType.hashCode ^
newRecord.hashCode ^
oldRecord.hashCode ^
errors.hashCode;
}
}

/// Specifies the type of filter to be applied on realtime Postgres Change listener.
enum PostgresChangeFilterType {
/// Listens to changes where a column's value in a table equals a client-specified value.
eq,

/// Listens to changes where a column's value in a table does not equal a value specified.
neq,

/// Listen to changes where a column's value in a table is less than a value specified.
lt,

/// Listens to changes where a column's value in a table is less than or equal to a value specified.
lte,

/// Listens to changes where a column's value in a table is greater than a value specified.
gt,

/// Listens to changes where a column's value in a table is greater than or equal to a value specified.
gte,

/// Listen to changes when a column's value in a table equals any of the values specified.
inFilter;
}

class PostgresChangeFilter {
final PostgresChangeFilterType type;
final String column;
final dynamic value;

PostgresChangeFilter({
required this.type,
required this.column,
required this.value,
});

@override
String toString() {
if (type == PostgresChangeFilterType.inFilter) {
if (value is List<String>) {
return '$column=in.(${value.map((s) => '"$s"').join(',')})';
} else {
return '$column=in.(${value.map((s) => '"$s"').join(',')})';
}
}
return '$column=${type.name}.$value';
}
}
1 change: 1 addition & 0 deletions packages/realtime_client/test/channel_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:io';

import 'package:realtime_client/realtime_client.dart';
import 'package:realtime_client/src/constants.dart';
import 'package:realtime_client/src/types.dart';
import 'package:test/test.dart';

void main() {
Expand Down
Loading
Loading