From f67bb91dea9287155972cb839a1a6e6ba0089ab0 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 14:34:00 +0100 Subject: [PATCH 01/10] WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 47 +++ lib/src/coap_message.dart | 13 +- lib/src/codec/tcp/message_decoder.dart | 427 +++++++++++++++++++++++++ lib/src/codec/tcp/message_encoder.dart | 183 +++++++++++ lib/src/network/coap_inetwork.dart | 14 + lib/src/network/coap_network_tcp.dart | 48 +-- lib/src/option/option.dart | 6 +- 7 files changed, 703 insertions(+), 35 deletions(-) create mode 100644 example/tcp_test.dart create mode 100644 lib/src/codec/tcp/message_decoder.dart create mode 100644 lib/src/codec/tcp/message_encoder.dart diff --git a/example/tcp_test.dart b/example/tcp_test.dart new file mode 100644 index 00000000..b801c181 --- /dev/null +++ b/example/tcp_test.dart @@ -0,0 +1,47 @@ +// ignore_for_file: avoid_print + +import 'dart:core'; +import 'dart:io'; + +import 'package:coap/coap.dart'; + +Future startServer() async { + final server = await ServerSocket.bind(InternetAddress.anyIPv4, 5683); + server.listen((final connection) async { + await connection.forEach((final frame) { + print(frame); + + const responseCode = (2 << 5) + 5; + + const tokenLength = 8; + const tokenOffset = 2; + final token = frame.sublist(tokenOffset, tokenOffset + tokenLength); + + final response = [tokenLength, responseCode, ...token]; + + connection.add(response); + }); + }); +} + +/// Tests the basic functionality of the TCP network. +/// Will be replaced with a "real" example later. +Future main() async { + await startServer(); + await connect(); +} + +Future connect() async { + final coapClient = CoapClient(Uri.parse('coap+tcp://127.0.0.1')); + + final response = await coapClient.get( + 'test', + options: [ContentFormatOption(40)], + ); + // TODO(JKRhb): Responses can't be matched at the moment, as the current + // implementation requires a message ID which is not defined in + // CoAP over TCP. + print(response); + + coapClient.close(); +} diff --git a/lib/src/coap_message.dart b/lib/src/coap_message.dart index f0dbb16a..589d83ef 100644 --- a/lib/src/coap_message.dart +++ b/lib/src/coap_message.dart @@ -17,6 +17,7 @@ import 'coap_code.dart'; import 'coap_media_type.dart'; import 'coap_message_type.dart'; import 'coap_response.dart'; +import 'codec/tcp/message_encoder.dart'; import 'codec/udp/message_decoder.dart'; import 'codec/udp/message_encoder.dart'; import 'event/coap_event_bus.dart'; @@ -635,18 +636,8 @@ abstract class CoapMessage { /// Is also used for DTLS. Uint8Buffer toUdpPayload() => serializeUdpMessage(this); - /// Serializes this CoAP message from the TCP message format. - /// - /// Is also used for TLS. - static CoapMessage? fromTcpPayload(final Uint8Buffer data) => - throw UnimplementedError( - 'TCP segment deserialization is not implemented yet.', - ); - /// Serializes this CoAP message into the TCP message format. /// /// Is also used for TLS. - Uint8Buffer toTcpPayload() => throw UnimplementedError( - 'TCP segment serialization is not implemented yet.', - ); + Uint8Buffer toTcpPayload() => serializeTcpMessage(this); } diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart new file mode 100644 index 00000000..b7f38be0 --- /dev/null +++ b/lib/src/codec/tcp/message_decoder.dart @@ -0,0 +1,427 @@ +// SPDX-FileCopyrightText: © 2023 Jan Romann + +// SPDX-License-Identifier: MIT + +import 'dart:async'; +import 'dart:io'; +import 'dart:typed_data'; + +import 'package:typed_data/typed_data.dart'; + +import '../../coap_code.dart'; +import '../../coap_empty_message.dart'; +import '../../coap_message.dart'; +import '../../coap_message_type.dart'; +import '../../coap_request.dart'; +import '../../coap_response.dart'; +import '../../option/coap_option_type.dart'; +import '../../option/option.dart'; +import '../../option/uri_converters.dart'; +import '../udp/datagram_reader.dart'; +import '../udp/message_format.dart' as message_format; + +enum TcpState { + initialState, + extendedLength, + extendedTokenLength, + code, + token, + optionsAndPayload, +} + +final toByteStream = + StreamTransformer((final input, final cancelOnError) { + final controller = StreamController(); + + controller.onListen = () { + final subscription = input.listen( + (final bytes) => bytes.forEach(controller.add), + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +class RawCoapTcpMessage { + RawCoapTcpMessage({ + required this.code, + required this.optionsAndPayload, + required this.token, + }); + + final int code; + + final Uint8List optionsAndPayload; + + final Uint8List token; + + @override + String toString() => + 'Code: $code\nToken:$token\nOptions and Payload:$optionsAndPayload'; +} + +final toRawCoapTcpStream = StreamTransformer( + (final input, final cancelOnError) { + // TODO(JKRhb): Connections must be aborted on error + final controller = StreamController(); + + var state = TcpState.initialState; + var length = 0; + var extendedLengthBytes = 0; + final extendedLengthBuffer = Uint8Buffer(); + var tokenLength = 0; + final token = Uint8Buffer(); + var extendedTokenLengthBytes = 0; + final extendedTokenLengthBuffer = Uint8Buffer(); + var code = 0; + final optionsAndPayload = Uint8Buffer(); + + controller.onListen = () { + final subscription = input.listen( + (final byte) async { + switch (state) { + case TcpState.initialState: + token.clear(); + extendedLengthBuffer.clear(); + optionsAndPayload.clear(); + extendedTokenLengthBuffer.clear(); + + // TODO(JKRhb): Handle WebSockets case with length = 0 + length = (byte >> 4) & 15; + tokenLength = byte & 15; + + if (const [13, 14, 15].contains(length)) { + state = TcpState.extendedLength; + extendedLengthBytes = determineExtendedLength(length); + break; + } + + state = TcpState.code; + break; + case TcpState.extendedLength: + extendedLengthBuffer.add(byte); + if (extendedLengthBytes-- <= 0) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + state = TcpState.code; + break; + } + + break; + case TcpState.code: + code = byte; + if (const [13, 14].contains(tokenLength)) { + state = TcpState.extendedTokenLength; + extendedTokenLengthBytes = determineExtendedLength(length); + break; + } else if (tokenLength == 15) { + throw const FormatException(); + } + state = TcpState.token; + break; + case TcpState.extendedTokenLength: + extendedTokenLengthBuffer.add(byte); + extendedTokenLengthBytes--; + + if (extendedTokenLengthBytes < 1) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + + state = TcpState.code; + break; + } + + break; + case TcpState.token: + token.add(byte); + tokenLength--; + + if (tokenLength >= 1) { + break; + } + + // TODO(JKRhb): Refactor + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } else { + state = TcpState.optionsAndPayload; + } + + break; + case TcpState.optionsAndPayload: + optionsAndPayload.add(byte); + length--; + + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } + + break; + } + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +int determineExtendedLength(final int length) { + switch (length) { + case 13: + return 1; + case 14: + return 2; + case 15: + return 4; + } + + throw const FormatException('message'); +} + +/// Transforms a [Stream] of [RawCoapTcpMessage]s into [CoapMessage]s. +/// +/// Returns the deserialized message, or `null` if the message can not be +/// decoded, i.e. the bytes do not represent a [CoapRequest], a [CoapResponse] +/// or a [CoapEmptyMessage]. +final deserializeTcpMessage = StreamTransformer( + (final input, final cancelOnError) { + final controller = StreamController(); + + controller.onListen = () { + final subscription = input.listen( + (final coapTcpMessage) { + final code = CoapCode.decode(coapTcpMessage.code); + + if (code == null) { + throw const FormatException('Encountered unknown CoapCode'); + } + + final token = coapTcpMessage.token; + + final reader = DatagramReader( + Uint8Buffer()..addAll(coapTcpMessage.optionsAndPayload), + ); + + try { + final options = readOptions(reader); + final payload = reader.readBytesLeft(); + final tokenBuffer = Uint8Buffer()..addAll(token); + final CoapMessage coapMessage; + + // TODO(JKRhb): Probably not really needed for TCP, since connections + // are simply closed on error + const hasUnknownCriticalOption = false; + const hasFormatError = false; + + if (code.isRequest) { + final method = RequestMethod.fromCoapCode(code); + if (method == null) { + return; + } + + final uri = optionsToUri( + options.where((final option) => option.isUriOption).toList(), + scheme: 'coap+tcp', // TODO(JKRhb): Replace + destinationAddress: + InternetAddress('127.0.0.1'), // TODO(JKRhb): Replace + ); + + coapMessage = CoapRequest.fromParsed( + uri, + method, + // id and type are not defined for CoAP over TCP + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isResponse) { + final responseCode = ResponseCode.fromCoapCode(code); + if (responseCode == null) { + return; + } + + final location = optionsToUri( + options.where((final option) => option.isLocationOption).toList(), + ); + + coapMessage = CoapResponse.fromParsed( + responseCode, + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + location: location, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isEmpty) { + coapMessage = CoapEmptyMessage.fromParsed( + id: 0, + type: CoapMessageType.ack, + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else { + return; + } + + controller.add(coapMessage); + } on UnknownCriticalOptionException { + // Should something be done here? + return; + } on FormatException { + // Should something be done here? + return; + } + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); +}); + +List> readOptions(final DatagramReader reader) { + final options = >[]; + var currentOption = 0; + while (reader.bytesAvailable) { + final nextByte = reader.readNextByte(); + if (nextByte == message_format.payloadMarker) { + if (!reader.bytesAvailable) { + throw const FormatException('Illegal format'); + // The presence of a marker followed by a zero-length payload + // must be processed as a message format error + } + } else { + // The first 4 bits of the byte represent the option delta + final optionDeltaNibble = (0xF0 & nextByte) >> 4; + final deltaValue = _getValueFromOptionNibble( + optionDeltaNibble, + reader, + ); + + if (deltaValue == null) { + throw const FormatException('Illegal format'); + } + + currentOption += deltaValue; + + // The second 4 bits represent the option length + final optionLengthNibble = 0x0F & nextByte; + final optionLength = _getValueFromOptionNibble( + optionLengthNibble, + reader, + ); + + if (optionLength == null) { + throw const FormatException('Illegal format'); + } + + // Read option + try { + final optionType = OptionType.fromTypeNumber(currentOption); + var optionBytes = reader.readBytes(optionLength); + if (Endian.host == Endian.little && + optionType.optionFormat is OptionFormat) { + optionBytes = Uint8Buffer()..addAll(optionBytes.reversed); + } + final option = optionType.parse(optionBytes); + options.add(option); + } on UnknownElectiveOptionException catch (_) { + // Unknown elective options must be silently ignored + continue; + } + } + } + + return options; +} + +/// Calculates the value used in the extended option fields as specified +/// in RFC 7252, section 3.1. +int? _getValueFromOptionNibble( + final int nibble, + final DatagramReader datagram, +) => + _readExtendedLength(nibble, datagram); + +int? _readExtendedLength( + final int value, + final DatagramReader datagram, +) { + if (value < 13) { + return value; + } else if (value == 13) { + return datagram.read(8) + 13; + } else if (value == 14) { + return datagram.read(16) + 269; + } + + return null; +} + +int _readExtendedMessageLength( + final int value, + final DatagramReader datagramReader, +) { + switch (value) { + case 13: + return datagramReader.read(8) + 13; + case 14: + return datagramReader.read(16) + 269; + case 15: + return datagramReader.read(32) + 65805; + } + + throw StateError('Illegal value read'); +} diff --git a/lib/src/codec/tcp/message_encoder.dart b/lib/src/codec/tcp/message_encoder.dart new file mode 100644 index 00000000..804272f6 --- /dev/null +++ b/lib/src/codec/tcp/message_encoder.dart @@ -0,0 +1,183 @@ +// SPDX-FileCopyrightText: © 2023 Jan Romann + +// SPDX-License-Identifier: MIT + +import 'package:typed_data/typed_data.dart'; + +import '../../coap_code.dart'; +import '../../coap_message.dart'; +import '../../option/coap_option_type.dart'; +import '../udp/datagram_writer.dart'; +import '../udp/message_format.dart' as message_format; + +/// Encodes a CoAP TCP or WebSockets message into a bytes array. +/// Returns the encoded bytes, or null if the message can not be encoded, +/// i.e. the message is not a Request, a Response or an EmptyMessage. +Uint8Buffer serializeTcpMessage(final CoapMessage message) { + final writer = DatagramWriter(); + + final token = message.token; + final tokenLength = _getTokenLength(token); + final options = _serializeOptions(message); + + final payload = message.payload; + const payloadMarkerLength = 1; + final payloadLength = + payload.isNotEmpty ? payload.length + payloadMarkerLength : 0; + + final messageLength = options.lengthInBytes + payloadLength; + + // TODO(JKRhb): Refactor + final lengthField = _getOptionNibble(messageLength); + + writer + ..write(lengthField, 4) + ..write(tokenLength, message_format.tokenLengthBits); + + if (lengthField == 13) { + writer.write(messageLength - 13, 8); + } else if (lengthField == 14) { + writer.write(messageLength - 269, 16); + } + + writer.write(message.code.code, CoapCode.bitLength); + + if (token != null) { + _writeExtendedTokenLength(writer, tokenLength, token); + } + + // Write token, which may be 0 to 8 bytes or have an extended token length, + // given by token length and the extended token length field. + writer + ..writeBytes(token) + ..writeBytes(options); + + if (payload.isNotEmpty) { + // If payload is present and of non-zero length, it is prefixed by + // an one-byte Payload Marker (0xFF) which indicates the end of + // options and the start of the payload + writer.writeByte(message_format.payloadMarker); + } + // Write payload + writer.writeBytes(payload); + + return writer.toByteArray(); +} + +Uint8Buffer _serializeOptions(final CoapMessage message) { + final writer = DatagramWriter(); + + var lastOptionNumber = 0; + final options = message.getAllOptions()..sort(); + + for (final opt in options) { + if (opt.type == OptionType.uriHost || opt.type == OptionType.uriPort) { + continue; + } + + // Write 4-bit option delta + final optNum = opt.type.optionNumber; + final optionDelta = optNum - lastOptionNumber; + final optionDeltaNibble = _getOptionNibble(optionDelta); + writer.write(optionDeltaNibble, message_format.optionDeltaBits); + + // Write 4-bit option length + final optionLength = opt.length; + final optionLengthNibble = _getOptionNibble(optionLength); + writer.write(optionLengthNibble, message_format.optionLengthBits); + + // Write extended option delta field (0 - 2 bytes) + if (optionDeltaNibble == 13) { + writer.write(optionDelta - 13, 8); + } else if (optionDeltaNibble == 14) { + writer.write(optionDelta - 269, 16); + } + + // Write extended option length field (0 - 2 bytes) + if (optionLengthNibble == 13) { + writer.write(optionLength - 13, 8); + } else if (optionLengthNibble == 14) { + writer.write(optionLength - 269, 16); + } + + // Write option value, reverse byte order for numeric options + if (opt.type.optionFormat == OptionFormat.integer) { + final reversedBuffer = Uint8Buffer()..addAll(opt.byteValue.reversed); + writer.writeBytes(reversedBuffer); + } else { + writer.writeBytes(opt.byteValue); + } + + lastOptionNumber = optNum; + } + + return writer.toByteArray(); +} + +/// Determine the token length. +/// +/// The token length can either be of zero to eight bytes or be extended, +/// following [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +int _getTokenLength(final Uint8Buffer? token) { + final tokenLength = token?.length ?? 0; + if (tokenLength <= 12) { + return tokenLength; + } else if (tokenLength <= 255 + 13) { + return 13; + } else if (tokenLength <= 65535 + 269) { + return 14; + } else { + throw FormatException('Unsupported token length delta $tokenLength'); + } +} + +/// Write a potentially extended token length as specified in [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +void _writeExtendedTokenLength( + final DatagramWriter writer, + final int tokenLength, + final Uint8Buffer token, +) { + final extendedTokenLength = _getExtendedTokenLength(tokenLength, token); + + switch (tokenLength) { + case 13: + writer.write(extendedTokenLength, 8); + break; + case 14: + writer.write(extendedTokenLength, 16); + } +} + +/// Determine a potentially extended token length as specified in [RFC 8974]. +/// +/// [RFC 8974]: https://datatracker.ietf.org/doc/html/rfc8974 +int _getExtendedTokenLength( + final int tokenLength, + final Uint8Buffer token, +) { + switch (tokenLength) { + case 13: + return token.length - 13; + case 14: + return token.length - 269; + } + + return 0; +} + +/// Returns the 4-bit option header value. +int _getOptionNibble(final int optionValue) { + if (optionValue <= 12) { + return optionValue; + } else if (optionValue <= 255 + 13) { + return 13; + } else if (optionValue <= 65535 + 269) { + return 14; + } else { + throw FormatException('Unsupported option delta $optionValue'); + } +} diff --git a/lib/src/network/coap_inetwork.dart b/lib/src/network/coap_inetwork.dart index e1e203dd..07baa65b 100644 --- a/lib/src/network/coap_inetwork.dart +++ b/lib/src/network/coap_inetwork.dart @@ -11,6 +11,7 @@ import '../coap_config.dart'; import '../coap_constants.dart'; import '../coap_message.dart'; import 'coap_network_openssl.dart'; +import 'coap_network_tcp.dart'; import 'coap_network_udp.dart'; import 'credentials/psk_credentials.dart'; @@ -93,6 +94,19 @@ abstract class CoapINetwork { libSsl: config.libSslInstance, hostName: uri.host, ); + case 'coap+tcp': + return CoapNetworkTCP( + address, + port ?? config.defaultPort, + bindAddress ?? defaultBindAddress, + ); + case 'coaps+tcp': + return CoapNetworkTCP( + address, + port ?? config.defaultSecurePort, + bindAddress ?? defaultBindAddress, + isTls: true, + ); default: throw UnsupportedProtocolException(uri.scheme); } diff --git a/lib/src/network/coap_network_tcp.dart b/lib/src/network/coap_network_tcp.dart index 117ab723..143d5c0c 100644 --- a/lib/src/network/coap_network_tcp.dart +++ b/lib/src/network/coap_network_tcp.dart @@ -8,9 +8,8 @@ import 'dart:async'; import 'dart:io'; -import 'package:typed_data/typed_data.dart'; - import '../coap_message.dart'; +import '../codec/tcp/message_decoder.dart'; import '../event/coap_event_bus.dart'; import 'coap_inetwork.dart'; @@ -98,26 +97,29 @@ class CoapNetworkTCP implements CoapINetwork { } void _receive() { - socket?.listen( - (final data) { - final message = CoapMessage.fromTcpPayload(Uint8Buffer()..addAll(data)); - eventBus.fire(CoapMessageReceivedEvent(message, address)); - }, - // ignore: avoid_types_on_closure_parameters - onError: (final Object e, final StackTrace s) => - eventBus.fire(CoapSocketErrorEvent(e, s)), - // Socket stream is done and can no longer be listened to - onDone: () { - isClosed = true; - Timer.periodic(CoapINetwork.reinitPeriod, (final timer) async { - try { - await init(); - timer.cancel(); - } on Exception catch (_) { - // Ignore errors, retry until successful - } - }); - }, - ); + socket + ?.transform(toByteStream) + .transform(toRawCoapTcpStream) + .transform(deserializeTcpMessage) + .listen( + (final message) { + eventBus.fire(CoapMessageReceivedEvent(message, address)); + }, + // ignore: avoid_types_on_closure_parameters + onError: (final Object e, final StackTrace s) => + eventBus.fire(CoapSocketErrorEvent(e, s)), + // Socket stream is done and can no longer be listened to + onDone: () { + isClosed = true; + Timer.periodic(CoapINetwork.reinitPeriod, (final timer) async { + try { + await init(); + timer.cancel(); + } on Exception catch (_) { + // Ignore errors, retry until successful + } + }); + }, + ); } } diff --git a/lib/src/option/option.dart b/lib/src/option/option.dart index 87ef980a..9b77059a 100644 --- a/lib/src/option/option.dart +++ b/lib/src/option/option.dart @@ -8,7 +8,7 @@ import 'string_option.dart'; /// This class describes the options of the CoAP messages. @immutable -abstract class Option { +abstract class Option implements Comparable> { Option() { _validate(); } @@ -96,6 +96,10 @@ abstract class Option { bool get isLocationOption => this is LocationPathOption || this is LocationQueryOption; + + @override + int compareTo(final Option other) => + this.optionNumber - other.optionNumber; } /// Mixin for an Oscore class E option (encrypted and integrity protected). From 79bf9afc9730682ed741078d3cdba05cb2a16d12 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 15:32:35 +0100 Subject: [PATCH 02/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 24 +++++++++++++++++------- lib/src/codec/tcp/message_decoder.dart | 1 + lib/src/network/coap_inetwork.dart | 2 ++ 3 files changed, 20 insertions(+), 7 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index b801c181..572a06a5 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -1,14 +1,15 @@ // ignore_for_file: avoid_print +import 'dart:convert'; import 'dart:core'; import 'dart:io'; import 'package:coap/coap.dart'; -Future startServer() async { +Future startServer() async { final server = await ServerSocket.bind(InternetAddress.anyIPv4, 5683); server.listen((final connection) async { - await connection.forEach((final frame) { + await connection.forEach((final frame) async { print(frame); const responseCode = (2 << 5) + 5; @@ -16,19 +17,31 @@ Future startServer() async { const tokenLength = 8; const tokenOffset = 2; final token = frame.sublist(tokenOffset, tokenOffset + tokenLength); + final payload = utf8.encode('Hello'); - final response = [tokenLength, responseCode, ...token]; + final response = [ + ((payload.length + 1) << 4) | tokenLength, + responseCode, + ...token, + 255, + ...payload, + ]; connection.add(response); + await connection.close(); }); }); + + return server; } /// Tests the basic functionality of the TCP network. /// Will be replaced with a "real" example later. Future main() async { - await startServer(); + final server = await startServer(); await connect(); + + await server.close(); } Future connect() async { @@ -38,9 +51,6 @@ Future connect() async { 'test', options: [ContentFormatOption(40)], ); - // TODO(JKRhb): Responses can't be matched at the moment, as the current - // implementation requires a message ID which is not defined in - // CoAP over TCP. print(response); coapClient.close(); diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart index b7f38be0..ea382218 100644 --- a/lib/src/codec/tcp/message_decoder.dart +++ b/lib/src/codec/tcp/message_decoder.dart @@ -342,6 +342,7 @@ List> readOptions(final DatagramReader reader) { // The presence of a marker followed by a zero-length payload // must be processed as a message format error } + break; } else { // The first 4 bits of the byte represent the option delta final optionDeltaNibble = (0xF0 & nextByte) >> 4; diff --git a/lib/src/network/coap_inetwork.dart b/lib/src/network/coap_inetwork.dart index 07baa65b..5c62301b 100644 --- a/lib/src/network/coap_inetwork.dart +++ b/lib/src/network/coap_inetwork.dart @@ -99,12 +99,14 @@ abstract class CoapINetwork { address, port ?? config.defaultPort, bindAddress ?? defaultBindAddress, + namespace: namespace, ); case 'coaps+tcp': return CoapNetworkTCP( address, port ?? config.defaultSecurePort, bindAddress ?? defaultBindAddress, + namespace: namespace, isTls: true, ); default: From a35ec07d6633330ffdaae73ce98a8041af26ba7a Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 16:11:37 +0100 Subject: [PATCH 03/10] fixup! WIP: Implement CoAP over TCP logic --- lib/src/coap_empty_message.dart | 4 ++-- lib/src/coap_message.dart | 16 +++------------- lib/src/coap_request.dart | 12 +++++++----- lib/src/coap_response.dart | 6 +++--- lib/src/codec/tcp/message_decoder.dart | 8 -------- lib/src/codec/udp/message_encoder.dart | 7 ++++++- lib/src/net/matcher.dart | 21 +++++++++++++++------ lib/src/stack/layers/reliability.dart | 4 +++- 8 files changed, 39 insertions(+), 39 deletions(-) diff --git a/lib/src/coap_empty_message.dart b/lib/src/coap_empty_message.dart index ee5f1b25..d6feee59 100644 --- a/lib/src/coap_empty_message.dart +++ b/lib/src/coap_empty_message.dart @@ -44,13 +44,13 @@ class CoapEmptyMessage extends CoapMessage { ..destination = message.source; CoapEmptyMessage.fromParsed({ - required final CoapMessageType type, - required final int id, required final Uint8Buffer token, required final List> options, required final Uint8Buffer? payload, required final bool hasUnknownCriticalOption, required final bool hasFormatError, + final int? id, + final CoapMessageType? type, }) : super.fromParsed( RequestMethod.empty.coapCode, type, diff --git a/lib/src/coap_message.dart b/lib/src/coap_message.dart index 589d83ef..8d433cc3 100644 --- a/lib/src/coap_message.dart +++ b/lib/src/coap_message.dart @@ -48,8 +48,7 @@ abstract class CoapMessage { CoapMessage.fromParsed( this.code, - this._type, { - required final int id, + this.type, { required final Uint8Buffer token, required final List> options, required final Uint8Buffer? payload, @@ -65,13 +64,8 @@ abstract class CoapMessage { bool hasFormatError = false; - CoapMessageType _type; - - @internal - set type(final CoapMessageType type) => _type = type; - /// The type of this CoAP message. - CoapMessageType get type => _type; + CoapMessageType? type; /// The code of this CoAP message. final CoapCode code; @@ -79,12 +73,8 @@ abstract class CoapMessage { /// The codestring String get codeString => code.toString(); - int? _id; - /// The ID of this CoAP message. - int? get id => _id; - @internal - set id(final int? val) => _id = val; + int? id; final List> _options = []; diff --git a/lib/src/coap_request.dart b/lib/src/coap_request.dart index 6638fb75..e1452b7f 100644 --- a/lib/src/coap_request.dart +++ b/lib/src/coap_request.dart @@ -42,7 +42,7 @@ class CoapRequest extends CoapMessage { final RequestMethod method; @override - CoapMessageType get type { + CoapMessageType? get type { if (super.type == CoapMessageType.con && isMulticast) { return CoapMessageType.non; } @@ -73,8 +73,10 @@ class CoapRequest extends CoapMessage { Endpoint? get endpoint => _endpoint; @internal set endpoint(final Endpoint? endpoint) { - super.id = endpoint!.nextMessageId; - super.destination = endpoint.destination; + if (['coap', 'coaps'].contains(uri.scheme)) { + super.id = endpoint!.nextMessageId; + } + super.destination = endpoint!.destination; _endpoint = endpoint; } @@ -199,13 +201,13 @@ class CoapRequest extends CoapMessage { CoapRequest.fromParsed( this.uri, this.method, { - required final CoapMessageType type, - required final int id, required final Uint8Buffer token, required final List> options, required final Uint8Buffer? payload, required final bool hasUnknownCriticalOption, required final bool hasFormatError, + final CoapMessageType? type, + final int? id, }) : super.fromParsed( method.coapCode, type, diff --git a/lib/src/coap_response.dart b/lib/src/coap_response.dart index e56c008a..07fe6edf 100644 --- a/lib/src/coap_response.dart +++ b/lib/src/coap_response.dart @@ -22,7 +22,7 @@ class CoapResponse extends CoapMessage { /// Initializes a response message. CoapResponse( this.responseCode, - final CoapMessageType type, { + final CoapMessageType? type, { final Uri? location, super.payload, }) : location = location ?? Uri(path: '/'), @@ -100,14 +100,14 @@ class CoapResponse extends CoapMessage { CoapResponse.fromParsed( this.responseCode, { - required final CoapMessageType type, - required final int id, required final Uint8Buffer token, required final List> options, required final Uint8Buffer? payload, required final bool hasUnknownCriticalOption, required final bool hasFormatError, final Uri? location, + final int? id, + final CoapMessageType? type, }) : location = location ?? Uri(path: '/'), super.fromParsed( responseCode.coapCode, diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart index ea382218..78b81f20 100644 --- a/lib/src/codec/tcp/message_decoder.dart +++ b/lib/src/codec/tcp/message_decoder.dart @@ -11,7 +11,6 @@ import 'package:typed_data/typed_data.dart'; import '../../coap_code.dart'; import '../../coap_empty_message.dart'; import '../../coap_message.dart'; -import '../../coap_message_type.dart'; import '../../coap_request.dart'; import '../../coap_response.dart'; import '../../option/coap_option_type.dart'; @@ -265,9 +264,6 @@ final deserializeTcpMessage = StreamTransformer( coapMessage = CoapRequest.fromParsed( uri, method, - // id and type are not defined for CoAP over TCP - id: 0, - type: CoapMessageType.ack, token: tokenBuffer, options: options, payload: payload, @@ -286,8 +282,6 @@ final deserializeTcpMessage = StreamTransformer( coapMessage = CoapResponse.fromParsed( responseCode, - id: 0, - type: CoapMessageType.ack, token: tokenBuffer, options: options, payload: payload, @@ -297,8 +291,6 @@ final deserializeTcpMessage = StreamTransformer( ); } else if (code.isEmpty) { coapMessage = CoapEmptyMessage.fromParsed( - id: 0, - type: CoapMessageType.ack, token: tokenBuffer, options: options, payload: payload, diff --git a/lib/src/codec/udp/message_encoder.dart b/lib/src/codec/udp/message_encoder.dart index 97ce5cf2..eabc5e06 100644 --- a/lib/src/codec/udp/message_encoder.dart +++ b/lib/src/codec/udp/message_encoder.dart @@ -29,11 +29,16 @@ Uint8Buffer serializeUdpMessage(final CoapMessage message) { const version = message_format.Version.version1; const versionLength = message_format.Version.bitLength; + final type = message.type; + + if (type == null) { + throw FormatException('No type defind for CoAP Message:\n$message'); + } // Write fixed-size CoAP headers writer ..write(version.numericValue, versionLength) - ..write(message.type.code, CoapMessageType.bitLength); + ..write(type.code, CoapMessageType.bitLength); final token = message.token; final tokenLength = _getTokenLength(token); diff --git a/lib/src/net/matcher.dart b/lib/src/net/matcher.dart index fd360a23..bbecd271 100644 --- a/lib/src/net/matcher.dart +++ b/lib/src/net/matcher.dart @@ -35,6 +35,7 @@ class CoapMatcher { late StreamSubscription subscr; /// For all + // TODO(JKRhb): Check if this really works with TCP exchanges final Map _exchangesById = {}; /// For outgoing @@ -68,7 +69,9 @@ class CoapMatcher { // If this request goes lost, we do not get anything back. // The MID is from the local namespace -- use blank address - _exchangesById[request.id] = exchange; + if (_isUdp(exchange.request)) { + _exchangesById[request.id] = exchange; + } _exchangesByToken[request.tokenString] = exchange; } @@ -81,10 +84,14 @@ class CoapMatcher { // CON/NON request with same MID again. We then find the corresponding // exchange and the ReliabilityLayer resends this response. + final isUdp = _isUdp(exchange.request); + + final isConNotification = response.type == CoapMessageType.con || + response.type == CoapMessageType.ack; + // If this is a CON notification we now can forget all previous // NON notifications. - if (response.type == CoapMessageType.con || - response.type == CoapMessageType.ack) { + if (isUdp && isConNotification) { final relation = exchange.relation; if (relation != null) { _removeNotificatoinsOf(relation); @@ -107,8 +114,7 @@ class CoapMatcher { // Insert CON and NON to match ACKs and RSTs to the exchange // Do not insert ACKs and RSTs. - if (response.type == CoapMessageType.con || - response.type == CoapMessageType.non) { + if (isUdp && isConNotification) { _exchangesById[response.id] = exchange; } @@ -222,7 +228,7 @@ class CoapMatcher { final prev = _deduplicator.findPrevious(response.id, exchange); if (prev != null) { response.duplicate = true; - } else { + } else if (_isUdp(exchange.request)) { _exchangesById.remove(exchange.currentRequest.id); } @@ -292,3 +298,6 @@ class CoapMatcher { } } } + +bool _isUdp(final CoapRequest request) => + ['coap', 'coaps'].contains(request.uri.scheme); diff --git a/lib/src/stack/layers/reliability.dart b/lib/src/stack/layers/reliability.dart index c7f91d65..ee749661 100644 --- a/lib/src/stack/layers/reliability.dart +++ b/lib/src/stack/layers/reliability.dart @@ -35,7 +35,9 @@ class ReliabilityLayer extends BaseLayer { final CoapExchange exchange, final CoapRequest request, ) { - if (request.type == CoapMessageType.con) { + // TODO(JKRhb): Refactor + if ((['coap', 'coaps'].contains(request.uri.scheme)) && + request.type == CoapMessageType.con) { _prepareRetransmission( exchange, request, From d05818a8a61a14832427483790ae3d410dbe3918 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 18:02:33 +0100 Subject: [PATCH 04/10] fixup! WIP: Implement CoAP over TCP logic --- lib/src/codec/tcp/message_decoder.dart | 197 +++++++++++++------------ lib/src/codec/tcp/message_encoder.dart | 39 ++++- lib/src/network/coap_network_tcp.dart | 4 +- 3 files changed, 139 insertions(+), 101 deletions(-) diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart index 78b81f20..6c727522 100644 --- a/lib/src/codec/tcp/message_decoder.dart +++ b/lib/src/codec/tcp/message_decoder.dart @@ -218,110 +218,115 @@ int determineExtendedLength(final int length) { /// Returns the deserialized message, or `null` if the message can not be /// decoded, i.e. the bytes do not represent a [CoapRequest], a [CoapResponse] /// or a [CoapEmptyMessage]. -final deserializeTcpMessage = StreamTransformer( - (final input, final cancelOnError) { - final controller = StreamController(); - - controller.onListen = () { - final subscription = input.listen( - (final coapTcpMessage) { - final code = CoapCode.decode(coapTcpMessage.code); - - if (code == null) { - throw const FormatException('Encountered unknown CoapCode'); - } - - final token = coapTcpMessage.token; - - final reader = DatagramReader( - Uint8Buffer()..addAll(coapTcpMessage.optionsAndPayload), - ); - - try { - final options = readOptions(reader); - final payload = reader.readBytesLeft(); - final tokenBuffer = Uint8Buffer()..addAll(token); - final CoapMessage coapMessage; +StreamTransformer deserializeTcpMessage( + final String uriScheme, + final InternetAddress destinationAddress, +) => + StreamTransformer( + (final input, final cancelOnError) { + final controller = StreamController(); - // TODO(JKRhb): Probably not really needed for TCP, since connections - // are simply closed on error - const hasUnknownCriticalOption = false; - const hasFormatError = false; + controller.onListen = () { + final subscription = input.listen( + (final coapTcpMessage) { + final code = CoapCode.decode(coapTcpMessage.code); - if (code.isRequest) { - final method = RequestMethod.fromCoapCode(code); - if (method == null) { - return; + if (code == null) { + throw const FormatException('Encountered unknown CoapCode'); } - final uri = optionsToUri( - options.where((final option) => option.isUriOption).toList(), - scheme: 'coap+tcp', // TODO(JKRhb): Replace - destinationAddress: - InternetAddress('127.0.0.1'), // TODO(JKRhb): Replace - ); + final token = coapTcpMessage.token; - coapMessage = CoapRequest.fromParsed( - uri, - method, - token: tokenBuffer, - options: options, - payload: payload, - hasUnknownCriticalOption: hasUnknownCriticalOption, - hasFormatError: hasFormatError, + final reader = DatagramReader( + Uint8Buffer()..addAll(coapTcpMessage.optionsAndPayload), ); - } else if (code.isResponse) { - final responseCode = ResponseCode.fromCoapCode(code); - if (responseCode == null) { + + try { + final options = readOptions(reader); + final payload = reader.readBytesLeft(); + final tokenBuffer = Uint8Buffer()..addAll(token); + final CoapMessage coapMessage; + + // TODO(JKRhb): Probably not really needed for TCP, since + // connections are simply closed on error + const hasUnknownCriticalOption = false; + const hasFormatError = false; + + if (code.isRequest) { + final method = RequestMethod.fromCoapCode(code); + if (method == null) { + return; + } + + final uri = optionsToUri( + options.where((final option) => option.isUriOption).toList(), + scheme: uriScheme, + destinationAddress: destinationAddress, + ); + + coapMessage = CoapRequest.fromParsed( + uri, + method, + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isResponse) { + final responseCode = ResponseCode.fromCoapCode(code); + if (responseCode == null) { + return; + } + + final location = optionsToUri( + options + .where((final option) => option.isLocationOption) + .toList(), + ); + + coapMessage = CoapResponse.fromParsed( + responseCode, + token: tokenBuffer, + options: options, + payload: payload, + location: location, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else if (code.isEmpty) { + coapMessage = CoapEmptyMessage.fromParsed( + token: tokenBuffer, + options: options, + payload: payload, + hasUnknownCriticalOption: hasUnknownCriticalOption, + hasFormatError: hasFormatError, + ); + } else { + return; + } + + controller.add(coapMessage); + } on UnknownCriticalOptionException { + // Should something be done here? + return; + } on FormatException { + // Should something be done here? return; } - - final location = optionsToUri( - options.where((final option) => option.isLocationOption).toList(), - ); - - coapMessage = CoapResponse.fromParsed( - responseCode, - token: tokenBuffer, - options: options, - payload: payload, - location: location, - hasUnknownCriticalOption: hasUnknownCriticalOption, - hasFormatError: hasFormatError, - ); - } else if (code.isEmpty) { - coapMessage = CoapEmptyMessage.fromParsed( - token: tokenBuffer, - options: options, - payload: payload, - hasUnknownCriticalOption: hasUnknownCriticalOption, - hasFormatError: hasFormatError, - ); - } else { - return; - } - - controller.add(coapMessage); - } on UnknownCriticalOptionException { - // Should something be done here? - return; - } on FormatException { - // Should something be done here? - return; - } - }, - onDone: controller.close, - onError: controller.addError, - cancelOnError: cancelOnError, - ); - controller - ..onPause = subscription.pause - ..onResume = subscription.resume - ..onCancel = subscription.cancel; - }; - - return controller.stream.listen(null); -}); + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); + }); List> readOptions(final DatagramReader reader) { final options = >[]; diff --git a/lib/src/codec/tcp/message_encoder.dart b/lib/src/codec/tcp/message_encoder.dart index 804272f6..0e79c41b 100644 --- a/lib/src/codec/tcp/message_encoder.dart +++ b/lib/src/codec/tcp/message_encoder.dart @@ -2,18 +2,27 @@ // SPDX-License-Identifier: MIT +import 'dart:io'; + import 'package:typed_data/typed_data.dart'; import '../../coap_code.dart'; import '../../coap_message.dart'; +import '../../coap_request.dart'; import '../../option/coap_option_type.dart'; +import '../../option/integer_option.dart'; +import '../../option/option.dart'; +import '../../option/string_option.dart'; import '../udp/datagram_writer.dart'; import '../udp/message_format.dart' as message_format; /// Encodes a CoAP TCP or WebSockets message into a bytes array. /// Returns the encoded bytes, or null if the message can not be encoded, /// i.e. the message is not a Request, a Response or an EmptyMessage. -Uint8Buffer serializeTcpMessage(final CoapMessage message) { +Uint8Buffer serializeTcpMessage( + final CoapMessage message, { + final bool isWebSockets = false, +}) { final writer = DatagramWriter(); final token = message.token; @@ -27,8 +36,14 @@ Uint8Buffer serializeTcpMessage(final CoapMessage message) { final messageLength = options.lengthInBytes + payloadLength; - // TODO(JKRhb): Refactor - final lengthField = _getOptionNibble(messageLength); + final int lengthField; + + if (isWebSockets) { + lengthField = 0; + } else { + // TODO(JKRhb): Refactor + lengthField = _getOptionNibble(messageLength); + } writer ..write(lengthField, 4) @@ -71,7 +86,7 @@ Uint8Buffer _serializeOptions(final CoapMessage message) { final options = message.getAllOptions()..sort(); for (final opt in options) { - if (opt.type == OptionType.uriHost || opt.type == OptionType.uriPort) { + if (_shouldBeSkipped(opt, message)) { continue; } @@ -181,3 +196,19 @@ int _getOptionNibble(final int optionValue) { throw FormatException('Unsupported option delta $optionValue'); } } + +// TODO(JKRhb): Refactor +bool _shouldBeSkipped(final Option opt, final CoapMessage message) { + if (opt is UriHostOption) { + final hostAddress = InternetAddress.tryParse(opt.value); + + return hostAddress != null && hostAddress == message.destination; + } + + // TODO(JKRhb): Revisit port option + if (opt is UriPortOption && message is CoapRequest) { + return true; + } + + return false; +} diff --git a/lib/src/network/coap_network_tcp.dart b/lib/src/network/coap_network_tcp.dart index 143d5c0c..1b524192 100644 --- a/lib/src/network/coap_network_tcp.dart +++ b/lib/src/network/coap_network_tcp.dart @@ -41,6 +41,8 @@ class CoapNetworkTCP implements CoapINetwork { final bool isTls; + String get _scheme => isTls ? 'coap+tcp' : 'coaps+tcp'; + final SecurityContext? _tlsContext; Socket? _socket; @@ -100,7 +102,7 @@ class CoapNetworkTCP implements CoapINetwork { socket ?.transform(toByteStream) .transform(toRawCoapTcpStream) - .transform(deserializeTcpMessage) + .transform(deserializeTcpMessage(_scheme, address)) .listen( (final message) { eventBus.fire(CoapMessageReceivedEvent(message, address)); From bc5602ea75b908c942aa3639d7c8462270d957ae Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sat, 18 Feb 2023 18:16:54 +0100 Subject: [PATCH 05/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index 572a06a5..0dacfa0a 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -6,7 +6,7 @@ import 'dart:io'; import 'package:coap/coap.dart'; -Future startServer() async { +Future startServer() async { final server = await ServerSocket.bind(InternetAddress.anyIPv4, 5683); server.listen((final connection) async { await connection.forEach((final frame) async { @@ -15,7 +15,7 @@ Future startServer() async { const responseCode = (2 << 5) + 5; const tokenLength = 8; - const tokenOffset = 2; + const tokenOffset = 3; final token = frame.sublist(tokenOffset, tokenOffset + tokenLength); final payload = utf8.encode('Hello'); @@ -29,27 +29,28 @@ Future startServer() async { connection.add(response); await connection.close(); + await server.close(); }); }); - - return server; } /// Tests the basic functionality of the TCP network. /// Will be replaced with a "real" example later. Future main() async { - final server = await startServer(); + await startServer(); await connect(); - - await server.close(); } Future connect() async { final coapClient = CoapClient(Uri.parse('coap+tcp://127.0.0.1')); - final response = await coapClient.get( + final response = await coapClient.post( 'test', - options: [ContentFormatOption(40)], + options: [ + ContentFormatOption(50), + AcceptOption(60), + ], + payload: 'Hello?', ); print(response); From 243e630520f47a50c79b4a0aca410c58272f52f0 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 2 Jul 2023 00:40:01 +0200 Subject: [PATCH 06/10] fixup! WIP: Implement CoAP over TCP logic --- lib/src/coap_message.dart | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/src/coap_message.dart b/lib/src/coap_message.dart index 8d433cc3..375b5229 100644 --- a/lib/src/coap_message.dart +++ b/lib/src/coap_message.dart @@ -39,7 +39,7 @@ typedef HookFunction = void Function(); abstract class CoapMessage { CoapMessage( this.code, - this._type, { + this.type, { final Iterable? payload, final CoapMediaType? contentFormat, }) : payload = Uint8Buffer()..addAll(payload ?? []) { @@ -54,8 +54,8 @@ abstract class CoapMessage { required final Uint8Buffer? payload, required this.hasUnknownCriticalOption, required this.hasFormatError, + this.id, }) : payload = payload ?? Uint8Buffer() { - this.id = id; this.token = token; setOptions(options); } From d8603a5a3c67ec7e2fa7c3eeff67a8fa30fe6f3a Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 2 Jul 2023 00:53:11 +0200 Subject: [PATCH 07/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index 0dacfa0a..0eca68d3 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -42,14 +42,13 @@ Future main() async { } Future connect() async { - final coapClient = CoapClient(Uri.parse('coap+tcp://127.0.0.1')); + final coapClient = + CoapClient(Uri.parse('coap+tcp://californium.eclipseprojects.io:5683')); final response = await coapClient.post( - 'test', - options: [ - ContentFormatOption(50), - AcceptOption(60), - ], + Uri(path: 'test'), + format: CoapMediaType.applicationJson, + accept: CoapMediaType.applicationCbor, payload: 'Hello?', ); print(response); From b6c2170279defbc700f985fdc2ae74adc7468650 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 2 Jul 2023 01:48:00 +0200 Subject: [PATCH 08/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 6 +++--- lib/src/coap_response.dart | 6 ++++-- lib/src/codec/tcp/message_decoder.dart | 11 ++++++++--- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index 0eca68d3..3505ce51 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -45,11 +45,11 @@ Future connect() async { final coapClient = CoapClient(Uri.parse('coap+tcp://californium.eclipseprojects.io:5683')); - final response = await coapClient.post( + final response = await coapClient.get( Uri(path: 'test'), - format: CoapMediaType.applicationJson, + // format: CoapMediaType.applicationJson, accept: CoapMediaType.applicationCbor, - payload: 'Hello?', + // payload: 'Hello?', ); print(response); diff --git a/lib/src/coap_response.dart b/lib/src/coap_response.dart index 07fe6edf..82908d92 100644 --- a/lib/src/coap_response.dart +++ b/lib/src/coap_response.dart @@ -43,8 +43,10 @@ class CoapResponse extends CoapMessage { final Uri location; @override - List> getAllOptions() => - locationToOptions(location)..addAll(super.getAllOptions()); + List> getAllOptions() => locationToOptions(location) + ..addAll( + super.getAllOptions().where((element) => !element.isLocationOption), + ); /// Status code as a string String get statusCodeString => code.toString(); diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart index 6c727522..1e38924a 100644 --- a/lib/src/codec/tcp/message_decoder.dart +++ b/lib/src/codec/tcp/message_decoder.dart @@ -6,6 +6,7 @@ import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; +import 'package:convert/convert.dart'; import 'package:typed_data/typed_data.dart'; import '../../coap_code.dart'; @@ -34,7 +35,9 @@ final toByteStream = controller.onListen = () { final subscription = input.listen( - (final bytes) => bytes.forEach(controller.add), + (final bytes) { + bytes.forEach(controller.add); + }, onDone: controller.close, onError: controller.addError, cancelOnError: cancelOnError, @@ -91,6 +94,7 @@ final toRawCoapTcpStream = StreamTransformer( extendedLengthBuffer.clear(); optionsAndPayload.clear(); extendedTokenLengthBuffer.clear(); + print(hex.encode([byte])); // TODO(JKRhb): Handle WebSockets case with length = 0 length = (byte >> 4) & 15; @@ -105,8 +109,9 @@ final toRawCoapTcpStream = StreamTransformer( state = TcpState.code; break; case TcpState.extendedLength: + print(hex.encode([byte])); extendedLengthBuffer.add(byte); - if (extendedLengthBytes-- <= 0) { + if (--extendedLengthBytes <= 0) { length = _readExtendedMessageLength( length, DatagramReader(extendedLengthBuffer), @@ -168,8 +173,8 @@ final toRawCoapTcpStream = StreamTransformer( break; case TcpState.optionsAndPayload: - optionsAndPayload.add(byte); length--; + optionsAndPayload.add(byte); if (length < 1) { state = TcpState.initialState; From c1e4cde056092b1add692b95695afbddf99742c2 Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 2 Jul 2023 02:04:24 +0200 Subject: [PATCH 09/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 2 +- lib/src/coap_response.dart | 2 +- lib/src/codec/tcp/message_decoder.dart | 265 +++++++++++++------------ lib/src/network/coap_network_tcp.dart | 2 +- 4 files changed, 138 insertions(+), 133 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index 3505ce51..99be104b 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -43,7 +43,7 @@ Future main() async { Future connect() async { final coapClient = - CoapClient(Uri.parse('coap+tcp://californium.eclipseprojects.io:5683')); + CoapClient(Uri.parse('coap+tcp://californium.eclipseprojects.io')); final response = await coapClient.get( Uri(path: 'test'), diff --git a/lib/src/coap_response.dart b/lib/src/coap_response.dart index 82908d92..e40b3c15 100644 --- a/lib/src/coap_response.dart +++ b/lib/src/coap_response.dart @@ -45,7 +45,7 @@ class CoapResponse extends CoapMessage { @override List> getAllOptions() => locationToOptions(location) ..addAll( - super.getAllOptions().where((element) => !element.isLocationOption), + super.getAllOptions().where((final element) => !element.isLocationOption), ); /// Status code as a string diff --git a/lib/src/codec/tcp/message_decoder.dart b/lib/src/codec/tcp/message_decoder.dart index 1e38924a..94ff1353 100644 --- a/lib/src/codec/tcp/message_decoder.dart +++ b/lib/src/codec/tcp/message_decoder.dart @@ -6,7 +6,6 @@ import 'dart:async'; import 'dart:io'; import 'dart:typed_data'; -import 'package:convert/convert.dart'; import 'package:typed_data/typed_data.dart'; import '../../coap_code.dart'; @@ -35,9 +34,7 @@ final toByteStream = controller.onListen = () { final subscription = input.listen( - (final bytes) { - bytes.forEach(controller.add); - }, + (final bytes) => bytes.forEach(controller.add), onDone: controller.close, onError: controller.addError, cancelOnError: cancelOnError, @@ -69,141 +66,149 @@ class RawCoapTcpMessage { 'Code: $code\nToken:$token\nOptions and Payload:$optionsAndPayload'; } -final toRawCoapTcpStream = StreamTransformer( - (final input, final cancelOnError) { - // TODO(JKRhb): Connections must be aborted on error - final controller = StreamController(); - - var state = TcpState.initialState; - var length = 0; - var extendedLengthBytes = 0; - final extendedLengthBuffer = Uint8Buffer(); - var tokenLength = 0; - final token = Uint8Buffer(); - var extendedTokenLengthBytes = 0; - final extendedTokenLengthBuffer = Uint8Buffer(); - var code = 0; - final optionsAndPayload = Uint8Buffer(); +StreamTransformer toRawCoapTcpStream({ + final bool usesWebSockets = false, +}) => + StreamTransformer( + (final input, final cancelOnError) { + // TODO(JKRhb): Connections must be aborted on error + final controller = StreamController(); + + var state = TcpState.initialState; + var length = 0; + var extendedLengthBytes = 0; + final extendedLengthBuffer = Uint8Buffer(); + var tokenLength = 0; + final token = Uint8Buffer(); + var extendedTokenLengthBytes = 0; + final extendedTokenLengthBuffer = Uint8Buffer(); + var code = 0; + final optionsAndPayload = Uint8Buffer(); - controller.onListen = () { - final subscription = input.listen( - (final byte) async { - switch (state) { - case TcpState.initialState: - token.clear(); - extendedLengthBuffer.clear(); - optionsAndPayload.clear(); - extendedTokenLengthBuffer.clear(); - print(hex.encode([byte])); - - // TODO(JKRhb): Handle WebSockets case with length = 0 - length = (byte >> 4) & 15; - tokenLength = byte & 15; - - if (const [13, 14, 15].contains(length)) { - state = TcpState.extendedLength; - extendedLengthBytes = determineExtendedLength(length); - break; - } + controller.onListen = () { + final subscription = input.listen( + (final byte) async { + switch (state) { + case TcpState.initialState: + token.clear(); + extendedLengthBuffer.clear(); + optionsAndPayload.clear(); + extendedTokenLengthBuffer.clear(); + + length = (byte >> 4) & 15; + + if (usesWebSockets && length != 0) { + // TODO(JKRhb): Properly support WebSockets + controller.addError(const FormatException()); + return; + } - state = TcpState.code; - break; - case TcpState.extendedLength: - print(hex.encode([byte])); - extendedLengthBuffer.add(byte); - if (--extendedLengthBytes <= 0) { - length = _readExtendedMessageLength( - length, - DatagramReader(extendedLengthBuffer), - ); - state = TcpState.code; - break; - } + tokenLength = byte & 15; - break; - case TcpState.code: - code = byte; - if (const [13, 14].contains(tokenLength)) { - state = TcpState.extendedTokenLength; - extendedTokenLengthBytes = determineExtendedLength(length); - break; - } else if (tokenLength == 15) { - throw const FormatException(); - } - state = TcpState.token; - break; - case TcpState.extendedTokenLength: - extendedTokenLengthBuffer.add(byte); - extendedTokenLengthBytes--; - - if (extendedTokenLengthBytes < 1) { - length = _readExtendedMessageLength( - length, - DatagramReader(extendedLengthBuffer), - ); - - state = TcpState.code; - break; - } + if (const [13, 14, 15].contains(length)) { + state = TcpState.extendedLength; + extendedLengthBytes = determineExtendedLength(length); + break; + } - break; - case TcpState.token: - token.add(byte); - tokenLength--; + state = TcpState.code; + break; + case TcpState.extendedLength: + extendedLengthBuffer.add(byte); + if (--extendedLengthBytes <= 0) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + state = TcpState.code; + break; + } - if (tokenLength >= 1) { - break; - } + break; + case TcpState.code: + code = byte; + if (const [13, 14].contains(tokenLength)) { + state = TcpState.extendedTokenLength; + extendedTokenLengthBytes = determineExtendedLength(length); + break; + } else if (tokenLength == 15) { + controller.addError(const FormatException()); + return; + } + state = TcpState.token; + break; + case TcpState.extendedTokenLength: + extendedTokenLengthBuffer.add(byte); + extendedTokenLengthBytes--; + + if (extendedTokenLengthBytes < 1) { + length = _readExtendedMessageLength( + length, + DatagramReader(extendedLengthBuffer), + ); + + state = TcpState.code; + break; + } - // TODO(JKRhb): Refactor - if (length < 1) { - state = TcpState.initialState; - controller.add( - RawCoapTcpMessage( - code: code, - token: Uint8List.fromList(token.toList(growable: false)), - optionsAndPayload: Uint8List.fromList( - optionsAndPayload.toList(growable: false), - ), - ), - ); - } else { - state = TcpState.optionsAndPayload; - } + break; + case TcpState.token: + token.add(byte); + tokenLength--; - break; - case TcpState.optionsAndPayload: - length--; - optionsAndPayload.add(byte); - - if (length < 1) { - state = TcpState.initialState; - controller.add( - RawCoapTcpMessage( - code: code, - token: Uint8List.fromList(token.toList(growable: false)), - optionsAndPayload: Uint8List.fromList( - optionsAndPayload.toList(growable: false), - ), - ), - ); - } + if (tokenLength >= 1) { + break; + } - break; - } - }, - onDone: controller.close, - onError: controller.addError, - cancelOnError: cancelOnError, - ); - controller - ..onPause = subscription.pause - ..onResume = subscription.resume - ..onCancel = subscription.cancel; - }; + // TODO(JKRhb): Refactor + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } else { + state = TcpState.optionsAndPayload; + } - return controller.stream.listen(null); -}); + break; + case TcpState.optionsAndPayload: + length--; + optionsAndPayload.add(byte); + + if (length < 1) { + state = TcpState.initialState; + controller.add( + RawCoapTcpMessage( + code: code, + token: Uint8List.fromList(token.toList(growable: false)), + optionsAndPayload: Uint8List.fromList( + optionsAndPayload.toList(growable: false), + ), + ), + ); + } + + break; + } + }, + onDone: controller.close, + onError: controller.addError, + cancelOnError: cancelOnError, + ); + controller + ..onPause = subscription.pause + ..onResume = subscription.resume + ..onCancel = subscription.cancel; + }; + + return controller.stream.listen(null); + }); int determineExtendedLength(final int length) { switch (length) { diff --git a/lib/src/network/coap_network_tcp.dart b/lib/src/network/coap_network_tcp.dart index 1b524192..8bc7ee4a 100644 --- a/lib/src/network/coap_network_tcp.dart +++ b/lib/src/network/coap_network_tcp.dart @@ -101,7 +101,7 @@ class CoapNetworkTCP implements CoapINetwork { void _receive() { socket ?.transform(toByteStream) - .transform(toRawCoapTcpStream) + .transform(toRawCoapTcpStream()) .transform(deserializeTcpMessage(_scheme, address)) .listen( (final message) { From b33f1eb9a328f9ac0b6f88e2efea0743dcdffc2c Mon Sep 17 00:00:00 2001 From: Jan Romann Date: Sun, 2 Jul 2023 02:41:10 +0200 Subject: [PATCH 10/10] fixup! WIP: Implement CoAP over TCP logic --- example/tcp_test.dart | 37 +++---------------------------------- 1 file changed, 3 insertions(+), 34 deletions(-) diff --git a/example/tcp_test.dart b/example/tcp_test.dart index 99be104b..39411e70 100644 --- a/example/tcp_test.dart +++ b/example/tcp_test.dart @@ -1,43 +1,12 @@ // ignore_for_file: avoid_print -import 'dart:convert'; import 'dart:core'; -import 'dart:io'; import 'package:coap/coap.dart'; -Future startServer() async { - final server = await ServerSocket.bind(InternetAddress.anyIPv4, 5683); - server.listen((final connection) async { - await connection.forEach((final frame) async { - print(frame); - - const responseCode = (2 << 5) + 5; - - const tokenLength = 8; - const tokenOffset = 3; - final token = frame.sublist(tokenOffset, tokenOffset + tokenLength); - final payload = utf8.encode('Hello'); - - final response = [ - ((payload.length + 1) << 4) | tokenLength, - responseCode, - ...token, - 255, - ...payload, - ]; - - connection.add(response); - await connection.close(); - await server.close(); - }); - }); -} - /// Tests the basic functionality of the TCP network. /// Will be replaced with a "real" example later. Future main() async { - await startServer(); await connect(); } @@ -45,11 +14,11 @@ Future connect() async { final coapClient = CoapClient(Uri.parse('coap+tcp://californium.eclipseprojects.io')); - final response = await coapClient.get( + final response = await coapClient.post( Uri(path: 'test'), - // format: CoapMediaType.applicationJson, + format: CoapMediaType.applicationJson, accept: CoapMediaType.applicationCbor, - // payload: 'Hello?', + payload: 'Hello?', ); print(response);