Skip to content

Commit 861f7a5

Browse files
committed
feat(columnar): flush memtable to in-memory segments and include in scans
When the columnar memtable reaches the flush threshold, drain it into a compressed segment and retain the bytes in memory. Extend scan_normalize to read from flushed segments before falling back to the live memtable, so queries see all rows regardless of whether they have been flushed. This makes columnar scan results consistent across memtable boundaries without requiring a durable write path yet.
1 parent c406d02 commit 861f7a5

File tree

3 files changed

+197
-17
lines changed

3 files changed

+197
-17
lines changed

nodedb/src/data/executor/core_loop/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ pub struct CoreLoop {
155155
pub(in crate::data::executor) columnar_engines:
156156
HashMap<String, nodedb_columnar::MutationEngine>,
157157

158+
/// Flushed columnar segment bytes, keyed by "{tid}:{collection}".
159+
/// Each entry is a list of encoded segment buffers produced by `SegmentWriter`.
160+
/// Kept in memory so `scan_columnar` can read rows that were drained from the
161+
/// active memtable during a flush (otherwise those rows would be lost until a
162+
/// real on-disk segment reader is wired up).
163+
pub(in crate::data::executor) columnar_flushed_segments: HashMap<String, Vec<Vec<u8>>>,
164+
158165
/// Per-collection max WAL LSN that has been ingested into the memtable.
159166
/// Used by the WAL catch-up deduplication: if a catch-up record's LSN
160167
/// is <= this value, the Data Plane skips it (already ingested).
@@ -283,6 +290,7 @@ impl CoreLoop {
283290
),
284291
columnar_memtables: HashMap::new(),
285292
columnar_engines: HashMap::new(),
293+
columnar_flushed_segments: HashMap::new(),
286294
ts_max_ingested_lsn: HashMap::new(),
287295
last_ts_ingest: None,
288296
ts_last_value_caches: HashMap::new(),

nodedb/src/data/executor/handlers/columnar_write.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,40 @@ impl CoreLoop {
9898
}
9999
}
100100

