-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathProgram.cs
82 lines (66 loc) · 3.07 KB
/
Program.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices((hostContext, services) =>
{
services.AddEventBus(builder =>
{
// Transport agnostic configuration
builder.Configure(o =>
{
o.Naming.Scope = "dev"; // queues will be prefixed by 'dev'
o.Naming.UseFullTypeNames = false;
});
builder.AddConsumer<VideoUploadedConsumer>();
// Transport specific configuration
builder.AddInMemoryTransport();
});
services.AddHostedService<ProducerService>();
})
.Build();
await host.RunAsync();
class ProducerService(IEventPublisher publisher) : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
var delay = TimeSpan.FromSeconds(25);
var times = 5;
var rnd = new Random(DateTimeOffset.UtcNow.Millisecond);
for (var i = 0; i < times; i++)
{
var evt = new VideoUploaded
{
VideoId = Convert.ToUInt32(rnd.Next()).ToString(),
SizeBytes = Convert.ToUInt32(rnd.Next()),
};
evt.Url = $"https://localhost:8080/uploads/raw/{evt.VideoId}.flv";
await publisher.PublishAsync(evt, cancellationToken: stoppingToken);
await Task.Delay(delay, stoppingToken);
}
}
}
class VideoUploadedConsumer(ILogger<VideoUploadedConsumer> logger) : IEventConsumer<VideoUploaded>
{
private static readonly TimeSpan SimulationDuration = TimeSpan.FromSeconds(3);
public async Task ConsumeAsync(EventContext<VideoUploaded> context, CancellationToken cancellationToken = default)
{
var evt = context.Event;
var videoId = evt.VideoId;
logger.LogInformation("Received event Id: {Id} for video '{VideoId}'.", context.Id, videoId);
// Download video locally
logger.LogInformation("Downloading video from {VideoUrl} ({VideoSize} bytes).", evt.Url, evt.SizeBytes);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay
// Extract thumbnail from video
logger.LogInformation("Extracting thumbnail from video with Id '{VideoId}'.", videoId);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay
// Upload video thumbnail
var thumbnailUrl = $"https://localhost:8080/uploads/thumbnails/{videoId}.jpg";
logger.LogInformation("Uploading thumbnail for video with Id '{VideoId}' to '{ThumbnailUrl}'.", videoId, thumbnailUrl);
await Task.Delay(SimulationDuration, cancellationToken); // simulate using delay
logger.LogInformation("Processing video with Id '{VideoId}' completed.", videoId);
}
}
class VideoUploaded
{
public string? VideoId { get; set; }
public string? Url { get; set; }
public long SizeBytes { get; set; }
}