Skip to content

Commit

Permalink
从Flink复制部分代码
Browse files Browse the repository at this point in the history
  • Loading branch information
entropy-cloud committed Jan 2, 2025
1 parent 6171ccd commit e0623bc
Show file tree
Hide file tree
Showing 20 changed files with 1,433 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;


import java.time.Duration;

/**
* A watermark generator that assumes monotonically ascending timestamps within the stream split and
* periodically generates watermarks based on that assumption.
*
* <p>The current watermark is always one after the latest (highest) timestamp, because we assume
* that more records with the same timestamp may still follow.
*
* <p>The watermarks are generated periodically and tightly follow the latest timestamp in the data.
* The delay introduced by this strategy is mainly the periodic interval in which the watermarks are
* generated, which can be configured via {@link
* org.apache.flink.api.common.ExecutionConfig#setAutoWatermarkInterval(long)}.
*/
public class AscendingTimestampsWatermarks<T> extends BoundedOutOfOrdernessWatermarks<T> {

/**
* Creates a new watermark generator with for ascending timestamps.
*/
public AscendingTimestampsWatermarks() {
super(Duration.ofMillis(0));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;


import io.nop.stream.core.streamrecord.watermark.Watermark;

import java.time.Duration;

import static com.google.common.base.Preconditions.checkNotNull;
import static io.nop.api.core.util.Guard.checkArgument;


/**
* A WatermarkGenerator for situations where records are out of order, but you can place an upper
* bound on how far the events are out of order. An out-of-order bound B means that once an event
* with timestamp T was encountered, no events older than {@code T - B} will follow any more.
*
* <p>The watermarks are generated periodically. The delay introduced by this watermark strategy is
* the periodic interval length, plus the out-of-orderness bound.
*/
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

/**
* The maximum timestamp encountered so far.
*/
private long maxTimestamp;

/**
* The maximum out-of-orderness that this watermark generator assumes.
*/
private final long outOfOrdernessMillis;

/**
* Creates a new watermark generator with the given out-of-orderness bound.
*
* @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
*/
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

// start so that our lowest watermark would be Long.MIN_VALUE.
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}

// ------------------------------------------------------------------------

@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;


/**
* An implementation of a {@link WatermarkGenerator} that generates no Watermarks.
*/
public final class NoWatermarksGenerator<E> implements WatermarkGenerator<E> {

@Override
public void onEvent(E event, long eventTimestamp, WatermarkOutput output) {
}

@Override
public void onPeriodicEmit(WatermarkOutput output) {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;

/**
* A {@link TimestampAssigner} that forwards the already-assigned timestamp. This is for use when
* records come out of a source with valid timestamps, for example from the Kafka Metadata.
*/
public final class RecordTimestampAssigner<E> implements TimestampAssigner<E> {

@Override
public long extractTimestamp(E element, long recordTimestamp) {
return recordTimestamp;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package io.nop.stream.core.common.eventtime;

/**
* A {@code TimestampAssigner} assigns event time timestamps to elements. These timestamps are used
* by all functions that operate on event time, for example event time windows.
*
* <p>Timestamps can be an arbitrary {@code long} value, but all built-in implementations represent
* it as the milliseconds since the Epoch (midnight, January 1, 1970 UTC), the same way as {@link
* System#currentTimeMillis()} does it.
*
* @param <T> The type of the elements to which this assigner assigns timestamps.
*/
@FunctionalInterface
public interface TimestampAssigner<T> {

/**
* The value that is passed to {@link #extractTimestamp} when there is no previous timestamp
* attached to the record.
*/
long NO_TIMESTAMP = Long.MIN_VALUE;

/**
* Assigns a timestamp to an element, in milliseconds since the Epoch. This is independent of
* any particular time zone or calendar.
*
* <p>The method is passed the previously assigned timestamp of the element. That previous
* timestamp may have been assigned from a previous assigner. If the element did not carry a
* timestamp before, this value is {@link #NO_TIMESTAMP} (= {@code Long.MIN_VALUE}: {@value
* Long#MIN_VALUE}).
*
* @param element The element that the timestamp will be assigned to.
* @param recordTimestamp The current internal timestamp of the element, or a negative value, if
* no timestamp has been assigned yet.
* @return The new timestamp.
*/
long extractTimestamp(T element, long recordTimestamp);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;


import java.io.Serializable;

/**
* A supplier for {@link TimestampAssigner TimestampAssigners}. The supplier pattern is used to
* avoid having to make {@link TimestampAssigner} {@link Serializable} for use in API methods.
*
* <p>This interface is {@link Serializable} because the supplier may be shipped to workers during
* distributed execution.
*/
@FunctionalInterface
public interface TimestampAssignerSupplier<T> extends Serializable {

/**
* Instantiates a {@link TimestampAssigner}.
*/
TimestampAssigner<T> createTimestampAssigner(Context context);

static <T> TimestampAssignerSupplier<T> of(TimestampAssigner<T> assigner) {
return new SupplierFromSerializableTimestampAssigner<>(assigner);
}

/**
* Additional information available to {@link #createTimestampAssigner(Context)}. This can be
* access to {@link org.apache.flink.metrics.MetricGroup MetricGroups}, for example.
*/
interface Context {

/**
* Returns the metric group for the context in which the created {@link TimestampAssigner}
* is used.
*
* <p>Instances of this class can be used to register new metrics with Flink and to create a
* nested hierarchy based on the group names. See {@link MetricGroup} for more information
* for the metrics system.
*
* @see MetricGroup
*/
Object getMetricGroup();
}

/**
* We need an actual class. Implementing this as a lambda in {@link
* #of(SerializableTimestampAssigner)} would not allow the {@link ClosureCleaner} to "reach"
* into the {@link SerializableTimestampAssigner}.
*/
class SupplierFromSerializableTimestampAssigner<T> implements TimestampAssignerSupplier<T> {

private static final long serialVersionUID = 1L;

private final TimestampAssigner<T> assigner;

public SupplierFromSerializableTimestampAssigner(
TimestampAssigner<T> assigner) {
this.assigner = assigner;
}

@Override
public TimestampAssigner<T> createTimestampAssigner(Context context) {
return assigner;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.nop.stream.core.common.eventtime;

import java.io.Serializable;

/**
* Configuration parameters for watermark alignment.
*/
public final class WatermarkAlignmentParams implements Serializable {
public static final WatermarkAlignmentParams WATERMARK_ALIGNMENT_DISABLED =
new WatermarkAlignmentParams(Long.MAX_VALUE, "", 0);
private final long maxAllowedWatermarkDrift;
private final long updateInterval;
private final String watermarkGroup;

public WatermarkAlignmentParams(
long maxAllowedWatermarkDrift, String watermarkGroup, long updateInterval) {
this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift;
this.watermarkGroup = watermarkGroup;
this.updateInterval = updateInterval;
}

public boolean isEnabled() {
return maxAllowedWatermarkDrift < Long.MAX_VALUE;
}

public long getMaxAllowedWatermarkDrift() {
return maxAllowedWatermarkDrift;
}

public String getWatermarkGroup() {
return watermarkGroup;
}

public long getUpdateInterval() {
return updateInterval;
}
}
Loading

0 comments on commit e0623bc

Please sign in to comment.