diff --git a/src/Evento.Repository/EventStoreDomainRepository.cs b/src/Evento.Repository/EventStoreDomainRepository.cs index aca2013..a11ec66 100644 --- a/src/Evento.Repository/EventStoreDomainRepository.cs +++ b/src/Evento.Repository/EventStoreDomainRepository.cs @@ -7,218 +7,281 @@ using Newtonsoft.Json; using Newtonsoft.Json.Linq; -namespace Evento.Repository +namespace Evento.Repository; + +public class EventStoreDomainRepository : DomainRepositoryBase { - public class EventStoreDomainRepository : DomainRepositoryBase + public readonly string Category; + private readonly IEventStoreConnection _connection; + private readonly IKeyReader? _keyReader; + private readonly ICryptoService? _cryptoService; + + private Dictionary? _mapping = new(); + + public static string EventClrTypeHeader = "EventClrTypeName"; + + public EventStoreDomainRepository( + string category, IEventStoreConnection connection, Dictionary? mapping = null) { - public readonly string Category; - private readonly IEventStoreConnection _connection; - private readonly IKeyReader _keyReader; - private readonly ICryptoService _cryptoService; + Category = category; + _connection = connection; + _mapping = mapping; + } - public EventStoreDomainRepository(string category, IEventStoreConnection connection) - { - Category = category; - _connection = connection; - } + public EventStoreDomainRepository( + string category, IEventStoreConnection connection, IKeyReader keyReader, ICryptoService cryptoService) + { + Category = category; + _connection = connection; + _keyReader = keyReader; + _cryptoService = cryptoService; + } - public EventStoreDomainRepository(string category, IEventStoreConnection connection, IKeyReader keyReader, - ICryptoService cryptoService) - { - Category = category; - _connection = connection; - _keyReader = keyReader; - _cryptoService = cryptoService; - } + private string AggregateToStreamName(Type type, string id) + { + return $"{Category}-{type.Name}-{id}"; + } - private string AggregateToStreamName(Type type, string id) + private async Task SaveAsyncInternal(string streamName, long expectedVersion, EventData[] eventData) + { + try { - return $"{Category}-{type.Name}-{id}"; + return await _connection.AppendToStreamAsync(streamName, expectedVersion, eventData); } - - public override TResult GetById(string id, int eventsToLoad) + catch (AggregateException) { - var streamName = AggregateToStreamName(typeof(TResult), id); - var eventsSlice = _connection.ReadStreamEventsForwardAsync(streamName, 0, eventsToLoad, false); - if (eventsSlice.Result.Status == SliceReadStatus.StreamNotFound) - throw new AggregateNotFoundException("Could not found aggregate of type " + typeof(TResult) + - " and id " + id); - var deserializedEvents = eventsSlice.Result.Events.Select(e => - DeserializeObject(e.OriginalEvent.Data, e.OriginalEvent.Metadata) as Event); - return BuildAggregate(deserializedEvents); + // Try to save using ExpectedVersion.Any to swallow silently WrongExpectedVersion error + return await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData); } + } - public override void DeleteAggregate(string correlationId, bool hard) + public override IEnumerable Save(TAggregate aggregate) + { + // Synchronous save operation + var streamName = AggregateToStreamName(aggregate.GetType(), aggregate.AggregateId); + var events = aggregate.UncommitedEvents().ToList(); + var originalVersion = CalculateExpectedVersion(aggregate, events); + var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1; + var eventData = events.Select(CreateEventData).ToArray(); + try { - var streamName = AggregateToStreamName(typeof(TAggregate), correlationId); - _connection.DeleteStreamAsync(streamName, ExpectedVersion.Any, hard).Wait(); + if (events.Count > 0) + _connection.AppendToStreamAsync(streamName, expectedVersion, eventData).Wait(); } - - public override TResult GetById(string correlationId) + catch (AggregateException) { - var streamName = AggregateToStreamName(typeof(TResult), correlationId); - var streamEvents = new List(); - StreamEventsSlice currentSlice; - var nextSliceStart = StreamPosition.Start; - do - { - currentSlice = _connection.ReadStreamEventsForwardAsync(streamName, nextSliceStart, 200, false).Result; - if (currentSlice.Status == SliceReadStatus.StreamNotFound) - throw new AggregateNotFoundException("Could not found aggregate of type " + typeof(TResult) + - " and id " + correlationId); - nextSliceStart = (int)currentSlice.NextEventNumber; - streamEvents.AddRange(currentSlice.Events); - } while (!currentSlice.IsEndOfStream); - var deserializedEvents = streamEvents.Select(e => - DeserializeObject(e.Event.Data, e.Event.Metadata) as Event); - return BuildAggregate(deserializedEvents); + // Try to save using ExpectedVersion.Any to swallow silently WrongExpectedVersion error + _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData).Wait(); } + aggregate.ClearUncommitedEvents(); + return events; + } + + public override async Task> SaveAsync(TAggregate aggregate) + { + var streamName = AggregateToStreamName(aggregate.GetType(), aggregate.AggregateId); + var events = aggregate.UncommitedEvents().ToList(); + var originalVersion = CalculateExpectedVersion(aggregate, events); + var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1; + var eventData = events.Select(CreateEventData).ToArray(); + await SaveAsyncInternal(streamName, expectedVersion, eventData); + aggregate.ClearUncommitedEvents(); + return events; + } - private EventData CreateEventData(object @event) + public override TResult GetById(string id) + { + return GetByIdInternal(id, null); + } + + public override TResult GetById(string id, int eventsToLoad) + { + return GetByIdInternal(id, eventsToLoad); + } + + private TResult GetByIdInternal(string id, int? eventsToLoad) where TResult : IAggregate, new() + { + var streamName = AggregateToStreamName(typeof(TResult), id); + var streamEvents = LoadEvents(streamName, eventsToLoad); + + var deserializedEvents = streamEvents.Select(e => DeserializeObject(e) as Event); + + return BuildAggregate(deserializedEvents); + } + + private List LoadEvents(string streamName, int? maxEventsToLoad) + { + const int batchSize = 200; + var allEvents = new List(); + var nextSliceStart = StreamPosition.Start; + var eventsLoaded = 0; + + while (true) { - IDictionary metadata; - var originalEventType = @event.GetType().Name; - if (((Event)@event).Metadata != null) - { - metadata = ((Event)@event).Metadata; - if (!metadata.ContainsKey("$correlationId")) - throw new Exception("The event metadata must contains a $correlationId"); - if (!metadata.ContainsKey(EventClrTypeHeader)) - metadata.Add(EventClrTypeHeader, @event.GetType().AssemblyQualifiedName); - else - metadata[EventClrTypeHeader] = @event.GetType().AssemblyQualifiedName; - // Remove the metadata from the event body - var tmp = (IDictionary)@event.ToDynamic(); - tmp.Remove("Metadata"); - @event = tmp; - } - else - throw new Exception("The event must have a $correlationId present in its metadata"); - var eventDataHeaders = SerializeObject(metadata); - byte[] data; - if (_cryptoService != null && _keyReader != null) - { - if (string.IsNullOrWhiteSpace(metadata["encrypt"])) - throw new Exception("id not found in the encrypt metadata field"); - var encryptionKey = _keyReader.Get(metadata["encrypt"]); - if (string.IsNullOrWhiteSpace(encryptionKey)) - throw new Exception("key not found with the given id"); - data = SerializeAndEncryptObject(@event, encryptionKey); - } - else - data = SerializeObject(@event); + var eventsToFetch = CalculateBatchSize(maxEventsToLoad, eventsLoaded, batchSize); + var currentSlice = _connection.ReadStreamEventsForwardAsync(streamName, nextSliceStart, eventsToFetch, false).Result; + + if (currentSlice.Status == SliceReadStatus.StreamNotFound) + throw new AggregateNotFoundException($"Could not find aggregate with id {streamName}"); - var eventData = new EventData(Guid.NewGuid(), originalEventType, true, data, eventDataHeaders); - return eventData; + allEvents.AddRange(currentSlice.Events); + eventsLoaded += currentSlice.Events.Length; + nextSliceStart = (int)currentSlice.NextEventNumber; + + if (IsEndOfStream(currentSlice, maxEventsToLoad, eventsLoaded)) + break; } - private byte[] SerializeAndEncryptObject(object obj, string key) + return allEvents; + } + + private static int CalculateBatchSize(int? total, int loaded, int batchSize) + { + return total.HasValue ? Math.Min(batchSize, total.Value - loaded) : batchSize; + } + + private static bool IsEndOfStream(StreamEventsSlice currentSlice, int? maxEventsToLoad, int eventsLoaded) + { + return currentSlice.IsEndOfStream || eventsLoaded >= maxEventsToLoad; + } + + public override void DeleteAggregate(string correlationId, bool hard) + { + var streamName = AggregateToStreamName(typeof(TAggregate), correlationId); + _connection.DeleteStreamAsync(streamName, ExpectedVersion.Any, hard).Wait(); + } + + private EventData CreateEventData(object @event) + { + IDictionary metadata; + var originalEventType = @event.GetType().Name; + if (((Event)@event).Metadata != null) { - var jsonString = JsonConvert.SerializeObject(obj); - var data = Encoding.UTF8.GetBytes(Convert.ToBase64String(_cryptoService.Encrypt(jsonString, key))); - return data; + metadata = ((Event)@event).Metadata; + if (!metadata.ContainsKey("$correlationId")) + throw new Exception("The event metadata must contains a $correlationId"); + if (!metadata.ContainsKey(EventClrTypeHeader)) + metadata.Add(EventClrTypeHeader, @event.GetType().AssemblyQualifiedName); + else + metadata[EventClrTypeHeader] = @event.GetType().AssemblyQualifiedName; + // Remove the metadata from the event body + var tmp = (IDictionary)@event.ToDynamic(); + tmp.Remove("Metadata"); + @event = tmp; } - - private static byte[] SerializeObject(object obj) + else + throw new Exception("The event must have a $correlationId present in its metadata"); + var eventDataHeaders = SerializeObject(metadata); + byte[] data; + if (_cryptoService != null && _keyReader != null) { - var jsonObj = JsonConvert.SerializeObject(obj); - var data = Encoding.UTF8.GetBytes(jsonObj); - return data; + if (string.IsNullOrWhiteSpace(metadata["encrypt"])) + throw new Exception("id not found in the encrypt metadata field"); + var encryptionKey = _keyReader.Get(metadata["encrypt"]); + if (string.IsNullOrWhiteSpace(encryptionKey)) + throw new Exception("key not found with the given id"); + data = SerializeAndEncryptObject(@event, encryptionKey); } + else + data = SerializeObject(@event); + + var eventData = new EventData(Guid.NewGuid(), originalEventType, true, data, eventDataHeaders); + return eventData; + } + + private byte[] SerializeAndEncryptObject(object obj, string key) + { + var jsonString = JsonConvert.SerializeObject(obj); + var data = Encoding.UTF8.GetBytes(Convert.ToBase64String(_cryptoService.Encrypt(jsonString, key))); + return data; + } + + private static byte[] SerializeObject(object obj) + { + var jsonObj = JsonConvert.SerializeObject(obj); + var data = Encoding.UTF8.GetBytes(jsonObj); + return data; + } + + + private static T? DeserializeObject(byte[] data) where T : class + { + var aqn = typeof(T).AssemblyQualifiedName; + + if (aqn is null) + return null; + + return (T?)DeserializeObject(data, aqn); + } - public override IEnumerable Save(TAggregate aggregate) + private static object? DeserializeObject(byte[] data, string typeName) + { + try { - // Synchronous save operation - var streamName = AggregateToStreamName(aggregate.GetType(), aggregate.AggregateId); - var events = aggregate.UncommitedEvents().ToList(); - var originalVersion = CalculateExpectedVersion(aggregate, events); - var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1; - var eventData = events.Select(CreateEventData).ToArray(); - try - { - if (events.Count > 0) - _connection.AppendToStreamAsync(streamName, expectedVersion, eventData).Wait(); - } - catch (AggregateException) - { - // Try to save using ExpectedVersion.Any to swallow silently WrongExpectedVersion error - _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData).Wait(); - } - aggregate.ClearUncommitedEvents(); - return events; + var jsonString = Encoding.UTF8.GetString(data); + var t = Type.GetType(typeName); + return t is not null + ? JsonConvert.DeserializeObject(jsonString, t) + : throw new Exception(""); } - - public override async Task> SaveAsync(TAggregate aggregate) + catch (Exception) { - var streamName = AggregateToStreamName(aggregate.GetType(), aggregate.AggregateId); - var events = aggregate.UncommitedEvents().ToList(); - var originalVersion = CalculateExpectedVersion(aggregate, events); - var expectedVersion = originalVersion == 0 ? ExpectedVersion.NoStream : originalVersion - 1; - var eventData = events.Select(CreateEventData).ToArray(); - await SaveAsyncInternal(streamName, expectedVersion, eventData); - aggregate.ClearUncommitedEvents(); - return events; + return null; } + } - private async Task SaveAsyncInternal(string streamName, long expectedVersion, EventData[] eventData) + private object? DeserializeObject(ResolvedEvent re) + { + try { - try - { - return await _connection.AppendToStreamAsync(streamName, expectedVersion, eventData); - } - catch (AggregateException) - { - // Try to save using ExpectedVersion.Any to swallow silently WrongExpectedVersion error - return await _connection.AppendToStreamAsync(streamName, ExpectedVersion.Any, eventData); - } - } + var metadataDict = DeserializeObject>(re.Event.Metadata); + if (metadataDict is null || !metadataDict.ContainsKey("$correlationId")) + throw new Exception("The metadata must contain a $correlationId"); - public static string EventClrTypeHeader = "EventClrTypeName"; + var bodyString = Encoding.UTF8.GetString(re.Event.Data); + bodyString = DecryptIfNecessary(bodyString, metadataDict); - public static T DeserializeObject(byte[] data) - { - return (T)(DeserializeObject(data, typeof(T).AssemblyQualifiedName)); - } + var eventObject = JObject.Parse(bodyString); + var metadataObject = JObject.Parse(JsonConvert.SerializeObject(new { metadata = metadataDict })); + eventObject.Merge(metadataObject, new JsonMergeSettings { MergeArrayHandling = MergeArrayHandling.Union }); - public static object DeserializeObject(byte[] data, string typeName) - { - try - { - var jsonString = Encoding.UTF8.GetString(data); - return JsonConvert.DeserializeObject(jsonString, Type.GetType(typeName)); - } - catch (Exception) - { - return null; - } + var targetType = GetEventType(re.Event.EventType, metadataDict); + if (targetType is null) + throw new Exception( + $"Could not determine type to deserialize to: " + + $"EventType {re.Event.EventType} / Header {metadataDict[EventClrTypeHeader]}"); + + return JsonConvert.DeserializeObject(eventObject.ToString(), targetType); } - - private object DeserializeObject(byte[] data, byte[] metadata) + catch (Exception) { - try - { - var dict = DeserializeObject>(metadata); - if (!dict.ContainsKey("$correlationId")) - throw new Exception("The metadata must contains a $correlationId"); - var bodyString = Encoding.UTF8.GetString(data); - if (dict.ContainsKey("encrypt")) - { - if (string.IsNullOrWhiteSpace(dict["encrypt"])) - throw new Exception("id not found in the encrypt metadata field"); - var encryptionKey = _keyReader.Get(dict["encrypt"]); - if (string.IsNullOrWhiteSpace(encryptionKey)) - throw new Exception("key not found with the given id"); - bodyString = _cryptoService.Decrypt(Convert.FromBase64String(bodyString), encryptionKey); - } - var o1 = JObject.Parse(bodyString); - var o2 = JObject.Parse(JsonConvert.SerializeObject(new { metadata = dict })); - o1.Merge(o2, new JsonMergeSettings { MergeArrayHandling = MergeArrayHandling.Union }); - return JsonConvert.DeserializeObject(o1.ToString(), Type.GetType(dict[EventClrTypeHeader])); - } - catch (Exception) - { - return null; - } + return null; } } + + private string DecryptIfNecessary(string bodyString, Dictionary metadataDict) + { + if (_keyReader is null || _cryptoService is null) + throw new Exception("Cryptography services are null."); + + if (!metadataDict.TryGetValue("encrypt", out var encrypt)) + return bodyString; + + if (string.IsNullOrWhiteSpace(encrypt)) + throw new Exception("ID not found in the encrypt metadata field"); + + var encryptionKey = _keyReader.Get(encrypt); + if (string.IsNullOrWhiteSpace(encryptionKey)) + throw new Exception("Key not found with the given ID"); + + return _cryptoService.Decrypt(Convert.FromBase64String(bodyString), encryptionKey); + } + + private Type? GetEventType(string eventType, Dictionary metadataDict) + { + if (_mapping == null || !_mapping.TryGetValue(eventType, out var targetType)) + targetType = Type.GetType(metadataDict[EventClrTypeHeader]); + + return targetType; + } } \ No newline at end of file diff --git a/src/Evento.Repository/Evento.Repository.csproj b/src/Evento.Repository/Evento.Repository.csproj index cb8f499..78548e0 100644 --- a/src/Evento.Repository/Evento.Repository.csproj +++ b/src/Evento.Repository/Evento.Repository.csproj @@ -13,6 +13,8 @@ https://github.com/riccardone/Evento.Repository github EventStore, Event Sourcing, DomainRepository, Repository, DDD + enable + 10 diff --git a/src/Evento.TestClient/Evento.TestClient.csproj b/src/Evento.TestClient/Evento.TestClient.csproj index 2f4d1dd..8a478f6 100644 --- a/src/Evento.TestClient/Evento.TestClient.csproj +++ b/src/Evento.TestClient/Evento.TestClient.csproj @@ -3,10 +3,12 @@ Exe netcoreapp2.2 + enable + 9 - - + + diff --git a/src/Evento.Tests/DomainRepositoryTests.cs b/src/Evento.Tests/DomainRepositoryTests.cs index 337c6e4..4a38c10 100644 --- a/src/Evento.Tests/DomainRepositoryTests.cs +++ b/src/Evento.Tests/DomainRepositoryTests.cs @@ -6,26 +6,61 @@ using Moq; using NUnit.Framework; -namespace Evento.Tests +namespace Evento.Tests; + +[TestFixture] +public class DomainRepositoryTests { - [TestFixture] - public class DomainRepositoryTests + [Test] + public void When_I_save_an_aggregate_I_should_receive_the_saved_events() { - [Test] - public void When_I_save_an_aggregate_I_should_receive_the_saved_events() - { - // Assign - const string correlationId = "correlationidexample-123"; - const string testString = "test"; - var cmd = new CreateFakeCommand(testString, new Dictionary{{"$correlationId", correlationId}}); - var mockConnection = new Mock(); - var repository = new EventStoreDomainRepository("domain", mockConnection.Object); + // Assign + const string correlationId = "correlationidexample-123"; + const string testString = "test"; + var cmd = new CreateFakeCommand(testString, new Dictionary{{"$correlationId", correlationId}}); + var mockConnection = new Mock(); + var repository = new EventStoreDomainRepository("domain", mockConnection.Object); - // Act - var results = repository.Save(new FakeHandler().Handle(cmd)); + // Act + var results = repository.Save(new FakeHandler().Handle(cmd)); - // Assert - Assert.IsTrue(((FakeAggregateCreated)results.Single()).Metadata["$correlationId"].Equals(correlationId)); - } + // Assert + Assert.IsTrue(((FakeAggregateCreated)results.Single()).Metadata["$correlationId"].Equals(correlationId)); + } + + [Test] + public void CanUseCustomMapping() + { + // // Assign + // const string eventTypeName = "CustomEventType"; + // const string correlationId = "correlationidexample-123"; + // var metadata = new Dictionary + // { + // { "$correlationId", correlationId }, + // { EventStoreDomainRepository.EventClrTypeHeader, typeof(CustomEvent).AssemblyQualifiedName! }, + // { "type", eventTypeName }, + // { "created", DateTime.UtcNow.Ticks.ToString() }, + // { "content-type", "application/json" } + // }; + // var resolvedEvent = CreateResolvedEvent(eventTypeName, metadata); + // var mockConnection = new Mock(); + // mockConnection + // .Setup( + // x => x.ReadStreamEventsForwardAsync( + // It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny(), It.IsAny())) + // .ReturnsAsync( + // CreateStreamEventsSlice( + // SliceReadStatus.Success, "stream", StreamPosition.Start, ReadDirection.Forward, + // new[] { resolvedEvent }, StreamPosition.Start + 1, StreamPosition.Start + 1, false)); + // + // var customMapping = new Dictionary { { eventTypeName, typeof(CustomEvent) } }; + // var repository = new EventStoreDomainRepository("domain", mockConnection.Object, customMapping); + // + // // Act + // var result = repository.GetById("test-id"); + // + // // Assert + // Assert.IsNotNull(result); + // Assert.IsTrue(result.Events.OfType().Any()); } -} +} \ No newline at end of file diff --git a/src/Evento.Tests/Evento.Tests.csproj b/src/Evento.Tests/Evento.Tests.csproj index b8f6bbd..8f622d0 100644 --- a/src/Evento.Tests/Evento.Tests.csproj +++ b/src/Evento.Tests/Evento.Tests.csproj @@ -2,8 +2,9 @@ net6.0 - false + enable + 10 diff --git a/src/Evento/DomainRepositoryBase.cs b/src/Evento/DomainRepositoryBase.cs index 05ebb2d..153cb9a 100644 --- a/src/Evento/DomainRepositoryBase.cs +++ b/src/Evento/DomainRepositoryBase.cs @@ -1,29 +1,29 @@ using System.Collections.Generic; using System.Threading.Tasks; -namespace Evento +namespace Evento; + +public abstract class DomainRepositoryBase : IDomainRepository { - public abstract class DomainRepositoryBase : IDomainRepository - { - public abstract IEnumerable Save(TAggregate aggregate) where TAggregate : IAggregate; - public abstract Task> SaveAsync(TAggregate aggregate) where TAggregate : IAggregate; - public abstract TResult GetById(string id) where TResult : IAggregate, new(); - public abstract TResult GetById(string id, int eventsToLoad) where TResult : IAggregate, new(); - public abstract void DeleteAggregate(string correlationId, bool hard); + public abstract IEnumerable Save(TAggregate aggregate) where TAggregate : IAggregate; + public abstract Task> SaveAsync(TAggregate aggregate) where TAggregate : IAggregate; + public abstract TResult GetById(string id) where TResult : IAggregate, new(); + public abstract TResult GetById(string id, int eventsToLoad) where TResult : IAggregate, new(); + public abstract void DeleteAggregate(string correlationId, bool hard); - protected int CalculateExpectedVersion(IAggregate aggregate, List events) - { - var expectedVersion = aggregate.Version - events.Count; - return expectedVersion; - } - protected TResult BuildAggregate(IEnumerable events) where TResult : IAggregate, new() + protected int CalculateExpectedVersion(IAggregate aggregate, List events) + { + var expectedVersion = aggregate.Version - events.Count; + return expectedVersion; + } + + protected TResult BuildAggregate(IEnumerable events) where TResult : IAggregate, new() + { + var result = new TResult(); + foreach (var @event in events) { - var result = new TResult(); - foreach (var @event in events) - { - result.ApplyEvent(@event); - } - return result; + result.ApplyEvent(@event); } + return result; } } \ No newline at end of file diff --git a/src/Evento/Evento.csproj b/src/Evento/Evento.csproj index 3a46239..96bf9aa 100644 --- a/src/Evento/Evento.csproj +++ b/src/Evento/Evento.csproj @@ -13,5 +13,7 @@ git EventStore, DomainRepository, Aggregate, Command Handlers, Event Sourcing 4.3.0.0 + enable + 10 \ No newline at end of file