101+
// Flush memtable to a segment if the threshold has been reached.
102+
if engine.should_flush() {
103+
let new_segment_id = engine.next_segment_id();
104+
let (schema, columns, row_count) = engine.memtable_mut().drain_optimized();
105+
if row_count > 0 {
106+
match nodedb_columnar::SegmentWriter::plain()
107+
.write_segment(&schema, &columns, row_count)
108+
{
109+
Ok(bytes) => {
110+
self.columnar_flushed_segments
111+
.entry(collection.to_string())
112+
.or_default()
113+
.push(bytes);
114+
tracing::debug!(
115+
core = self.core_id,
116+
%collection,
117+
new_segment_id,
118+
row_count,
119+
"columnar memtable flushed and segment bytes retained in memory"
120+
);
121+
}
122+
Err(e) => {
123+
tracing::warn!(
124+
core = self.core_id,
125+
%collection,
126+
error = %e,
127+
"columnar segment encode failed; flushed rows may be lost"
128+
);
129+
}
130+
}
131+
}
132+
engine.on_memtable_flushed(new_segment_id);
133+
}
134+
101135
// Populate R-tree for geometry columns so spatial predicates work.
102136
{
103137
let tid = task.request.tenant_id;

nodedb/src/data/executor/scan_normalize.rs

Lines changed: 155 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -108,26 +108,83 @@ impl CoreLoop {
108108
};
109109

110110
let schema = engine.schema();
111-
let rows: Vec<_> = engine.scan_memtable_rows().take(limit).collect();
112-
let mut results = Vec::with_capacity(rows.len());
113-
114-
for row in rows {
115-
// Build a nodedb_types::Value::Object directly — no JSON intermediary.
116-
let mut map = std::collections::HashMap::new();
117-
let mut id = String::new();
118-
for (i, col_def) in schema.columns.iter().enumerate() {
119-
if i < row.len() {
120-
if col_def.name == "id"
121-
&& let nodedb_types::value::Value::String(s) = &row[i]
122-
{
123-
id.clone_from(s);
111+
let mut results = Vec::new();
112+
113+
// 1. Read from flushed segments (older rows drained from prior memtable flushes).
114+
if let Some(segments) = self.columnar_flushed_segments.get(collection) {
115+
for seg_bytes in segments {
116+
if results.len() >= limit {
117+
break;
118+
}
119+
let reader = match nodedb_columnar::SegmentReader::open(seg_bytes) {
120+
Ok(r) => r,
121+
Err(e) => {
122+
tracing::warn!(error = %e, "failed to open flushed columnar segment for scan");
123+
continue;
124+
}
125+
};
126+
let seg_row_count = reader.row_count() as usize;
127+
let remaining = limit - results.len();
128+
let take = seg_row_count.min(remaining);
129+
130+
// Decode all columns for this segment.
131+
let col_count = schema.columns.len();
132+
let mut decoded_cols = Vec::with_capacity(col_count);
133+
let mut decode_ok = true;
134+
for col_idx in 0..col_count {
135+
match reader.read_column(col_idx) {
136+
Ok(dc) => decoded_cols.push(dc),
137+
Err(e) => {
138+
tracing::warn!(error = %e, col_idx, "failed to decode columnar segment column");
139+
decode_ok = false;
140+
break;
141+
}
142+
}
143+
}
144+
if !decode_ok {
145+
continue;
146+
}
147+
148+
for row_idx in 0..take {
149+
let mut map = std::collections::HashMap::new();
150+
let mut id = String::new();
151+
for (col_idx, col_def) in schema.columns.iter().enumerate() {
152+
let val = decoded_col_to_value(&decoded_cols[col_idx], row_idx);
153+
if col_def.name == "id"
154+
&& let nodedb_types::value::Value::String(s) = &val
155+
{
156+
id.clone_from(s);
157+
}
158+
map.insert(col_def.name.clone(), val);
124159
}
125-
map.insert(col_def.name.clone(), row[i].clone());
160+
let ndb_val = nodedb_types::value::Value::Object(map);
161+
let mp = nodedb_types::value_to_msgpack(&ndb_val).unwrap_or_default();
162+
results.push((id, mp));
126163
}
127164
}
128-
let ndb_val = nodedb_types::value::Value::Object(map);
129-
let mp = nodedb_types::value_to_msgpack(&ndb_val).unwrap_or_default();
130-
results.push((id, mp));
165+
}
166+
167+
// 2. Read from the live memtable (most-recent rows not yet flushed).
168+
if results.len() < limit {
169+
let remaining = limit - results.len();
170+
let rows: Vec<_> = engine.scan_memtable_rows().take(remaining).collect();
171+
for row in rows {
172+
let mut map = std::collections::HashMap::new();
173+
let mut id = String::new();
174+
for (i, col_def) in schema.columns.iter().enumerate() {
175+
if i < row.len() {
176+
if col_def.name == "id"
177+
&& let nodedb_types::value::Value::String(s) = &row[i]
178+
{
179+
id.clone_from(s);
180+
}
181+
map.insert(col_def.name.clone(), row[i].clone());
182+
}
183+
}
184+
let ndb_val = nodedb_types::value::Value::Object(map);
185+
let mp = nodedb_types::value_to_msgpack(&ndb_val).unwrap_or_default();
186+
results.push((id, mp));
187+
}
131188
}
132189

133190
results
@@ -175,3 +232,84 @@ impl CoreLoop {
175232
}
176233
}
177234
}
235+
236+
/// Convert a single row from a `DecodedColumn` to a `nodedb_types::value::Value`.
237+
///
238+
/// Returns `Value::Null` if the row index is out of range or the validity bit is false.
239+
fn decoded_col_to_value(
240+
col: &nodedb_columnar::reader::DecodedColumn,
241+
row_idx: usize,
242+
) -> nodedb_types::value::Value {
243+
use nodedb_columnar::reader::DecodedColumn;
244+
use nodedb_types::value::Value;
245+
246+
match col {
247+
DecodedColumn::Int64 { values, valid } => {
248+
if row_idx < valid.len() && valid[row_idx] {
249+
Value::Integer(values[row_idx])
250+
} else {
251+
Value::Null
252+
}
253+
}
254+
DecodedColumn::Float64 { values, valid } => {
255+
if row_idx < valid.len() && valid[row_idx] {
256+
Value::Float(values[row_idx])
257+
} else {
258+
Value::Null
259+
}
260+
}
261+
DecodedColumn::Timestamp { values, valid } => {
262+
if row_idx < valid.len() && valid[row_idx] {
263+
// Represent as integer microseconds (same as Value::Integer for timestamps).
264+
Value::Integer(values[row_idx])
265+
} else {
266+
Value::Null
267+
}
268+
}
269+
DecodedColumn::Bool { values, valid } => {
270+
if row_idx < valid.len() && valid[row_idx] {
271+
Value::Bool(values[row_idx])
272+
} else {
273+
Value::Null
274+
}
275+
}
276+
DecodedColumn::Binary {
277+
data,
278+
offsets,
279+
valid,
280+
} => {
281+
if row_idx < valid.len() && valid[row_idx] && row_idx + 1 < offsets.len() {
282+
let start = offsets[row_idx] as usize;
283+
let end = offsets[row_idx + 1] as usize;
284+
if start <= end && end <= data.len() {
285+
let bytes = &data[start..end];
286+
// Best-effort UTF-8 interpretation; fall back to bytes.
287+
match std::str::from_utf8(bytes) {
288+
Ok(s) => Value::String(s.to_string()),
289+
Err(_) => Value::Bytes(bytes.to_vec()),
290+
}
291+
} else {
292+
Value::Null
293+
}
294+
} else {
295+
Value::Null
296+
}
297+
}
298+
DecodedColumn::DictEncoded {
299+
ids,
300+
dictionary,
301+
valid,
302+
} => {
303+
if row_idx < valid.len() && valid[row_idx] {
304+
let id = ids[row_idx] as usize;
305+
if id < dictionary.len() {
306+
Value::String(dictionary[id].clone())
307+
} else {
308+
Value::Null
309+
}
310+
} else {
311+
Value::Null
312+
}
313+
}
314+
}
315+
}

0 commit comments

Comments
 (0)