Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nodedb-columnar/src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl ColumnarMemtable {
modifiers: Vec::new(),
generated_expr: None,
generated_deps: Vec::new(),
added_at_version: 1,
});
}
}
Expand Down
2 changes: 2 additions & 0 deletions nodedb-types/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ impl std::str::FromStr for CollectionType {
StrictSchema {
columns: vec![],
version: 1,
dropped_columns: Vec::new(),
},
))),
"columnar" => Ok(Self::columnar()),
Expand All @@ -196,6 +197,7 @@ impl std::str::FromStr for CollectionType {
schema: StrictSchema {
columns: vec![],
version: 1,
dropped_columns: Vec::new(),
},
ttl: None,
capacity_hint: 0,
Expand Down
12 changes: 12 additions & 0 deletions nodedb-types/src/columnar/column_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,16 @@ pub struct ColumnDef {
/// Column names this generated column depends on.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub generated_deps: Vec<String>,
/// Schema version at which this column was added. Original columns have
/// version 1 (the default). Columns added via `ALTER ADD COLUMN` record
/// the schema version after the bump so the reader can build a physical
/// sub-schema for tuples written under older versions.
#[serde(default = "default_added_at_version")]
pub added_at_version: u16,
}

fn default_added_at_version() -> u16 {
1
}

impl ColumnDef {
Expand All @@ -268,6 +278,7 @@ impl ColumnDef {
modifiers: Vec::new(),
generated_expr: None,
generated_deps: Vec::new(),
added_at_version: 1,
}
}

Expand All @@ -281,6 +292,7 @@ impl ColumnDef {
modifiers: Vec::new(),
generated_expr: None,
generated_deps: Vec::new(),
added_at_version: 1,
}
}

Expand Down
2 changes: 1 addition & 1 deletion nodedb-types/src/columnar/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ pub mod schema;

pub use column_type::{ColumnDef, ColumnModifier, ColumnType, ColumnTypeParseError};
pub use profile::{ColumnarProfile, DocumentMode};
pub use schema::{ColumnarSchema, SchemaError, SchemaOps, StrictSchema};
pub use schema::{ColumnarSchema, DroppedColumn, SchemaError, SchemaOps, StrictSchema};
96 changes: 96 additions & 0 deletions nodedb-types/src/columnar/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,31 @@ pub trait SchemaOps {
pub struct StrictSchema {
pub columns: Vec<ColumnDef>,
pub version: u16,
/// Columns that were removed via `ALTER DROP COLUMN`. Retained so the
/// reader can reconstruct the physical layout of tuples written before
/// the drop.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub dropped_columns: Vec<DroppedColumn>,
}

/// Tombstone for a column removed by `ALTER DROP COLUMN`.
#[derive(
Debug,
Clone,
PartialEq,
Eq,
Serialize,
Deserialize,
zerompk::ToMessagePack,
zerompk::FromMessagePack,
)]
pub struct DroppedColumn {
/// The full column definition at time of drop.
pub def: ColumnDef,
/// The column's position in the column list before it was removed.
pub position: usize,
/// The schema version at which the column was dropped.
pub dropped_at_version: u16,
}

/// Schema for a columnar collection (compressed segment files).
Expand Down Expand Up @@ -112,6 +137,7 @@ impl StrictSchema {
Ok(Self {
columns,
version: 1,
dropped_columns: Vec::new(),
})
}

Expand All @@ -135,6 +161,75 @@ impl StrictSchema {
pub fn null_bitmap_size(&self) -> usize {
self.columns.len().div_ceil(8)
}

/// Build a sub-schema matching the physical layout of tuples written at
/// the given version. Columns added after `version` are excluded;
/// columns dropped after `version` are re-inserted at their original
/// positions.
pub fn schema_for_version(&self, version: u16) -> StrictSchema {
// Start with live columns that existed at this version.
let mut cols: Vec<ColumnDef> = self
.columns
.iter()
.filter(|c| c.added_at_version <= version)
.cloned()
.collect();

// Re-insert dropped columns that were still alive at this version,
// sorted by position (ascending) so inserts don't shift later indices.
let mut to_reinsert: Vec<&DroppedColumn> = self
.dropped_columns
.iter()
.filter(|dc| dc.def.added_at_version <= version && dc.dropped_at_version > version)
.collect();
to_reinsert.sort_by_key(|dc| dc.position);
for dc in to_reinsert {
let pos = dc.position.min(cols.len());
cols.insert(pos, dc.def.clone());
}

StrictSchema {
version,
columns: cols,
dropped_columns: Vec::new(),
}
}

/// Parse a SQL default literal (e.g. `'n/a'`, `0`, `true`) into a `Value`.
///
/// Covers the common cases produced by `ALTER ADD COLUMN ... DEFAULT ...`.
/// Returns `Value::Null` for expressions that cannot be trivially evaluated
/// at read time (functions, sub-queries, etc.).
pub fn parse_default_literal(expr: &str) -> crate::value::Value {
use crate::value::Value;

let trimmed = expr.trim();

// String literals: 'foo'
if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
return Value::String(trimmed[1..trimmed.len() - 1].replace("''", "'"));
}

// Boolean
match trimmed.to_uppercase().as_str() {
"TRUE" => return Value::Bool(true),
"FALSE" => return Value::Bool(false),
"NULL" => return Value::Null,
_ => {}
}

// Integer
if let Ok(i) = trimmed.parse::<i64>() {
return Value::Integer(i);
}

// Float
if let Ok(f) = trimmed.parse::<f64>() {
return Value::Float(f);
}

Value::Null
}
}

