Skip to content

Commit

Permalink
[eclipse-hono#3585] Add acknowledgement required command feature for …
Browse files Browse the repository at this point in the history
…Google Pub/Sub based commands

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Feb 2, 2024
1 parent ddebeae commit 376722e
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -59,6 +59,7 @@
import org.eclipse.hono.client.NoConsumerException;
import org.eclipse.hono.client.ServerErrorException;
import org.eclipse.hono.client.ServiceInvocationException;
import org.eclipse.hono.client.command.AbstractCommandContext;
import org.eclipse.hono.client.command.Command;
import org.eclipse.hono.client.command.CommandContext;
import org.eclipse.hono.client.command.CommandResponse;
Expand Down Expand Up @@ -1734,8 +1735,19 @@ private void afterCommandPublished(
reportPublishedCommand(tenantObject, subscription, commandContext, ProcessingOutcome.FORWARDED);
log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]",
msgId, subscription.getTenant(), subscription.getDeviceId(), endpoint.clientIdentifier());
commandContext.getTracingSpan().log("received PUBACK from device");
commandContext.accept();
final Span span = commandContext.getTracingSpan();
span.log("received PUBACK from device");
final Command command = commandContext.getCommand();
if (command.isOneWay() && command.isValid() && command.isAckRequired()
&& commandContext instanceof AbstractCommandContext<?> abstractCommandContext) {
abstractCommandContext
.sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED,
"Command successfully received", span, command.getCorrelationId(),
command.getMessagingType())
.onComplete(v -> commandContext.accept());
} else {
commandContext.accept();
}
};

final Handler<Void> onAckTimeoutHandler = v -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -193,6 +193,11 @@ public boolean isOneWay() {
return replyToId == null;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation
* Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -213,6 +213,11 @@ public boolean isOneWay() {
return !responseRequired;
}

@Override
public boolean isAckRequired() {
return false;
}

@Override
public boolean isValid() {
return !validationError.isPresent();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -47,6 +47,7 @@ public final class PubSubBasedCommand implements Command {
private final String contentType;
private final String requestId;
private final boolean responseRequired;
private final boolean ackRequired;

private String gatewayId;

Expand All @@ -58,7 +59,8 @@ private PubSubBasedCommand(
final String correlationId,
final String subject,
final String contentType,
final boolean responseRequired) {
final boolean responseRequired,
final boolean ackRequired) {

this.validationError = validationError;
this.pubsubMessage = pubsubMessage;
Expand All @@ -68,6 +70,7 @@ private PubSubBasedCommand(
this.subject = subject;
this.contentType = contentType;
this.responseRequired = responseRequired;
this.ackRequired = ackRequired;
this.requestId = Commands.encodeRequestIdParameters(correlationId, MessagingType.pubsub);
}

Expand Down Expand Up @@ -134,10 +137,11 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,

final StringJoiner validationErrorJoiner = new StringJoiner(", ");
final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes);
final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes);
final String correlationId = PubSubMessageHelper.getCorrelationId(attributes)
.filter(id -> !id.isEmpty())
.orElseGet(() -> {
if (responseRequired) {
if (responseRequired || ackRequired) {
validationErrorJoiner.add("correlation-id is not set");
}
return null;
Expand All @@ -157,7 +161,8 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage,
correlationId,
subject,
contentType,
responseRequired);
responseRequired,
ackRequired);
}

/**
Expand All @@ -169,6 +174,11 @@ public PubsubMessage getPubsubMessage() {
return pubsubMessage;
}

@Override
public boolean isAckRequired() {
return ackRequired;
}

@Override
public boolean isOneWay() {
return !responseRequired;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -70,7 +70,7 @@ public void release(final Throwable error) {
final String correlationId = getCorrelationId();
sendDeliveryFailureCommandResponseMessage(status, errorMessage, span, error, correlationId,
MessagingType.pubsub)
.onComplete(v -> span.finish());
.onComplete(v -> span.finish());
} else {
span.finish();
}
Expand Down Expand Up @@ -117,7 +117,7 @@ public void reject(final Throwable error) {
final String correlationId = getCorrelationId();
sendDeliveryFailureCommandResponseMessage(status, nonNullCause, span, null, correlationId,
MessagingType.pubsub)
.onComplete(v -> span.finish());
.onComplete(v -> span.finish());
} else {
span.finish();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand All @@ -14,12 +14,14 @@

import java.net.HttpURLConnection;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import org.eclipse.hono.tracing.TracingHelper;
import org.eclipse.hono.util.CommandConstants;
import org.eclipse.hono.util.MapBasedExecutionContext;
import org.eclipse.hono.util.MessageHelper;
import org.eclipse.hono.util.MessagingType;
import org.eclipse.hono.util.RegistrationAssertion;
import org.eclipse.hono.util.TenantObject;
Expand All @@ -34,9 +36,11 @@
/**
* A base class providing utility methods for passing around parameters relevant for processing a {@code Command} used
* in a Pub/Sub and Kafka based command context.
*
* @param <T> The type of Command.
*/
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext implements CommandContext {
public abstract class AbstractCommandContext<T extends Command> extends MapBasedExecutionContext
implements CommandContext {

protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommandContext.class);

Expand Down Expand Up @@ -157,24 +161,73 @@ protected Future<Void> sendDeliveryFailureCommandResponseMessage(
commandResponse.setAdditionalProperties(
Collections.unmodifiableMap(command.getDeliveryFailureNotificationProperties()));

return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
})
return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published error command response [{}, cause: {}]", commandResponse,
cause != null ? cause.getMessage() : error);
span.log("published error command response");
});
}

