From 2b58d86f70c40e5ea6eb3e8b802b77ac3d8ff5ae Mon Sep 17 00:00:00 2001 From: xuyang Date: Thu, 9 Jan 2025 16:24:02 +0800 Subject: [PATCH] address comment --- .../exec/stream/StreamExecGroupAggregate.java | 6 +++--- .../table/planner/plan/utils/AggregateUtil.scala | 2 +- .../operators/aggregate/GroupAggFunctionBase.java | 2 +- .../AsyncStateGroupAggFunction.java | 14 +++++--------- 4 files changed, 10 insertions(+), 14 deletions(-) rename flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/{asyncprocessing => async}/AsyncStateGroupAggFunction.java (92%) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java index c7657bea3838a..5d36319857353 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecGroupAggregate.java @@ -48,7 +48,7 @@ import org.apache.flink.table.runtime.keyselector.RowDataKeySelector; import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction; import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction; -import org.apache.flink.table.runtime.operators.aggregate.asyncprocessing.AsyncStateGroupAggFunction; +import org.apache.flink.table.runtime.operators.aggregate.async.AsyncStateGroupAggFunction; import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator; import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -229,7 +229,7 @@ protected Transformation translateToPlanInternal( .generateRecordEqualiser("GroupAggValueEqualiser"); final int inputCountIndex = aggInfoList.getIndexOfCountStar(); final boolean isMiniBatchEnabled = MinibatchUtil.isMiniBatchEnabled(config); - final boolean enableAsyncState = AggregateUtil.enableAsyncState(config, aggInfoList); + final boolean isAsyncStateEnabled = AggregateUtil.isAsyncStateEnabled(config, aggInfoList); final OneInputStreamOperator operator; if (isMiniBatchEnabled) { @@ -245,7 +245,7 @@ protected Transformation translateToPlanInternal( operator = new KeyedMapBundleOperator<>( aggFunction, MinibatchUtil.createMiniBatchTrigger(config)); - } else if (enableAsyncState) { + } else if (isAsyncStateEnabled) { AsyncStateGroupAggFunction aggFunction = new AsyncStateGroupAggFunction( aggsHandler, diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala index e19a609472328..968a62af425c3 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala @@ -1178,7 +1178,7 @@ object AggregateUtil extends Enumeration { .exists(_.getKind == FunctionKind.TABLE_AGGREGATE) } - def enableAsyncState(config: ReadableConfig, aggInfoList: AggregateInfoList): Boolean = { + def isAsyncStateEnabled(config: ReadableConfig, aggInfoList: AggregateInfoList): Boolean = { // Currently, we do not support async state with agg functions that include DataView. val containsDataViewInAggInfo = aggInfoList.aggInfos.toStream.stream().anyMatch(agg => !agg.viewSpecs.isEmpty) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionBase.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionBase.java index 7b3f1b547ae9d..b6a65df39033d 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionBase.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunctionBase.java @@ -27,7 +27,7 @@ import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; import org.apache.flink.table.runtime.generated.RecordEqualiser; -import org.apache.flink.table.runtime.operators.aggregate.asyncprocessing.AsyncStateGroupAggFunction; +import org.apache.flink.table.runtime.operators.aggregate.async.AsyncStateGroupAggFunction; import org.apache.flink.table.types.logical.LogicalType; import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig; diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncprocessing/AsyncStateGroupAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java similarity index 92% rename from flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncprocessing/AsyncStateGroupAggFunction.java rename to flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java index 42fe79111b2ed..506c36cc8991f 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/asyncprocessing/AsyncStateGroupAggFunction.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/async/AsyncStateGroupAggFunction.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.runtime.operators.aggregate.asyncprocessing; +package org.apache.flink.table.runtime.operators.aggregate.async; import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.state.v2.ValueState; @@ -25,7 +25,6 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction; import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser; -import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction; import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunctionBase; import org.apache.flink.table.runtime.operators.aggregate.utils.GroupAggHelper; import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; @@ -43,7 +42,7 @@ public class AsyncStateGroupAggFunction extends GroupAggFunctionBase { private transient AsyncStateGroupAggHelper aggHelper = null; /** - * Creates a {@link GroupAggFunction}. + * Creates a {@link AsyncStateGroupAggFunction}. * * @param genAggsHandler The code generated function used to handle aggregates. * @param genRecordEqualiser The code generated equaliser used to equal RowData. @@ -74,16 +73,13 @@ public AsyncStateGroupAggFunction( public void open(OpenContext openContext) throws Exception { super.open(openContext); - final StreamingRuntimeContext runtimeContext = - (StreamingRuntimeContext) getRuntimeContext(); - InternalTypeInfo accTypeInfo = InternalTypeInfo.ofFields(accTypes); ValueStateDescriptor accDesc = new ValueStateDescriptor<>("accState", accTypeInfo); if (ttlConfig.isEnabled()) { accDesc.enableTimeToLive(ttlConfig); } - accState = runtimeContext.getValueState(accDesc); + accState = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(accDesc); aggHelper = new AsyncStateGroupAggHelper(); } @@ -103,12 +99,12 @@ public AsyncStateGroupAggHelper() { @Override protected void updateAccumulatorsState(RowData accumulators) throws Exception { - accState.update(accumulators); + accState.asyncUpdate(accumulators); } @Override protected void clearAccumulatorsState() throws Exception { - accState.clear(); + accState.asyncClear(); } } }