impl ColumnarSchema {
Expand Down Expand Up @@ -211,6 +306,7 @@ mod tests {
modifiers: Vec::new(),
generated_expr: None,
generated_deps: Vec::new(),
added_at_version: 1,
}];
assert!(matches!(
StrictSchema::new(cols),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ pub async fn alter_table_add_column(
&format!("column '{}' already exists", column.name),
));
}
let new_version = schema.version.saturating_add(1);
let mut column = column;
column.added_at_version = new_version;
schema.columns.push(column);
schema.version = schema.version.saturating_add(1);
schema.version = new_version;

let mut updated = coll;
updated.collection_type = nodedb_types::CollectionType::strict(schema.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,16 @@ pub async fn alter_collection_drop_column(
));
}

schema.columns.remove(idx);
schema.version = schema.version.saturating_add(1);
let dropped_def = schema.columns.remove(idx);
let new_version = schema.version.saturating_add(1);
schema
.dropped_columns
.push(nodedb_types::columnar::DroppedColumn {
def: dropped_def,
position: idx,
dropped_at_version: new_version,
});
schema.version = new_version;

let mut updated = coll;
updated.collection_type = nodedb_types::CollectionType::strict(schema.clone());
Expand Down
1 change: 1 addition & 0 deletions nodedb/src/control/server/pgwire/ddl/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub async fn convert_collection(
let schema = nodedb_types::columnar::StrictSchema {
columns,
version: 1,
dropped_columns: Vec::new(),
};
if target_type == "kv" {
nodedb_types::CollectionType::kv(schema)
Expand Down
1 change: 1 addition & 0 deletions nodedb/src/data/executor/handlers/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl CoreLoop {
let schema = StrictSchema {
columns,
version: 1,
dropped_columns: Vec::new(),
};

// Scan all existing documents.
Expand Down
1 change: 1 addition & 0 deletions nodedb/src/data/executor/handlers/document/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ mod tests {
ColumnDef::nullable("age", ColumnType::Int64),
],
version: 1,
dropped_columns: Vec::new(),
};
let mut map = std::collections::HashMap::new();
map.insert("id".into(), Value::String("u1".into()));
Expand Down
54 changes: 43 additions & 11 deletions nodedb/src/data/executor/strict_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,37 @@ pub(super) fn binary_tuple_to_value(tuple_bytes: &[u8], schema: &StrictSchema) -
return None;
}

let values = decoder.extract_all(tuple_bytes).ok()?;

// Version-aware decoding: if the tuple was written with an older schema
// (fewer columns due to ADD COLUMN), build a sub-schema decoder matching
// the physical layout and fill defaults for new columns.
let mut map = std::collections::HashMap::with_capacity(schema.columns.len());
for (i, col) in schema.columns.iter().enumerate() {
map.insert(col.name.clone(), values[i].clone());
if version < schema.version {
let old_schema = schema.schema_for_version(version);
let old_decoder = nodedb_strict::TupleDecoder::new(&old_schema);
let old_values = old_decoder.extract_all(tuple_bytes).ok()?;

// Map old columns by name.
for (i, col) in old_schema.columns.iter().enumerate() {
map.insert(col.name.clone(), old_values[i].clone());
}
// Fill defaults for columns added after this tuple's version.
for col in &schema.columns {
if col.added_at_version > version {
let default_val = col
.default
.as_deref()
.map(StrictSchema::parse_default_literal)
.unwrap_or(Value::Null);
map.insert(col.name.clone(), default_val);
}
}
} else {
let values = decoder.extract_all(tuple_bytes).ok()?;
for (i, col) in schema.columns.iter().enumerate() {
map.insert(col.name.clone(), values[i].clone());
}
}

Some(Value::Object(map))
}

Expand All @@ -108,14 +133,20 @@ pub(super) fn binary_tuple_to_json(
tuple_bytes: &[u8],
schema: &StrictSchema,
) -> Option<serde_json::Value> {
let decoder = nodedb_strict::TupleDecoder::new(schema);
let values = decoder.extract_all(tuple_bytes).ok()?;

let mut obj = serde_json::Map::with_capacity(schema.columns.len());
for (i, col) in schema.columns.iter().enumerate() {
obj.insert(col.name.clone(), value_to_json(&values[i]));
// Delegate to binary_tuple_to_value (which handles version-aware decoding)
// then convert Value → JSON.
let val = binary_tuple_to_value(tuple_bytes, schema)?;
match val {
Value::Object(map) => {
let mut obj = serde_json::Map::with_capacity(map.len());
for col in &schema.columns {
let v = map.get(&col.name).unwrap_or(&Value::Null);
obj.insert(col.name.clone(), value_to_json(v));
}
Some(serde_json::Value::Object(obj))
}
_ => None,
}
Some(serde_json::Value::Object(obj))
}

/// Coerce a `nodedb_types::Value` to match a column's declared type.
Expand Down Expand Up @@ -372,6 +403,7 @@ mod tests {
ColumnDef::nullable("age", ColumnType::Int64),
],
version: 1,
dropped_columns: Vec::new(),
}
}

Expand Down
1 change: 1 addition & 0 deletions nodedb/tests/executor_tests/test_cross_type_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ fn document_scan_preserves_kv_rows_when_collection_has_strict_config() {
ColumnDef::nullable("lang", ColumnType::String),
],
version: 1,
dropped_columns: Vec::new(),
},
},
enforcement: Box::new(EnforcementOptions::default()),
Expand Down
Loading
Loading