/**
* Sends a command response with an acknowledgement.
*
* @param status The HTTP status code indicating the outcome of processing the command.
* @param successMessage The ack message describing the cause for the command message delivery failure.
* @param span The active OpenTracing span to use for tracking this operation.
* @param correlationId The correlation ID of the command that this is the response for.
* @param messagingType The type of the messaging system via which the command message was received.
* @return A future indicating the outcome of the operation.
* <p>
* The future will be succeeded if the command response has been sent.
* <p>
* The future will be failed if the command response could not be sent.
*/
public Future<Void> sendDeliverySuccessCommandResponseMessage(
final int status,
final String successMessage,
final Span span,
final String correlationId,
final MessagingType messagingType) {
if (correlationId == null) {
TracingHelper.logError(span, "can't send command response message - no correlation id set");
return Future.failedFuture("missing correlation id");
}
final JsonObject payloadJson = new JsonObject();
payloadJson.put("acknowledgement", successMessage != null ? successMessage : "");

final CommandResponse commandResponse = new CommandResponse(
command.getTenant(),
command.getDeviceId(),
payloadJson.toBuffer(),
CommandConstants.CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION,
status,
correlationId,
"",
messagingType);
commandResponse.setAdditionalProperties(Map.of(MessageHelper.SYS_PROPERTY_SUBJECT, command.getName()));

return sendCommandResponse(commandResponse, span)
.onSuccess(v -> {
LOG.debug("published ack command response [{}]", commandResponse);
span.log("published ack command response");
});
}

private Future<Void> sendCommandResponse(final CommandResponse commandResponse, final Span span) {
return commandResponseSender.sendCommandResponse(
// try to retrieve tenant configuration from context
Optional.ofNullable(get(KEY_TENANT_CONFIG))
.filter(TenantObject.class::isInstance)
.map(TenantObject.class::cast)
// and fall back to default configuration
.orElseGet(() -> TenantObject.from(command.getTenant())),
new RegistrationAssertion(command.getDeviceId()),
commandResponse,
span.context())
.onFailure(thr -> {
LOG.debug("failed to publish command response [{}]", commandResponse, thr);
TracingHelper.logError(span, "failed to publish command response message", thr);
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation
* Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -35,6 +35,13 @@ public interface Command {
*/
boolean isOneWay();

/**
* Checks if an acknowledgement of this command should be sent to the messaging infrastructure.
*
* @return {@code true} if an acknowledgement is required.
*/
boolean isAckRequired();

/**
* Checks if this command contains all required information.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/**
* Copyright (c) 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -43,6 +43,7 @@ public final class PubSubMessageHelper {
* The name of the Pub/Sub message property indicating whether a response to the message is expected/required.
*/
public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required";
public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required";

/**
* Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be
Expand Down Expand Up @@ -202,6 +203,17 @@ public static boolean isResponseRequired(final Map<String, String> attributesMap
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_RESPONSE_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value PUBSUB_PROPERTY_ACK_REQUIRED} attribute.
*
* @param attributesMap The attributes map to get the value from.
* @return The attributes value.
*/
public static boolean isAckRequired(final Map<String, String> attributesMap) {
return Boolean
.parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_ACK_REQUIRED).orElse("false"));
}

/**
* Gets the value of the {@value MessageHelper#SYS_PROPERTY_CONTENT_TYPE} attribute.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation
* Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
Expand Down Expand Up @@ -76,6 +76,11 @@ public class CommandConstants {
*/
public static final String COMMAND_RESPONSE_RESPONSE_PART_SHORT = "s";

/**
* The content type that is defined for acknowledgement command response messages sent by a protocol adapter or Command Router.
*/
public static final String CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION = "application/vnd.eclipse-hono-delivery-success-notification+json";

/**
* The content type that is defined for error command response messages sent by a protocol adapter or Command Router.
*/
Expand Down
Loading

0 comments on commit 376722e

Please sign in to comment.