diff --git a/nodedb-columnar/src/memtable/mod.rs b/nodedb-columnar/src/memtable/mod.rs index 1bccb2f9..cc1a8d80 100644 --- a/nodedb-columnar/src/memtable/mod.rs +++ b/nodedb-columnar/src/memtable/mod.rs @@ -199,6 +199,7 @@ impl ColumnarMemtable { modifiers: Vec::new(), generated_expr: None, generated_deps: Vec::new(), + added_at_version: 1, }); } } diff --git a/nodedb-types/src/collection.rs b/nodedb-types/src/collection.rs index a070233f..3cff3ef7 100644 --- a/nodedb-types/src/collection.rs +++ b/nodedb-types/src/collection.rs @@ -187,6 +187,7 @@ impl std::str::FromStr for CollectionType { StrictSchema { columns: vec![], version: 1, + dropped_columns: Vec::new(), }, ))), "columnar" => Ok(Self::columnar()), @@ -196,6 +197,7 @@ impl std::str::FromStr for CollectionType { schema: StrictSchema { columns: vec![], version: 1, + dropped_columns: Vec::new(), }, ttl: None, capacity_hint: 0, diff --git a/nodedb-types/src/columnar/column_type.rs b/nodedb-types/src/columnar/column_type.rs index 58153cf6..4d3dd771 100644 --- a/nodedb-types/src/columnar/column_type.rs +++ b/nodedb-types/src/columnar/column_type.rs @@ -255,6 +255,16 @@ pub struct ColumnDef { /// Column names this generated column depends on. #[serde(default, skip_serializing_if = "Vec::is_empty")] pub generated_deps: Vec, + /// Schema version at which this column was added. Original columns have + /// version 1 (the default). Columns added via `ALTER ADD COLUMN` record + /// the schema version after the bump so the reader can build a physical + /// sub-schema for tuples written under older versions. + #[serde(default = "default_added_at_version")] + pub added_at_version: u16, +} + +fn default_added_at_version() -> u16 { + 1 } impl ColumnDef { @@ -268,6 +278,7 @@ impl ColumnDef { modifiers: Vec::new(), generated_expr: None, generated_deps: Vec::new(), + added_at_version: 1, } } @@ -281,6 +292,7 @@ impl ColumnDef { modifiers: Vec::new(), generated_expr: None, generated_deps: Vec::new(), + added_at_version: 1, } } diff --git a/nodedb-types/src/columnar/mod.rs b/nodedb-types/src/columnar/mod.rs index a7664b0f..a7c07b17 100644 --- a/nodedb-types/src/columnar/mod.rs +++ b/nodedb-types/src/columnar/mod.rs @@ -4,4 +4,4 @@ pub mod schema; pub use column_type::{ColumnDef, ColumnModifier, ColumnType, ColumnTypeParseError}; pub use profile::{ColumnarProfile, DocumentMode}; -pub use schema::{ColumnarSchema, SchemaError, SchemaOps, StrictSchema}; +pub use schema::{ColumnarSchema, DroppedColumn, SchemaError, SchemaOps, StrictSchema}; diff --git a/nodedb-types/src/columnar/schema.rs b/nodedb-types/src/columnar/schema.rs index 55b7ce17..128efc22 100644 --- a/nodedb-types/src/columnar/schema.rs +++ b/nodedb-types/src/columnar/schema.rs @@ -44,6 +44,31 @@ pub trait SchemaOps { pub struct StrictSchema { pub columns: Vec, pub version: u16, + /// Columns that were removed via `ALTER DROP COLUMN`. Retained so the + /// reader can reconstruct the physical layout of tuples written before + /// the drop. + #[serde(default, skip_serializing_if = "Vec::is_empty")] + pub dropped_columns: Vec, +} + +/// Tombstone for a column removed by `ALTER DROP COLUMN`. +#[derive( + Debug, + Clone, + PartialEq, + Eq, + Serialize, + Deserialize, + zerompk::ToMessagePack, + zerompk::FromMessagePack, +)] +pub struct DroppedColumn { + /// The full column definition at time of drop. + pub def: ColumnDef, + /// The column's position in the column list before it was removed. + pub position: usize, + /// The schema version at which the column was dropped. + pub dropped_at_version: u16, } /// Schema for a columnar collection (compressed segment files). @@ -112,6 +137,7 @@ impl StrictSchema { Ok(Self { columns, version: 1, + dropped_columns: Vec::new(), }) } @@ -135,6 +161,75 @@ impl StrictSchema { pub fn null_bitmap_size(&self) -> usize { self.columns.len().div_ceil(8) } + + /// Build a sub-schema matching the physical layout of tuples written at + /// the given version. Columns added after `version` are excluded; + /// columns dropped after `version` are re-inserted at their original + /// positions. + pub fn schema_for_version(&self, version: u16) -> StrictSchema { + // Start with live columns that existed at this version. + let mut cols: Vec = self + .columns + .iter() + .filter(|c| c.added_at_version <= version) + .cloned() + .collect(); + + // Re-insert dropped columns that were still alive at this version, + // sorted by position (ascending) so inserts don't shift later indices. + let mut to_reinsert: Vec<&DroppedColumn> = self + .dropped_columns + .iter() + .filter(|dc| dc.def.added_at_version <= version && dc.dropped_at_version > version) + .collect(); + to_reinsert.sort_by_key(|dc| dc.position); + for dc in to_reinsert { + let pos = dc.position.min(cols.len()); + cols.insert(pos, dc.def.clone()); + } + + StrictSchema { + version, + columns: cols, + dropped_columns: Vec::new(), + } + } + + /// Parse a SQL default literal (e.g. `'n/a'`, `0`, `true`) into a `Value`. + /// + /// Covers the common cases produced by `ALTER ADD COLUMN ... DEFAULT ...`. + /// Returns `Value::Null` for expressions that cannot be trivially evaluated + /// at read time (functions, sub-queries, etc.). + pub fn parse_default_literal(expr: &str) -> crate::value::Value { + use crate::value::Value; + + let trimmed = expr.trim(); + + // String literals: 'foo' + if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 { + return Value::String(trimmed[1..trimmed.len() - 1].replace("''", "'")); + } + + // Boolean + match trimmed.to_uppercase().as_str() { + "TRUE" => return Value::Bool(true), + "FALSE" => return Value::Bool(false), + "NULL" => return Value::Null, + _ => {} + } + + // Integer + if let Ok(i) = trimmed.parse::() { + return Value::Integer(i); + } + + // Float + if let Ok(f) = trimmed.parse::() { + return Value::Float(f); + } + + Value::Null + } } impl ColumnarSchema { @@ -211,6 +306,7 @@ mod tests { modifiers: Vec::new(), generated_expr: None, generated_deps: Vec::new(), + added_at_version: 1, }]; assert!(matches!( StrictSchema::new(cols), diff --git a/nodedb/src/control/server/pgwire/ddl/collection/alter/add_column.rs b/nodedb/src/control/server/pgwire/ddl/collection/alter/add_column.rs index fe4245ee..8f149275 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection/alter/add_column.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection/alter/add_column.rs @@ -62,8 +62,11 @@ pub async fn alter_table_add_column( &format!("column '{}' already exists", column.name), )); } + let new_version = schema.version.saturating_add(1); + let mut column = column; + column.added_at_version = new_version; schema.columns.push(column); - schema.version = schema.version.saturating_add(1); + schema.version = new_version; let mut updated = coll; updated.collection_type = nodedb_types::CollectionType::strict(schema.clone()); diff --git a/nodedb/src/control/server/pgwire/ddl/collection/alter/drop_column.rs b/nodedb/src/control/server/pgwire/ddl/collection/alter/drop_column.rs index 6415a64e..3ded3e18 100644 --- a/nodedb/src/control/server/pgwire/ddl/collection/alter/drop_column.rs +++ b/nodedb/src/control/server/pgwire/ddl/collection/alter/drop_column.rs @@ -79,8 +79,16 @@ pub async fn alter_collection_drop_column( )); } - schema.columns.remove(idx); - schema.version = schema.version.saturating_add(1); + let dropped_def = schema.columns.remove(idx); + let new_version = schema.version.saturating_add(1); + schema + .dropped_columns + .push(nodedb_types::columnar::DroppedColumn { + def: dropped_def, + position: idx, + dropped_at_version: new_version, + }); + schema.version = new_version; let mut updated = coll; updated.collection_type = nodedb_types::CollectionType::strict(schema.clone()); diff --git a/nodedb/src/control/server/pgwire/ddl/convert.rs b/nodedb/src/control/server/pgwire/ddl/convert.rs index 31e5c6a7..db9b4d4a 100644 --- a/nodedb/src/control/server/pgwire/ddl/convert.rs +++ b/nodedb/src/control/server/pgwire/ddl/convert.rs @@ -92,6 +92,7 @@ pub async fn convert_collection( let schema = nodedb_types::columnar::StrictSchema { columns, version: 1, + dropped_columns: Vec::new(), }; if target_type == "kv" { nodedb_types::CollectionType::kv(schema) diff --git a/nodedb/src/data/executor/handlers/convert.rs b/nodedb/src/data/executor/handlers/convert.rs index 79535cf2..5f135a2e 100644 --- a/nodedb/src/data/executor/handlers/convert.rs +++ b/nodedb/src/data/executor/handlers/convert.rs @@ -105,6 +105,7 @@ impl CoreLoop { let schema = StrictSchema { columns, version: 1, + dropped_columns: Vec::new(), }; // Scan all existing documents. diff --git a/nodedb/src/data/executor/handlers/document/read.rs b/nodedb/src/data/executor/handlers/document/read.rs index 0e273144..92cd8433 100644 --- a/nodedb/src/data/executor/handlers/document/read.rs +++ b/nodedb/src/data/executor/handlers/document/read.rs @@ -626,6 +626,7 @@ mod tests { ColumnDef::nullable("age", ColumnType::Int64), ], version: 1, + dropped_columns: Vec::new(), }; let mut map = std::collections::HashMap::new(); map.insert("id".into(), Value::String("u1".into())); diff --git a/nodedb/src/data/executor/strict_format.rs b/nodedb/src/data/executor/strict_format.rs index 7b9629aa..ba9105f6 100644 --- a/nodedb/src/data/executor/strict_format.rs +++ b/nodedb/src/data/executor/strict_format.rs @@ -85,12 +85,37 @@ pub(super) fn binary_tuple_to_value(tuple_bytes: &[u8], schema: &StrictSchema) - return None; } - let values = decoder.extract_all(tuple_bytes).ok()?; - + // Version-aware decoding: if the tuple was written with an older schema + // (fewer columns due to ADD COLUMN), build a sub-schema decoder matching + // the physical layout and fill defaults for new columns. let mut map = std::collections::HashMap::with_capacity(schema.columns.len()); - for (i, col) in schema.columns.iter().enumerate() { - map.insert(col.name.clone(), values[i].clone()); + if version < schema.version { + let old_schema = schema.schema_for_version(version); + let old_decoder = nodedb_strict::TupleDecoder::new(&old_schema); + let old_values = old_decoder.extract_all(tuple_bytes).ok()?; + + // Map old columns by name. + for (i, col) in old_schema.columns.iter().enumerate() { + map.insert(col.name.clone(), old_values[i].clone()); + } + // Fill defaults for columns added after this tuple's version. + for col in &schema.columns { + if col.added_at_version > version { + let default_val = col + .default + .as_deref() + .map(StrictSchema::parse_default_literal) + .unwrap_or(Value::Null); + map.insert(col.name.clone(), default_val); + } + } + } else { + let values = decoder.extract_all(tuple_bytes).ok()?; + for (i, col) in schema.columns.iter().enumerate() { + map.insert(col.name.clone(), values[i].clone()); + } } + Some(Value::Object(map)) } @@ -108,14 +133,20 @@ pub(super) fn binary_tuple_to_json( tuple_bytes: &[u8], schema: &StrictSchema, ) -> Option { - let decoder = nodedb_strict::TupleDecoder::new(schema); - let values = decoder.extract_all(tuple_bytes).ok()?; - - let mut obj = serde_json::Map::with_capacity(schema.columns.len()); - for (i, col) in schema.columns.iter().enumerate() { - obj.insert(col.name.clone(), value_to_json(&values[i])); + // Delegate to binary_tuple_to_value (which handles version-aware decoding) + // then convert Value → JSON. + let val = binary_tuple_to_value(tuple_bytes, schema)?; + match val { + Value::Object(map) => { + let mut obj = serde_json::Map::with_capacity(map.len()); + for col in &schema.columns { + let v = map.get(&col.name).unwrap_or(&Value::Null); + obj.insert(col.name.clone(), value_to_json(v)); + } + Some(serde_json::Value::Object(obj)) + } + _ => None, } - Some(serde_json::Value::Object(obj)) } /// Coerce a `nodedb_types::Value` to match a column's declared type. @@ -372,6 +403,7 @@ mod tests { ColumnDef::nullable("age", ColumnType::Int64), ], version: 1, + dropped_columns: Vec::new(), } } diff --git a/nodedb/tests/executor_tests/test_cross_type_join.rs b/nodedb/tests/executor_tests/test_cross_type_join.rs index 25efc5ac..438d00ad 100644 --- a/nodedb/tests/executor_tests/test_cross_type_join.rs +++ b/nodedb/tests/executor_tests/test_cross_type_join.rs @@ -96,6 +96,7 @@ fn document_scan_preserves_kv_rows_when_collection_has_strict_config() { ColumnDef::nullable("lang", ColumnType::String), ], version: 1, + dropped_columns: Vec::new(), }, }, enforcement: Box::new(EnforcementOptions::default()), diff --git a/nodedb/tests/sql_transactions.rs b/nodedb/tests/sql_transactions.rs index 8f70a366..26af9e22 100644 --- a/nodedb/tests/sql_transactions.rs +++ b/nodedb/tests/sql_transactions.rs @@ -226,3 +226,275 @@ async fn alter_collection_alter_column_type() { .unwrap(); assert_eq!(rows.len(), 1); } + +// ── Pre-ALTER row survival tests ────────────────────────────────────── +// +// Every test below verifies that rows written BEFORE a schema-altering DDL +// remain readable with correct values AFTER the DDL. The bug class is: +// catalog schema mutated without row migration or read-time compat shim. + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn add_column_preserves_pre_alter_row_existing_columns() { + let server = TestServer::start().await; + + server + .exec("CREATE COLLECTION ac_preserve TYPE DOCUMENT STRICT (id TEXT PRIMARY KEY, name TEXT)") + .await + .unwrap(); + server + .exec("INSERT INTO ac_preserve (id, name) VALUES ('a', 'alice')") + .await + .unwrap(); + + server + .exec("ALTER TABLE ac_preserve ADD COLUMN note TEXT DEFAULT 'n/a'") + .await + .unwrap(); + + // Pre-ALTER row must still return correct values for original columns. + let rows = server + .query_text("SELECT id, name FROM ac_preserve WHERE id = 'a'") + .await + .unwrap(); + assert_eq!(rows.len(), 1, "pre-ALTER row must be visible"); + assert!( + rows[0].contains("\"name\":\"alice\""), + "original column 'name' must retain its value, got {:?}", + rows[0] + ); + // Regression guard: must NOT return null-everywhere. + assert!( + !rows[0].contains("\"name\":null"), + "pre-ALTER row must not have null-everywhere corruption, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn add_column_returns_default_for_pre_alter_row() { + let server = TestServer::start().await; + + server + .exec("CREATE COLLECTION ac_default TYPE DOCUMENT STRICT (id TEXT PRIMARY KEY, name TEXT)") + .await + .unwrap(); + server + .exec("INSERT INTO ac_default (id, name) VALUES ('a', 'alice')") + .await + .unwrap(); + + server + .exec("ALTER TABLE ac_default ADD COLUMN note TEXT DEFAULT 'n/a'") + .await + .unwrap(); + + // The new column should virtual-fill with its DEFAULT for pre-ALTER rows. + let rows = server + .query_text("SELECT id, name, note FROM ac_default WHERE id = 'a'") + .await + .unwrap(); + assert_eq!(rows.len(), 1, "pre-ALTER row must be visible"); + assert!( + rows[0].contains("\"note\":\"n/a\""), + "new column must return DEFAULT value for pre-ALTER rows, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn add_column_then_update_pre_alter_row() { + let server = TestServer::start().await; + + server + .exec("CREATE COLLECTION ac_update TYPE DOCUMENT STRICT (id TEXT PRIMARY KEY, name TEXT)") + .await + .unwrap(); + server + .exec("INSERT INTO ac_update (id, name) VALUES ('a', 'alice')") + .await + .unwrap(); + + server + .exec("ALTER TABLE ac_update ADD COLUMN note TEXT DEFAULT 'n/a'") + .await + .unwrap(); + + // Updating a pre-ALTER row must succeed and preserve all columns. + server + .exec("UPDATE ac_update SET note = 'updated' WHERE id = 'a'") + .await + .unwrap(); + + let rows = server + .query_text("SELECT id, name, note FROM ac_update WHERE id = 'a'") + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert!( + rows[0].contains("\"name\":\"alice\""), + "original column must survive update, got {:?}", + rows[0] + ); + assert!( + rows[0].contains("\"note\":\"updated\""), + "updated column must reflect new value, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn multiple_add_columns_preserves_pre_alter_row() { + let server = TestServer::start().await; + + server + .exec("CREATE COLLECTION ac_multi TYPE DOCUMENT STRICT (id TEXT PRIMARY KEY, name TEXT)") + .await + .unwrap(); + server + .exec("INSERT INTO ac_multi (id, name) VALUES ('a', 'alice')") + .await + .unwrap(); + + server + .exec("ALTER TABLE ac_multi ADD COLUMN col1 INT DEFAULT 0") + .await + .unwrap(); + server + .exec("ALTER TABLE ac_multi ADD COLUMN col2 TEXT DEFAULT 'x'") + .await + .unwrap(); + + // Two sequential ADD COLUMNs compound the schema drift — pre-ALTER row + // must still be readable with correct values and defaults. + let rows = server + .query_text("SELECT id, name, col1, col2 FROM ac_multi WHERE id = 'a'") + .await + .unwrap(); + assert_eq!( + rows.len(), + 1, + "pre-ALTER row must be visible after two ADD COLUMNs" + ); + assert!( + rows[0].contains("\"name\":\"alice\""), + "original column must retain value, got {:?}", + rows[0] + ); + // Regression guard: null-everywhere means total schema-data offset corruption. + assert!( + !rows[0].contains("\"name\":null"), + "must not exhibit null-everywhere corruption, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn drop_column_preserves_pre_alter_row_remaining_columns() { + let server = TestServer::start().await; + + server + .exec( + "CREATE COLLECTION dc_preserve TYPE DOCUMENT STRICT (\ + id TEXT PRIMARY KEY, \ + name TEXT NOT NULL, \ + scratch TEXT)", + ) + .await + .unwrap(); + server + .exec("INSERT INTO dc_preserve (id, name, scratch) VALUES ('a', 'alice', 'temp')") + .await + .unwrap(); + + server + .exec("ALTER COLLECTION dc_preserve DROP COLUMN scratch") + .await + .unwrap(); + + // Remaining columns of the pre-ALTER row must read correctly. + let rows = server + .query_text("SELECT id, name FROM dc_preserve WHERE id = 'a'") + .await + .unwrap(); + assert_eq!( + rows.len(), + 1, + "pre-ALTER row must be visible after DROP COLUMN" + ); + assert!( + rows[0].contains("\"name\":\"alice\""), + "remaining column 'name' must retain its value after DROP COLUMN, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn rename_column_preserves_pre_alter_row_value() { + let server = TestServer::start().await; + + server + .exec( + "CREATE COLLECTION rc_preserve TYPE DOCUMENT STRICT (\ + id TEXT PRIMARY KEY, \ + name TEXT NOT NULL, \ + score INT DEFAULT 0)", + ) + .await + .unwrap(); + server + .exec("INSERT INTO rc_preserve (id, name, score) VALUES ('a', 'alice', 42)") + .await + .unwrap(); + + server + .exec("ALTER COLLECTION rc_preserve RENAME COLUMN score TO points") + .await + .unwrap(); + + // Pre-ALTER row must be readable under the new column name with correct value. + let rows = server + .query_text("SELECT id, name, points FROM rc_preserve WHERE id = 'a'") + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert!( + rows[0].contains("\"points\":42") || rows[0].contains("\"points\": 42"), + "renamed column must retain pre-ALTER value, got {:?}", + rows[0] + ); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn alter_column_type_preserves_pre_alter_row_value() { + let server = TestServer::start().await; + + server + .exec( + "CREATE COLLECTION at_preserve TYPE DOCUMENT STRICT (\ + id TEXT PRIMARY KEY, \ + value INT NOT NULL)", + ) + .await + .unwrap(); + server + .exec("INSERT INTO at_preserve (id, value) VALUES ('a', 42)") + .await + .unwrap(); + + server + .exec("ALTER COLLECTION at_preserve ALTER COLUMN value TYPE BIGINT") + .await + .unwrap(); + + // Pre-ALTER row must still read correctly after type widening. + let rows = server + .query_text("SELECT id, value FROM at_preserve WHERE id = 'a'") + .await + .unwrap(); + assert_eq!(rows.len(), 1); + assert!( + rows[0].contains("\"value\":42") || rows[0].contains("\"value\": 42"), + "value must survive ALTER COLUMN TYPE, got {:?}", + rows[0] + ); +}