Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ public void readEntriesComplete(int rc, long ledgerId, long startEntryId, ByteBu
heardFromHosts.add(rctx.to);
heardFromHostsBitSet.set(rctx.bookieIndex, true);

/*
* Retain the response while this read op handles it. complete() returns true only when it
* transfers the buffers into request.entries. For digest failures, duplicate responses, or
* other incomplete paths, complete() returns false and this retained reference is released here.
*/
bufList.retain();
// if entry has completed don't handle twice
if (entry.complete(rctx.bookieIndex, rctx.to, bufList)) {
Expand Down Expand Up @@ -160,32 +165,50 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) {
if (isComplete()) {
return false;
}
if (!complete.getAndSet(true)) {
for (int i = 0; i < bufList.size(); i++) {
ByteBuf buffer = bufList.getBuffer(i);
ByteBuf content;
try {
content = lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
} catch (BKException.BKDigestMatchException e) {
clientCtx.getClientStats().getReadOpDmCounter().inc();

/*
* Verify entries in order. If the first entry has a digest mismatch, retry the read from
* another replica. If a later entry fails, return the verified prefix; batch reads are allowed
* to return fewer than maxCount entries.
*/
int verifiedEntries = 0;
for (int i = 0; i < bufList.size(); i++) {
ByteBuf buffer = bufList.getBuffer(i);
try {
lh.macManager.verifyDigestAndReturnData(eId + i, buffer);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One improvement is to return the entries before the digest mismatch one

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. The batch read now returns the verified prefix before a later digest mismatch, while preserving retry behavior when the first entry mismatches. Added test coverage in testDigestMismatchAfterPartialVerificationReturnsVerifiedPrefix. Pushed as a5ee51a.

verifiedEntries++;
} catch (BKException.BKDigestMatchException e) {
clientCtx.getClientStats().getReadOpDmCounter().inc();
if (verifiedEntries == 0) {
logErrorAndReattemptRead(bookieIndex, host, "Mac mismatch",
BKException.Code.DigestMatchException);
return false;
}
rc = BKException.Code.OK;
break;
}
}

if (complete.compareAndSet(false, true)) {
rc = BKException.Code.OK;
for (int i = 0; i < verifiedEntries; i++) {
ByteBuf buffer = bufList.getBuffer(i);
/*
* The length is a long and it is the last field of the metadata of an entry.
* Consequently, we have to subtract 8 from METADATA_LENGTH to get the length.
*/
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
LedgerEntryImpl entryImpl = LedgerEntryImpl.create(lh.ledgerId, startEntryId + i);
entryImpl.setLength(buffer.getLong(DigestManager.METADATA_LENGTH - 8));
entryImpl.setEntryBuf(content);
entryImpl.setEntryBuf(buffer);
entries.add(entryImpl);
}
// These buffers are not transferred to LedgerEntryImpl, so release them here.
for (int i = verifiedEntries; i < bufList.size(); i++) {
bufList.getBuffer(i).release();
}
writeSet.recycle();
return true;
} else {
writeSet.recycle();
// Another response completed the request first; readEntriesComplete() releases bufList.
return false;
}
}
Expand Down Expand Up @@ -329,4 +352,4 @@ boolean complete(int bookieIndex, BookieId host, final ByteBufList bufList) {
return completed;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,29 @@
package org.apache.bookkeeper.client;

import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.junit.Test;
Expand Down Expand Up @@ -289,4 +298,105 @@ public void testReadFailureWithFailedBookies() throws Exception {
lh.close();
newBk.close();
}

@Test
public void testDigestMismatchRetriesNextReplicaAndCompletes() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
.setUseV2WireProtocol(true)
.setReorderReadSequenceEnabled(false)
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
byte[] data = "batch-digest-data".getBytes(StandardCharsets.UTF_8);
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
writer.addEntry(data);
long ledgerId = writer.getId();
BookieId corruptReplica = writer.getLedgerMetadata().getAllEnsembles().get(0L).get(0);
writer.close();

ServerConfiguration corruptConf = killBookie(corruptReplica);
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf));

LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd);
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 1, 1024, false);
readOp.submit();

Iterator<LedgerEntry> entries = readOp.future().get().iterator();
assertTrue(entries.hasNext());
LedgerEntry entry = entries.next();
assertArrayEquals(data, entry.getEntryBytes());
entry.close();
assertFalse(entries.hasNext());
reader.close();
}
}

@Test
public void testDigestMismatchAfterPartialVerificationReturnsVerifiedPrefix() throws Exception {
ClientConfiguration conf = new ClientConfiguration(baseClientConf)
.setUseV2WireProtocol(true)
.setReorderReadSequenceEnabled(false)
.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

try (BookKeeperTestClient bk = new BookKeeperTestClient(conf)) {
byte[] entry0 = "batch-digest-entry-0".getBytes(StandardCharsets.UTF_8);
byte[] entry1 = "batch-digest-entry-1".getBytes(StandardCharsets.UTF_8);
LedgerHandle writer = bk.createLedger(3, 3, 2, digestType, passwd);
writer.addEntry(entry0);
writer.addEntry(entry1);
long ledgerId = writer.getId();
List<BookieId> ensemble = writer.getLedgerMetadata().getAllEnsembles().get(0L);
BookieId corruptReplica = ensemble.get(0);
writer.close();

CountDownLatch corruptReadLatch = new CountDownLatch(1);
ServerConfiguration corruptConf = killBookie(corruptReplica);
startAndAddBookie(corruptConf, new CorruptReadBookie(corruptConf, 1L, corruptReadLatch));

LedgerHandle reader = bk.openLedger(ledgerId, digestType, passwd);
BatchedReadOp readOp = new BatchedReadOp(reader, bk.getClientCtx(), 0, 2, 2048, false);
readOp.submit();

assertTrue("corrupt replica did not read the corrupted entry",
corruptReadLatch.await(10, TimeUnit.SECONDS));
Iterator<LedgerEntry> entries = readOp.future().get(10, TimeUnit.SECONDS).iterator();
assertTrue(entries.hasNext());
LedgerEntry first = entries.next();
assertArrayEquals(entry0, first.getEntryBytes());
first.close();
assertFalse(entries.hasNext());
reader.close();
}
}

static class CorruptReadBookie extends TestBookieImpl {
private final long corruptEntryId;
private final CountDownLatch corruptReadLatch;

CorruptReadBookie(ServerConfiguration conf) throws Exception {
this(conf, -1L, null);
}

CorruptReadBookie(ServerConfiguration conf, long corruptEntryId, CountDownLatch corruptReadLatch)
throws Exception {
super(conf);
this.corruptEntryId = corruptEntryId;
this.corruptReadLatch = corruptReadLatch;
}

@Override
public ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, Bookie.NoLedgerException, BookieException {
ByteBuf localBuf = super.readEntry(ledgerId, entryId);
if (corruptEntryId < 0 || corruptEntryId == entryId) {
for (int i = 0; i < localBuf.capacity(); i++) {
localBuf.setByte(i, 0);
}
if (corruptReadLatch != null) {
corruptReadLatch.countDown();
}
}
return localBuf;
}
}
}