Skip to content

Commit

Permalink
Outbox
Browse files Browse the repository at this point in the history
  • Loading branch information
phongnguyend committed Jan 24, 2025
1 parent 19fb792 commit 4d402bf
Show file tree
Hide file tree
Showing 30 changed files with 114 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ public class PublishingOutBoxEvent
public string EventSource { get; set; }

public string Payload { get; set; }

public string ActivityId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class MetaData

public string MessageVersion { get; set; }

public string CorrelationId { get; set; }
public string ActivityId { get; set; }

public DateTimeOffset? CreationDateTime { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using ClassifiedAds.Services.Product.Entities;
using MediatR;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace ClassifiedAds.Services.Product.Commands;
Expand Down Expand Up @@ -50,8 +51,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = auditLog.CreatedDateTime,
ObjectId = auditLog.Id.ToString(),
Message = auditLog.AsJsonString(),
Published = false,
Payload = auditLog.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public async Task Handle(PublishEventsCommand command, CancellationToken cancell
Id = eventLog.Id.ToString(),
EventType = eventLog.EventType,
EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name,
Payload = eventLog.Message,
Payload = eventLog.Payload,
ActivityId = eventLog.ActivityId
};

await _messageBus.SendAsync(outbox, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public abstract class OutboxEventBase : Entity<Guid>

public string ObjectId { get; set; }

public string Message { get; set; }
public string Payload { get; set; }

public bool Published { get; set; }

public string ActivityId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Product.Entities;
using MediatR;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Product.Entities;
using MediatR;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Product.Entities;
using MediatR;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using ClassifiedAds.Services.Storage.Constants;
using ClassifiedAds.Services.Storage.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -50,8 +51,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = auditLog.CreatedDateTime,
ObjectId = auditLog.Id.ToString(),
Message = auditLog.AsJsonString(),
Published = false,
Payload = auditLog.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca
Id = eventLog.Id.ToString(),
EventType = eventLog.EventType,
EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name,
Payload = eventLog.Message,
Payload = eventLog.Payload,
ActivityId = eventLog.ActivityId
};

await _messageBus.SendAsync(outbox, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public abstract class OutboxEventBase : Entity<Guid>

public string ObjectId { get; set; }

public string Message { get; set; }
public string Payload { get; set; }

public bool Published { get; set; }

public string ActivityId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Storage.Constants;
using ClassifiedAds.Services.Storage.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Storage.Constants;
using ClassifiedAds.Services.Storage.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using ClassifiedAds.Services.Storage.Constants;
using ClassifiedAds.Services.Storage.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -47,8 +48,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using ClassifiedAds.CrossCuttingConcerns.Logging;
using ClassifiedAds.Domain.Infrastructure.MessageBrokers;
using ClassifiedAds.Services.Storage.DTOs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -27,12 +28,14 @@ public WebhookConsumer(ILogger<WebhookConsumer> logger,

public async Task HandleAsync(FileUploadedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
{
using var activity = ActivityExtensions.StartNew("WebhookConsumer", metaData?.ActivityId);
var url = _configuration["Webhooks:FileUploadedEvent:PayloadUrl"];
await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken);
}

public async Task HandleAsync(FileDeletedEvent data, MetaData metaData, CancellationToken cancellationToken = default)
{
using var activity = ActivityExtensions.StartNew("WebhookConsumer", metaData?.ActivityId);
var url = _configuration["Webhooks:FileDeletedEvent:PayloadUrl"];
await _httpClient.PostAsJsonAsync(url, data.FileEntry, cancellationToken: cancellationToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class FileEntryOutBoxEventPublisher : IOutBoxEventPublisher

public static string[] CanHandleEventTypes()
{
return new string[] { EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted };
return [EventTypeConstants.FileEntryCreated, EventTypeConstants.FileEntryDeleted];
}

public static string CanHandleEventSource()
Expand All @@ -32,11 +32,21 @@ public async Task HandleAsync(PublishingOutBoxEvent outbox, CancellationToken ca
{
if (outbox.EventType == EventTypeConstants.FileEntryCreated)
{
await _messageBus.SendAsync(new FileUploadedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(outbox.Payload) }, cancellationToken: cancellationToken);
await _messageBus.SendAsync(new FileUploadedEvent
{
FileEntry = JsonSerializer.Deserialize<FileEntry>(outbox.Payload)
},
metaData: new MetaData { ActivityId = outbox.ActivityId, MessageId = outbox.Id },
cancellationToken: cancellationToken);
}
else if (outbox.EventType == EventTypeConstants.FileEntryDeleted)
{
await _messageBus.SendAsync(new FileDeletedEvent { FileEntry = JsonSerializer.Deserialize<FileEntry>(outbox.Payload) }, cancellationToken: cancellationToken);
await _messageBus.SendAsync(new FileDeletedEvent
{
FileEntry = JsonSerializer.Deserialize<FileEntry>(outbox.Payload)
},
metaData: new MetaData { ActivityId = outbox.ActivityId, MessageId = outbox.Id },
cancellationToken: cancellationToken);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ public class PublishingOutBoxEvent
public string EventSource { get; set; }

public string Payload { get; set; }

public string ActivityId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class MetaData

public string MessageVersion { get; set; }

public string CorrelationId { get; set; }
public string ActivityId { get; set; }

public DateTimeOffset? CreationDateTime { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public async Task HandleAsync(PublishEventsCommand command, CancellationToken ca
Id = eventLog.Id.ToString(),
EventType = eventLog.EventType,
EventSource = typeof(PublishEventsCommand).Assembly.GetName().Name,
Payload = eventLog.Message,
Payload = eventLog.Payload,
ActivityId = eventLog.ActivityId
};

await _messageBus.SendAsync(outbox, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public abstract class OutboxEventBase : Entity<Guid>

public string ObjectId { get; set; }

public string Message { get; set; }
public string Payload { get; set; }

public bool Published { get; set; }

public string ActivityId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using ClassifiedAds.Modules.Product.Constants;
using ClassifiedAds.Modules.Product.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -45,8 +46,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = auditLog.CreatedDateTime,
ObjectId = auditLog.Id.ToString(),
Message = auditLog.AsJsonString(),
Published = false,
Payload = auditLog.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
Expand All @@ -55,8 +56,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using ClassifiedAds.Modules.Product.Constants;
using ClassifiedAds.Modules.Product.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -45,8 +46,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = auditLog.CreatedDateTime,
ObjectId = auditLog.Id.ToString(),
Message = auditLog.AsJsonString(),
Published = false,
Payload = auditLog.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
Expand All @@ -55,8 +56,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using ClassifiedAds.Modules.Product.Constants;
using ClassifiedAds.Modules.Product.Entities;
using System;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

Expand Down Expand Up @@ -45,8 +46,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = auditLog.CreatedDateTime,
ObjectId = auditLog.Id.ToString(),
Message = auditLog.AsJsonString(),
Published = false,
Payload = auditLog.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
Expand All @@ -55,8 +56,8 @@ await _outboxEventRepository.AddOrUpdateAsync(new OutboxEvent
TriggeredById = _currentUser.UserId,
CreatedDateTime = domainEvent.EventDateTime,
ObjectId = domainEvent.Entity.Id.ToString(),
Message = domainEvent.Entity.AsJsonString(),
Published = false,
Payload = domainEvent.Entity.AsJsonString(),
ActivityId = Activity.Current.Id,
}, cancellationToken);

await _outboxEventRepository.UnitOfWork.SaveChangesAsync(cancellationToken);
Expand Down
Loading

0 comments on commit 4d402bf

Please sign in to comment.