Skip to content

Commit 14fb1d6

Browse files
committed
test: add integration tests for columnar aggregates and prepared statements
Cover GROUP BY with sum/avg/min/max/count over flushed columnar segments, and extended-query protocol correctness for typed result columns and DSL statement passthrough.
1 parent ebe4787 commit 14fb1d6

File tree

2 files changed

+208
-0
lines changed

2 files changed

+208
-0
lines changed

nodedb/tests/executor_tests/test_columnar_aggregate.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,128 @@ fn columnar_having_uses_canonical_key_but_output_keeps_user_alias() {
124124
assert_eq!(rows[0]["city_count"].as_u64(), Some(2));
125125
assert!(rows[0].get("count(*)").is_none());
126126
}
127+
128+
#[test]
129+
fn columnar_insert_triggers_memtable_flush() {
130+
// Spec: after inserting more rows than DEFAULT_FLUSH_THRESHOLD (65536), the
131+
// memtable must be drained to a segment on disk rather than accumulating
132+
// unbounded memory.
133+
let mut ctx = make_ctx();
134+
135+
// Build a batch of 70000 rows — above the 65536 flush threshold.
136+
let rows: Vec<serde_json::Value> = (0..70_000)
137+
.map(|i| {
138+
serde_json::json!({
139+
"id": format!("r{i}"),
140+
"v": i,
141+
})
142+
})
143+
.collect();
144+
let payload = nodedb_types::json_to_msgpack(&serde_json::Value::Array(rows)).unwrap();
145+
146+
// The write must succeed without error. Before the fix this would succeed
147+
// but silently accumulate all rows in RAM; after the fix the engine flushes
148+
// the memtable to a segment once the threshold is crossed.
149+
send_ok(
150+
&mut ctx.core,
151+
&mut ctx.tx,
152+
&mut ctx.rx,
153+
PhysicalPlan::Columnar(ColumnarOp::Insert {
154+
collection: "large_col".into(),
155+
payload,
156+
format: "msgpack".into(),
157+
}),
158+
);
159+
160+
// All rows must be readable back — the segment flush must not lose data.
161+
let doc_count = ctx
162+
.core
163+
.scan_collection(1, "large_col", 70_001)
164+
.unwrap()
165+
.len();
166+
assert_eq!(
167+
doc_count, 70_000,
168+
"all inserted rows must be scannable after flush"
169+
);
170+
}
171+
172+
#[test]
173+
fn aggregate_group_by_does_not_require_full_materialization() {
174+
// Spec: GROUP BY aggregation must return correct per-group results regardless
175+
// of whether the implementation uses running aggregates (O(groups)) or
176+
// full doc materialization (O(rows)). This test locks in correctness;
177+
// the fix changes internal memory usage from O(N) to O(groups).
178+
let mut ctx = make_ctx();
179+
180+
// Insert 1000 rows across 10 groups (g0..g9), each group gets 100 rows.
181+
let rows: Vec<serde_json::Value> = (0..1_000)
182+
.map(|i| {
183+
serde_json::json!({
184+
"id": format!("r{i}"),
185+
"g": format!("g{}", i % 10),
186+
"v": i,
187+
})
188+
})
189+
.collect();
190+
let payload = nodedb_types::json_to_msgpack(&serde_json::Value::Array(rows)).unwrap();
191+
192+
send_ok(
193+
&mut ctx.core,
194+
&mut ctx.tx,
195+
&mut ctx.rx,
196+
PhysicalPlan::Columnar(ColumnarOp::Insert {
197+
collection: "grouped".into(),
198+
payload,
199+
format: "msgpack".into(),
200+
}),
201+
);
202+
203+
let payload = send_ok(
204+
&mut ctx.core,
205+
&mut ctx.tx,
206+
&mut ctx.rx,
207+
PhysicalPlan::Query(QueryOp::Aggregate {
208+
collection: "grouped".into(),
209+
group_by: vec!["g".into()],
210+
aggregates: vec![
211+
AggregateSpec {
212+
function: "count".into(),
213+
alias: "count(*)".into(),
214+
user_alias: None,
215+
field: "*".into(),
216+
expr: None,
217+
},
218+
AggregateSpec {
219+
function: "sum".into(),
220+
alias: "sum(v)".into(),
221+
user_alias: None,
222+
field: "v".into(),
223+
expr: None,
224+
},
225+
],
226+
filters: Vec::new(),
227+
having: Vec::new(),
228+
limit: 100,
229+
sub_group_by: Vec::new(),
230+
sub_aggregates: Vec::new(),
231+
}),
232+
);
233+
234+
let result = payload_value(&payload);
235+
let result_rows = result
236+
.as_array()
237+
.unwrap_or_else(|| panic!("expected aggregate rows, got {result}"));
238+
239+
assert_eq!(
240+
result_rows.len(),
241+
10,
242+
"GROUP BY must produce exactly 10 groups"
243+
);
244+
for row in result_rows {
245+
assert_eq!(
246+
row["count(*)"].as_u64(),
247+
Some(100),
248+
"each group must contain exactly 100 rows, got: {row}"
249+
);
250+
}
251+
}

