Skip to content

Commit

Permalink
address comment
Browse files Browse the repository at this point in the history
xuyangzhong committed Jan 9, 2025
1 parent ac39f7b commit 2b58d86
Showing 4 changed files with 10 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@
import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction;
import org.apache.flink.table.runtime.operators.aggregate.MiniBatchGroupAggFunction;
import org.apache.flink.table.runtime.operators.aggregate.asyncprocessing.AsyncStateGroupAggFunction;
import org.apache.flink.table.runtime.operators.aggregate.async.AsyncStateGroupAggFunction;
import org.apache.flink.table.runtime.operators.bundle.KeyedMapBundleOperator;
import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -229,7 +229,7 @@ protected Transformation<RowData> translateToPlanInternal(
.generateRecordEqualiser("GroupAggValueEqualiser");
final int inputCountIndex = aggInfoList.getIndexOfCountStar();
final boolean isMiniBatchEnabled = MinibatchUtil.isMiniBatchEnabled(config);
final boolean enableAsyncState = AggregateUtil.enableAsyncState(config, aggInfoList);
final boolean isAsyncStateEnabled = AggregateUtil.isAsyncStateEnabled(config, aggInfoList);

final OneInputStreamOperator<RowData, RowData> operator;
if (isMiniBatchEnabled) {
@@ -245,7 +245,7 @@ protected Transformation<RowData> translateToPlanInternal(
operator =
new KeyedMapBundleOperator<>(
aggFunction, MinibatchUtil.createMiniBatchTrigger(config));
} else if (enableAsyncState) {
} else if (isAsyncStateEnabled) {
AsyncStateGroupAggFunction aggFunction =
new AsyncStateGroupAggFunction(
aggsHandler,
Original file line number Diff line number Diff line change
@@ -1178,7 +1178,7 @@ object AggregateUtil extends Enumeration {
.exists(_.getKind == FunctionKind.TABLE_AGGREGATE)
}

def enableAsyncState(config: ReadableConfig, aggInfoList: AggregateInfoList): Boolean = {
def isAsyncStateEnabled(config: ReadableConfig, aggInfoList: AggregateInfoList): Boolean = {
// Currently, we do not support async state with agg functions that include DataView.
val containsDataViewInAggInfo =
aggInfoList.aggInfos.toStream.stream().anyMatch(agg => !agg.viewSpecs.isEmpty)
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.generated.RecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.asyncprocessing.AsyncStateGroupAggFunction;
import org.apache.flink.table.runtime.operators.aggregate.async.AsyncStateGroupAggFunction;
import org.apache.flink.table.types.logical.LogicalType;

import static org.apache.flink.table.runtime.util.StateConfigUtil.createTtlConfig;
Original file line number Diff line number Diff line change
@@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.table.runtime.operators.aggregate.asyncprocessing;
package org.apache.flink.table.runtime.operators.aggregate.async;

import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.v2.ValueState;
@@ -25,7 +25,6 @@
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.generated.GeneratedAggsHandleFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction;
import org.apache.flink.table.runtime.operators.aggregate.GroupAggFunctionBase;
import org.apache.flink.table.runtime.operators.aggregate.utils.GroupAggHelper;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
@@ -43,7 +42,7 @@ public class AsyncStateGroupAggFunction extends GroupAggFunctionBase {
private transient AsyncStateGroupAggHelper aggHelper = null;

/**
* Creates a {@link GroupAggFunction}.
* Creates a {@link AsyncStateGroupAggFunction}.
*
* @param genAggsHandler The code generated function used to handle aggregates.
* @param genRecordEqualiser The code generated equaliser used to equal RowData.
@@ -74,16 +73,13 @@ public AsyncStateGroupAggFunction(
public void open(OpenContext openContext) throws Exception {
super.open(openContext);

final StreamingRuntimeContext runtimeContext =
(StreamingRuntimeContext) getRuntimeContext();

InternalTypeInfo<RowData> accTypeInfo = InternalTypeInfo.ofFields(accTypes);
ValueStateDescriptor<RowData> accDesc = new ValueStateDescriptor<>("accState", accTypeInfo);
if (ttlConfig.isEnabled()) {
accDesc.enableTimeToLive(ttlConfig);
}

accState = runtimeContext.getValueState(accDesc);
accState = ((StreamingRuntimeContext) getRuntimeContext()).getValueState(accDesc);
aggHelper = new AsyncStateGroupAggHelper();
}

@@ -103,12 +99,12 @@ public AsyncStateGroupAggHelper() {

@Override
protected void updateAccumulatorsState(RowData accumulators) throws Exception {
accState.update(accumulators);
accState.asyncUpdate(accumulators);
}

@Override
protected void clearAccumulatorsState() throws Exception {
accState.clear();
accState.asyncClear();
}
}
}

0 comments on commit 2b58d86

Please sign in to comment.