Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ build/
*.dat

# Downloaded released BookKeeper versions (cached by CI, not committed)
.released-versions/
.released-versions/
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ void setExplicitLac(ByteBuf entry, WriteCallback writeCallback, Object ctx, byte
// TODO: Shouldn't this be async?
ByteBuf readEntry(long ledgerId, long entryId)
throws IOException, NoLedgerException, BookieException;

/**
* Read a ledger entry only when it can fit the provided bound.
*
* <p>{@code maxEntrySize} includes the 4-byte per-entry delimiter used by batched-read response framing.
* Implementations return {@code null} when the entry exists but {@code entry.readableBytes() + 4}
* exceeds {@code maxEntrySize}.
*/
ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
throws IOException, NoLedgerException, BookieException;
long readLastAddConfirmed(long ledgerId) throws IOException, BookieException;
PrimitiveIterator.OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedgerException;

Expand Down Expand Up @@ -127,4 +137,4 @@ public long getEntry() {
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,36 @@ public ByteBuf readEntry(long ledgerId, long entryId)
}
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long maxEntrySize)
throws IOException, NoLedgerException, BookieException {
long requestNanos = MathUtils.nowInNano();
boolean success = false;
int entrySize = 0;
try {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
if (LOG.isTraceEnabled()) {
LOG.trace("Reading {}@{} with maxEntrySize {}", entryId, ledgerId, maxEntrySize);
}
ByteBuf entry = handle.readEntryIfFits(entryId, maxEntrySize);
if (entry != null) {
entrySize = entry.readableBytes();
bookieStats.getReadBytes().addCount(entrySize);
}
success = true;
return entry;
} finally {
long elapsedNanos = MathUtils.elapsedNanos(requestNanos);
if (success) {
bookieStats.getReadEntryStats().registerSuccessfulEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadBytesStats().registerSuccessfulValue(entrySize);
} else {
bookieStats.getReadEntryStats().registerFailedEvent(elapsedNanos, TimeUnit.NANOSECONDS);
bookieStats.getReadBytesStats().registerFailedValue(entrySize);
}
}
}

public long readLastAddConfirmed(long ledgerId) throws IOException, BookieException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
return handle.getLastAddConfirmed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,41 @@ public ByteBuf readEntry(long location) throws IOException, Bookie.NoEntryExcept
return internalReadEntry(-1L, -1L, location, false /* validateEntry */);
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, Bookie.NoEntryException {
long entryLogId = logIdForOffset(entryLocation);
long pos = posForOffset(entryLocation);

BufferedReadChannel fc = null;
int entrySize;
try {
fc = getFCForEntryInternal(ledgerId, entryId, entryLogId, pos);

ByteBuf sizeBuff = readEntrySize(ledgerId, entryId, entryLogId, pos, fc);
entrySize = sizeBuff.getInt(0);
validateEntry(ledgerId, entryId, entryLogId, pos, sizeBuff);
} catch (EntryLookupException e) {
throw new IOException("Bad entry read from log file id: " + entryLogId, e);
}

if (entrySize + Integer.BYTES > maxEntrySize) {
return null;
}

ByteBuf data = allocator.buffer(entrySize, entrySize);
int rc = readFromLogChannel(entryLogId, fc, data, pos);
if (rc != entrySize) {
ReferenceCountUtil.release(data);
throw new IOException("Bad entry read from log file id: " + entryLogId,
new EntryLookupException("Short read for " + ledgerId + "@"
+ entryId + " in " + entryLogId + "@"
+ pos + "(" + rc + "!=" + entrySize + ")"));
}
data.writerIndex(entrySize);
return data;
}


private ByteBuf internalReadEntry(long ledgerId, long entryId, long location, boolean validateEntry)
throws IOException, Bookie.NoEntryException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,44 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException {
}
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
entryId = ledgerCache.getLastEntry(ledgerId);
}

long offset;
long startTimeNanos = MathUtils.nowInNano();
boolean success = false;
try {
offset = ledgerCache.getEntryOffset(ledgerId, entryId);
if (offset == 0) {
throw new Bookie.NoEntryException(ledgerId, entryId);
}
success = true;
} finally {
if (success) {
getOffsetStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getOffsetStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}

startTimeNanos = MathUtils.nowInNano();
success = false;
try {
ByteBuf entry = entryLogger.readEntryIfFits(ledgerId, entryId, offset, maxEntrySize);
success = true;
return entry;
} finally {
if (success) {
getEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
} else {
getEntryStats.registerFailedEvent(MathUtils.elapsedNanos(startTimeNanos), TimeUnit.NANOSECONDS);
}
}
}

