Skip to content

Commit

Permalink
[FLINK-37172][table-runtime] Add logs in all existing async state ops…
Browse files Browse the repository at this point in the history
… to check if they are under async state when running
  • Loading branch information
xuyangzhong committed Jan 20, 2025
1 parent 6b6a73e commit 0cef14d
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,16 @@
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.util.Collector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Aggregate Function used for the groupby (without window) aggregate with async state api. */
public class AsyncStateGroupAggFunction extends GroupAggFunctionBase {

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AsyncStateGroupAggFunction.class);

// stores the accumulators
private transient ValueState<RowData> accState = null;

Expand Down Expand Up @@ -73,6 +78,8 @@ public AsyncStateGroupAggFunction(
public void open(OpenContext openContext) throws Exception {
super.open(openContext);

LOG.info("Group agg is using async state");

InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo);
if (ttlConfig.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SliceSharedAssigner;
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -46,6 +49,9 @@ public abstract class AbstractAsyncStateSliceWindowAggProcessor
extends AbstractAsyncStateWindowAggProcessor<Long>
implements AsyncStateSlicingWindowProcessor<Long> {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractAsyncStateSliceWindowAggProcessor.class);

protected final AsyncStateWindowBuffer.Factory windowBufferFactory;
protected final SliceAssigner sliceAssigner;
protected final long windowInterval;
Expand Down Expand Up @@ -80,6 +86,9 @@ public AbstractAsyncStateSliceWindowAggProcessor(
@Override
public void open(AsyncStateContext<Long> context) throws Exception {
super.open(context);

LOG.info("Slice window agg is using async state");

this.windowBuffer =
windowBufferFactory.create(
ctx.getOperatorOwner(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.runtime.operators.deduplicate.DeduplicateFunctionBase;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;

/**
Expand All @@ -42,6 +45,9 @@ abstract class AsyncStateDeduplicateFunctionBase<T, K, IN, OUT>

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(AsyncStateDeduplicateFunctionBase.class);

// state stores previous message under the key.
protected ValueState<T> state;

Expand All @@ -54,6 +60,8 @@ public AsyncStateDeduplicateFunctionBase(
public void open(OpenContext openContext) throws Exception {
super.open(openContext);

LOG.info("Deduplicate is using async state");

ValueStateDescriptor<T> stateDesc =
new ValueStateDescriptor<>("deduplicate-state", typeInfo);
StateTtlConfig ttlConfig = createTtlConfig(stateRetentionTime);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Streaming unbounded Join operator based on async state api, which supports INNER/LEFT/RIGHT/FULL
* JOIN.
Expand All @@ -42,6 +45,9 @@ public class AsyncStateStreamingJoinOperator extends AbstractAsyncStateStreaming

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(AsyncStateStreamingJoinOperator.class);

// whether left side is outer side, e.g. left is outer but right is not when LEFT OUTER JOIN
private final boolean leftIsOuter;
// whether right side is outer side, e.g. right is outer but left is not when RIGHT OUTER JOIN
Expand Down Expand Up @@ -86,6 +92,8 @@ public AsyncStateStreamingJoinOperator(
public void open() throws Exception {
super.open();

LOG.info("Join is using async state");

this.outRow = new JoinedRowData();
this.leftNullRow = new GenericRowData(leftType.toRowSize());
this.rightNullRow = new GenericRowData(rightType.toRowSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@
import org.apache.flink.table.runtime.operators.window.tvf.slicing.SlicingWindowTimerServiceImpl;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZoneId;

/**
Expand All @@ -69,6 +72,8 @@ public class AsyncStateWindowJoinOperator extends AsyncStateTableStreamOperator<

private static final long serialVersionUID = 1L;

private static final Logger LOG = LoggerFactory.getLogger(AsyncStateWindowJoinOperator.class);

private static final String LEFT_RECORDS_STATE_NAME = "left-records";
private static final String RIGHT_RECORDS_STATE_NAME = "right-records";

Expand Down Expand Up @@ -119,6 +124,8 @@ public AsyncStateWindowJoinOperator(
public void open() throws Exception {
super.open();

LOG.info("Window join is using async state");

this.collector = new TimestampedCollector<>(output);
collector.eraseTimestamp();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,16 @@
import org.apache.flink.table.runtime.operators.rank.RankType;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Objects;

/** Base class for TopN Function with async state api. */
public abstract class AbstractAsyncStateTopNFunction extends AbstractTopNFunction {

private static final Logger LOG = LoggerFactory.getLogger(AbstractAsyncStateTopNFunction.class);

private ValueState<Long> rankEndState;

public AbstractAsyncStateTopNFunction(
Expand All @@ -65,6 +70,8 @@ public AbstractAsyncStateTopNFunction(
public void open(OpenContext openContext) throws Exception {
super.open(openContext);

LOG.info("Top-N is using async state");

if (!isConstantRankEnd) {
ValueStateDescriptor<Long> rankStateDesc =
new ValueStateDescriptor<>("rankEnd", Types.LONG);
Expand Down

0 comments on commit 0cef14d

Please sign in to comment.