Skip to content

Commit aa5f1ee

Browse files
authored
Merge pull request #64 from NodeDB-Lab/fix/issue-48-alter-column-schema-evolution
fix(#48): version-aware binary tuple decoding for ALTER COLUMN
2 parents 4c55689 + 93139e2 commit aa5f1ee

File tree

13 files changed

+445
-15
lines changed

13 files changed

+445
-15
lines changed

nodedb-columnar/src/memtable/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ impl ColumnarMemtable {
199199
modifiers: Vec::new(),
200200
generated_expr: None,
201201
generated_deps: Vec::new(),
202+
added_at_version: 1,
202203
});
203204
}
204205
}

nodedb-types/src/collection.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,7 @@ impl std::str::FromStr for CollectionType {
187187
StrictSchema {
188188
columns: vec![],
189189
version: 1,
190+
dropped_columns: Vec::new(),
190191
},
191192
))),
192193
"columnar" => Ok(Self::columnar()),
@@ -196,6 +197,7 @@ impl std::str::FromStr for CollectionType {
196197
schema: StrictSchema {
197198
columns: vec![],
198199
version: 1,
200+
dropped_columns: Vec::new(),
199201
},
200202
ttl: None,
201203
capacity_hint: 0,

nodedb-types/src/columnar/column_type.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,16 @@ pub struct ColumnDef {
255255
/// Column names this generated column depends on.
256256
#[serde(default, skip_serializing_if = "Vec::is_empty")]
257257
pub generated_deps: Vec<String>,
258+
/// Schema version at which this column was added. Original columns have
259+
/// version 1 (the default). Columns added via `ALTER ADD COLUMN` record
260+
/// the schema version after the bump so the reader can build a physical
261+
/// sub-schema for tuples written under older versions.
262+
#[serde(default = "default_added_at_version")]
263+
pub added_at_version: u16,
264+
}
265+
266+
fn default_added_at_version() -> u16 {
267+
1
258268
}
259269

260270
impl ColumnDef {
@@ -268,6 +278,7 @@ impl ColumnDef {
268278
modifiers: Vec::new(),
269279
generated_expr: None,
270280
generated_deps: Vec::new(),
281+
added_at_version: 1,
271282
}
272283
}
273284

@@ -281,6 +292,7 @@ impl ColumnDef {
281292
modifiers: Vec::new(),
282293
generated_expr: None,
283294
generated_deps: Vec::new(),
295+
added_at_version: 1,
284296
}
285297
}
286298

nodedb-types/src/columnar/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,4 @@ pub mod schema;
44

55
pub use column_type::{ColumnDef, ColumnModifier, ColumnType, ColumnTypeParseError};
66
pub use profile::{ColumnarProfile, DocumentMode};
7-
pub use schema::{ColumnarSchema, SchemaError, SchemaOps, StrictSchema};
7+
pub use schema::{ColumnarSchema, DroppedColumn, SchemaError, SchemaOps, StrictSchema};

nodedb-types/src/columnar/schema.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,31 @@ pub trait SchemaOps {
4444
pub struct StrictSchema {
4545
pub columns: Vec<ColumnDef>,
4646
pub version: u16,
47+
/// Columns that were removed via `ALTER DROP COLUMN`. Retained so the
48+
/// reader can reconstruct the physical layout of tuples written before
49+
/// the drop.
50+
#[serde(default, skip_serializing_if = "Vec::is_empty")]
51+
pub dropped_columns: Vec<DroppedColumn>,
52+
}
53+
54+
/// Tombstone for a column removed by `ALTER DROP COLUMN`.
55+
#[derive(
56+
Debug,
57+
Clone,
58+
PartialEq,
59+
Eq,
60+
Serialize,
61+
Deserialize,
62+
zerompk::ToMessagePack,
63+
zerompk::FromMessagePack,
64+
)]
65+
pub struct DroppedColumn {
66+
/// The full column definition at time of drop.
67+
pub def: ColumnDef,
68+
/// The column's position in the column list before it was removed.
69+
pub position: usize,
70+
/// The schema version at which the column was dropped.
71+
pub dropped_at_version: u16,
4772
}
4873

4974
/// Schema for a columnar collection (compressed segment files).
@@ -112,6 +137,7 @@ impl StrictSchema {
112137
Ok(Self {
113138
columns,
114139
version: 1,
140+
dropped_columns: Vec::new(),
115141
})
116142
}
117143

