From 06ce59307ebee75a0657ca938344866181a68194 Mon Sep 17 00:00:00 2001 From: Marcel Bindseil Date: Wed, 26 Jun 2024 15:22:37 +0200 Subject: [PATCH] finalized workflow-module --- .../Program.cs | 4 +- .../Activities/EnrichmentActivity.cs | 2 - .../Activities/PublishActivity.cs | 15 ++---- .../Components/mqtt3-pub-sub.yaml | 18 ++++++++ .../Components/redis-statestore.yaml | 15 ++++++ ...Distributed.IoT.Edge.WorkflowModule.csproj | 24 +++++----- .../Program.cs | 19 ++++---- .../Services/SubscriptionService.cs | 46 ++++++++----------- .../WorkflowParameters.cs | 6 +-- 9 files changed, 82 insertions(+), 67 deletions(-) create mode 100644 iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/mqtt3-pub-sub.yaml create mode 100644 iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/redis-statestore.yaml diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.DataGatewayModule/Program.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.DataGatewayModule/Program.cs index 877dff27..b6b7ebb6 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.DataGatewayModule/Program.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.DataGatewayModule/Program.cs @@ -33,8 +33,8 @@ // Additional configuration is required to successfully run gRPC on macOS. // For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682 -// builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols = -// HttpProtocols.Http2)); +builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols = + HttpProtocols.Http2)); // Add services to the container. builder.Services.AddGrpc(); diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/EnrichmentActivity.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/EnrichmentActivity.cs index 185837f3..65a98da8 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/EnrichmentActivity.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/EnrichmentActivity.cs @@ -2,9 +2,7 @@ { using System; using System.Threading.Tasks; - using Dapr.Client; using Dapr.Workflow; - using Distributed.IoT.Edge.WorkflowModule; using Microsoft.Extensions.Logging; using Newtonsoft.Json; diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/PublishActivity.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/PublishActivity.cs index a2dac730..f37ff83d 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/PublishActivity.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Activities/PublishActivity.cs @@ -6,14 +6,13 @@ using Dapr.Workflow; using Distributed.IoT.Edge.WorkflowModule; using Microsoft.Extensions.Logging; - using Newtonsoft.Json; public class PublishActivity : WorkflowActivity { private readonly ILogger _logger; private readonly DaprClient _daprClient; - private readonly string _senderPubsubName; - private readonly string _senderPubsubTopicName; + private readonly string? _senderPubsubName; + private readonly string? _senderPubsubTopicName; public PublishActivity( ILogger logger, @@ -28,14 +27,8 @@ public PublishActivity( throw new ArgumentNullException(nameof(parameters)); } - _senderPubsubName = parameters.SenderPubSubName ?? - throw new ArgumentNullException( - nameof(parameters.SenderPubSubName), - "Parameter cannot be null."); - _senderPubsubTopicName = parameters.SenderPubSubTopicName ?? - throw new ArgumentNullException( - nameof(parameters.SenderPubSubTopicName), - "Parameter cannot be null."); + _senderPubsubName = parameters.ReceiverPubSubName; + _senderPubsubTopicName = parameters.SenderPubSubTopicName; _logger.LogTrace($"Entering: {nameof(PublishActivity)}"); } diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/mqtt3-pub-sub.yaml b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/mqtt3-pub-sub.yaml new file mode 100644 index 00000000..d617f558 --- /dev/null +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/mqtt3-pub-sub.yaml @@ -0,0 +1,18 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: local-pub-sub +spec: + type: pubsub.mqtt3 + version: v1 + metadata: + - name: url + value: mqtt://localhost:1883 + - name: retain + value: "false" + - name: cleanSession + value: "false" + - name: qos + value: "1" + - name: consumerID + value: "workflow-module" diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/redis-statestore.yaml b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/redis-statestore.yaml new file mode 100644 index 00000000..7c45ff88 --- /dev/null +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Components/redis-statestore.yaml @@ -0,0 +1,15 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: statestore +spec: + type: state.redis + version: v1 + initTimeout: 1m + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" \ No newline at end of file diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Distributed.IoT.Edge.WorkflowModule.csproj b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Distributed.IoT.Edge.WorkflowModule.csproj index 59a96659..f7f4a617 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Distributed.IoT.Edge.WorkflowModule.csproj +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Distributed.IoT.Edge.WorkflowModule.csproj @@ -1,7 +1,7 @@ Exe - net6.0 + net8.0 enable enable Linux @@ -14,21 +14,23 @@ - - - - - - - + + + + + + + - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive @@ -39,4 +41,4 @@ - + \ No newline at end of file diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Program.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Program.cs index 3201db43..c16099a2 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Program.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Program.cs @@ -1,15 +1,15 @@ // Local run cmd line. -// dapr run --app-id workflow-module --app-protocol grpc --app-port 5000 --resources-path=../../../deployment/helm/iot-edge-accelerator/templates/dapr -- dotnet run -- --receiverPubSubName "local-pub-sub" --receiverPubSubTopicName "telemetry" --senderPubSubName "local-pub-sub" --senderPubSubTopicName "enriched-telemetry" +// dapr run --app-id workflow-module --app-protocol grpc --app-port 5002 --resources-path=./Components --dapr-grpc-port 5005 -- dotnet run -- --receiverPubSubName "local-pub-sub-2" --receiverPubSubTopicName "telemetry" --senderPubSubName "local-pub-sub" --senderPubSubTopicName "enriched-telemetry" +// mosquitto_pub -h localhost -p 1883 -t telemetry -m '{"ambient":{"temperature":10}}' +// mosquitto_sub -h localhost -p 1883 -t enriched-telemetry +// dapr publish --publish-app-id workflow-module --pubsub local-pub-sub --topic telemetry --data '{"ambient":{"temperature":10}}' --metadata '{"rawPayload":"true"}' -using System.Collections.Immutable; using CommandLine; -using Dapr.Client; using Dapr.Workflow; using Distributed.IoT.Edge.WorkflowModule; using Distributed.IoT.Edge.WorkflowModule.Services; using Distributed.IoT.Edge.WorkflowModule.Workflows; using Microsoft.AspNetCore.Server.Kestrel.Core; -using Microsoft.Extensions.DependencyInjection.Extensions; using WorkflowConsoleApp.Activities; // Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "50001"); @@ -31,12 +31,11 @@ builder.Services.AddSingleton(sp => parameters); // Already registered by AddDaprWorkflow extension - builder.Services.AddSingleton(new DaprClientBuilder().Build()); + //builder.Services.AddSingleton(new DaprClientBuilder().Build()); builder.Services.AddTransient( sp => new SubscriptionService( sp.GetRequiredService>(), - sp.GetRequiredService(), - sp.GetRequiredService(), + sp.GetRequiredService(), parameters)); }) .WithNotParsed(errors => @@ -44,15 +43,15 @@ Environment.Exit(1); }); -// builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5001, op => op.Protocols = -// HttpProtocols.Http2)); +builder.WebHost.ConfigureKestrel(k => k.ListenLocalhost(5002, op => op.Protocols = + HttpProtocols.Http2)); // Additional configuration is required to successfully run gRPC on macOS. // For instructions on how to configure Kestrel and gRPC clients on macOS, visit https://go.microsoft.com/fwlink/?linkid=2099682 builder.Services.AddGrpc(); var app = builder.Build(); - +app.UseRouting(); // Configure the HTTP request pipeline. app.MapGrpcService(); diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Services/SubscriptionService.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Services/SubscriptionService.cs index 2f683c9d..caf16dd2 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Services/SubscriptionService.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/Services/SubscriptionService.cs @@ -1,43 +1,33 @@ -using Dapr.AppCallback.Autogen.Grpc.v1; -using Dapr.Client; -using Dapr.Client.Autogen.Grpc.v1; -using Dapr.Workflow; -using Distributed.IoT.Edge.WorkflowModule.Workflows; -using Google.Protobuf.WellKnownTypes; -using Grpc.Core; - namespace Distributed.IoT.Edge.WorkflowModule.Services { + using Dapr.AppCallback.Autogen.Grpc.v1; + using Dapr.Workflow; + using Distributed.IoT.Edge.WorkflowModule.Workflows; + using Google.Protobuf.WellKnownTypes; + using Grpc.Core; + public class SubscriptionService : AppCallback.AppCallbackBase { private readonly ILogger _logger; - private readonly DaprClient _daprClient; - private readonly WorkflowEngineClient _workflowClient; + private readonly DaprWorkflowClient _workflowClient; private readonly string _receiverPubsubName; private readonly string _receiverPubsubTopicName; public SubscriptionService( ILogger logger, - DaprClient daprClient, - WorkflowEngineClient workflowClient, + DaprWorkflowClient workflowClient, WorkflowParameters parameters) { _logger = logger ?? throw new ArgumentNullException(nameof(logger)); - _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient)); - _workflowClient = workflowClient ?? throw new ArgumentNullException(nameof(workflowClient)); + _workflowClient = workflowClient ?? throw new ArgumentNullException(nameof(workflowClient)); if (parameters == null) { throw new ArgumentNullException(nameof(parameters)); } - _receiverPubsubName = parameters.ReceiverPubSubName ?? - throw new ArgumentNullException( - nameof(parameters.ReceiverPubSubName), - "Parameter cannot be null."); - _receiverPubsubTopicName = parameters.ReceiverPubSubTopicName ?? - throw new ArgumentNullException( - nameof(parameters.ReceiverPubSubTopicName), - "Parameter cannot be null."); + _receiverPubsubName = parameters.ReceiverPubSubName; + _receiverPubsubTopicName = parameters.ReceiverPubSubTopicName; + } public override Task ListTopicSubscriptions( @@ -68,20 +58,20 @@ public override async Task OnTopicEvent( throw new ArgumentNullException(nameof(request)); } - var topicString = request.Data.ToStringUtf8(); - _logger.LogTrace($"requestPath: {request.Path}"); + var topicString = request.Extensions.ToString(); _logger.LogTrace($"Sending event to workflow, object json: {topicString}"); + var instanceId = Guid.NewGuid().ToString(); // starting workflow to enrich and transform the data await _workflowClient.ScheduleNewWorkflowAsync( name: nameof(EnrichTelemetryWorkflow), - instanceId: request.Id, + instanceId: instanceId, input: topicString); // Wait a second to allow workflow to start await Task.Delay(TimeSpan.FromSeconds(1)); WorkflowState state = await _workflowClient.GetWorkflowStateAsync( - instanceId: request.Id, + instanceId: instanceId, getInputsAndOutputs: true); _logger.LogTrace($"Your workflow {request.Id} has started. Here is the status of the workflow: {state.RuntimeStatus}"); @@ -89,9 +79,9 @@ await _workflowClient.ScheduleNewWorkflowAsync( { await Task.Delay(TimeSpan.FromSeconds(5)); state = await _workflowClient.GetWorkflowStateAsync( - instanceId: request.Id, + instanceId: instanceId, getInputsAndOutputs: true); - _logger.LogTrace($"State of workflow {request.Id} is: {state.RuntimeStatus}"); + _logger.LogTrace($"State of workflow {instanceId} is: {state.RuntimeStatus}"); } // Depending on the status return dapr side will either retry or drop the message from underlying pubsub. diff --git a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/WorkflowParameters.cs b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/WorkflowParameters.cs index 0a28c3df..283c9909 100644 --- a/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/WorkflowParameters.cs +++ b/iotedge/Distributed.IoT.Edge/Distributed.IoT.Edge.WorkflowModule/WorkflowParameters.cs @@ -1,7 +1,7 @@ -using CommandLine; - -namespace Distributed.IoT.Edge.WorkflowModule +namespace Distributed.IoT.Edge.WorkflowModule { + using CommandLine; + public class WorkflowParameters { [Option(