Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 8 additions & 1 deletion nodedb-columnar/src/mutation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl MutationEngine {
}

// Generate WAL record BEFORE applying the mutation.
let row_data = encode_row_for_wal(values);
let row_data = encode_row_for_wal(values)?;
let wal = ColumnarWalRecord::InsertRow {
collection: self.collection.clone(),
row_data,
Expand Down Expand Up @@ -310,6 +310,13 @@ impl MutationEngine {
self.memtable.get_row(row_idx)
}

/// The segment ID that will be assigned to the next flushed segment.
///
/// Use this to obtain the ID to pass to `on_memtable_flushed`.
pub fn next_segment_id(&self) -> u32 {
self.next_segment_id
}

/// Whether a segment should be compacted based on its delete ratio.
pub fn should_compact(&self, segment_id: u32, total_rows: u64) -> bool {
self.delete_bitmaps
Expand Down
206 changes: 162 additions & 44 deletions nodedb-columnar/src/wal_record.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ impl ColumnarWalRecord {
/// Each value is written as: [type_tag: u8][value_bytes].
/// This is more compact than MessagePack for typed columns and enables
/// direct replay into the memtable without schema interpretation overhead.
pub fn encode_row_for_wal(values: &[nodedb_types::value::Value]) -> Vec<u8> {
pub fn encode_row_for_wal(
values: &[nodedb_types::value::Value],
) -> Result<Vec<u8>, crate::error::ColumnarError> {
use nodedb_types::value::Value;

let mut buf = Vec::with_capacity(values.len() * 10); // Rough estimate.
Expand Down Expand Up @@ -152,14 +154,63 @@ pub fn encode_row_for_wal(values: &[nodedb_types::value::Value]) -> Vec<u8> {
_ => {
// Geometry and other complex types: serialize as JSON bytes.
buf.push(10);
let json = sonic_rs::to_vec(value).unwrap_or_default();
let json = sonic_rs::to_vec(value).map_err(|e| {
crate::error::ColumnarError::Serialization(format!(
"failed to serialize value as JSON: {e}"
))
})?;
buf.extend_from_slice(&(json.len() as u32).to_le_bytes());
buf.extend_from_slice(&json);
}
}
}

buf
Ok(buf)
}

/// Maximum length for a variable-length field in a WAL record (256 MiB).
/// Prevents OOM from crafted/corrupt records with bogus length prefixes.
const MAX_FIELD_LEN: usize = 256 * 1024 * 1024;

/// Read exactly `n` bytes from `data` at `cursor`, advancing cursor.
/// Returns `Err` if not enough bytes remain.
fn read_slice<'a>(
data: &'a [u8],
cursor: &mut usize,
n: usize,
context: &str,
) -> Result<&'a [u8], crate::error::ColumnarError> {
let end = cursor.checked_add(n).ok_or_else(|| {
crate::error::ColumnarError::Serialization(format!("overflow in {context}"))
})?;
if end > data.len() {
return Err(crate::error::ColumnarError::Serialization(format!(
"truncated {context}: need {n} bytes at offset {cursor}, have {}",
data.len().saturating_sub(*cursor)
)));
}
let slice = &data[*cursor..end];
*cursor = end;
Ok(slice)
}

/// Read a u32 length prefix, validate it against MAX_FIELD_LEN, then read
/// that many bytes. Returns the payload slice.
fn read_length_prefixed<'a>(
data: &'a [u8],
cursor: &mut usize,
context: &str,
) -> Result<&'a [u8], crate::error::ColumnarError> {
let len_bytes = read_slice(data, cursor, 4, context)?;
let len = u32::from_le_bytes(len_bytes.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization(format!("truncated {context} len"))
})?) as usize;
if len > MAX_FIELD_LEN {
return Err(crate::error::ColumnarError::Serialization(format!(
"{context} length {len} exceeds maximum {MAX_FIELD_LEN}"
)));
}
read_slice(data, cursor, len, context)
}