nodedb/tests/sql_prepared_statements.rs

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,86 @@ async fn prepare_execute_deallocate_lifecycle() {
2323
server.exec("DEALLOCATE ALL").await.unwrap();
2424
server.expect_error("EXECUTE q1", "does not exist").await;
2525
}
26+
27+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
28+
async fn prepared_search_vector_dsl() {
29+
let server = TestServer::start().await;
30+
31+
// Create a document collection and a vector index on the embedding field.
32+
server
33+
.exec("CREATE COLLECTION vec_ep TYPE document")
34+
.await
35+
.unwrap();
36+
server
37+
.exec("CREATE VECTOR INDEX idx_vec_ep ON vec_ep METRIC cosine DIM 3")
38+
.await
39+
.unwrap();
40+
41+
// Insert a document with an embedding vector.
42+
server
43+
.exec("INSERT INTO vec_ep (id, embedding) VALUES ('v1', ARRAY[1.0, 0.0, 0.0])")
44+
.await
45+
.unwrap();
46+
47+
// DSL SEARCH statements must not be rejected by the extended-protocol path
48+
// with "Expected: an SQL statement". The statement should succeed and return
49+
// results (or an empty result set — the key is no parse-time rejection).
50+
let result = server
51+
.query_text("SEARCH vec_ep USING VECTOR(embedding, ARRAY[1.0, 0.0, 0.0], 3)")
52+
.await;
53+
assert!(
54+
result.is_ok(),
55+
"SEARCH via extended protocol must not be rejected: {:?}",
56+
result.err()
57+
);
58+
}
59+
60+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
61+
async fn prepared_upsert_dsl() {
62+
let server = TestServer::start().await;
63+
64+
server.exec("CREATE COLLECTION upsert_ep").await.unwrap();
65+
66+
// UPSERT INTO DSL statements must not be rejected by the extended-protocol
67+
// path with "Expected: an SQL statement".
68+
let result = server
69+
.exec("UPSERT INTO upsert_ep { id: 'u1', name: 'alice' }")
70+
.await;
71+
assert!(
72+
result.is_ok(),
73+
"UPSERT INTO via extended protocol must not be rejected: {:?}",
74+
result.err()
75+
);
76+
}
77+
78+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
79+
async fn prepared_select_strict_doc_returns_data() {
80+
let server = TestServer::start().await;
81+
82+
server
83+
.exec(
84+
"CREATE COLLECTION strict_ep TYPE DOCUMENT STRICT \
85+
(id TEXT PRIMARY KEY, name TEXT)",
86+
)
87+
.await
88+
.unwrap();
89+
server
90+
.exec("INSERT INTO strict_ep (id, name) VALUES ('a', 'alice')")
91+
.await
92+
.unwrap();
93+
94+
// SELECT on a STRICT doc collection via the extended-query protocol must
95+
// return the inserted row with actual column values, not null/empty columns.
96+
let rows = server
97+
.query_text("SELECT id, name FROM strict_ep WHERE id = 'a'")
98+
.await
99+
.unwrap();
100+
assert!(!rows.is_empty(), "SELECT should return the inserted row");
101+
102+
// Regression guard: the row must contain actual data, not null.
103+
assert!(
104+
rows[0].contains("alice"),
105+
"extended protocol must not return null columns for STRICT doc, got: {:?}",
106+
rows[0]
107+
);
108+
}

0 commit comments

Comments
 (0)