Skip to content

Commit dd35ea2

Browse files
committed
OPT: Make Buffer shift explicit in C++
1 parent e0a40bf commit dd35ea2

7 files changed

Lines changed: 54 additions & 30 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Enhancements
66
- Added new publisher values for Cboe Titanium Cboe Global Indices Feed
7+
- Made `detail::Buffer` shifts explicit to avoid redundant moves during record decoding
78
- Added `Year` to `SplitDuration` enum for yearly historical batch job submissions
89

910
## 0.54.0 - 2026-04-21

include/databento/detail/buffer.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,7 @@ class Buffer : public IReadable, public IWritable {
4646
std::byte* ReadEnd() { return write_pos_; }
4747
const std::byte* ReadBegin() const { return read_pos_; }
4848
const std::byte* ReadEnd() const { return write_pos_; }
49-
// Indicate how many bytes were read
50-
void Consume(std::size_t length) {
51-
read_pos_ += length;
52-
if (static_cast<std::size_t>(read_pos_ - buf_.get()) > (Capacity() / 2)) {
53-
Shift();
54-
}
55-
}
56-
void ConsumeNoShift(std::size_t length) { read_pos_ += length; }
49+
void Consume(std::size_t length) { read_pos_ += length; }
5750
std::size_t ReadCapacity() const {
5851
return static_cast<std::size_t>(write_pos_ - read_pos_);
5952
}
@@ -65,6 +58,13 @@ class Buffer : public IReadable, public IWritable {
6558
}
6659
void Reserve(std::size_t capacity);
6760
void Shift();
61+
// Shifts unread data to offset 0 if writable space is less than `needed`,
62+
// reclaiming the consumed prefix. Does not grow the buffer.
63+
void ShiftForSpace(std::size_t needed) {
64+
if (WriteCapacity() < needed && read_pos_ != buf_.get()) {
65+
Shift();
66+
}
67+
}
6868

6969
friend std::ostream& operator<<(std::ostream& stream, const Buffer& buffer);
7070

src/dbn_decoder.cpp

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -293,16 +293,14 @@ const databento::Record* DbnDecoder::DecodeRecord() {
293293
}
294294
}
295295
current_record_ = Record{BufferRecordHeader()};
296-
buffer_.ConsumeNoShift(current_record_.Size());
296+
buffer_.Consume(current_record_.Size());
297297
current_record_ = DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, ts_out_,
298298
&compat_buffer_, current_record_);
299299
return &current_record_;
300300
}
301301

302302
size_t DbnDecoder::FillBuffer() {
303-
if (buffer_.WriteCapacity() < kMaxRecordLen) {
304-
buffer_.Shift();
305-
}
303+
buffer_.ShiftForSpace(kMaxRecordLen);
306304
const auto fill_size =
307305
input_->ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity());
308306
buffer_.Fill(fill_size);