@@ -135,6 +161,75 @@ impl StrictSchema {
135161
pub fn null_bitmap_size(&self) -> usize {
136162
self.columns.len().div_ceil(8)
137163
}
164+
165+
/// Build a sub-schema matching the physical layout of tuples written at
166+
/// the given version. Columns added after `version` are excluded;
167+
/// columns dropped after `version` are re-inserted at their original
168+
/// positions.
169+
pub fn schema_for_version(&self, version: u16) -> StrictSchema {
170+
// Start with live columns that existed at this version.
171+
let mut cols: Vec<ColumnDef> = self
172+
.columns
173+
.iter()
174+
.filter(|c| c.added_at_version <= version)
175+
.cloned()
176+
.collect();
177+
178+
// Re-insert dropped columns that were still alive at this version,
179+
// sorted by position (ascending) so inserts don't shift later indices.
180+
let mut to_reinsert: Vec<&DroppedColumn> = self
181+
.dropped_columns
182+
.iter()
183+
.filter(|dc| dc.def.added_at_version <= version && dc.dropped_at_version > version)
184+
.collect();
185+
to_reinsert.sort_by_key(|dc| dc.position);
186+
for dc in to_reinsert {
187+
let pos = dc.position.min(cols.len());
188+
cols.insert(pos, dc.def.clone());
189+
}
190+
191+
StrictSchema {
192+
version,
193+
columns: cols,
194+
dropped_columns: Vec::new(),
195+
}
196+
}
197+
198+
/// Parse a SQL default literal (e.g. `'n/a'`, `0`, `true`) into a `Value`.
199+
///
200+
/// Covers the common cases produced by `ALTER ADD COLUMN ... DEFAULT ...`.
201+
/// Returns `Value::Null` for expressions that cannot be trivially evaluated
202+
/// at read time (functions, sub-queries, etc.).
203+
pub fn parse_default_literal(expr: &str) -> crate::value::Value {
204+
use crate::value::Value;
205+
206+
let trimmed = expr.trim();
207+
208+
// String literals: 'foo'
209+
if trimmed.starts_with('\'') && trimmed.ends_with('\'') && trimmed.len() >= 2 {
210+
return Value::String(trimmed[1..trimmed.len() - 1].replace("''", "'"));
211+
}
212+
213+
// Boolean
214+
match trimmed.to_uppercase().as_str() {
215+
"TRUE" => return Value::Bool(true),
216+
"FALSE" => return Value::Bool(false),
217+
"NULL" => return Value::Null,
218+
_ => {}
219+
}
220+
221+
// Integer
222+
if let Ok(i) = trimmed.parse::<i64>() {
223+
return Value::Integer(i);
224+
}
225+
226+
// Float
227+
if let Ok(f) = trimmed.parse::<f64>() {
228+
return Value::Float(f);
229+
}
230+
231+
Value::Null
232+
}
138233
}
139234

140235
impl ColumnarSchema {
@@ -211,6 +306,7 @@ mod tests {
211306
modifiers: Vec::new(),
212307
generated_expr: None,
213308
generated_deps: Vec::new(),
309+
added_at_version: 1,
214310
}];
215311
assert!(matches!(
216312
StrictSchema::new(cols),

nodedb/src/control/server/pgwire/ddl/collection/alter/add_column.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,11 @@ pub async fn alter_table_add_column(
6262
&format!("column '{}' already exists", column.name),
6363
));
6464
}
65+
let new_version = schema.version.saturating_add(1);
66+
let mut column = column;
67+
column.added_at_version = new_version;
6568
schema.columns.push(column);
66-
schema.version = schema.version.saturating_add(1);
69+
schema.version = new_version;
6770

6871
let mut updated = coll;
6972
updated.collection_type = nodedb_types::CollectionType::strict(schema.clone());

nodedb/src/control/server/pgwire/ddl/collection/alter/drop_column.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,16 @@ pub async fn alter_collection_drop_column(
7979
));
8080
}
8181

82-
schema.columns.remove(idx);
83-
schema.version = schema.version.saturating_add(1);
82+
let dropped_def = schema.columns.remove(idx);
83+
let new_version = schema.version.saturating_add(1);
84+
schema
85+
.dropped_columns
86+
.push(nodedb_types::columnar::DroppedColumn {
87+
def: dropped_def,
88+
position: idx,
89+
dropped_at_version: new_version,
90+
});
91+
schema.version = new_version;
8492

8593
let mut updated = coll;
8694
updated.collection_type = nodedb_types::CollectionType::strict(schema.clone());

nodedb/src/control/server/pgwire/ddl/convert.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ pub async fn convert_collection(
9292
let schema = nodedb_types::columnar::StrictSchema {
9393
columns,
9494
version: 1,
95+
dropped_columns: Vec::new(),
9596
};
9697
if target_type == "kv" {
9798
nodedb_types::CollectionType::kv(schema)

nodedb/src/data/executor/handlers/convert.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ impl CoreLoop {
105105
let schema = StrictSchema {
106106
columns,
107107
version: 1,
108+
dropped_columns: Vec::new(),
108109
};
109110

110111
// Scan all existing documents.

nodedb/src/data/executor/handlers/document/read.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,7 @@ mod tests {
626626
ColumnDef::nullable("age", ColumnType::Int64),
627627
],
628628
version: 1,
629+
dropped_columns: Vec::new(),
629630
};
630631
let mut map = std::collections::HashMap::new();
631632
map.insert("id".into(), Value::String("u1".into()));

0 commit comments

Comments
 (0)