Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor datastream update checks #958

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -207,44 +207,7 @@ private void doUpdateDatastreams(Map<String, Datastream> datastreamMap) {
// connector, transport provider, destination or status (use pause/resume to update status).
// Writing into a different destination should essentially be for a new datastream.
try {
if (!oldDatastream.hasConnectorName() || !datastream.hasConnectorName()) {
throw new DatastreamValidationException(String.format("Failed to update %s because connector is not present."
+ " Are they valid? old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getConnectorName().equals(oldDatastream.getConnectorName())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update connector in update request."
+ " old: %s, new: %s", key, oldDatastream, datastream));
}
if (!oldDatastream.hasTransportProviderName() || !datastream.hasTransportProviderName()) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update transport provider in"
+ " update request. old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getTransportProviderName().equals(oldDatastream.getTransportProviderName())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update transport provider in"
+ " update request. old: %s new: %s", key, oldDatastream, datastream));
}
if (!oldDatastream.hasDestination() || !datastream.hasDestination()) {
throw new DatastreamValidationException(String.format("Failed to update %s because destination is not set. "
+ "Are they initialized? old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getDestination().equals(oldDatastream.getDestination())) {
throw new DatastreamValidationException(String.format("Failed to update %s because destination is immutable."
+ " old: %s new: %s", key, oldDatastream, datastream));
}
if (!oldDatastream.hasStatus() || !datastream.hasStatus()) {
throw new DatastreamValidationException(String.format("Failed to update %s because status is not present."
+ " Are they valid? old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getStatus().equals(oldDatastream.getStatus())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update status in update request."
+ " old: %s new: %s", key, oldDatastream, datastream));
}

if (datastream.getMetadata().containsKey(NUM_TASKS) &&
!datastream.getMetadata().get(NUM_TASKS).equals(oldDatastream.getMetadata().get(NUM_TASKS))) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update numTasks."
+ " old: %s new: %s", key, oldDatastream, datastream));
}
_coordinator.verifyUpdateDatastreamChecks(datastream, oldDatastream);

LOG.info("[UPDATE] old datastream = {}, new datastream = {}", oldDatastream, datastream);
} catch (DatastreamValidationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import com.linkedin.datastream.server.zk.ZkAdapter;

import static com.linkedin.datastream.common.DatastreamMetadataConstants.CREATION_MS;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.NUM_TASKS;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.SYSTEM_DESTINATION_PREFIX;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.THROUGHPUT_VIOLATING_TOPICS;
import static com.linkedin.datastream.common.DatastreamMetadataConstants.TTL_MS;
Expand Down Expand Up @@ -2008,6 +2009,59 @@ public void validateDatastreamsUpdate(List<Datastream> datastreams) throws Datas
}
}

/**
* Verify certain checks before proceeding with datastream update. Compares few fields with existing datastream to
* check if update can be allowed or not
* @param datastream New state of datastream
* @param oldDatastream Current datastream
* @throws DatastreamValidationException if update cannot be allowed
*/
public void verifyUpdateDatastreamChecks(Datastream datastream, Datastream oldDatastream) throws DatastreamValidationException {
String key = datastream.getName();
if (!oldDatastream.hasConnectorName() || !datastream.hasConnectorName()) {
throw new DatastreamValidationException(String.format("Failed to update %s because connector is not present."
+ " Are they valid? old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getConnectorName().equals(oldDatastream.getConnectorName())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update connector in update request."
+ " old: %s, new: %s", key, oldDatastream, datastream));
}
if (!oldDatastream.hasTransportProviderName() || !datastream.hasTransportProviderName()) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update transport provider in"
+ " update request. old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getTransportProviderName().equals(oldDatastream.getTransportProviderName())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update transport provider in"
+ " update request. old: %s new: %s", key, oldDatastream, datastream));
}
if (!oldDatastream.hasDestination() || !datastream.hasDestination()) {
throw new DatastreamValidationException(String.format("Failed to update %s because destination is not set. "
+ "Are they initialized? old: %s, new: %s", key, oldDatastream, datastream));
}

if (!oldDatastream.hasStatus() || !datastream.hasStatus()) {
throw new DatastreamValidationException(String.format("Failed to update %s because status is not present."
+ " Are they valid? old: %s, new: %s", key, oldDatastream, datastream));
}
if (!datastream.getStatus().equals(oldDatastream.getStatus())) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update status in update request."
+ " old: %s new: %s", key, oldDatastream, datastream));
}

if (datastream.getMetadata().containsKey(NUM_TASKS) &&
!datastream.getMetadata().get(NUM_TASKS).equals(oldDatastream.getMetadata().get(NUM_TASKS))) {
throw new DatastreamValidationException(String.format("Failed to update %s. Can't update numTasks."
+ " old: %s new: %s", key, oldDatastream, datastream));
}

if (!_config.getOverrideDatastreamUpdateChecks()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this flag only allows you to bypass the destination check? Why is this needed and if it is should we rename it?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes for now we only bypass destination change check. For IPv6, we plan to manually (through brooklin tool command) update destination of the exisiting datastreams, and this check prevents that.
This is only a temporary change. Once IPv6 migration is done we will revert this change. So not a lot of benefit in renaming it if you dont feel too strongly about it

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the plan is to revert that's fine

if (!datastream.getDestination().equals(oldDatastream.getDestination())) {
throw new DatastreamValidationException(String.format("Failed to update %s because destination is immutable."
+ " old: %s new: %s", key, oldDatastream, datastream));
}
}
}

/**
* Validate the partition is managed by connector for this datastream
* @param datastream datastream which needs the verification
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public final class CoordinatorConfig {

public static final String CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING = PREFIX + "enableThroughputViolatingTopicsHandling";

public static final String CONFIG_OVERRIDE_DATASTREAM_UPDATE_CHECKS = PREFIX + "overrideDatastreamUpdateChecks";

public static final int DEFAULT_MAX_ASSIGNMENT_RETRY_COUNT = 100;
public static final long DEFAULT_STOP_PROPAGATION_TIMEOUT_MS = 60 * 1000;
public static final long DEFAULT_TASK_STOP_CHECK_TIMEOUT_MS = 60 * 1000;
Expand Down Expand Up @@ -79,6 +81,7 @@ public final class CoordinatorConfig {
private final long _markDatastreamsStoppedTimeoutMs;
private final long _markDatastreamsStoppedRetryPeriodMs;
private final boolean _enableThroughputViolatingTopicsHandling;
private final boolean _overrideDatastreamUpdateChecks;


/**
Expand Down Expand Up @@ -117,6 +120,8 @@ public CoordinatorConfig(Properties config) {
DEFAULT_MARK_DATASTREMS_STOPPED_RETRY_PERIOD_MS);
_enableThroughputViolatingTopicsHandling = _properties.getBoolean(
CONFIG_ENABLE_THROUGHPUT_VIOLATING_TOPICS_HANDLING, false);
_overrideDatastreamUpdateChecks = _properties.getBoolean(
CONFIG_OVERRIDE_DATASTREAM_UPDATE_CHECKS, false);
}

public Properties getConfigProperties() {
Expand Down Expand Up @@ -212,4 +217,8 @@ public long getMarkDatastreamsStoppedTimeoutMs() {
public long getMarkDatastreamsStoppedRetryPeriodMs() {
return _markDatastreamsStoppedRetryPeriodMs;
}

public boolean getOverrideDatastreamUpdateChecks() {
return _overrideDatastreamUpdateChecks;
}
}