-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathAzureIotEventsConsumer.cs
48 lines (43 loc) · 2.2 KB
/
AzureIotEventsConsumer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
using System.Text.Json;
using Tingle.EventBus.Configuration;
using Tingle.EventBus.Transports.Azure.EventHubs.IotHub;
namespace AzureIotHub;
[ConsumerName("$Default")] // or [ConsumerName(EventHubConsumerClient.DefaultConsumerGroupName)]
internal class AzureIotEventsConsumer(ILogger<AzureIotEventsConsumer> logger) : IEventConsumer<MyIotHubEvent>
{
private static readonly JsonSerializerOptions serializerOptions = new(JsonSerializerDefaults.Web) { WriteIndented = true, };
public Task ConsumeAsync(EventContext<MyIotHubEvent> context, CancellationToken cancellationToken)
{
var evt = context.Event;
if (evt.IsTelemetry)
{
var telemetry = evt.GetTelemetry<MyIotHubTelemetry>();
var deviceId = context.GetIotHubDeviceId();
var enqueued = context.GetIotHubEnqueuedTime();
logger.LogInformation("Received Telemetry from {DeviceId}\r\nEnqueued: {EnqueuedTime}\r\nTimestamped: {Timestamp}\r\nTelemetry:{Telemetry}",
deviceId,
enqueued,
telemetry.Timestamp,
JsonSerializer.Serialize(telemetry, serializerOptions));
}
else
{
var prefix = evt.Source switch
{
IotHubEventMessageSource.TwinChangeEvents => "TwinChange",
IotHubEventMessageSource.DeviceLifecycleEvents => "Device Lifecycle",
IotHubEventMessageSource.DeviceConnectionStateEvents => "Device connection state",
_ => throw new InvalidOperationException($"Unknown event source '{evt.Source}'."),
};
var ope = evt.Event;
logger.LogInformation("{Prefix} event received of type '{Type}' from '{DeviceId}{ModuleId}' in '{HubName}'.\r\nEvent:{Event}",
prefix,
ope.Type,
ope.DeviceId,
ope.ModuleId,
ope.HubName,
JsonSerializer.Serialize(ope.Payload, serializerOptions));
}
return Task.CompletedTask;
}
}