Skip to content

Commit

Permalink
Compaction task memory is incorrect when compaction_max_aligned_serie…
Browse files Browse the repository at this point in the history
…s_num_in_one_batch <= 0 (apache#14603) (apache#14639)

* Compaction task memory is incorrect when compaction_max_aligned_series_num_in_one_batch <= 0

* move MetadataInfo

* add ut

* add ut & fix bug

* modify MetadataInfo

* modify test

* fix bug

* rename

* fix ut
  • Loading branch information
shuwenwei authored Jan 10, 2025
1 parent 14d3528 commit ff13ec7
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 157 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception;

public class CompactionSourceFileDeletedException extends RuntimeException {

public CompactionSourceFileDeletedException(String message) {
super(message);
}

@Override
public Throwable fillInStackTrace() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iotdb.db.service.metrics.FileMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionRecoverException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.ICompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.ReadChunkCompactionPerformer;
Expand Down Expand Up @@ -685,10 +686,13 @@ public long getEstimatedMemoryCost() {
innerSpaceEstimator.roughEstimateInnerCompactionMemory(
filesView.sourceFilesInCompactionPerformer);
memoryCost =
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
? roughEstimatedMemoryCost
: innerSpaceEstimator.estimateInnerCompactionMemory(
filesView.sourceFilesInCompactionPerformer);
} catch (CompactionSourceFileDeletedException e) {
innerSpaceEstimator.cleanup();
return -1;
} catch (Exception e) {
if (e instanceof StopReadTsFileByInterruptException || Thread.interrupted()) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,10 @@ public long estimateCrossCompactionMemory(
List<TsFileResource> resources = new ArrayList<>(seqResources.size() + unseqResources.size());
resources.addAll(seqResources);
resources.addAll(unseqResources);
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
CompactionEstimateUtils.addReadLock(resources);

long cost = 0;
try {
if (!isAllSourceFileExist(resources)) {
return -1L;
}
CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost += calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,9 @@ protected TsFileSequenceReader getReader(String filePath) throws IOException {
}

public long estimateInnerCompactionMemory(List<TsFileResource> resources) throws IOException {
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
CompactionEstimateUtils.addReadLock(resources);
long cost;
try {
if (!isAllSourceFileExist(resources)) {
return -1L;
}

CompactionTaskInfo taskInfo = calculatingCompactionTaskInfo(resources);
cost = calculatingMetadataMemoryCost(taskInfo);
cost += calculatingDataMemoryCost(taskInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;

import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileReader;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionType;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
Expand Down Expand Up @@ -101,11 +102,10 @@ public static FileInfo calculateFileInfo(TsFileSequenceReader reader) throws IOE
averageChunkMetadataSize);
}

public static long roughEstimateMetadataCostInCompaction(
List<TsFileResource> resources, CompactionType taskType) throws IOException {
if (!CompactionEstimateUtils.addReadLock(resources)) {
return -1L;
}
static MetadataInfo collectMetadataInfo(List<TsFileResource> resources, CompactionType taskType)
throws IOException {
CompactionEstimateUtils.addReadLock(resources);
MetadataInfo metadataInfo = new MetadataInfo();
long cost = 0L;
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
try {
Expand All @@ -115,23 +115,29 @@ public static long roughEstimateMetadataCostInCompaction(
}
try (CompactionTsFileReader reader =
new CompactionTsFileReader(resource.getTsFilePath(), taskType)) {
for (Map.Entry<IDeviceID, Long> entry : getDeviceMetadataSizeMap(reader).entrySet()) {
for (Map.Entry<IDeviceID, Long> entry :
getDeviceMetadataSizeMapAndCollectMetadataInfo(reader, metadataInfo).entrySet()) {
deviceMetadataSizeMap.merge(entry.getKey(), entry.getValue(), Long::sum);
}
}
}
return cost + deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
metadataInfo.metadataMemCost =
cost + deviceMetadataSizeMap.values().stream().max(Long::compareTo).orElse(0L);
return metadataInfo;
} finally {
CompactionEstimateUtils.releaseReadLock(resources);
}
}

public static Map<IDeviceID, Long> getDeviceMetadataSizeMap(CompactionTsFileReader reader)
throws IOException {
static Map<IDeviceID, Long> getDeviceMetadataSizeMapAndCollectMetadataInfo(
CompactionTsFileReader reader, MetadataInfo metadataInfo) throws IOException {
Map<IDeviceID, Long> deviceMetadataSizeMap = new HashMap<>();
TsFileDeviceIterator deviceIterator = reader.getAllDevicesIteratorWithIsAligned();
while (deviceIterator.hasNext()) {
IDeviceID deviceID = deviceIterator.next().getLeft();
Pair<IDeviceID, Boolean> deviceAlignedPair = deviceIterator.next();
IDeviceID deviceID = deviceAlignedPair.getLeft();
boolean isAligned = deviceAlignedPair.getRight();
metadataInfo.hasAlignedSeries |= isAligned;
MetadataIndexNode firstMeasurementNodeOfCurrentDevice =
deviceIterator.getFirstMeasurementNodeOfCurrentDevice();
long totalTimeseriesMetadataSizeOfCurrentDevice = 0;
Expand All @@ -145,14 +151,15 @@ public static Map<IDeviceID, Long> getDeviceMetadataSizeMap(CompactionTsFileRead
return deviceMetadataSizeMap;
}

public static boolean shouldAccurateEstimate(long roughEstimatedMemCost) {
public static boolean shouldUseRoughEstimatedResult(long roughEstimatedMemCost) {
return roughEstimatedMemCost > 0
&& IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
* roughEstimatedMemCost
< SystemInfo.getInstance().getMemorySizeForCompaction();
}

public static boolean addReadLock(List<TsFileResource> resources) {
public static void addReadLock(List<TsFileResource> resources)
throws CompactionSourceFileDeletedException {
for (int i = 0; i < resources.size(); i++) {
TsFileResource resource = resources.get(i);
resource.readLock();
Expand All @@ -161,10 +168,10 @@ public static boolean addReadLock(List<TsFileResource> resources) {
for (int j = 0; j <= i; j++) {
resources.get(j).readUnlock();
}
return false;
throw new CompactionSourceFileDeletedException(
"source file " + resource.getTsFilePath() + " is deleted");
}
}
return true;
}

public static void releaseReadLock(List<TsFileResource> resources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,23 @@ public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOExce
@Override
public long roughEstimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException {
long metadataCost =
CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
MetadataInfo metadataInfo =
CompactionEstimateUtils.collectMetadataInfo(
resources,
resources.get(0).isSeq()
? CompactionType.INNER_SEQ_COMPACTION
: CompactionType.INNER_UNSEQ_COMPACTION);
if (metadataCost < 0) {
return metadataCost;
}
int maxConcurrentSeriesNum =
Math.max(
config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum());
int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(resources);
// source files (chunk + uncompressed page) * overlap file num
// target file (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize)
+ fixedMemoryBudget
+ metadataCost;
+ metadataInfo.metadataMemCost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,27 +83,24 @@ protected long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) throws IOE
@Override
public long roughEstimateCrossCompactionMemory(
List<TsFileResource> seqResources, List<TsFileResource> unseqResources) throws IOException {
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
List<TsFileResource> sourceFiles = new ArrayList<>(seqResources.size() + unseqResources.size());
sourceFiles.addAll(seqResources);
sourceFiles.addAll(unseqResources);

long metadataCost =
CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
sourceFiles, CompactionType.CROSS_COMPACTION);
if (metadataCost < 0) {
return metadataCost;
}
MetadataInfo metadataInfo =
CompactionEstimateUtils.collectMetadataInfo(sourceFiles, CompactionType.CROSS_COMPACTION);

int maxConcurrentSeriesNum =
Math.max(
config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum());
int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
int maxOverlapFileNum = calculatingMaxOverlapFileNumInSubCompactionTask(sourceFiles);
// source files (chunk + uncompressed page) * overlap file num
// target files (chunk + unsealed page writer)
return (maxOverlapFileNum + 1) * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize)
+ fixedMemoryBudget
+ metadataCost;
+ metadataInfo.metadataMemCost;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iotdb.db.storageengine.dataregion.compaction.selector.estimator;

import org.apache.iotdb.db.conf.IoTDBDescriptor;

class MetadataInfo {
public long metadataMemCost;
public boolean hasAlignedSeries;

public int getMaxConcurrentSeriesNum() {
if (!hasAlignedSeries) {
return IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum();
}
int compactionMaxAlignedSeriesNumInOneBatch =
IoTDBDescriptor.getInstance().getConfig().getCompactionMaxAlignedSeriesNumInOneBatch();
compactionMaxAlignedSeriesNumInOneBatch =
compactionMaxAlignedSeriesNumInOneBatch <= 0
? Integer.MAX_VALUE
: compactionMaxAlignedSeriesNumInOneBatch;
return Math.max(
compactionMaxAlignedSeriesNumInOneBatch,
IoTDBDescriptor.getInstance().getConfig().getSubCompactionTaskNum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,18 @@ public long calculatingDataMemoryCost(CompactionTaskInfo taskInfo) {
@Override
public long roughEstimateInnerCompactionMemory(List<TsFileResource> resources)
throws IOException {
long metadataCost =
CompactionEstimateUtils.roughEstimateMetadataCostInCompaction(
resources, CompactionType.INNER_SEQ_COMPACTION);
if (metadataCost < 0) {
return metadataCost;
if (config.getCompactionMaxAlignedSeriesNumInOneBatch() <= 0) {
return -1L;
}
int maxConcurrentSeriesNum =
Math.max(
config.getCompactionMaxAlignedSeriesNumInOneBatch(), config.getSubCompactionTaskNum());
MetadataInfo metadataInfo =
CompactionEstimateUtils.collectMetadataInfo(resources, CompactionType.INNER_SEQ_COMPACTION);
int maxConcurrentSeriesNum = metadataInfo.getMaxConcurrentSeriesNum();
long maxChunkSize = config.getTargetChunkSize();
long maxPageSize = tsFileConfig.getPageSizeInByte();
// source files (chunk + uncompressed page)
// target file (chunk + unsealed page writer)
return 2 * maxConcurrentSeriesNum * (maxChunkSize + maxPageSize)
+ fixedMemoryBudget
+ metadataCost;
+ metadataInfo.metadataMemCost;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.iotdb.db.exception.MergeException;
import org.apache.iotdb.db.service.metrics.CompactionMetrics;
import org.apache.iotdb.db.storageengine.dataregion.compaction.constant.CompactionTaskType;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionSourceFileDeletedException;
import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext;
import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager;
Expand Down Expand Up @@ -140,6 +141,8 @@ public CrossCompactionTaskResource selectOneTaskResources(CrossSpaceCompactionCa
candidate.getUnseqFiles().size());

return executeTaskResourceSelection(candidate);
} catch (CompactionSourceFileDeletedException e) {
return new CrossCompactionTaskResource();
} catch (Exception e) {
if (e instanceof StopReadTsFileByInterruptException || Thread.interrupted()) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -227,7 +230,7 @@ private CrossCompactionTaskResource executeTaskResourceSelection(
compactionEstimator.roughEstimateCrossCompactionMemory(
newSelectedSeqResources, newSelectedUnseqResources);
long memoryCost =
CompactionEstimateUtils.shouldAccurateEstimate(roughEstimatedMemoryCost)
CompactionEstimateUtils.shouldUseRoughEstimatedResult(roughEstimatedMemoryCost)
? roughEstimatedMemoryCost
: compactionEstimator.estimateCrossCompactionMemory(
newSelectedSeqResources, newSelectedUnseqResources);
Expand Down
Loading

0 comments on commit ff13ec7

Please sign in to comment.