src/detail/buffer.cpp

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ size_t Buffer::Write(const char* data, std::size_t length) {
1313
return Write(reinterpret_cast<const std::byte*>(data), length);
1414
}
1515
size_t Buffer::Write(const std::byte* data, std::size_t length) {
16-
if (length > WriteCapacity()) {
17-
Shift();
18-
}
16+
ShiftForSpace(length);
1917
const auto write_size = std::min(WriteCapacity(), length);
2018
std::copy(data, data + write_size, WriteBegin());
2119
Fill(write_size);
@@ -28,8 +26,8 @@ void Buffer::WriteAll(const char* data, std::size_t length) {
2826
void Buffer::WriteAll(const std::byte* data, std::size_t length) {
2927
if (length > Capacity() - ReadCapacity()) {
3028
Reserve(ReadCapacity() + length);
31-
} else if (length >= WriteCapacity()) {
32-
Shift();
29+
} else {
30+
ShiftForSpace(length);
3331
}
3432
std::copy(data, data + length, WriteBegin());
3533
write_pos_ += length;

src/detail/dbn_buffer_decoder.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ using databento::detail::DbnBufferDecoder;
1010
databento::KeepGoing DbnBufferDecoder::Process(const char* data, std::size_t length) {
1111
zstd_buffer_->WriteAll(data, length);
1212
while (true) {
13+
dbn_buffer_.ShiftForSpace(kMaxRecordLen);
1314
const auto read_size =
1415
zstd_stream_.ReadSome(dbn_buffer_.WriteBegin(), dbn_buffer_.WriteCapacity());
1516
dbn_buffer_.Fill(read_size);

src/live_blocking.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,7 +482,7 @@ databento::IReadable::Result LiveBlocking::FillBuffer() {
482482

483483
databento::IReadable::Result LiveBlocking::FillBuffer(
484484
std::chrono::milliseconds timeout) {
485-
buffer_.Shift();
485+
buffer_.ShiftForSpace(kMaxRecordLen);
486486
const auto read_res =
487487
connection_.ReadSome(buffer_.WriteBegin(), buffer_.WriteCapacity(), timeout);
488488
buffer_.Fill(read_res.read_size);
@@ -494,7 +494,7 @@ databento::IReadable::Result LiveBlocking::FillBuffer(
494494

495495
const databento::Record* LiveBlocking::ConsumeBufferedRecord() {
496496
current_record_ = Record{BufferRecordHeader()};
497-
buffer_.ConsumeNoShift(current_record_.Size());
497+
buffer_.Consume(current_record_.Size());
498498
current_record_ = DbnDecoder::DecodeRecordCompat(
499499
version_, upgrade_policy_, send_ts_out_, &compat_buffer_, current_record_);
500500
return &current_record_;

tests/src/buffer_tests.cpp

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ namespace databento::detail::tests {
1111
TEST(BufferTests, TestWriteAllPastCapacity) {
1212
Buffer target{10};
1313
target.Fill(4);
14-
target.ConsumeNoShift(2);
14+
target.Consume(2);
1515
ASSERT_EQ(target.WriteCapacity(), 6);
1616
ASSERT_EQ(target.ReadCapacity(), 2);
1717
ASSERT_EQ(target.Capacity(), 10);
@@ -25,7 +25,7 @@ TEST(BufferTests, TestWriteAllPastCapacity) {
2525
TEST(BufferTests, TestWriteAllShift) {
2626
Buffer target{20};
2727
target.WriteAll("TestWriteAllShift", 17);
28-
target.ConsumeNoShift(4);
28+
target.Consume(4);
2929
ASSERT_EQ(target.WriteCapacity(), 3);
3030
ASSERT_EQ(target.ReadCapacity(), 13);
3131
ASSERT_EQ(target.Capacity(), 20);
@@ -39,7 +39,7 @@ TEST(BufferTests, TestWriteAllShift) {
3939
TEST(BufferTests, TestWriteRead) {
4040
Buffer target{10};
4141
target.Fill(5);
42-
target.ConsumeNoShift(5);
42+
target.Consume(5);
4343
const auto write_len = target.Write("BufferTests", 11);
4444
ASSERT_EQ(write_len, 10);
4545
std::array<std::byte, 10> read_buf{};
@@ -54,16 +54,42 @@ TEST(BufferTests, TestReserve) {
5454
ASSERT_EQ(target.ReadCapacity(), 0);
5555
ASSERT_EQ(target.Capacity(), 120);
5656
target.WriteAll("TestReserve", 11);
57-
target.ConsumeNoShift(4);
57+
target.Consume(4);
5858
}
5959

60-
TEST(BufferTests, TestConsumeShift) {
60+
TEST(BufferTests, TestConsumeDoesNotShift) {
61+
Buffer target{16};
62+
target.Fill(12);
63+
target.Consume(10);
64+
ASSERT_EQ(target.ReadCapacity(), 2);
65+
ASSERT_EQ(target.WriteCapacity(), 4);
66+
}
67+
68+
TEST(BufferTests, TestShiftForSpace) {
6169
Buffer target{120};
62-
target.Fill(120);
63-
ASSERT_EQ(target.WriteCapacity(), 0);
64-
target.ConsumeNoShift(100);
65-
ASSERT_EQ(target.WriteCapacity(), 0);
66-
target.Consume(1);
67-
ASSERT_EQ(target.WriteCapacity(), 101);
70+
target.Fill(40);
71+
target.Consume(20);
72+
ASSERT_EQ(target.WriteCapacity(), 80);
73+
ASSERT_EQ(target.ReadCapacity(), 20);
74+
// Writable space is sufficient: no shift
75+
target.ShiftForSpace(50);
76+
ASSERT_EQ(target.WriteCapacity(), 80);
77+
ASSERT_EQ(target.ReadCapacity(), 20);
78+
// Writable space is insufficient: reclaim the consumed prefix
79+
target.ShiftForSpace(100);
80+
ASSERT_EQ(target.WriteCapacity(), 100);
81+
ASSERT_EQ(target.ReadCapacity(), 20);
82+
// Nothing left to reclaim; shift is a no-op
83+
target.ShiftForSpace(1000);
84+
ASSERT_EQ(target.WriteCapacity(), 100);
85+
ASSERT_EQ(target.ReadCapacity(), 20);
86+
}
87+
88+
TEST(BufferTests, TestShiftForSpaceNoopWhenUnconsumed) {
89+
Buffer target{16};
90+
target.Fill(4);
91+
target.ShiftForSpace(1000);
92+
ASSERT_EQ(target.WriteCapacity(), 12);
93+
ASSERT_EQ(target.ReadCapacity(), 4);
6894
}
6995
} // namespace databento::detail::tests

0 commit comments

Comments
 (0)