private void flushOrCheckpoint(boolean isCheckpointFlush)
throws IOException {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,21 @@ static ByteBuf createLedgerFenceEntry(Long ledgerId) {
abstract long addEntry(ByteBuf entry) throws IOException, BookieException;
abstract ByteBuf readEntry(long entryId) throws IOException, BookieException;

/**
* Read an entry only when it fits within {@code maxEntrySize}.
*
* <p>{@code maxEntrySize} includes the 4-byte per-entry delimiter used in batched-read response framing,
* so an exact fit is {@code entry.readableBytes() + 4 == maxEntrySize}.
*/
ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
ByteBuf entry = readEntry(entryId);
if (entry.readableBytes() + 4 > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

abstract long getLastAddConfirmed() throws IOException, BookieException;
abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
Watcher<LastAddConfirmedUpdateNotification> watcher)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,11 @@ ByteBuf readEntry(long entryId) throws IOException, BookieException {
return ledgerStorage.getEntry(ledgerId, entryId);
}

@Override
ByteBuf readEntryIfFits(long entryId, long maxEntrySize) throws IOException, BookieException {
return ledgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

@Override
long getLastAddConfirmed() throws IOException, BookieException {
return ledgerStorage.getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,21 @@ void initialize(ServerConfiguration conf,
*/
ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieException;

/**
* Read an entry from storage only if its serialized size, including the
* 4-byte per-entry framing delimiter, is less than or equal to maxEntrySize.
*
* <p>Returns {@code null} when the entry exists but does not fit the supplied budget.
*/
default ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
ByteBuf entry = getEntry(ledgerId, entryId);
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

/**
* Get last add confirmed.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,38 @@ public ByteBuf getEntry(long ledgerId, long entryId) throws IOException, BookieE
return buffToRet;
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
if (entryId == BookieProtocol.LAST_ADD_CONFIRMED) {
EntryKeyValue kv = memTable.getLastEntry(ledgerId);
if (kv != null) {
ByteBuf entry = kv.getValueAsByteBuffer();
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

try {
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
} catch (Bookie.NoEntryException nee) {
EntryKeyValue kv = memTable.getEntry(ledgerId, entryId);
if (kv == null) {
return interleavedLedgerStorage.getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

ByteBuf entry = kv.getValueAsByteBuffer();
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}
}

@Override
public long getLastAddConfirmed(long ledgerId) throws IOException {
return interleavedLedgerStorage.getLastAddConfirmed(ledgerId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,22 @@ ByteBuf readEntry(long entryLocation)
ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
throws IOException, NoEntryException;

/**
* Read an entry only if its serialized size, including the 4-byte per-entry
* framing delimiter, is less than or equal to maxEntrySize.
*
* <p>Returns {@code null} when the entry exists but does not fit the supplied budget.
*/
default ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, NoEntryException {
ByteBuf entry = readEntry(ledgerId, entryId, entryLocation);
if (entry.readableBytes() + Integer.BYTES > maxEntrySize) {
entry.release();
return null;
}
return entry;
}

/**
* Flush any outstanding writes to disk.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,42 @@ public ByteBuf readEntry(long ledgerId, long entryId, long entryLocation)
return internalReadEntry(ledgerId, entryId, entryLocation, true);
}

@Override
public ByteBuf readEntryIfFits(long ledgerId, long entryId, long entryLocation, long maxEntrySize)
throws IOException, NoEntryException {
int logId = (int) (entryLocation >> 32);
int pos = (int) (entryLocation & 0xFFFFFFFF);

long start = System.nanoTime();
LogReader reader = getReader(logId);

try {
int entrySize = reader.readEntrySizeAt(pos);
long thisLedgerId = reader.readLongAt(pos);
long thisEntryId = reader.readLongAt(pos + Long.BYTES);
if (thisLedgerId != ledgerId || thisEntryId != entryId) {
throw new IOException(
exMsg("Bad location").kv("location", entryLocation)
.kv("expectedLedger", ledgerId).kv("expectedEntry", entryId)
.kv("foundLedger", thisLedgerId).kv("foundEntry", thisEntryId)
.toString());
}
if (entrySize + Integer.BYTES > maxEntrySize) {
stats.getReadEntryStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
return null;
}

ByteBuf buf = reader.readBufferAt(pos, entrySize);
stats.getReadEntryStats().registerSuccessfulEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
return buf;
} catch (EOFException eof) {
stats.getReadEntryStats().registerFailedEvent(System.nanoTime() - start, TimeUnit.NANOSECONDS);
throw new NoEntryException(
exMsg("Entry location doesn't exist").kv("location", entryLocation).toString(),
ledgerId, entryId);
}
}

private LogReader getReader(int logId) throws IOException {
Cache<Integer, LogReader> cache = caches.get();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private int readBytesIntoBuf(ByteBuf buf, long offset, int size) throws IOExcept
}

@Override
public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
public int readEntrySizeAt(int offset) throws IOException, EOFException {
assertValidEntryOffset(offset);
int sizeOffset = offset - Integer.BYTES;
if (sizeOffset < 0) {
Expand All @@ -188,6 +188,12 @@ public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
.kv("maxSaneEntrySize", maxSaneEntrySize)
.kv("readEntrySize", entrySize).toString());
}
return entrySize;
}

@Override
public ByteBuf readEntryAt(int offset) throws IOException, EOFException {
int entrySize = readEntrySizeAt(offset);
return readBufferAt(offset, entrySize);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ public interface LogReader extends AutoCloseable {
*/
long readLongAt(long offset) throws IOException, EOFException;

/**
* Read the size of an entry at a given offset.
* The size is stored at {@code offset - Integer.BYTES}.
*/
int readEntrySizeAt(int offset) throws IOException, EOFException;

/**
* Read an entry at a given offset.
* The size of the entry must be at (offset - Integer.BYTES).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,11 @@ public long getLocation(long ledgerId, long entryId) throws IOException {
return getLedgerStorage(ledgerId).getEntryLocationIndex().getLocation(ledgerId, entryId);
}

@Override
public ByteBuf getEntryIfFits(long ledgerId, long entryId, long maxEntrySize) throws IOException, BookieException {
return getLedgerStorage(ledgerId).getEntryIfFits(ledgerId, entryId, maxEntrySize);
}

private SingleDirectoryDbLedgerStorage getLedgerStorage(long ledgerId) {
return ledgerStorageList.get(MathUtils.signSafeMod(ledgerId, numberOfDirs));
}
Expand Down
Loading
Loading