Skip to content

Commit

Permalink
Pipe: report linked tsfile size & Subscription: decrease reference co…
Browse files Browse the repository at this point in the history
…unt for other enriched events & add logging to observe possible stuck situations (apache#14668)
  • Loading branch information
VGalaxies committed Jan 10, 2025
1 parent a8d21d8 commit f6082b9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,10 @@ private void ttlCheck() throws InterruptedException {
} else {
logBuilder.append(
String.format(
"<%s , %d times> ", entry.getKey(), entry.getValue().getReferenceCount()));
"<%s , %d times, %d bytes> ",
entry.getKey(),
entry.getValue().getReferenceCount(),
entry.getValue().getFileSize()));
}
} catch (final IOException e) {
LOGGER.warn("failed to close PipeTsFileResource when checking TTL: ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ private void tryPrefetch() {
"Subscription: SubscriptionPrefetchingQueue {} ignore EnrichedEvent {} when prefetching.",
this,
event);
((EnrichedEvent) event)
.decreaseReferenceCount(SubscriptionPrefetchingQueue.class.getName(), false);
if (onEvent()) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ public class SubscriptionEvent {
// record file name for file payload
private String fileName;

private static final long NACK_COUNT_REPORT_THRESHOLD = 3;
private final AtomicLong nackCount = new AtomicLong();

/**
* Constructs a {@link SubscriptionEvent} with the response type of {@link
* SubscriptionEventSingleResponse}.
Expand Down Expand Up @@ -143,11 +146,11 @@ public boolean isCommittable() {
}

public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
// ack response
response.ack(onCommittedHook);

// ack pipe events
// NOTE: we should ack pipe events before ack response since multiple events may reuse the same
// batch (as pipe events)
// TODO: consider more elegant design for this method
pipeEvents.ack();
response.ack(onCommittedHook);
}

/**
Expand All @@ -156,11 +159,8 @@ public void ack(final Consumer<SubscriptionEvent> onCommittedHook) {
* SubscriptionPrefetchingQueue} or {@link SubscriptionPrefetchingQueue#cleanUp}.
*/
public void cleanUp() {
// reset serialized responses
response.cleanUp();

// clean up pipe events
pipeEvents.cleanUp();
response.cleanUp();

// TODO: clean more fields
}
Expand Down Expand Up @@ -216,6 +216,11 @@ public void nack() {

// reset lastPolledTimestamp makes this event pollable
lastPolledTimestamp.set(INVALID_TIMESTAMP);

// record nack count
if (nackCount.getAndIncrement() > NACK_COUNT_REPORT_THRESHOLD) {
LOGGER.warn("{} has been nacked {} times", this, nackCount);
}
}

public void recordLastPolledConsumerId(final String consumerId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch
implements Iterator<List<Tablet>> {
Expand All @@ -62,6 +63,10 @@ public class SubscriptionPipeTabletEventBatch extends SubscriptionPipeEventBatch

private final List<EnrichedEvent> iteratedEnrichedEvents = new ArrayList<>();

private static final long ITERATED_COUNT_REPORT_FREQ =
30000; // based on the full parse of a 128MB tsfile estimate
private final AtomicLong iteratedCount = new AtomicLong();

public SubscriptionPipeTabletEventBatch(
final int regionId,
final SubscriptionPrefetchingTabletQueue prefetchingQueue,
Expand Down Expand Up @@ -230,6 +235,23 @@ public boolean hasNext() {

@Override
public List<Tablet> next() {
final List<Tablet> tablets = nextInternal();
if (Objects.isNull(tablets)) {
return null;
}
if (iteratedCount.incrementAndGet() % ITERATED_COUNT_REPORT_FREQ == 0) {
LOGGER.info(
"{} has been iterated {} times, current TsFileInsertionEvent {}",
this,
iteratedCount,
Objects.isNull(currentTsFileInsertionEvent)
? "<unknown>"
: ((EnrichedEvent) currentTsFileInsertionEvent).coreReportMessage());
}
return tablets;
}

private List<Tablet> nextInternal() {
if (Objects.nonNull(currentTabletInsertionEventsIterator)) {
if (currentTabletInsertionEventsIterator.hasNext()) {
final TabletInsertionEvent tabletInsertionEvent =
Expand Down

0 comments on commit f6082b9

Please sign in to comment.