diff --git a/.github/workflows/base.yml b/.github/workflows/base.yml
index e121c28bf..c57a9ce3d 100644
--- a/.github/workflows/base.yml
+++ b/.github/workflows/base.yml
@@ -25,7 +25,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- framework: [ net8.0 ]
+ framework: [ net9.0 ]
os: [ ubuntu-latest ]
configuration: [ release ]
runs-on: ${{ matrix.os }}
@@ -53,7 +53,7 @@ jobs:
uses: actions/setup-dotnet@v3
with:
dotnet-version: |
- 8.0.x
+ 9.0.x
- name: Run Tests
shell: bash
env:
@@ -64,7 +64,7 @@ jobs:
dotnet test --configuration ${{ matrix.configuration }} --blame \
--logger:"GitHubActions;report-warnings=false" --logger:"console;verbosity=normal" \
--framework ${{ matrix.framework }} \
- test/EventStore.Client.Tests
+ test/Kurrent.Client.Tests
# run: |
# sudo ./gencert.sh
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index c1ab57e76..1f408ae3c 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -14,7 +14,8 @@ jobs:
strategy:
fail-fast: false
matrix:
- docker-tag: [ ci, lts, previous-lts ]
+# docker-tag: [ ci, lts, previous-lts ]
+ docker-tag: [ ci ]
# test: [ Streams, PersistentSubscriptions, Operations, UserManagement, ProjectionManagement ]
name: Test CE (${{ matrix.docker-tag }})
with:
diff --git a/EventStore.Client.sln b/Kurrent.Client.sln
similarity index 66%
rename from EventStore.Client.sln
rename to Kurrent.Client.sln
index 4b4791ec9..63cdbc278 100644
--- a/EventStore.Client.sln
+++ b/Kurrent.Client.sln
@@ -9,12 +9,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client", "src\Ev
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "test", "test", "{C51F2C69-45A9-4D0D-A708-4FC319D5D340}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests", "test\EventStore.Client.Tests\EventStore.Client.Tests.csproj", "{FC829F1B-43AD-4C96-9002-23D04BBA3AF3}"
-EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Tests.Common", "test\EventStore.Client.Tests.Common\EventStore.Client.Tests.Common.csproj", "{E326832D-DE52-4DE4-9E54-C800908B75F3}"
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests", "test\Kurrent.Client.Tests\Kurrent.Client.Tests.csproj", "{FC829F1B-43AD-4C96-9002-23D04BBA3AF3}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventStore.Client.Extensions.OpenTelemetry", "src\EventStore.Client.Extensions.OpenTelemetry\EventStore.Client.Extensions.OpenTelemetry.csproj", "{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E}"
EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client", "src\Kurrent.Client\Kurrent.Client.csproj", "{762EECAA-122E-4B0C-BC50-5AA4F72CA4E0}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kurrent.Client.Tests.Common", "test\Kurrent.Client.Tests.Common\Kurrent.Client.Tests.Common.csproj", "{47BF715B-A0BF-4044-B335-717E56422550}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|x64 = Debug|x64
@@ -32,19 +34,24 @@ Global
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3}.Debug|x64.Build.0 = Debug|Any CPU
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3}.Release|x64.ActiveCfg = Release|Any CPU
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3}.Release|x64.Build.0 = Release|Any CPU
- {E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.ActiveCfg = Debug|Any CPU
- {E326832D-DE52-4DE4-9E54-C800908B75F3}.Debug|x64.Build.0 = Debug|Any CPU
- {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.ActiveCfg = Release|Any CPU
- {E326832D-DE52-4DE4-9E54-C800908B75F3}.Release|x64.Build.0 = Release|Any CPU
{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E}.Debug|x64.ActiveCfg = Debug|Any CPU
{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E}.Debug|x64.Build.0 = Debug|Any CPU
{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E}.Release|x64.ActiveCfg = Release|Any CPU
{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E}.Release|x64.Build.0 = Release|Any CPU
+ {762EECAA-122E-4B0C-BC50-5AA4F72CA4E0}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {762EECAA-122E-4B0C-BC50-5AA4F72CA4E0}.Debug|x64.Build.0 = Debug|Any CPU
+ {762EECAA-122E-4B0C-BC50-5AA4F72CA4E0}.Release|x64.ActiveCfg = Release|Any CPU
+ {762EECAA-122E-4B0C-BC50-5AA4F72CA4E0}.Release|x64.Build.0 = Release|Any CPU
+ {47BF715B-A0BF-4044-B335-717E56422550}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {47BF715B-A0BF-4044-B335-717E56422550}.Debug|x64.Build.0 = Debug|Any CPU
+ {47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.ActiveCfg = Release|Any CPU
+ {47BF715B-A0BF-4044-B335-717E56422550}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{8853D875-4A8E-450B-A1BE-9CEF8BEDABC3} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
{FC829F1B-43AD-4C96-9002-23D04BBA3AF3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
- {E326832D-DE52-4DE4-9E54-C800908B75F3} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
{F6A7B391-36F1-4838-AD08-E0EE0F2FE57E} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
+ {762EECAA-122E-4B0C-BC50-5AA4F72CA4E0} = {EA59C1CB-16DA-4F68-AF8A-642A969B4CF8}
+ {47BF715B-A0BF-4044-B335-717E56422550} = {C51F2C69-45A9-4D0D-A708-4FC319D5D340}
EndGlobalSection
EndGlobal
diff --git a/EventStore.Client.sln.DotSettings b/Kurrent.Client.sln.DotSettings
similarity index 100%
rename from EventStore.Client.sln.DotSettings
rename to Kurrent.Client.sln.DotSettings
diff --git a/src/Directory.Build.props b/src/Directory.Build.props
index 9f5159807..38e546298 100644
--- a/src/Directory.Build.props
+++ b/src/Directory.Build.props
@@ -3,8 +3,8 @@
true
- EventStore.Client
- EventStore.Client
+ Kurrent.Client
+ Kurrent.Client
@@ -12,8 +12,8 @@
LICENSE.md
https://kurrent.io
false
- https://eventstore.com/blog/
- kurrent eventstore client grpc
+ https://kurrent.io/blog/
+ kurrent client grpc
Kurrent Ltd
Copyright 2012-$([System.DateTime]::Today.Year.ToString()) Kurrent Ltd
v
diff --git a/src/Kurrent.Client/Core/Certificates/X509Certificates.cs b/src/Kurrent.Client/Core/Certificates/X509Certificates.cs
new file mode 100644
index 000000000..3fe1006f5
--- /dev/null
+++ b/src/Kurrent.Client/Core/Certificates/X509Certificates.cs
@@ -0,0 +1,114 @@
+#pragma warning disable CS1591 // Missing XML comment for publicly visible type or member
+
+using System.Security.Cryptography;
+using System.Security.Cryptography.X509Certificates;
+
+#if NET48
+using Org.BouncyCastle.Crypto;
+using Org.BouncyCastle.Crypto.Parameters;
+using Org.BouncyCastle.OpenSsl;
+using Org.BouncyCastle.Security;
+#endif
+
+namespace EventStore.Client;
+
+static class X509Certificates {
+ // TODO SS: Use .NET 8 X509Certificate2.CreateFromPemFile(certPemFilePath, keyPemFilePath) once the Windows32Exception issue is resolved
+ public static X509Certificate2 CreateFromPemFile(string certPemFilePath, string keyPemFilePath) {
+ try {
+#if NET9_0_OR_GREATER
+ using var publicCert = X509CertificateLoader.LoadCertificateFromFile(certPemFilePath);
+#else
+ using var publicCert = new X509Certificate2(certPemFilePath);
+#endif
+ using var privateKey = RSA.Create().ImportPrivateKeyFromFile(keyPemFilePath);
+ using var certificate = publicCert.CopyWithPrivateKey(privateKey);
+
+#if NET48
+ return new(certificate.Export(X509ContentType.Pfx));
+#else
+ return X509Certificate2.CreateFromPemFile(certPemFilePath, keyPemFilePath);
+#endif
+ } catch (Exception ex) {
+ throw new CryptographicException($"Failed to load private key: {ex.Message}");
+ }
+
+ // Notes:
+ // using X509Certificate2.CreateFromPemFile(certPemFilePath, keyPemFilePath) would be the ideal choice here,
+ // but it's currently causing a Win32Exception specifically on Windows. Alternative implementation is used until the issue is resolved.
+ //
+ // Error: The SSL connection could not be established, see inner exception. AuthenticationException: Authentication failed because the platform
+ // does not support ephemeral keys. Win32Exception: No credentials are available in the security package
+ //
+ // public static X509Certificate2 CreateFromPemFile(string certPemFilePath, string keyPemFilePath) =>
+ // X509Certificate2.CreateFromPemFile(certPemFilePath, keyPemFilePath);
+ }
+}
+
+public static class RsaExtensions {
+#if NET48
+ public static RSA ImportPrivateKeyFromFile(this RSA rsa, string privateKeyPath) {
+ var (content, label) = LoadPemKeyFile(privateKeyPath);
+
+ using var reader = new PemReader(new StringReader(string.Join(Environment.NewLine, content)));
+
+ var keyParameters = reader.ReadObject() switch {
+ RsaPrivateCrtKeyParameters parameters => parameters,
+ AsymmetricCipherKeyPair keyPair => keyPair.Private as RsaPrivateCrtKeyParameters,
+ _ => throw new NotSupportedException($"Invalid private key format: {label}")
+ };
+
+ rsa.ImportParameters(DotNetUtilities.ToRSAParameters(keyParameters));
+
+ return rsa;
+ }
+#else
+ public static RSA ImportPrivateKeyFromFile(this RSA rsa, string privateKeyPath) {
+ var (content, label) = LoadPemKeyFile(privateKeyPath);
+
+ var privateKey = string.Join(string.Empty, content[1..^1]);
+ var privateKeyBytes = Convert.FromBase64String(privateKey);
+
+ if (label == RsaPemLabels.Pkcs8PrivateKey)
+ rsa.ImportPkcs8PrivateKey(privateKeyBytes, out _);
+ else if (label == RsaPemLabels.RSAPrivateKey)
+ rsa.ImportRSAPrivateKey(privateKeyBytes, out _);
+
+ return rsa;
+ }
+#endif
+
+ static (string[] Content, string Label) LoadPemKeyFile(string privateKeyPath) {
+ var content = File.ReadAllLines(privateKeyPath);
+ var label = RsaPemLabels.ParseKeyLabel(content[0]);
+
+ if (RsaPemLabels.IsEncryptedPrivateKey(label))
+ throw new NotSupportedException("Encrypted private keys are not supported");
+
+ return (content, label);
+ }
+}
+
+static class RsaPemLabels {
+ public const string RSAPrivateKey = "RSA PRIVATE KEY";
+ public const string Pkcs8PrivateKey = "PRIVATE KEY";
+ public const string EncryptedPkcs8PrivateKey = "ENCRYPTED PRIVATE KEY";
+
+ public static readonly string[] PrivateKeyLabels = [RSAPrivateKey, Pkcs8PrivateKey, EncryptedPkcs8PrivateKey];
+
+ public static bool IsPrivateKey(string label) => Array.IndexOf(PrivateKeyLabels, label) != -1;
+
+ public static bool IsEncryptedPrivateKey(string label) => label == EncryptedPkcs8PrivateKey;
+
+ const string LabelPrefix = "-----BEGIN ";
+ const string LabelSuffix = "-----";
+
+ public static string ParseKeyLabel(string pemFileHeader) {
+ var label = pemFileHeader.Replace(LabelPrefix, string.Empty).Replace(LabelSuffix, string.Empty);
+
+ if (!IsPrivateKey(label))
+ throw new CryptographicException($"Unknown private key label: {label}");
+
+ return label;
+ }
+}
diff --git a/src/Kurrent.Client/Core/ChannelBaseExtensions.cs b/src/Kurrent.Client/Core/ChannelBaseExtensions.cs
new file mode 100644
index 000000000..9c44addef
--- /dev/null
+++ b/src/Kurrent.Client/Core/ChannelBaseExtensions.cs
@@ -0,0 +1,10 @@
+using Grpc.Core;
+
+namespace EventStore.Client;
+
+static class ChannelBaseExtensions {
+ public static async ValueTask DisposeAsync(this ChannelBase channel) {
+ await channel.ShutdownAsync().ConfigureAwait(false);
+ (channel as IDisposable)?.Dispose();
+ }
+}
diff --git a/src/Kurrent.Client/Core/ChannelCache.cs b/src/Kurrent.Client/Core/ChannelCache.cs
new file mode 100644
index 000000000..09f7c2b86
--- /dev/null
+++ b/src/Kurrent.Client/Core/ChannelCache.cs
@@ -0,0 +1,146 @@
+using System.Net;
+using TChannel = Grpc.Net.Client.GrpcChannel;
+
+namespace EventStore.Client {
+ // Maintains Channels keyed by DnsEndPoint so the channels can be reused.
+ // Deals with the disposal difference between grpc.net and grpc.core
+ // Thread safe.
+ internal class ChannelCache :
+ IAsyncDisposable {
+
+ private readonly KurrentClientSettings _settings;
+ private readonly Random _random;
+ private readonly Dictionary _channels;
+ private readonly object _lock = new();
+ private bool _disposed;
+
+ public ChannelCache(KurrentClientSettings settings) {
+ _settings = settings;
+ _random = new Random(0);
+ _channels = new Dictionary(
+ DnsEndPointEqualityComparer.Instance);
+ }
+
+ public TChannel GetChannelInfo(DnsEndPoint endPoint) {
+ lock (_lock) {
+ ThrowIfDisposed();
+
+ if (!_channels.TryGetValue(endPoint, out var channel)) {
+ channel = ChannelFactory.CreateChannel(
+ settings: _settings,
+ endPoint: endPoint);
+ _channels[endPoint] = channel;
+ }
+
+ return channel;
+ }
+ }
+
+ public KeyValuePair[] GetRandomOrderSnapshot() {
+ lock (_lock) {
+ ThrowIfDisposed();
+
+ return _channels
+ .OrderBy(_ => _random.Next())
+ .ToArray();
+ }
+ }
+
+ // Update the cache to contain channels for exactly these endpoints
+ public void UpdateCache(IEnumerable endPoints) {
+ lock (_lock) {
+ ThrowIfDisposed();
+
+ // remove
+ var endPointsToDiscard = _channels.Keys
+ .Except(endPoints, DnsEndPointEqualityComparer.Instance)
+ .ToArray();
+
+ var channelsToDispose = new List(endPointsToDiscard.Length);
+
+ foreach (var endPoint in endPointsToDiscard) {
+ if (!_channels.TryGetValue(endPoint, out var channel))
+ continue;
+
+ _channels.Remove(endPoint);
+ channelsToDispose.Add(channel);
+ }
+
+ _ = DisposeChannelsAsync(channelsToDispose);
+
+ // add
+ foreach (var endPoint in endPoints) {
+ GetChannelInfo(endPoint);
+ }
+ }
+ }
+
+ public void Dispose() {
+ lock (_lock) {
+ if (_disposed)
+ return;
+
+ _disposed = true;
+
+ foreach (var channel in _channels.Values) {
+ channel.Dispose();
+ }
+
+ _channels.Clear();
+ }
+ }
+
+ public async ValueTask DisposeAsync() {
+ var channelsToDispose = Array.Empty();
+
+ lock (_lock) {
+ if (_disposed)
+ return;
+ _disposed = true;
+
+ channelsToDispose = _channels.Values.ToArray();
+ _channels.Clear();
+ }
+
+ await DisposeChannelsAsync(channelsToDispose).ConfigureAwait(false);
+ }
+
+ private void ThrowIfDisposed() {
+ lock (_lock) {
+ if (_disposed) {
+ throw new ObjectDisposedException(GetType().ToString());
+ }
+ }
+ }
+
+ private static async Task DisposeChannelsAsync(IEnumerable channels) {
+ foreach (var channel in channels)
+ await channel.DisposeAsync().ConfigureAwait(false);
+ }
+
+ private class DnsEndPointEqualityComparer : IEqualityComparer {
+ public static readonly DnsEndPointEqualityComparer Instance = new();
+
+ public bool Equals(DnsEndPoint? x, DnsEndPoint? y) {
+ if (ReferenceEquals(x, y))
+ return true;
+ if (x is null)
+ return false;
+ if (y is null)
+ return false;
+ if (x.GetType() != y.GetType())
+ return false;
+ return
+ string.Equals(x.Host, y.Host, StringComparison.OrdinalIgnoreCase) &&
+ x.Port == y.Port;
+ }
+
+ public int GetHashCode(DnsEndPoint obj) {
+ unchecked {
+ return (StringComparer.OrdinalIgnoreCase.GetHashCode(obj.Host) * 397) ^
+ obj.Port;
+ }
+ }
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/ChannelFactory.cs b/src/Kurrent.Client/Core/ChannelFactory.cs
new file mode 100644
index 000000000..c63605bb4
--- /dev/null
+++ b/src/Kurrent.Client/Core/ChannelFactory.cs
@@ -0,0 +1,106 @@
+using System.Net.Http;
+using System.Security.Cryptography.X509Certificates;
+using Grpc.Net.Client;
+using EndPoint = System.Net.EndPoint;
+using TChannel = Grpc.Net.Client.GrpcChannel;
+
+namespace EventStore.Client {
+
+ internal static class ChannelFactory {
+ private const int MaxReceiveMessageLength = 17 * 1024 * 1024;
+
+ public static TChannel CreateChannel(KurrentClientSettings settings, EndPoint endPoint) {
+ var address = endPoint.ToUri(!settings.ConnectivitySettings.Insecure);
+
+ if (settings.ConnectivitySettings.Insecure) {
+ //this must be switched on before creation of the HttpMessageHandler
+ AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
+ }
+
+ return TChannel.ForAddress(
+ address,
+ new GrpcChannelOptions {
+#if NET48
+ HttpHandler = CreateHandler(settings),
+#else
+ HttpClient = new HttpClient(CreateHandler(settings), true) {
+ Timeout = System.Threading.Timeout.InfiniteTimeSpan,
+ DefaultRequestVersion = new Version(2, 0)
+ },
+#endif
+ LoggerFactory = settings.LoggerFactory,
+ Credentials = settings.ChannelCredentials,
+ DisposeHttpClient = true,
+ MaxReceiveMessageSize = MaxReceiveMessageLength
+ }
+ );
+
+
+#if NET48
+ static HttpMessageHandler CreateHandler(KurrentClientSettings settings) {
+ if (settings.CreateHttpMessageHandler is not null)
+ return settings.CreateHttpMessageHandler.Invoke();
+
+ var handler = new WinHttpHandler {
+ TcpKeepAliveEnabled = true,
+ TcpKeepAliveTime = settings.ConnectivitySettings.KeepAliveTimeout,
+ TcpKeepAliveInterval = settings.ConnectivitySettings.KeepAliveInterval,
+ EnableMultipleHttp2Connections = true
+ };
+
+ if (settings.ConnectivitySettings.Insecure) return handler;
+
+ if (settings.ConnectivitySettings.ClientCertificate is not null)
+ handler.ClientCertificates.Add(settings.ConnectivitySettings.ClientCertificate);
+
+ handler.ServerCertificateValidationCallback = settings.ConnectivitySettings.TlsVerifyCert switch {
+ false => delegate { return true; },
+ true when settings.ConnectivitySettings.TlsCaFile is not null => (sender, certificate, chain, errors) => {
+ if (chain is null) return false;
+
+ chain.ChainPolicy.ExtraStore.Add(settings.ConnectivitySettings.TlsCaFile);
+ return chain.Build(certificate);
+ },
+ _ => null
+ };
+
+ return handler;
+ }
+#else
+ static HttpMessageHandler CreateHandler(KurrentClientSettings settings) {
+ if (settings.CreateHttpMessageHandler is not null)
+ return settings.CreateHttpMessageHandler.Invoke();
+
+ var handler = new SocketsHttpHandler {
+ KeepAlivePingDelay = settings.ConnectivitySettings.KeepAliveInterval,
+ KeepAlivePingTimeout = settings.ConnectivitySettings.KeepAliveTimeout,
+ EnableMultipleHttp2Connections = true
+ };
+
+ if (settings.ConnectivitySettings.Insecure)
+ return handler;
+
+ if (settings.ConnectivitySettings.ClientCertificate is not null) {
+ handler.SslOptions.ClientCertificates = new X509CertificateCollection {
+ settings.ConnectivitySettings.ClientCertificate
+ };
+ }
+
+ handler.SslOptions.RemoteCertificateValidationCallback = settings.ConnectivitySettings.TlsVerifyCert switch {
+ false => delegate { return true; },
+ true when settings.ConnectivitySettings.TlsCaFile is not null => (sender, certificate, chain, errors) => {
+ if (certificate is not X509Certificate2 peerCertificate || chain is null) return false;
+
+ chain.ChainPolicy.TrustMode = X509ChainTrustMode.CustomRootTrust;
+ chain.ChainPolicy.CustomTrustStore.Add(settings.ConnectivitySettings.TlsCaFile);
+ return chain.Build(peerCertificate);
+ },
+ _ => null
+ };
+
+ return handler;
+ }
+#endif
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/ChannelInfo.cs b/src/Kurrent.Client/Core/ChannelInfo.cs
new file mode 100644
index 000000000..87c80b644
--- /dev/null
+++ b/src/Kurrent.Client/Core/ChannelInfo.cs
@@ -0,0 +1,9 @@
+using Grpc.Core;
+
+namespace EventStore.Client {
+#pragma warning disable 1591
+ public record ChannelInfo(
+ ChannelBase Channel,
+ ServerCapabilities ServerCapabilities,
+ CallInvoker CallInvoker);
+}
diff --git a/src/Kurrent.Client/Core/ChannelSelector.cs b/src/Kurrent.Client/Core/ChannelSelector.cs
new file mode 100644
index 000000000..af0fa3031
--- /dev/null
+++ b/src/Kurrent.Client/Core/ChannelSelector.cs
@@ -0,0 +1,24 @@
+using System.Net;
+using System.Threading;
+using System.Threading.Tasks;
+using Grpc.Core;
+
+namespace EventStore.Client {
+ internal class ChannelSelector : IChannelSelector {
+ private readonly IChannelSelector _inner;
+
+ public ChannelSelector(
+ KurrentClientSettings settings,
+ ChannelCache channelCache) {
+ _inner = settings.ConnectivitySettings.IsSingleNode
+ ? new SingleNodeChannelSelector(settings, channelCache)
+ : new GossipChannelSelector(settings, channelCache, new GrpcGossipClient(settings));
+ }
+
+ public Task SelectChannelAsync(CancellationToken cancellationToken) =>
+ _inner.SelectChannelAsync(cancellationToken);
+
+ public ChannelBase SelectChannel(DnsEndPoint endPoint) =>
+ _inner.SelectChannel(endPoint);
+ }
+}
diff --git a/src/Kurrent.Client/Core/ClusterMessage.cs b/src/Kurrent.Client/Core/ClusterMessage.cs
new file mode 100644
index 000000000..64701781e
--- /dev/null
+++ b/src/Kurrent.Client/Core/ClusterMessage.cs
@@ -0,0 +1,28 @@
+using System.Net;
+
+namespace EventStore.Client {
+ internal static class ClusterMessages {
+ public record ClusterInfo(MemberInfo[] Members);
+
+ public record MemberInfo(Uuid InstanceId, VNodeState State, bool IsAlive, DnsEndPoint EndPoint);
+
+ public enum VNodeState {
+ Initializing = 0,
+ DiscoverLeader = 1,
+ Unknown = 2,
+ PreReplica = 3,
+ CatchingUp = 4,
+ Clone = 5,
+ Follower = 6,
+ PreLeader = 7,
+ Leader = 8,
+ Manager = 9,
+ ShuttingDown = 10,
+ Shutdown = 11,
+ ReadOnlyLeaderless = 12,
+ PreReadOnlyReplica = 13,
+ ReadOnlyReplica = 14,
+ ResigningLeader = 15
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/AsyncStreamReaderExtensions.cs b/src/Kurrent.Client/Core/Common/AsyncStreamReaderExtensions.cs
new file mode 100644
index 000000000..98f9de54d
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/AsyncStreamReaderExtensions.cs
@@ -0,0 +1,29 @@
+using System.Threading.Channels;
+using System.Runtime.CompilerServices;
+using Grpc.Core;
+
+namespace EventStore.Client;
+
+static class AsyncStreamReaderExtensions {
+ public static async IAsyncEnumerable ReadAllAsync(
+ this IAsyncStreamReader reader,
+ [EnumeratorCancellation]
+ CancellationToken cancellationToken = default
+ ) {
+ while (await reader.MoveNext(cancellationToken).ConfigureAwait(false))
+ yield return reader.Current;
+ }
+
+ public static async IAsyncEnumerable ReadAllAsync(this ChannelReader reader, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
+#if NET
+ await foreach (var item in reader.ReadAllAsync(cancellationToken))
+ yield return item;
+#else
+ while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) {
+ while (reader.TryRead(out T? item)) {
+ yield return item;
+ }
+ }
+#endif
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Constants.cs b/src/Kurrent.Client/Core/Common/Constants.cs
new file mode 100644
index 000000000..2ed9d7c82
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Constants.cs
@@ -0,0 +1,62 @@
+namespace EventStore.Client;
+
+static class Constants {
+ public static class Exceptions {
+ public const string ExceptionKey = "exception";
+
+ public const string AccessDenied = "access-denied";
+ public const string InvalidTransaction = "invalid-transaction";
+ public const string StreamDeleted = "stream-deleted";
+ public const string WrongExpectedVersion = "wrong-expected-version";
+ public const string StreamNotFound = "stream-not-found";
+ public const string MaximumAppendSizeExceeded = "maximum-append-size-exceeded";
+ public const string MissingRequiredMetadataProperty = "missing-required-metadata-property";
+ public const string NotLeader = "not-leader";
+
+ public const string PersistentSubscriptionFailed = "persistent-subscription-failed";
+ public const string PersistentSubscriptionDoesNotExist = "persistent-subscription-does-not-exist";
+ public const string PersistentSubscriptionExists = "persistent-subscription-exists";
+ public const string MaximumSubscribersReached = "maximum-subscribers-reached";
+ public const string PersistentSubscriptionDropped = "persistent-subscription-dropped";
+
+ public const string UserNotFound = "user-not-found";
+ public const string UserConflict = "user-conflict";
+
+ public const string ScavengeNotFound = "scavenge-not-found";
+
+ public const string ExpectedVersion = "expected-version";
+ public const string ActualVersion = "actual-version";
+ public const string StreamName = "stream-name";
+ public const string GroupName = "group-name";
+ public const string Reason = "reason";
+ public const string MaximumAppendSize = "maximum-append-size";
+ public const string RequiredMetadataProperties = "required-metadata-properties";
+ public const string ScavengeId = "scavenge-id";
+ public const string LeaderEndpointHost = "leader-endpoint-host";
+ public const string LeaderEndpointPort = "leader-endpoint-port";
+
+ public const string LoginName = "login-name";
+ }
+
+ public static class Metadata {
+ public const string Type = "type";
+ public const string Created = "created";
+ public const string ContentType = "content-type";
+
+ public static readonly string[] RequiredMetadata = [Type, ContentType];
+
+ public static class ContentTypes {
+ public const string ApplicationJson = "application/json";
+ public const string ApplicationOctetStream = "application/octet-stream";
+ }
+ }
+
+ public static class Headers {
+ public const string Authorization = "authorization";
+ public const string BasicScheme = "Basic";
+ public const string BearerScheme = "Bearer";
+
+ public const string ConnectionName = "connection-name";
+ public const string RequiresLeader = "requires-leader";
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/ActivitySourceExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/ActivitySourceExtensions.cs
new file mode 100644
index 000000000..ab914d8c9
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/ActivitySourceExtensions.cs
@@ -0,0 +1,85 @@
+// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
+
+using System.Diagnostics;
+using Kurrent.Diagnostics;
+using Kurrent.Diagnostics.Telemetry;
+using Kurrent.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivitySourceExtensions {
+ public static async ValueTask TraceClientOperation(
+ this ActivitySource source,
+ Func> tracedOperation,
+ string operationName,
+ ActivityTagsCollection? tags = null
+ ) {
+ using var activity = StartActivity(source, operationName, ActivityKind.Client, tags, Activity.Current?.Context);
+
+ try {
+ var res = await tracedOperation().ConfigureAwait(false);
+ activity?.StatusOk();
+ return res;
+ } catch (Exception ex) {
+ activity?.StatusError(ex);
+ throw;
+ }
+ }
+
+ public static void TraceSubscriptionEvent(
+ this ActivitySource source,
+ string? subscriptionId,
+ ResolvedEvent resolvedEvent,
+ ChannelInfo channelInfo,
+ KurrentClientSettings settings,
+ UserCredentials? userCredentials
+ ) {
+ if (source.HasNoActiveListeners() || resolvedEvent.Event is null)
+ return;
+
+ var parentContext = resolvedEvent.Event.Metadata.ExtractPropagationContext();
+
+ if (parentContext == default(ActivityContext)) return;
+
+ var tags = new ActivityTagsCollection()
+ .WithRequiredTag(TelemetryTags.Kurrent.Stream, resolvedEvent.OriginalEvent.EventStreamId)
+ .WithOptionalTag(TelemetryTags.Kurrent.SubscriptionId, subscriptionId)
+ .WithRequiredTag(TelemetryTags.Kurrent.EventId, resolvedEvent.OriginalEvent.EventId.ToString())
+ .WithRequiredTag(TelemetryTags.Kurrent.EventType, resolvedEvent.OriginalEvent.EventType)
+ // Ensure consistent server.address attribute when connecting to cluster via dns discovery
+ .WithGrpcChannelServerTags(channelInfo)
+ .WithClientSettingsServerTags(settings)
+ .WithOptionalTag(
+ TelemetryTags.Database.User,
+ userCredentials?.Username ?? settings.DefaultCredentials?.Username
+ );
+
+ StartActivity(source, TracingConstants.Operations.Subscribe, ActivityKind.Consumer, tags, parentContext)
+ ?.Dispose();
+ }
+
+ static Activity? StartActivity(
+ this ActivitySource source,
+ string operationName, ActivityKind activityKind, ActivityTagsCollection? tags = null,
+ ActivityContext? parentContext = null
+ ) {
+ if (source.HasNoActiveListeners())
+ return null;
+
+ (tags ??= new ActivityTagsCollection())
+ .WithRequiredTag(TelemetryTags.Database.System, "kurrent")
+ .WithRequiredTag(TelemetryTags.Database.Operation, operationName);
+
+ return source
+ .CreateActivity(
+ operationName,
+ activityKind,
+ parentContext ?? default,
+ tags,
+ idFormat: ActivityIdFormat.W3C
+ )
+ ?.Start();
+ }
+
+ static bool HasNoActiveListeners(this ActivitySource source) => !source.HasListeners();
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/ActivityTagsCollectionExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/ActivityTagsCollectionExtensions.cs
new file mode 100644
index 000000000..fd6ad661a
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/ActivityTagsCollectionExtensions.cs
@@ -0,0 +1,32 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using Kurrent.Diagnostics;
+using Kurrent.Diagnostics.Telemetry;
+
+namespace EventStore.Client.Diagnostics;
+
+static class ActivityTagsCollectionExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithGrpcChannelServerTags(this ActivityTagsCollection tags, ChannelInfo? channelInfo) {
+ if (channelInfo is null)
+ return tags;
+
+ var authorityParts = channelInfo.Channel.Target.Split(':');
+
+ return tags
+ .WithRequiredTag(TelemetryTags.Server.Address, authorityParts[0])
+ .WithRequiredTag(TelemetryTags.Server.Port, int.Parse(authorityParts[1]));
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithClientSettingsServerTags(this ActivityTagsCollection source, KurrentClientSettings settings) {
+ if (settings.ConnectivitySettings.DnsGossipSeeds?.Length != 1)
+ return source;
+
+ var gossipSeed = settings.ConnectivitySettings.DnsGossipSeeds[0];
+
+ return source
+ .WithRequiredTag(TelemetryTags.Server.Address, gossipSeed.Host)
+ .WithRequiredTag(TelemetryTags.Server.Port, gossipSeed.Port);
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityExtensions.cs
new file mode 100644
index 000000000..4b6404f60
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityExtensions.cs
@@ -0,0 +1,52 @@
+// ReSharper disable CheckNamespace
+
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using Kurrent.Diagnostics.Telemetry;
+using Kurrent.Diagnostics.Tracing;
+
+namespace Kurrent.Diagnostics;
+
+static class ActivityExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static TracingMetadata GetTracingMetadata(this Activity activity) =>
+ new(activity.TraceId.ToString(), activity.SpanId.ToString());
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static Activity StatusOk(this Activity activity, string? description = null) =>
+ activity.SetActivityStatus(ActivityStatus.Ok(description));
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static Activity StatusError(this Activity activity, Exception exception) =>
+ activity.SetActivityStatus(ActivityStatus.Error(exception));
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static Activity RecordException(this Activity activity, Exception? exception) {
+ if (exception is null) return activity;
+
+ var ex = exception is AggregateException aex ? aex.Flatten() : exception;
+
+ var tags = new ActivityTagsCollection {
+ { TelemetryTags.Exception.Type, ex.GetType().FullName },
+ { TelemetryTags.Exception.Stacktrace, ex.ToInvariantString() }
+ };
+
+ if (!string.IsNullOrWhiteSpace(exception.Message))
+ tags.Add(TelemetryTags.Exception.Message, ex.Message);
+
+ activity.AddEvent(new ActivityEvent(TelemetryTags.Exception.EventName, default, tags));
+
+ return activity;
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static Activity SetActivityStatus(this Activity activity, ActivityStatus status) {
+ var statusCode = ActivityStatusCodeHelper.GetTagValueForStatusCode(status.StatusCode);
+
+ activity.SetStatus(status.StatusCode, status.Description);
+ activity.SetTag(TelemetryTags.Otel.StatusCode, statusCode);
+ activity.SetTag(TelemetryTags.Otel.StatusDescription, status.Description);
+
+ return activity.IsAllDataRequested ? activity.RecordException(status.Exception) : activity;
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatus.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatus.cs
new file mode 100644
index 000000000..bab790b6c
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatus.cs
@@ -0,0 +1,13 @@
+// ReSharper disable CheckNamespace
+
+using System.Diagnostics;
+
+namespace Kurrent.Diagnostics;
+
+record ActivityStatus(ActivityStatusCode StatusCode, string? Description, Exception? Exception) {
+ public static ActivityStatus Ok(string? description = null) =>
+ new(ActivityStatusCode.Ok, description, null);
+
+ public static ActivityStatus Error(Exception exception, string? description = null) =>
+ new(ActivityStatusCode.Error, description ?? exception.Message, exception);
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatusCodeHelper.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatusCodeHelper.cs
new file mode 100644
index 000000000..592501d11
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityStatusCodeHelper.cs
@@ -0,0 +1,24 @@
+// ReSharper disable CheckNamespace
+
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+
+using static System.Diagnostics.ActivityStatusCode;
+using static System.StringComparison;
+
+namespace Kurrent.Diagnostics;
+
+static class ActivityStatusCodeHelper {
+ public const string UnsetStatusCodeTagValue = "UNSET";
+ public const string OkStatusCodeTagValue = "OK";
+ public const string ErrorStatusCodeTagValue = "ERROR";
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static string? GetTagValueForStatusCode(ActivityStatusCode statusCode) =>
+ statusCode switch {
+ Unset => UnsetStatusCodeTagValue,
+ Error => ErrorStatusCodeTagValue,
+ Ok => OkStatusCodeTagValue,
+ _ => null
+ };
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityTagsCollectionExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityTagsCollectionExtensions.cs
new file mode 100644
index 000000000..2c8c8b291
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ActivityTagsCollectionExtensions.cs
@@ -0,0 +1,25 @@
+// ReSharper disable CheckNamespace
+
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+
+namespace Kurrent.Diagnostics;
+
+static class ActivityTagsCollectionExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithRequiredTag(this ActivityTagsCollection source, string key, object? value) {
+ source[key] = value ?? throw new ArgumentNullException(key);
+ return source;
+ }
+
+ ///
+ /// - If the key previously existed in the collection and the value is , the collection item matching the key will get removed from the collection.
+ /// - If the key previously existed in the collection and the value is not , the value will replace the old value stored in the collection.
+ /// - Otherwise, a new item will get added to the collection.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityTagsCollection WithOptionalTag(this ActivityTagsCollection source, string key, object? value) {
+ source[key] = value;
+ return source;
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/ExceptionExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ExceptionExtensions.cs
new file mode 100644
index 000000000..7eb397251
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/ExceptionExtensions.cs
@@ -0,0 +1,25 @@
+// ReSharper disable CheckNamespace
+
+using System.Globalization;
+
+namespace Kurrent.Diagnostics;
+
+static class ExceptionExtensions {
+ ///
+ /// Returns a culture-independent string representation of the given object,
+ /// appropriate for diagnostics tracing.
+ ///
+ /// Exception to convert to string.
+ /// Exception as string with no culture.
+ public static string ToInvariantString(this Exception exception) {
+ var originalUiCulture = Thread.CurrentThread.CurrentUICulture;
+
+ try {
+ Thread.CurrentThread.CurrentUICulture = CultureInfo.InvariantCulture;
+ return exception.ToString();
+ }
+ finally {
+ Thread.CurrentThread.CurrentUICulture = originalUiCulture;
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/Telemetry/TelemetryTags.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Telemetry/TelemetryTags.cs
new file mode 100644
index 000000000..54487e3bd
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Telemetry/TelemetryTags.cs
@@ -0,0 +1,35 @@
+// ReSharper disable CheckNamespace
+
+namespace Kurrent.Diagnostics.Telemetry;
+
+// The attributes below match the specification of v1.24.0 of the Open Telemetry semantic conventions.
+// Some attributes are ignored where not required or relevant.
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/general/trace.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/database/database-spans.md
+// https://github.com/open-telemetry/semantic-conventions/blob/v1.24.0/docs/exceptions/exceptions-spans.md
+
+static partial class TelemetryTags {
+ public static class Database {
+ public const string User = "db.user";
+ public const string System = "db.system";
+ public const string Operation = "db.operation";
+ }
+
+ public static class Server {
+ public const string Address = "server.address";
+ public const string Port = "server.port";
+ public const string SocketAddress = "server.socket.address"; // replaces: "net.peer.ip" (AttributeNetPeerIp)
+ }
+
+ public static class Exception {
+ public const string EventName = "exception";
+ public const string Type = "exception.type";
+ public const string Message = "exception.message";
+ public const string Stacktrace = "exception.stacktrace";
+ }
+
+ public static class Otel {
+ public const string StatusCode = "otel.status_code";
+ public const string StatusDescription = "otel.status_description";
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingConstants.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingConstants.cs
new file mode 100644
index 000000000..1e94c9ef0
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingConstants.cs
@@ -0,0 +1,10 @@
+// ReSharper disable CheckNamespace
+
+namespace Kurrent.Diagnostics.Tracing;
+
+static partial class TracingConstants {
+ public static class Metadata {
+ public const string TraceId = "$traceId";
+ public const string SpanId = "$spanId";
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingMetadata.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingMetadata.cs
new file mode 100644
index 000000000..7580753a7
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Core/Tracing/TracingMetadata.cs
@@ -0,0 +1,32 @@
+// ReSharper disable CheckNamespace
+
+using System.Diagnostics;
+using System.Text.Json.Serialization;
+
+namespace Kurrent.Diagnostics.Tracing;
+
+readonly record struct TracingMetadata(
+ [property: JsonPropertyName(TracingConstants.Metadata.TraceId)]
+ string? TraceId,
+ [property: JsonPropertyName(TracingConstants.Metadata.SpanId)]
+ string? SpanId
+) {
+ public static readonly TracingMetadata None = new(null, null);
+
+ [JsonIgnore] public bool IsValid => TraceId != null && SpanId != null;
+
+ public ActivityContext? ToActivityContext(bool isRemote = true) {
+ try {
+ return IsValid
+ ? new ActivityContext(
+ ActivityTraceId.CreateFromString(new ReadOnlySpan(TraceId!.ToCharArray())),
+ ActivitySpanId.CreateFromString(new ReadOnlySpan(SpanId!.ToCharArray())),
+ ActivityTraceFlags.Recorded,
+ isRemote: isRemote
+ )
+ : default;
+ } catch (Exception) {
+ return default;
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs
new file mode 100644
index 000000000..d45b53156
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/EventMetadataExtensions.cs
@@ -0,0 +1,84 @@
+using System.Diagnostics;
+using System.Runtime.CompilerServices;
+using System.Text.Json;
+using Kurrent.Diagnostics;
+using Kurrent.Diagnostics.Tracing;
+
+namespace EventStore.Client.Diagnostics;
+
+static class EventMetadataExtensions {
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ReadOnlySpan InjectTracingContext(
+ this ReadOnlyMemory eventMetadata, Activity? activity
+ ) =>
+ eventMetadata.InjectTracingMetadata(activity?.GetTracingMetadata() ?? TracingMetadata.None);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static ActivityContext? ExtractPropagationContext(this ReadOnlyMemory eventMetadata) =>
+ eventMetadata.ExtractTracingMetadata().ToActivityContext(isRemote: true);
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static TracingMetadata ExtractTracingMetadata(this ReadOnlyMemory eventMetadata) {
+ if (eventMetadata.IsEmpty)
+ return TracingMetadata.None;
+
+ var reader = new Utf8JsonReader(eventMetadata.Span);
+ try {
+ if (!JsonDocument.TryParseValue(ref reader, out var doc))
+ return TracingMetadata.None;
+
+ using (doc) {
+ if (!doc.RootElement.TryGetProperty(TracingConstants.Metadata.TraceId, out var traceId)
+ || !doc.RootElement.TryGetProperty(TracingConstants.Metadata.SpanId, out var spanId))
+ return TracingMetadata.None;
+
+ return new TracingMetadata(traceId.GetString(), spanId.GetString());
+ }
+ } catch (Exception) {
+ return TracingMetadata.None;
+ }
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static ReadOnlySpan InjectTracingMetadata(
+ this ReadOnlyMemory eventMetadata, TracingMetadata tracingMetadata
+ ) {
+ if (tracingMetadata == TracingMetadata.None || !tracingMetadata.IsValid)
+ return eventMetadata.Span;
+
+ return eventMetadata.IsEmpty
+ ? JsonSerializer.SerializeToUtf8Bytes(tracingMetadata)
+ : TryInjectTracingMetadata(eventMetadata, tracingMetadata).ToArray();
+ }
+
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ static ReadOnlyMemory TryInjectTracingMetadata(
+ this ReadOnlyMemory utf8Json, TracingMetadata tracingMetadata
+ ) {
+ try {
+ using var doc = JsonDocument.Parse(utf8Json);
+ using var stream = new MemoryStream();
+ using var writer = new Utf8JsonWriter(stream);
+
+ writer.WriteStartObject();
+
+ if (doc.RootElement.ValueKind != JsonValueKind.Object)
+ return utf8Json;
+
+ foreach (var prop in doc.RootElement.EnumerateObject())
+ prop.WriteTo(writer);
+
+ writer.WritePropertyName(TracingConstants.Metadata.TraceId);
+ writer.WriteStringValue(tracingMetadata.TraceId);
+ writer.WritePropertyName(TracingConstants.Metadata.SpanId);
+ writer.WriteStringValue(tracingMetadata.SpanId);
+
+ writer.WriteEndObject();
+ writer.Flush();
+
+ return stream.ToArray();
+ } catch (Exception) {
+ return utf8Json;
+ }
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/KurrentClientDiagnostics.cs b/src/Kurrent.Client/Core/Common/Diagnostics/KurrentClientDiagnostics.cs
new file mode 100644
index 000000000..48e437463
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/KurrentClientDiagnostics.cs
@@ -0,0 +1,8 @@
+using System.Diagnostics;
+
+namespace EventStore.Client.Diagnostics;
+
+public static class KurrentClientDiagnostics {
+ public const string InstrumentationName = "kurrent";
+ public static readonly ActivitySource ActivitySource = new(InstrumentationName);
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Telemetry/TelemetryTags.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Telemetry/TelemetryTags.cs
new file mode 100644
index 000000000..e4e0b11f9
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Telemetry/TelemetryTags.cs
@@ -0,0 +1,12 @@
+// ReSharper disable CheckNamespace
+
+namespace Kurrent.Diagnostics.Telemetry;
+
+static partial class TelemetryTags {
+ public static class Kurrent {
+ public const string Stream = "db.kurrent.stream";
+ public const string SubscriptionId = "db.kurrent.subscription.id";
+ public const string EventId = "db.kurrent.event.id";
+ public const string EventType = "db.kurrent.event.type";
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/Diagnostics/Tracing/TracingConstants.cs b/src/Kurrent.Client/Core/Common/Diagnostics/Tracing/TracingConstants.cs
new file mode 100644
index 000000000..357654570
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Diagnostics/Tracing/TracingConstants.cs
@@ -0,0 +1,10 @@
+// ReSharper disable CheckNamespace
+
+namespace Kurrent.Diagnostics.Tracing;
+
+static partial class TracingConstants {
+ public static class Operations {
+ public const string Append = "streams.append";
+ public const string Subscribe = "streams.subscribe";
+ }
+}
diff --git a/src/Kurrent.Client/Core/Common/EnumerableTaskExtensions.cs b/src/Kurrent.Client/Core/Common/EnumerableTaskExtensions.cs
new file mode 100644
index 000000000..5be066ab5
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/EnumerableTaskExtensions.cs
@@ -0,0 +1,11 @@
+using System.Diagnostics;
+
+namespace EventStore.Client;
+
+static class EnumerableTaskExtensions {
+ [DebuggerStepThrough]
+ public static Task WhenAll(this IEnumerable source) => Task.WhenAll(source);
+
+ [DebuggerStepThrough]
+ public static Task WhenAll(this IEnumerable> source) => Task.WhenAll(source);
+}
diff --git a/src/Kurrent.Client/Core/Common/EpochExtensions.cs b/src/Kurrent.Client/Core/Common/EpochExtensions.cs
new file mode 100644
index 000000000..0643bcecb
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/EpochExtensions.cs
@@ -0,0 +1,25 @@
+namespace EventStore.Client;
+
+static class EpochExtensions {
+#if NET
+ static readonly DateTime UnixEpoch = DateTime.UnixEpoch;
+#else
+ const long TicksPerMillisecond = 10000;
+ const long TicksPerSecond = TicksPerMillisecond * 1000;
+ const long TicksPerMinute = TicksPerSecond * 60;
+ const long TicksPerHour = TicksPerMinute * 60;
+ const long TicksPerDay = TicksPerHour * 24;
+ const int DaysPerYear = 365;
+ const int DaysPer4Years = DaysPerYear * 4 + 1;
+ const int DaysPer100Years = DaysPer4Years * 25 - 1;
+ const int DaysPer400Years = DaysPer100Years * 4 + 1;
+ const int DaysTo1970 = DaysPer400Years * 4 + DaysPer100Years * 3 + DaysPer4Years * 17 + DaysPerYear;
+ const long UnixEpochTicks = DaysTo1970 * TicksPerDay;
+
+ static readonly DateTime UnixEpoch = new(UnixEpochTicks, DateTimeKind.Utc);
+#endif
+
+ public static DateTime FromTicksSinceEpoch(this long value) => new(UnixEpoch.Ticks + value, DateTimeKind.Utc);
+
+ public static long ToTicksSinceEpoch(this DateTime value) => (value - UnixEpoch).Ticks;
+}
diff --git a/src/Kurrent.Client/Core/Common/KurrentCallOptions.cs b/src/Kurrent.Client/Core/Common/KurrentCallOptions.cs
new file mode 100644
index 000000000..0644cffea
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/KurrentCallOptions.cs
@@ -0,0 +1,74 @@
+using Grpc.Core;
+using static System.Threading.Timeout;
+
+namespace EventStore.Client;
+
+static class KurrentCallOptions {
+ // deadline falls back to infinity
+ public static CallOptions CreateStreaming(
+ KurrentClientSettings settings,
+ TimeSpan? deadline = null, UserCredentials? userCredentials = null,
+ CancellationToken cancellationToken = default
+ ) =>
+ Create(settings, deadline, userCredentials, cancellationToken);
+
+ // deadline falls back to connection DefaultDeadline
+ public static CallOptions CreateNonStreaming(
+ KurrentClientSettings settings,
+ CancellationToken cancellationToken
+ ) =>
+ Create(
+ settings,
+ settings.DefaultDeadline,
+ settings.DefaultCredentials,
+ cancellationToken
+ );
+
+ public static CallOptions CreateNonStreaming(
+ KurrentClientSettings settings, TimeSpan? deadline,
+ UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) =>
+ Create(
+ settings,
+ deadline ?? settings.DefaultDeadline,
+ userCredentials,
+ cancellationToken
+ );
+
+ static CallOptions Create(
+ KurrentClientSettings settings, TimeSpan? deadline,
+ UserCredentials? userCredentials, CancellationToken cancellationToken
+ ) =>
+ new(
+ cancellationToken: cancellationToken,
+ deadline: DeadlineAfter(deadline),
+ headers: new() {
+ {
+ Constants.Headers.RequiresLeader,
+ settings.ConnectivitySettings.NodePreference == NodePreference.Leader
+ ? bool.TrueString
+ : bool.FalseString
+ }
+ },
+ credentials: (userCredentials ?? settings.DefaultCredentials) == null
+ ? null
+ : CallCredentials.FromInterceptor(
+ async (_, metadata) => {
+ var credentials = userCredentials ?? settings.DefaultCredentials;
+
+ var authorizationHeader = await settings.OperationOptions
+ .GetAuthenticationHeaderValue(credentials!, CancellationToken.None)
+ .ConfigureAwait(false);
+
+ metadata.Add(Constants.Headers.Authorization, authorizationHeader);
+ }
+ )
+ );
+
+ static DateTime? DeadlineAfter(TimeSpan? timeoutAfter) =>
+ !timeoutAfter.HasValue
+ ? new DateTime?()
+ : timeoutAfter.Value == TimeSpan.MaxValue || timeoutAfter.Value == InfiniteTimeSpan
+ ? DateTime.MaxValue
+ : DateTime.UtcNow.Add(timeoutAfter.Value);
+}
diff --git a/src/Kurrent.Client/Core/Common/MetadataExtensions.cs b/src/Kurrent.Client/Core/Common/MetadataExtensions.cs
new file mode 100644
index 000000000..e7311c37f
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/MetadataExtensions.cs
@@ -0,0 +1,29 @@
+using Grpc.Core;
+
+namespace EventStore.Client;
+
+static class MetadataExtensions {
+ public static bool TryGetValue(this Metadata metadata, string key, out string? value) {
+ value = default;
+
+ foreach (var entry in metadata) {
+ if (entry.Key != key)
+ continue;
+
+ value = entry.Value;
+ return true;
+ }
+
+ return false;
+ }
+
+ public static StreamRevision GetStreamRevision(this Metadata metadata, string key) =>
+ metadata.TryGetValue(key, out var s) && ulong.TryParse(s, out var value)
+ ? new(value)
+ : StreamRevision.None;
+
+ public static int GetIntValueOrDefault(this Metadata metadata, string key) =>
+ metadata.TryGetValue(key, out var s) && int.TryParse(s, out var value)
+ ? value
+ : default;
+}
diff --git a/src/Kurrent.Client/Core/Common/Shims/Index.cs b/src/Kurrent.Client/Core/Common/Shims/Index.cs
new file mode 100644
index 000000000..357bbd34d
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Shims/Index.cs
@@ -0,0 +1,110 @@
+#if !NET
+using System.Runtime.CompilerServices;
+
+namespace System;
+
+/// Represent a type can be used to index a collection either from the start or the end.
+///
+/// Index is used by the C# compiler to support the new index syntax
+///
+/// int[] someArray = new int[5] { 1, 2, 3, 4, 5 } ;
+/// int lastElement = someArray[^1]; // lastElement = 5
+///
+///
+readonly struct Index : IEquatable {
+ readonly int _value;
+
+ /// Construct an Index using a value and indicating if the index is from the start or from the end.
+ /// The index value. it has to be zero or positive number.
+ /// Indicating if the index is from the start or from the end.
+ ///
+ /// If the Index constructed from the end, index value 1 means pointing at the last element and index value 0 means pointing at beyond last element.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public Index(int value, bool fromEnd = false) {
+ if (value < 0) throw new ArgumentOutOfRangeException(nameof(value), "value must be non-negative");
+
+ if (fromEnd)
+ _value = ~value;
+ else
+ _value = value;
+ }
+
+ // The following private constructors mainly created for perf reason to avoid the checks
+ Index(int value) => _value = value;
+
+ /// Create an Index pointing at first element.
+ public static Index Start => new(0);
+
+ /// Create an Index pointing at beyond last element.
+ public static Index End => new(~0);
+
+ /// Create an Index from the start at the position indicated by the value.
+ /// The index value from the start.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static Index FromStart(int value) {
+ if (value < 0) throw new ArgumentOutOfRangeException(nameof(value), "value must be non-negative");
+
+ return new Index(value);
+ }
+
+ /// Create an Index from the end at the position indicated by the value.
+ /// The index value from the end.
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public static Index FromEnd(int value) {
+ if (value < 0) throw new ArgumentOutOfRangeException(nameof(value), "value must be non-negative");
+
+ return new Index(~value);
+ }
+
+ /// Returns the index value.
+ public int Value {
+ get {
+ if (_value < 0)
+ return ~_value;
+ else
+ return _value;
+ }
+ }
+
+ /// Indicates whether the index is from the start or the end.
+ public bool IsFromEnd => _value < 0;
+
+ /// Calculate the offset from the start using the giving collection length.
+ /// The length of the collection that the Index will be used with. length has to be a positive value
+ ///
+ /// For performance reason, we don't validate the input length parameter and the returned offset value against negative values.
+ /// we don't validate either the returned offset is greater than the input length.
+ /// It is expected Index will be used with collections which always have non negative length/count. If the returned offset is negative and
+ /// then used to index a collection will get out of range exception which will be same affect as the validation.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public int GetOffset(int length) {
+ var offset = _value;
+ if (IsFromEnd)
+ // offset = length - (~value)
+ // offset = length + (~(~value) + 1)
+ // offset = length + value + 1
+ offset += length + 1;
+
+ return offset;
+ }
+
+ /// Indicates whether the current Index object is equal to another object of the same type.
+ /// An object to compare with this object
+ public override bool Equals(object? value) => value is Index && _value == ((Index)value)._value;
+
+ /// Indicates whether the current Index object is equal to another Index object.
+ /// An object to compare with this object
+ public bool Equals(Index other) => _value == other._value;
+
+ /// Returns the hash code for this instance.
+ public override int GetHashCode() => _value;
+
+ /// Converts integer number to an Index.
+ public static implicit operator Index(int value) => FromStart(value);
+
+ /// Converts the value of the current Index object to its equivalent string representation.
+ public override string ToString() => IsFromEnd ? $"^{(uint)Value}" : ((uint)Value).ToString();
+}
+#endif
\ No newline at end of file
diff --git a/src/Kurrent.Client/Core/Common/Shims/IsExternalInit.cs b/src/Kurrent.Client/Core/Common/Shims/IsExternalInit.cs
new file mode 100644
index 000000000..7dc4fea3d
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Shims/IsExternalInit.cs
@@ -0,0 +1,10 @@
+#if !NET
+
+using System.ComponentModel;
+
+// ReSharper disable once CheckNamespace
+namespace System.Runtime.CompilerServices;
+
+[EditorBrowsable(EditorBrowsableState.Never)]
+class IsExternalInit{}
+#endif
diff --git a/src/Kurrent.Client/Core/Common/Shims/Range.cs b/src/Kurrent.Client/Core/Common/Shims/Range.cs
new file mode 100644
index 000000000..3a0b34fde
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Shims/Range.cs
@@ -0,0 +1,75 @@
+#if !NET
+// ReSharper disable CheckNamespace
+
+using System.Runtime.CompilerServices;
+
+namespace System;
+
+/// Represent a range has start and end indexes.
+///
+/// Range is used by the C# compiler to support the range syntax.
+///
+/// int[] someArray = new int[5] { 1, 2, 3, 4, 5 };
+/// int[] subArray1 = someArray[0..2]; // { 1, 2 }
+/// int[] subArray2 = someArray[1..^0]; // { 2, 3, 4, 5 }
+///
+///
+readonly struct Range : IEquatable {
+ /// Represent the inclusive start index of the Range.
+ public Index Start { get; }
+
+ /// Represent the exclusive end index of the Range.
+ public Index End { get; }
+
+ /// Construct a Range object using the start and end indexes.
+ /// Represent the inclusive start index of the range.
+ /// Represent the exclusive end index of the range.
+ public Range(Index start, Index end) {
+ Start = start;
+ End = end;
+ }
+
+ /// Indicates whether the current Range object is equal to another object of the same type.
+ /// An object to compare with this object
+ public override bool Equals(object? value) =>
+ value is Range r &&
+ r.Start.Equals(Start) &&
+ r.End.Equals(End);
+
+ /// Indicates whether the current Range object is equal to another Range object.
+ /// An object to compare with this object
+ public bool Equals(Range other) => other.Start.Equals(Start) && other.End.Equals(End);
+
+ /// Returns the hash code for this instance.
+ public override int GetHashCode() => Start.GetHashCode() * 31 + End.GetHashCode();
+
+ /// Converts the value of the current Range object to its equivalent string representation.
+ public override string ToString() => $"{Start}..{End}";
+
+ /// Create a Range object starting from start index to the end of the collection.
+ public static Range StartAt(Index start) => new(start, Index.End);
+
+ /// Create a Range object starting from first element in the collection to the end Index.
+ public static Range EndAt(Index end) => new(Index.Start, end);
+
+ /// Create a Range object starting from first element to the end.
+ public static Range All => new(Index.Start, Index.End);
+
+ /// Calculate the start offset and length of range object using a collection length.
+ /// The length of the collection that the range will be used with. length has to be a positive value.
+ ///
+ /// For performance reason, we don't validate the input length parameter against negative values.
+ /// It is expected Range will be used with collections which always have non negative length/count.
+ /// We validate the range is inside the length scope though.
+ ///
+ [MethodImpl(MethodImplOptions.AggressiveInlining)]
+ public (int Offset, int Length) GetOffsetAndLength(int length) {
+ var start = Start.GetOffset(length);
+ var end = End.GetOffset(length);
+
+ if ((uint)end > (uint)length || (uint)start > (uint)end) throw new ArgumentOutOfRangeException(nameof(length));
+
+ return (start, end - start);
+ }
+}
+#endif
\ No newline at end of file
diff --git a/src/Kurrent.Client/Core/Common/Shims/TaskCompletionSource.cs b/src/Kurrent.Client/Core/Common/Shims/TaskCompletionSource.cs
new file mode 100644
index 000000000..ad6573c4a
--- /dev/null
+++ b/src/Kurrent.Client/Core/Common/Shims/TaskCompletionSource.cs
@@ -0,0 +1,11 @@
+#if !NET
+// ReSharper disable CheckNamespace
+
+namespace System.Threading.Tasks;
+
+class TaskCompletionSource : TaskCompletionSource