diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java index 9607c41a94b7..2bdfda64ef41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/tsfile/PipeTsFileResourceManager.java @@ -98,7 +98,10 @@ private void ttlCheck() throws InterruptedException { } else { logBuilder.append( String.format( - "<%s , %d times> ", entry.getKey(), entry.getValue().getReferenceCount())); + "<%s , %d times, %d bytes> ", + entry.getKey(), + entry.getValue().getReferenceCount(), + entry.getValue().getFileSize())); } } catch (final IOException e) { LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 20a0d0983f05..5afbf2128293 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -357,6 +357,8 @@ private void tryPrefetch() { "Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.", this, event); + ((EnrichedEvent) event) + .decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false); if (onEvent()) { return; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java index 97fa621faefc..2db5bc7a4b02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/SubscriptionEvent.java @@ -65,6 +65,9 @@ public class SubscriptionEvent { // record file name for file payload private String fileName; + private static final long NACK_COUNT_REPORT_THRESHOLD = 3; + private final AtomicLong nackCount = new AtomicLong(); + /** * Constructs a {@link SubscriptionEvent} with the response type of {@link * SubscriptionEventSingleResponse}. @@ -143,11 +146,11 @@ public boolean isCommittable() { } public void ack(final Consumer onCommittedHook) { - // ack response - response.ack(onCommittedHook); - - // ack pipe events + // NOTE: we should ack pipe events before ack response since multiple events may reuse the same + // batch (as pipe events) + // TODO: consider more elegant design for this method pipeEvents.ack(); + response.ack(onCommittedHook); } /** @@ -156,11 +159,8 @@ public void ack(final Consumer onCommittedHook) { * SubscriptionPrefetchingQueue} or {@link SubscriptionPrefetchingQueue#cleanUp}. */ public void cleanUp() { - // reset serialized responses - response.cleanUp(); - - // clean up pipe events pipeEvents.cleanUp(); + response.cleanUp(); // TODO: clean more fields } @@ -216,6 +216,11 @@ public void nack() { // reset lastPolledTimestamp makes this event pollable lastPolledTimestamp.set(INVALID_TIMESTAMP); + + // record nack count + if (nackCount.getAndIncrement() > NACK_COUNT_REPORT_THRESHOLD) { + LOGGER.warn("{} has been nacked {} times", this, nackCount); + } } public void recordLastPolledConsumerId(final String consumerId) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java index 59528125f497..1031e1e22796 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTabletEventBatch.java @@ -43,6 +43,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.atomic.AtomicLong; public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch implements Iterator> { @@ -62,6 +63,10 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch private final List iteratedEnrichedEvents = new ArrayList<>(); + private static final long ITERATED_COUNT_REPORT_FREQ = + 30000; // based on the full parse of a 128MB tsfile estimate + private final AtomicLong iteratedCount = new AtomicLong(); + public SubscriptionPipeTabletEventBatch( final int regionId, final SubscriptionPrefetchingTabletQueue prefetchingQueue, @@ -230,6 +235,23 @@ public boolean hasNext() { @Override public List next() { + final List tablets = nextInternal(); + if (Objects.isNull(tablets)) { + return null; + } + if (iteratedCount.incrementAndGet() % ITERATED_COUNT_REPORT_FREQ == 0) { + LOGGER.info( + "{} has been iterated {} times, current TsFileInsertionEvent {}", + this, + iteratedCount, + Objects.isNull(currentTsFileInsertionEvent) + ? "" + : ((EnrichedEvent) currentTsFileInsertionEvent).coreReportMessage()); + } + return tablets; + } + + private List nextInternal() { if (Objects.nonNull(currentTabletInsertionEventsIterator)) { if (currentTabletInsertionEventsIterator.hasNext()) { final TabletInsertionEvent tabletInsertionEvent =