Skip to content

Commit

Permalink
ITERATED_COUNT_REPORT_FREQ
Browse files Browse the repository at this point in the history
  • Loading branch information
VGalaxies committed Jan 10, 2025
1 parent db27e28 commit 78422c3
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class SubscriptionEvent {
// record file name for file payload
private String fileName;

private static final long NACK_COUNT_THRESHOLD = 3;
private static final long NACK_COUNT_REPORT_THRESHOLD = 3;
private final AtomicLong nackCount = new AtomicLong();

/**
Expand Down Expand Up @@ -218,7 +218,7 @@ public void nack() {
lastPolledTimestamp.set(INVALID_TIMESTAMP);

// record nack count
if (nackCount.getAndIncrement() > NACK_COUNT_THRESHOLD) {
if (nackCount.getAndIncrement() > NACK_COUNT_REPORT_THRESHOLD) {
LOGGER.warn("{} has been nacked {} times", this, nackCount);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch
implements Iterator<List<Tablet>> {
Expand All @@ -62,6 +63,10 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch

private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();

private static final long ITERATED_COUNT_REPORT_FREQ =
30000; // based on the full parse of a 128MB tsfile estimate
private final AtomicLong iteratedCount = new AtomicLong();

public SubscriptionPipeTabletEventBatch(
final int regionId,
final SubscriptionPrefetchingTabletQueue prefetchingQueue,
Expand Down Expand Up @@ -230,6 +235,23 @@ public boolean hasNext() {

@Override
public List<Tablet> next() {
final List<Tablet> tablets = nextInternal();
if (Objects.isNull(tablets)) {
return null;
}
if (iteratedCount.incrementAndGet() % ITERATED_COUNT_REPORT_FREQ == 0) {
LOGGER.info(
"{} has been iterated {} times, current TsFileInsertionEvent {}",
this,
iteratedCount,
Objects.isNull(currentTsFileInsertionEvent)
? "<unknown>"
: ((EnrichedEvent) currentTsFileInsertionEvent).coreReportMessage());
}
return tablets;
}

private List<Tablet> nextInternal() {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
if (currentTabletInsertionEventsIterator.hasNext()) {
final TabletInsertionEvent tabletInsertionEvent =
Expand Down

0 comments on commit 78422c3

Please sign in to comment.