/// Decode a row from the columnar wire format back into Values.
Expand All @@ -172,37 +223,40 @@ pub fn decode_row_from_wal(
let mut cursor = 0;

while cursor < data.len() {
let tag = data[cursor];
cursor += 1;
let tag_slice = read_slice(data, &mut cursor, 1, "tag")?;
let tag = tag_slice[0];

let value = match tag {
0 => Value::Null,
1 => {
let v = i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
let bytes = read_slice(data, &mut cursor, 8, "i64")?;
let v = i64::from_le_bytes(bytes.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated i64".into())
})?);
cursor += 8;
Value::Integer(v)
}
2 => {
let v = f64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
let bytes = read_slice(data, &mut cursor, 8, "f64")?;
let v = f64::from_le_bytes(bytes.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated f64".into())
})?);
cursor += 8;
Value::Float(v)
}
3 => {
let v = data[cursor] != 0;
cursor += 1;
Value::Bool(v)
let bytes = read_slice(data, &mut cursor, 1, "bool")?;
Value::Bool(bytes[0] != 0)
}
4 | 5 | 8 => {
let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated len".into())
})?) as usize;
cursor += 4;
let bytes = &data[cursor..cursor + len];
cursor += len;
let bytes = read_length_prefixed(
data,
&mut cursor,
match tag {
4 => "string",
5 => "bytes",
8 => "uuid",
_ => unreachable!(),
},
)?;
match tag {
4 => Value::String(String::from_utf8_lossy(bytes).into_owned()),
5 => Value::Bytes(bytes.to_vec()),
Expand All @@ -211,43 +265,41 @@ pub fn decode_row_from_wal(
}
}
6 => {
let micros =
i64::from_le_bytes(data[cursor..cursor + 8].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated timestamp".into())
})?);
cursor += 8;
let bytes = read_slice(data, &mut cursor, 8, "timestamp")?;
let micros = i64::from_le_bytes(bytes.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated timestamp".into())
})?);
Value::DateTime(nodedb_types::datetime::NdbDateTime::from_micros(micros))
}
7 => {
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&data[cursor..cursor + 16]);
cursor += 16;
Value::Decimal(rust_decimal::Decimal::deserialize(bytes))
let bytes = read_slice(data, &mut cursor, 16, "decimal")?;
let mut arr = [0u8; 16];
arr.copy_from_slice(bytes);
Value::Decimal(rust_decimal::Decimal::deserialize(arr))
}
9 => {
let count =
u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated vector count".into())
})?) as usize;
cursor += 4;
let count_bytes = read_slice(data, &mut cursor, 4, "vector count")?;
let count = u32::from_le_bytes(count_bytes.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated vector count".into())
})?) as usize;
if count > MAX_FIELD_LEN / 4 {
return Err(crate::error::ColumnarError::Serialization(format!(
"vector count {count} exceeds maximum {}",
MAX_FIELD_LEN / 4
)));
}
let mut arr = Vec::with_capacity(count);
for _ in 0..count {
let f =
f32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated f32".into())
})?);
cursor += 4;
let fb = read_slice(data, &mut cursor, 4, "vector f32")?;
let f = f32::from_le_bytes(fb.try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated f32".into())
})?);
arr.push(Value::Float(f as f64));
}
Value::Array(arr)
}
10 => {
let len = u32::from_le_bytes(data[cursor..cursor + 4].try_into().map_err(|_| {
crate::error::ColumnarError::Serialization("truncated json len".into())
})?) as usize;
cursor += 4;
let json_bytes = &data[cursor..cursor + len];
cursor += len;
let json_bytes = read_length_prefixed(data, &mut cursor, "json")?;
sonic_rs::from_slice(json_bytes).unwrap_or(Value::Null)
}
_ => {
Expand Down Expand Up @@ -316,7 +368,7 @@ mod tests {
Value::Array(vec![Value::Float(1.0), Value::Float(2.0)]),
];

let encoded = encode_row_for_wal(&values);
let encoded = encode_row_for_wal(&values).expect("encode");
let decoded = decode_row_from_wal(&encoded).expect("decode");

assert_eq!(decoded.len(), values.len());
Expand All @@ -335,4 +387,70 @@ mod tests {
);
assert_eq!(decoded[8], Value::Null);
}

#[test]
fn decode_truncated_i64_returns_error() {
// Tag 1 (i64) requires 8 payload bytes; supply none.
// Today the slice index `data[cursor..cursor+8]` panics with an index
// out-of-bounds. After the fix, `try_into()` returns the
// Serialization error instead.
let result = decode_row_from_wal(&[1]);
assert!(
result.is_err(),
"truncated i64 payload must return Err, not panic"
);
}

#[test]
fn decode_truncated_string_returns_error() {
// Tag 4 (string): length prefix says 255 bytes but the slice ends
// immediately after the 4-byte length field. The read of
// `data[cursor..cursor+255]` panics today; after the fix it errors.
let input = {
let mut v = vec![4u8]; // tag = string
v.extend_from_slice(&255u32.to_le_bytes()); // len = 255
// no payload bytes follow
v
};
let result = decode_row_from_wal(&input);
assert!(
result.is_err(),
"truncated string payload must return Err, not panic"
);
}

#[test]
fn decode_huge_vector_count_returns_error() {
// Tag 9 (vector array): count = 0x7FFFFFFF. After reading the count,
// the very first iteration tries to read 4 bytes of f32 from an empty
// slice, which panics today. After the fix the loop errors out cleanly
// before any allocation proportional to count is attempted.
let input = {
let mut v = vec![9u8]; // tag = vector array
v.extend_from_slice(&0x7FFF_FFFFu32.to_le_bytes()); // count
// no f32 bytes follow
v
};
let result = decode_row_from_wal(&input);
assert!(
result.is_err(),
"huge vector count with no payload must return Err, not panic or OOM"
);
}

#[test]
fn decode_truncated_decimal_returns_error() {
// Tag 7 (Decimal) requires 16 bytes; supply only 4.
// `data[cursor..cursor+16]` panics today; after the fix it errors.
let input = {
let mut v = vec![7u8]; // tag = decimal
v.extend_from_slice(&[0u8; 4]); // only 4 bytes, need 16
v
};
let result = decode_row_from_wal(&input);
assert!(
result.is_err(),
"truncated decimal payload must return Err, not panic"
);
}
}
Loading
Loading