Skip to content

Commit b79e143

Browse files
committed
OPT: Cache whether upgrade may be required in C++
1 parent dd35ea2 commit b79e143

6 files changed

Lines changed: 56 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
## 0.55.0 - TBD
44

55
### Enhancements
6-
- Added new publisher values for Cboe Titanium Cboe Global Indices Feed
6+
- Improved `DbnDecoder` throughput on current-version data and `AsIs` workloads by
7+
caching whether the upgrade policy-version combination requires upgrading, skipping
8+
the per-record `DecodeRecordCompat` dispatch on the fast path
79
- Made `detail::Buffer` shifts explicit to avoid redundant moves during record decoding
10+
- Added new publisher values for Cboe Titanium Cboe Global Indices Feed
811
- Added `Year` to `SplitDuration` enum for yearly historical batch job submissions
912

1013
## 0.54.0 - 2026-04-21

include/databento/dbn_decoder.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class DbnDecoder {
3535
VersionUpgradePolicy upgrade_policy, bool ts_out,
3636
std::array<std::byte, kMaxRecordLen>* compat_buffer,
3737
Record rec);
38+
// Returns whether a record from `version`-formatted data requires runtime
39+
// upgrade dispatch under `upgrade_policy`.
40+
static bool NeedsUpgrade(VersionUpgradePolicy upgrade_policy, std::uint8_t version);
3841

3942
// Should be called exactly once.
4043
Metadata DecodeMetadata();
@@ -61,6 +64,7 @@ class DbnDecoder {
6164
ILogReceiver* log_receiver_;
6265
std::uint8_t version_{};
6366
VersionUpgradePolicy upgrade_policy_;
67+
bool needs_upgrade_{true};
6468
bool ts_out_{};
6569
std::unique_ptr<IReadable> input_;
6670
detail::Buffer buffer_{};

include/databento/detail/dbn_buffer_decoder.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class DbnBufferDecoder {
6161
alignas(RecordHeader) std::array<std::byte, kMaxRecordLen> compat_buffer_{};
6262
std::uint8_t input_version_{};
6363
bool ts_out_{};
64+
bool needs_upgrade_{true};
6465
DecoderState state_{DecoderState::Init};
6566
};
6667
} // namespace databento::detail

src/dbn_decoder.cpp

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,7 @@ databento::Metadata DbnDecoder::DecodeMetadata() {
173173
buffer_.ReadBegin(), kMetadataPreludeSize);
174174
buffer_.Consume(kMetadataPreludeSize);
175175
version_ = version;
176+
needs_upgrade_ = NeedsUpgrade(upgrade_policy_, version_);
176177
buffer_.Reserve(size);
177178
input_->ReadExact(buffer_.WriteBegin(), size);
178179
buffer_.Fill(size);
@@ -273,6 +274,19 @@ databento::Record DbnDecoder::DecodeRecordCompat(
273274
return rec;
274275
}
275276

277+
bool DbnDecoder::NeedsUpgrade(VersionUpgradePolicy upgrade_policy,
278+
std::uint8_t version) {
279+
switch (upgrade_policy) {
280+
case VersionUpgradePolicy::UpgradeToV2:
281+
return version < 2;
282+
case VersionUpgradePolicy::UpgradeToV3:
283+
return version < 3;
284+
case VersionUpgradePolicy::AsIs:
285+
default:
286+
return false;
287+
}
288+
}
289+
276290
// assumes DecodeMetadata has been called
277291
const databento::Record* DbnDecoder::DecodeRecord() {
278292
// need some unread unread_bytes
@@ -294,8 +308,10 @@ const databento::Record* DbnDecoder::DecodeRecord() {
294308
}
295309
current_record_ = Record{BufferRecordHeader()};
296310
buffer_.Consume(current_record_.Size());
297-
current_record_ = DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, ts_out_,
298-
&compat_buffer_, current_record_);
311+
if (needs_upgrade_) {
312+
current_record_ = DbnDecoder::DecodeRecordCompat(version_, upgrade_policy_, ts_out_,
313+
&compat_buffer_, current_record_);
314+
}
299315
return &current_record_;
300316
}
301317

src/detail/dbn_buffer_decoder.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, std::size_t len
2525
std::tie(input_version_, bytes_needed_) =
2626
DbnDecoder::DecodeMetadataVersionAndSize(dbn_buffer_.ReadBegin(),
2727
dbn_buffer_.ReadCapacity());
28+
needs_upgrade_ = DbnDecoder::NeedsUpgrade(upgrade_policy_, input_version_);
2829
dbn_buffer_.Consume(kMetadataPreludeSize);
2930
dbn_buffer_.Reserve(bytes_needed_);
3031
state_ = DecoderState::Metadata;
@@ -56,8 +57,10 @@ databento::KeepGoing DbnBufferDecoder::Process(const char* data, std::size_t len
5657
if (dbn_buffer_.ReadCapacity() < bytes_needed_) {
5758
break;
5859
}
59-
record = DbnDecoder::DecodeRecordCompat(input_version_, upgrade_policy_,
60-
ts_out_, &compat_buffer_, record);
60+
if (needs_upgrade_) {
61+
record = DbnDecoder::DecodeRecordCompat(input_version_, upgrade_policy_,
62+
ts_out_, &compat_buffer_, record);
63+
}
6164
if (record_callback_(record) == KeepGoing::Stop) {
6265
return KeepGoing::Stop;
6366
}
@@ -78,6 +81,7 @@ std::ostream& operator<<(std::ostream& stream, const DbnBufferDecoder& buffer) {
7881
.AddField("bytes_needed_", buffer.bytes_needed_)
7982
.AddField("input_version_", buffer.input_version_)
8083
.AddField("ts_out_", buffer.ts_out_)
84+
.AddField("needs_upgrade_", buffer.needs_upgrade_)
8185
.AddField("state_", buffer.state_)
8286
.Finish();
8387
}

tests/src/dbn_decoder_tests.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,29 @@ TEST_F(DbnDecoderTests, TestUpgradeMbp1WithTsOut) {
217217
ASSERT_EQ(&orig, &upgraded);
218218
}
219219

220+
TEST_F(DbnDecoderTests, TestNeedsUpgrade) {
221+
struct Case {
222+
VersionUpgradePolicy policy;
223+
std::uint8_t version;
224+
bool expected;
225+
};
226+
constexpr Case kCases[] = {
227+
{VersionUpgradePolicy::AsIs, 1, false},
228+
{VersionUpgradePolicy::AsIs, 2, false},
229+
{VersionUpgradePolicy::AsIs, 3, false},
230+
{VersionUpgradePolicy::UpgradeToV2, 1, true},
231+
{VersionUpgradePolicy::UpgradeToV2, 2, false},
232+
{VersionUpgradePolicy::UpgradeToV3, 1, true},
233+
{VersionUpgradePolicy::UpgradeToV3, 2, true},
234+
{VersionUpgradePolicy::UpgradeToV3, 3, false},
235+
};
236+
for (const auto& c : kCases) {
237+
EXPECT_EQ(DbnDecoder::NeedsUpgrade(c.policy, c.version), c.expected)
238+
<< "policy=" << static_cast<int>(c.policy)
239+
<< " version=" << static_cast<int>(c.version);
240+
}
241+
}
242+
220243
class DbnDecoderSchemaTests
221244
: public DbnDecoderTests,
222245
public testing::WithParamInterface<std::pair<const char*, std::uint8_t>> {};

0 commit comments

Comments
 (0)