Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issue where entries could not be read due to mismatch between the ensemble on metadata and the actual written bookies. #4194

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1994,11 +1994,14 @@ void ensembleChangeLoop(List<BookieId> origEnsemble, Map<Integer, BookieId> fail
replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId);

// Since changingEnsemble is true, processing in #sendAddSuccessCallbacks() is skipped.
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
changingEnsemble = false;
}
}
if (newEnsemble != null) { // unsetSuccess outside of lock
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
if (newEnsemble != null) {
// After changingEnsemble is changed to false, call #sendAddSuccessCallbacks().
sendAddSuccessCallbacks();
}
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
Expand All @@ -2010,6 +2013,21 @@ void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, final Set<Integer>
pendingAddOp.unsetSuccessAndSendWriteRequest(ensemble, bookieIndex);
}
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback to cover this case.
sendAddSuccessCallbacks();
}

void registerOperationFailureOnBookie(BookieId bookie, long entryId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,7 @@ synchronized void unsetSuccessAndSendWriteRequest(List<BookieId> ensemble, int b
}
// Suppose that unset doesn't happen on the write set of an entry. In this
// case we don't need to resend the write request upon an ensemble change.
// We do need to invoke #sendAddSuccessCallbacks() for such entries because
// they may have already completed, but they are just waiting for the ensemble
// to change.
// E.g.
// ensemble (A, B, C, D), entry k is written to (A, B, D). An ensemble change
// happens to replace C with E. Entry k does not complete until C is
// replaced with E successfully. When the ensemble change completes, it tries
// to unset entry k. C however is not in k's write set, so no entry is written
// again, and no one triggers #sendAddSuccessCallbacks. Consequently, k never
// completes.
//
// We call sendAddSuccessCallback when unsetting t cover this case.
if (!lh.distributionSchedule.hasEntry(entryId, bookieIndex)) {
lh.sendAddSuccessCallbacks();
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,7 @@ public void testConstructionZkDelay() throws Exception {
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(20000);

CountDownLatch l = new CountDownLatch(1);
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
l.await();
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS);

BookKeeper bkc = new BookKeeper(conf);
bkc.createLedger(digestType, "testPasswd".getBytes()).close();
Expand All @@ -109,9 +107,7 @@ public void testConstructionNotConnectedExplicitZk() throws Exception {
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri())
.setZkTimeout(20000);

CountDownLatch l = new CountDownLatch(1);
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS, l);
l.await();
zkUtil.sleepCluster(200, TimeUnit.MILLISECONDS);

ZooKeeper zk = new ZooKeeper(
zkUtil.getZooKeeperConnectString(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand All @@ -47,6 +48,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -1539,7 +1541,142 @@ public void testLedgerMetadataTest() throws Exception {
lh.close();
}

@Test
public void testToDelayEnsembleReplacementAndRewriteEntry() throws Exception {
lh = bkc.createLedger(4, 2, digestType, ledgerPassword);

// Put Bookie0 to sleep.
List<BookieId> currentEnsemble = lh.getLedgerMetadata()
.getAllEnsembles().entrySet().iterator().next().getValue();
CountDownLatch bookie0Latch = new CountDownLatch(1);
sleepBookie(currentEnsemble.get(0), bookie0Latch);

// Write entry0,1,2,3 to Bookie.
int sendCount = 7;
CountDownLatch addCompleteLatch = new CountDownLatch(sendCount);
for (int count = 0; count < 4; count++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(count);
entry.position(0);
entries1.add(entry.array());

lh.asyncAddEntry(entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
CountDownLatch addCompleteLatch = (CountDownLatch) ctx;
addCompleteLatch.countDown();
}
}, addCompleteLatch);
}

// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0.
Field fieldPendingAddOps = lh.getClass().getDeclaredField("pendingAddOps");
fieldPendingAddOps.setAccessible(true);
int completedCount;
do {
Thread.sleep(100);

completedCount = 0;
for (PendingAddOp pendingAddOp : (Queue<PendingAddOp>) fieldPendingAddOps.get(lh)) {
if (pendingAddOp.completed) {
completedCount++;
}
}
} while (completedCount != 2);

// Kill Bookie2,3 and start a new Bookie.
killBookie(currentEnsemble.get(2));
killBookie(currentEnsemble.get(3));
startNewBookie();

// Put ZK cluster to sleep to delay ensemble replacement.
CountDownLatch zkLatch = new CountDownLatch(1);
sleepZKCluster(zkLatch);

// Write entry4,5,6 to Bookie.
for (int count = 4; count < sendCount; count++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(count);
entry.position(0);
entries1.add(entry.array());

lh.asyncAddEntry(entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
CountDownLatch addCompleteLatch = (CountDownLatch) ctx;
addCompleteLatch.countDown();
}
}, addCompleteLatch);
}

// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Waiting for successful write to Bookie0.
// entry: 4(Bookie0, Bookie1) -> Waiting for successful write to Bookie0.
// entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2.
// entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3.
Field fieldChangingEnsemble = lh.getClass().getDeclaredField("changingEnsemble");
fieldChangingEnsemble.setAccessible(true);
boolean changingEnsemble;
do {
Thread.sleep(100);

changingEnsemble = (boolean) fieldChangingEnsemble.get(lh);
} while (!changingEnsemble);

// Bookie0 is wake up, write to Bookie0 is successful.
//
// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 1(Bookie1, Bookie2) -> Writing to Bookie1,2 was successful, but its completion is pending.
// entry: 2(Bookie2, Bookie3) -> Writing to Bookie2,3 was successful, but its completion is pending.
// entry: 3(Bookie3, Bookie0) -> Writing to Bookie3,0 was successful, but its completion is pending.
// entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 5(Bookie1, Bookie2) -> Failed to write to Bookie2.
// entry: 6(Bookie2, Bookie3) -> Failed to write to Bookie2,3.
bookie0Latch.countDown();
do {
Thread.sleep(100);

completedCount = 0;
for (PendingAddOp pendingAddOp : (Queue<PendingAddOp>) fieldPendingAddOps.get(lh)) {
if (pendingAddOp.completed) {
completedCount++;
}
}
} while (completedCount != 5);

// ZK cluster is wake up, then ensemble replacement is completed.
//
// Expected state of entries.
// entry: 0(Bookie0, Bookie1) -> Entry write is completed.
// entry: 1(Bookie1, Bookie4) -> Write to Bookie4.
// entry: 2(Bookie4, Bookie5) -> Write to Bookie4,5.
// entry: 3(Bookie5, Bookie0) -> Write to Bookie5.
// entry: 4(Bookie0, Bookie1) -> Writing to Bookie0,1 was successful, but its completion is pending.
// entry: 5(Bookie1, Bookie4) -> Write to Bookie4.
// entry: 6(Bookie4, Bookie5) -> Write to Bookie4,5.
zkLatch.countDown();

// Waiting for all Entry writes to complete.
addCompleteLatch.await();

readEntries(lh, entries1, sendCount);
lh.close();
}

private void readEntries(LedgerHandle lh, List<byte[]> entries) throws InterruptedException, BKException {
readEntries(lh, entries, numEntriesToWrite);
}

private void readEntries(LedgerHandle lh, List<byte[]> entries, int numEntriesToWrite)
throws InterruptedException, BKException {
ls = lh.readEntries(0, numEntriesToWrite - 1);
int index = 0;
while (ls.hasMoreElements()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,11 @@ protected void stopZKCluster() throws Exception {
zkUtil.killCluster();
}

protected void sleepZKCluster(final CountDownLatch l)
throws InterruptedException, IOException {
zkUtil.sleepCluster(l);
}

/**
* Start cluster. Also, starts the auto recovery process for each bookie, if
* isAutoRecoveryEnabled is true.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ public interface ZooKeeperCluster {

void killCluster() throws Exception;

void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l)
void sleepCluster(int time, TimeUnit timeUnit)
throws InterruptedException, IOException;

void sleepCluster(CountDownLatch l)
throws InterruptedException, IOException;

default void expireSession(ZooKeeper zk) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,12 @@ public void killCluster() throws Exception {
}

@Override
public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException {
public void sleepCluster(int time, TimeUnit timeUnit) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}

@Override
public void sleepCluster(CountDownLatch l) throws InterruptedException, IOException {
throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,19 +140,19 @@ public void restartCluster() throws Exception {

@Override
public void sleepCluster(final int time,
final TimeUnit timeUnit,
final CountDownLatch l)
final TimeUnit timeUnit)
throws InterruptedException, IOException {
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
final CountDownLatch suspendLatch = new CountDownLatch(1);
Thread sleeper = new Thread() {
@SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
l.countDown();
suspendLatch.countDown();
timeUnit.sleep(time);
t.resume();
} catch (Exception e) {
Expand All @@ -161,6 +161,36 @@ public void run() {
}
};
sleeper.start();
suspendLatch.await();
return;
}
}
throw new IOException("ZooKeeper thread not found");
}

@Override
public void sleepCluster(final CountDownLatch l)
throws InterruptedException, IOException {
Thread[] allthreads = new Thread[Thread.activeCount()];
Thread.enumerate(allthreads);
for (final Thread t : allthreads) {
if (t.getName().contains("SyncThread:0")) {
final CountDownLatch suspendLatch = new CountDownLatch(1);
Thread sleeper = new Thread() {
@SuppressWarnings("deprecation")
public void run() {
try {
t.suspend();
suspendLatch.countDown();
l.await();
t.resume();
} catch (Exception e) {
LOG.error("Error suspending thread", e);
}
}
};
sleeper.start();
suspendLatch.await();
return;
}
}
Expand Down