From c10ee61ce443c35073c09810eab4094801860646 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:36:27 +0800 Subject: [PATCH 1/8] fix(nodedb): eliminate idle CPU burn in response poller and event consumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The response poller loop unconditionally called yield_now() even when no responses were in flight, keeping a tokio worker pinned at ~100% CPU on an idle server. Similarly the Event Plane consumer woke every 1ms regardless of ring buffer activity. response_poller now uses adaptive backoff: yield_now() while active, ramp to sleep(1ms) after 256 idle iterations, then sleep(10ms) after 1024 (roughly one second of idleness). This bounds idle CPU to ~0.1% of one core while preserving sub-millisecond latency under load. The Event Plane consumer gains the same adaptive ramp: it stays at 1ms for the first 32 empty polls then backs off to 50ms, capping idle wakeups to ~20/sec per core rather than 1000/sec. poll_and_route_responses now returns the routed-response count so the poller can distinguish active from idle iterations. The data-plane tick loops in test harnesses (and session.rs) are tightened to exit on Disconnected as well as on the stop signal — previously a panic-induced drop of the sender left spawn_blocking threads spinning forever on a closed channel, which blocked tokio runtime shutdown and wasted CI time at slow-timeout. --- nodedb/src/control/server/session.rs | 5 ++- nodedb/src/control/state/methods.rs | 6 +++- nodedb/src/event/consumer.rs | 24 ++++++++++++-- nodedb/src/main.rs | 48 +++++++++++++++++++++------ nodedb/tests/common/pgwire_harness.rs | 5 ++- nodedb/tests/pgwire_connect.rs | 5 ++- 6 files changed, 75 insertions(+), 18 deletions(-) diff --git a/nodedb/src/control/server/session.rs b/nodedb/src/control/server/session.rs index 06df6c22..5c7968b5 100644 --- a/nodedb/src/control/server/session.rs +++ b/nodedb/src/control/server/session.rs @@ -477,7 +477,10 @@ mod tests { let core_handle = tokio::task::spawn_blocking(move || { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } diff --git a/nodedb/src/control/state/methods.rs b/nodedb/src/control/state/methods.rs index a1f28ad2..a8f54b8f 100644 --- a/nodedb/src/control/state/methods.rs +++ b/nodedb/src/control/state/methods.rs @@ -306,7 +306,9 @@ impl SharedState { } /// Poll responses from all Data Plane cores and route them to waiting sessions. - pub fn poll_and_route_responses(&self) { + /// Returns the number of responses routed — callers use this for adaptive + /// backoff (zero ⇒ idle, sleep longer; non-zero ⇒ active, stay hot). + pub fn poll_and_route_responses(&self) -> usize { let responses = match self.dispatcher.lock() { Ok(mut d) => d.poll_responses(), Err(poisoned) => { @@ -314,11 +316,13 @@ impl SharedState { poisoned.into_inner().poll_responses() } }; + let count = responses.len(); for resp in responses { if !self.tracker.complete(resp) { warn!("response for unknown or cancelled request"); } } + count } /// Acquire (or re-confirm) a descriptor lease at the given diff --git a/nodedb/src/event/consumer.rs b/nodedb/src/event/consumer.rs index b9e1b15e..8c1725b6 100644 --- a/nodedb/src/event/consumer.rs +++ b/nodedb/src/event/consumer.rs @@ -34,8 +34,17 @@ use super::consumer_helpers::{ detect_sequence_gap, flush_watermark, maybe_flush_watermark, record_event, }; -/// How often to poll the ring buffer when empty (milliseconds). -const EMPTY_POLL_INTERVAL: Duration = Duration::from_millis(1); +/// Initial sleep when the ring buffer is empty. Adaptive backoff ramps +/// up to `EMPTY_POLL_MAX` after `EMPTY_POLL_RAMP` consecutive empty polls +/// so an idle Event Plane consumer does not wake every 1ms forever. +const EMPTY_POLL_MIN: Duration = Duration::from_millis(1); +/// Cap on the empty-poll sleep. 50ms keeps trigger / CDC dispatch latency +/// bounded for the first event after an idle period while limiting idle +/// CPU to ~20 wakes/sec per core. +const EMPTY_POLL_MAX: Duration = Duration::from_millis(50); +/// After this many consecutive empty polls (~32ms of idleness at 1ms), +/// switch to the long sleep. +const EMPTY_POLL_RAMP: u32 = 32; /// Maximum events to process per ring buffer drain before yielding. const DRAIN_BATCH_LIMIT: u32 = 1024; @@ -137,6 +146,7 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { debug!(core_id, "event plane consumer started"); let mut wal_retry_count: u32 = 0; + let mut empty_polls: u32 = 0; loop { if *shutdown.borrow() { @@ -161,6 +171,7 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { let batch_count = events.len(); if batch_count > 0 { + empty_polls = 0; dirty_watermark = true; let mut trigger_collector = @@ -329,8 +340,15 @@ async fn consumer_loop(config: ConsumerConfig, metrics: Arc) { &mut last_watermark_flush, ); + empty_polls = empty_polls.saturating_add(1); + let poll_sleep = if empty_polls < EMPTY_POLL_RAMP { + EMPTY_POLL_MIN + } else { + EMPTY_POLL_MAX + }; + tokio::select! { - _ = tokio::time::sleep(EMPTY_POLL_INTERVAL) => {} + _ = tokio::time::sleep(poll_sleep) => {} _ = shutdown.changed() => { if dirty_watermark { flush_watermark(&watermark_store, core_id, last_lsn); diff --git a/nodedb/src/main.rs b/nodedb/src/main.rs index 13309f6b..5502eb27 100644 --- a/nodedb/src/main.rs +++ b/nodedb/src/main.rs @@ -357,24 +357,46 @@ async fn main() -> anyhow::Result<()> { ); // Start response poller: routes Data Plane responses to - // waiting sessions. Uses `yield_now()` instead of `sleep()` - // because tokio's timer wheel has 1ms minimum granularity — - // sleep(100us) actually sleeps ~1ms, adding 1ms to every - // request's latency. `yield_now()` yields to the scheduler - // without a timer, polling on every scheduler cycle - // (microsecond-level). + // waiting sessions. + // + // Adaptive backoff strategy: under load we use `yield_now()` + // for microsecond-level responsiveness (tokio's timer wheel + // has 1ms granularity, so sleep(100us) actually sleeps ~1ms, + // adding 1ms to every request's latency). When the poller + // observes an idle streak we ramp the wait up so an idle + // server does not peg an entire tokio worker at 100% CPU. + // + // - Active (response just routed OR within the last ~256 yields): + // yield_now() — sub-millisecond latency for bursts. + // - Idle for 256+ iterations: sleep 1ms (still responsive, + // matches the timer wheel minimum). + // - Idle for 1024+ iterations (~1s of true idleness): sleep + // 10ms — bounds idle CPU to ~0.1% of one core. let shared_poller = Arc::clone(&shared); nodedb::control::shutdown::spawn_loop( &shared.loop_registry, &shared.shutdown, "response_poller", move |shutdown| async move { + let mut idle_iters: u32 = 0; loop { if shutdown.is_cancelled() { break; } - shared_poller.poll_and_route_responses(); - tokio::task::yield_now().await; + let routed = shared_poller.poll_and_route_responses(); + if routed > 0 { + idle_iters = 0; + tokio::task::yield_now().await; + continue; + } + idle_iters = idle_iters.saturating_add(1); + if idle_iters <= 256 { + tokio::task::yield_now().await; + } else if idle_iters <= 1024 { + tokio::time::sleep(std::time::Duration::from_millis(1)).await; + } else { + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + } } }, ); @@ -669,15 +691,19 @@ async fn main() -> anyhow::Result<()> { shared.cluster_transport.as_ref(), shared.cluster_topology.as_ref(), ) { - let topo_guard = topology.read().unwrap_or_else(|p| p.into_inner()); + // Clone the topology snapshot so the read guard is dropped + // before awaiting — clippy::await_holding_lock. + let topo_snapshot = { + let guard = topology.read().unwrap_or_else(|p| p.into_inner()); + guard.clone() + }; let warm_report = nodedb::control::cluster::warm_known_peers( transport, - &topo_guard, + &topo_snapshot, shared.node_id, Duration::from_secs(2), ) .await; - drop(topo_guard); if warm_report.attempted > 0 { info!(report = %warm_report, "peer cache warm-up complete"); if !warm_report.is_complete() { diff --git a/nodedb/tests/common/pgwire_harness.rs b/nodedb/tests/common/pgwire_harness.rs index 48a7c6c3..101b0ef3 100644 --- a/nodedb/tests/common/pgwire_harness.rs +++ b/nodedb/tests/common/pgwire_harness.rs @@ -56,7 +56,10 @@ impl TestServer { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); core.set_event_producer(event_producer); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } diff --git a/nodedb/tests/pgwire_connect.rs b/nodedb/tests/pgwire_connect.rs index e080e4b4..588b8d18 100644 --- a/nodedb/tests/pgwire_connect.rs +++ b/nodedb/tests/pgwire_connect.rs @@ -27,7 +27,10 @@ async fn pgwire_connect_and_query() { let core_handle = tokio::task::spawn_blocking(move || { let mut core = CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir).unwrap(); - while core_stop_rx.try_recv().is_err() { + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } From a88356bc692fdbbdb5571cd37eaa4217d9531302 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:36:48 +0800 Subject: [PATCH 2/8] fix(nodedb): close post-apply race between in-memory cache and applied-index watcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously all post-apply side effects ran inside a tokio::spawn task. The metadata applier then bumped the applied-index watcher, meaning a reader that woke on the watcher bump (e.g. waiting for applied_index to advance past N) could query the in-memory credential or permission cache before install_replicated_user / install_replicated_owner had run — a scheduler-order race that caused sporadic test failures. Split post-apply into two phases: - apply_post_apply_side_effects_sync runs inline on the applier thread BEFORE the watcher bump, covering all in-memory cache updates (users, roles, permissions, API keys, sequences, etc.). Any reader observing applied_index >= N is now guaranteed to see every sync side-effect of every entry up to N. - spawn_post_apply_async_side_effects spawns the genuinely async work (Data Plane Register dispatch for PutCollection). Correctness does not depend on this completing before the watcher advances. Also tighten the cluster-mode CREATE USER path: if the user entry is missing after propose_catalog_entry returns (which can happen when a leader change truncates the log entry between assignment and quorum commit), return a retryable 40001 error so exec_ddl_on_any_leader re-proposes on the current leader rather than silently succeeding with a phantom log index. Single-node mode is unchanged: it still writes to redb and installs the cache entry inline when a catalog is present, and works correctly without one (test fixtures). --- .../catalog_entry/post_apply/collection.rs | 34 ++- .../control/catalog_entry/post_apply/mod.rs | 288 ++++++++++-------- .../src/control/cluster/metadata_applier.rs | 16 +- nodedb/src/control/server/pgwire/ddl/user.rs | 66 ++-- .../control/server/pgwire/handler/retry.rs | 8 +- 5 files changed, 252 insertions(+), 160 deletions(-) diff --git a/nodedb/src/control/catalog_entry/post_apply/collection.rs b/nodedb/src/control/catalog_entry/post_apply/collection.rs index 5bf4fda8..4d1f40c3 100644 --- a/nodedb/src/control/catalog_entry/post_apply/collection.rs +++ b/nodedb/src/control/catalog_entry/post_apply/collection.rs @@ -7,19 +7,11 @@ use tracing::debug; use crate::control::security::catalog::{StoredCollection, StoredOwner}; use crate::control::state::SharedState; -pub async fn put(stored: StoredCollection, shared: Arc) { - // Tell this node's Data Plane about the new collection so the - // first cross-node INSERT doesn't need to rediscover the - // storage mode. - crate::control::server::pgwire::ddl::collection::create::dispatch_register_from_stored( - &shared, &stored, - ) - .await; - debug!( - collection = %stored.name, - "catalog_entry: Register dispatched to local Data Plane" - ); - +/// Synchronous half of `PutCollection` post-apply: install the owner +/// record into the in-memory `PermissionStore`. Called inline by the +/// metadata applier BEFORE the applied-index watcher bump so readers +/// of `applied_index` observe the ownership consistently. +pub fn put_owner_sync(stored: &StoredCollection, shared: Arc) { // Replicate the owner record on every node so cluster-wide // `is_owner` / `check` evaluations succeed. Handlers no longer // call `set_owner` directly — ownership is entirely a side @@ -32,6 +24,22 @@ pub async fn put(stored: StoredCollection, shared: Arc) { }); } +/// Asynchronous half: dispatch a `Register` request to this node's +/// Data Plane so the first cross-node INSERT doesn't need to +/// rediscover the storage mode. Spawned as a best-effort task — +/// correctness does not depend on it completing before the +/// `applied_index` watcher bumps, only performance does. +pub async fn put_async(stored: StoredCollection, shared: Arc) { + crate::control::server::pgwire::ddl::collection::create::dispatch_register_from_stored( + &shared, &stored, + ) + .await; + debug!( + collection = %stored.name, + "catalog_entry: Register dispatched to local Data Plane" + ); +} + pub fn deactivate(tenant_id: u32, name: String, shared: Arc) { // Remove the ownership record so `is_owner` checks return false // after drop — the in-memory map would otherwise keep a stale diff --git a/nodedb/src/control/catalog_entry/post_apply/mod.rs b/nodedb/src/control/catalog_entry/post_apply/mod.rs index 4d8a7aa6..88814339 100644 --- a/nodedb/src/control/catalog_entry/post_apply/mod.rs +++ b/nodedb/src/control/catalog_entry/post_apply/mod.rs @@ -1,12 +1,28 @@ -//! Asynchronous post-apply side effects for a [`CatalogEntry`] — -//! dispatched by DDL family. +//! Post-apply side effects for a [`CatalogEntry`] — dispatched by +//! DDL family. //! -//! The top-level [`spawn_post_apply_side_effects`] is one -//! `tokio::spawn` containing an exhaustive match that routes each -//! variant to a typed function in a per-family sibling file. -//! Adding a new variant forces this file to grow by one line and -//! the corresponding family file by one function — never grows -//! unboundedly. +//! Split into two phases so readers of `applied_index` observe a +//! consistent view: +//! +//! - [`apply_post_apply_side_effects_sync`] runs the synchronous +//! in-memory cache updates (install_replicated_user, +//! install_replicated_role, etc.) **inline** on the raft applier +//! thread, BEFORE the metadata applier bumps the +//! `AppliedIndexWatcher`. Once `applied_index = N`, readers are +//! guaranteed to see every sync side-effect of every entry up to +//! N — no tokio spawn race. +//! - [`spawn_post_apply_async_side_effects`] spawns a tokio task for +//! the genuinely async work — today that is only Data Plane +//! dispatches for `PutCollection`. Readers of the Data Plane +//! register state still race with this, but no test relies on +//! that synchronisation. +//! +//! Previously both were combined into a single `tokio::spawn`, so +//! a freshly-applied `PutUser` could bump the watcher while its +//! `install_replicated_user` task was still queued on the +//! scheduler. Tests that waited on `applied_index` and then +//! immediately polled `credentials.get_user` would flake whenever +//! the scheduler ran them in that order. pub mod api_key; pub mod change_stream; @@ -29,117 +45,147 @@ use std::sync::Arc; use crate::control::catalog_entry::entry::CatalogEntry; use crate::control::state::SharedState; -/// Spawn the post-apply side effects of `entry`. Best-effort: any -/// failure inside the spawned task logs a warning but does not -/// unwind the raft apply path. -pub fn spawn_post_apply_side_effects(entry: CatalogEntry, shared: Arc) { - tokio::spawn(async move { - match entry { - CatalogEntry::PutCollection(stored) => { - collection::put(*stored, shared).await; - } - CatalogEntry::DeactivateCollection { tenant_id, name } => { - collection::deactivate(tenant_id, name, shared); - } - CatalogEntry::PutSequence(stored) => { - sequence::put(*stored, shared); - } - CatalogEntry::DeleteSequence { tenant_id, name } => { - sequence::delete(tenant_id, name, shared); - } - CatalogEntry::PutSequenceState(state) => { - sequence::put_state(*state, shared); - } - CatalogEntry::PutTrigger(stored) => { - trigger::put(*stored, shared); - } - CatalogEntry::DeleteTrigger { tenant_id, name } => { - trigger::delete(tenant_id, name, shared); - } - CatalogEntry::PutFunction(stored) => { - function::put(*stored, shared); - } - CatalogEntry::DeleteFunction { tenant_id, name } => { - function::delete(tenant_id, name, shared); - } - CatalogEntry::PutProcedure(stored) => { - procedure::put(*stored, shared); - } - CatalogEntry::DeleteProcedure { tenant_id, name } => { - procedure::delete(tenant_id, name, shared); - } - CatalogEntry::PutSchedule(stored) => { - schedule::put(*stored, shared); - } - CatalogEntry::DeleteSchedule { tenant_id, name } => { - schedule::delete(tenant_id, name, shared); - } - CatalogEntry::PutChangeStream(stored) => { - change_stream::put(*stored, shared); - } - CatalogEntry::DeleteChangeStream { tenant_id, name } => { - change_stream::delete(tenant_id, name, shared); - } - CatalogEntry::PutUser(stored) => { - user::put(*stored, shared); - } - CatalogEntry::DeactivateUser { username } => { - user::deactivate(username, shared); - } - CatalogEntry::PutRole(stored) => { - role::put(*stored, shared); - } - CatalogEntry::DeleteRole { name } => { - role::delete(name, shared); - } - CatalogEntry::PutApiKey(stored) => { - api_key::put(*stored, shared); - } - CatalogEntry::RevokeApiKey { key_id } => { - api_key::revoke(key_id, shared); - } - CatalogEntry::PutMaterializedView(stored) => { - materialized_view::put(*stored, shared); - } - CatalogEntry::DeleteMaterializedView { tenant_id, name } => { - materialized_view::delete(tenant_id, name, shared); - } - CatalogEntry::PutTenant(stored) => { - tenant::put(*stored, shared); - } - CatalogEntry::DeleteTenant { tenant_id } => { - tenant::delete(tenant_id, shared); - } - CatalogEntry::PutRlsPolicy(stored) => { - rls::put(*stored, shared); - } - CatalogEntry::DeleteRlsPolicy { - tenant_id, - collection, - name, - } => { - rls::delete(tenant_id, collection, name, shared); - } - CatalogEntry::PutPermission(stored) => { - permission::put(*stored, shared); - } - CatalogEntry::DeletePermission { - target, - grantee, - permission: perm, - } => { - permission::delete(target, grantee, perm, shared); - } - CatalogEntry::PutOwner(stored) => { - owner::put(*stored, shared); - } - CatalogEntry::DeleteOwner { - object_type, - tenant_id, - object_name, - } => { - owner::delete(object_type, tenant_id, object_name, shared); - } - } - }); +/// Run every **synchronous** post-apply side effect inline. Must be +/// called from the metadata applier BEFORE the watcher bump so +/// readers of the applied index see every in-memory cache update +/// that entry triggered. Best-effort per variant: the whole thing +/// is infallible today (all typed functions log on failure and +/// return). +pub fn apply_post_apply_side_effects_sync(entry: &CatalogEntry, shared: &Arc) { + match entry { + CatalogEntry::PutCollection(stored) => { + // Owner record install is sync; Data Plane register is + // the async part, handled by `spawn_post_apply_async_side_effects`. + collection::put_owner_sync(stored, Arc::clone(shared)); + } + CatalogEntry::DeactivateCollection { tenant_id, name } => { + collection::deactivate(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSequence(stored) => { + sequence::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteSequence { tenant_id, name } => { + sequence::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSequenceState(state) => { + sequence::put_state((**state).clone(), Arc::clone(shared)); + } + CatalogEntry::PutTrigger(stored) => { + trigger::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteTrigger { tenant_id, name } => { + trigger::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutFunction(stored) => { + function::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteFunction { tenant_id, name } => { + function::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutProcedure(stored) => { + procedure::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteProcedure { tenant_id, name } => { + procedure::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutSchedule(stored) => { + schedule::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteSchedule { tenant_id, name } => { + schedule::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutChangeStream(stored) => { + change_stream::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteChangeStream { tenant_id, name } => { + change_stream::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutUser(stored) => { + user::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeactivateUser { username } => { + user::deactivate(username.clone(), Arc::clone(shared)); + } + CatalogEntry::PutRole(stored) => { + role::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteRole { name } => { + role::delete(name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutApiKey(stored) => { + api_key::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::RevokeApiKey { key_id } => { + api_key::revoke(key_id.clone(), Arc::clone(shared)); + } + CatalogEntry::PutMaterializedView(stored) => { + materialized_view::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteMaterializedView { tenant_id, name } => { + materialized_view::delete(*tenant_id, name.clone(), Arc::clone(shared)); + } + CatalogEntry::PutTenant(stored) => { + tenant::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteTenant { tenant_id } => { + tenant::delete(*tenant_id, Arc::clone(shared)); + } + CatalogEntry::PutRlsPolicy(stored) => { + rls::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteRlsPolicy { + tenant_id, + collection, + name, + } => { + rls::delete( + *tenant_id, + collection.clone(), + name.clone(), + Arc::clone(shared), + ); + } + CatalogEntry::PutPermission(stored) => { + permission::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeletePermission { + target, + grantee, + permission: perm, + } => { + permission::delete( + target.clone(), + grantee.clone(), + perm.clone(), + Arc::clone(shared), + ); + } + CatalogEntry::PutOwner(stored) => { + owner::put((**stored).clone(), Arc::clone(shared)); + } + CatalogEntry::DeleteOwner { + object_type, + tenant_id, + object_name, + } => { + owner::delete( + object_type.clone(), + *tenant_id, + object_name.clone(), + Arc::clone(shared), + ); + } + } +} + +/// Spawn the async post-apply side effects of `entry`. Today this is +/// limited to Data Plane dispatches for `PutCollection` (the only +/// genuinely `.await`-carrying variant). Best-effort: failures log +/// and drop. +pub fn spawn_post_apply_async_side_effects(entry: CatalogEntry, shared: Arc) { + if let CatalogEntry::PutCollection(stored) = entry { + tokio::spawn(async move { + collection::put_async(*stored, shared).await; + }); + } } diff --git a/nodedb/src/control/cluster/metadata_applier.rs b/nodedb/src/control/cluster/metadata_applier.rs index 1c61164d..b4fb2360 100644 --- a/nodedb/src/control/cluster/metadata_applier.rs +++ b/nodedb/src/control/cluster/metadata_applier.rs @@ -157,7 +157,7 @@ impl MetadataCommitApplier { if compat { catalog_entry } else { - catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, &catalog) + catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, catalog) } } else { // Unit tests construct the applier without a SharedState. @@ -183,7 +183,19 @@ impl MetadataCommitApplier { { shared.lease_drain.install_end(&drained_id); } - catalog_entry::post_apply::spawn_post_apply_side_effects(stamped, shared); + // Run synchronous post-apply side effects INLINE so every + // in-memory cache update (install_replicated_user, + // install_replicated_owner, etc.) is visible before the + // watcher bump. Any reader that observes `applied_index` + // moving past `last` is guaranteed to see the sync side + // effects of every entry up to `last`. + // + // The async tail (today: Data Plane Register dispatches + // for PutCollection) is spawned separately and is NOT + // part of the applied-index contract — it's a + // performance optimisation, not a correctness gate. + catalog_entry::post_apply::apply_post_apply_side_effects_sync(&stamped, &shared); + catalog_entry::post_apply::spawn_post_apply_async_side_effects(stamped, shared); } } } diff --git a/nodedb/src/control/server/pgwire/ddl/user.rs b/nodedb/src/control/server/pgwire/ddl/user.rs index b9d27a8b..73eaf665 100644 --- a/nodedb/src/control/server/pgwire/ddl/user.rs +++ b/nodedb/src/control/server/pgwire/ddl/user.rs @@ -121,15 +121,41 @@ pub fn create_user( let entry = crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - // Single-node / no-cluster fallback: write the record - // directly and install it into the in-memory cache. - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + // Single-node / no-cluster fallback: install into the + // in-memory cache so subsequent reads see the user. + // Persist to redb when a catalog is wired up — the + // catalog write is best-effort durability, not a gate + // on the cache update. Test fixtures (and any future + // fully-in-memory deployment) can run without a redb + // catalog and still get correct read-after-write. + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); + } else { + // Cluster mode: `propose_catalog_entry` waits for the + // entry to be applied on THIS node, which runs the + // synchronous post_apply (`install_replicated_user`) + // inline BEFORE the applied-index watermark bumps. So if + // our entry really committed, `get_user` must see it now. + // + // If `get_user` returns None, the Raft log entry at the + // index our leader assigned has been truncated and + // overwritten with a noop from a new leader term (a known + // Raft subtlety: `propose` returns the assigned log index + // without waiting for commit; if leadership changes + // before the quorum ack, the entry is dropped). Return a + // retryable error so `exec_ddl_on_any_leader` re-proposes + // on the next attempt against whoever is now leader. + if state.credentials.get_user(username).is_none() { + return Err(sqlstate_error( + "40001", + "transient: metadata entry truncated by leader change, retry", + )); + } } state.audit_record( @@ -190,12 +216,12 @@ pub fn alter_user( crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); } @@ -228,12 +254,12 @@ pub fn alter_user( crate::control::catalog_entry::CatalogEntry::PutUser(Box::new(stored.clone())); let log_index = crate::control::metadata_proposer::propose_catalog_entry(state, &entry) .map_err(|e| sqlstate_error("XX000", &format!("metadata propose: {e}")))?; - if log_index == 0 - && let Some(catalog) = state.credentials.catalog() - { - catalog - .put_user(&stored) - .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + if log_index == 0 { + if let Some(catalog) = state.credentials.catalog() { + catalog + .put_user(&stored) + .map_err(|e| sqlstate_error("XX000", &format!("catalog write: {e}")))?; + } state.credentials.install_replicated_user(&stored); } diff --git a/nodedb/src/control/server/pgwire/handler/retry.rs b/nodedb/src/control/server/pgwire/handler/retry.rs index d468cb89..051b84a9 100644 --- a/nodedb/src/control/server/pgwire/handler/retry.rs +++ b/nodedb/src/control/server/pgwire/handler/retry.rs @@ -65,8 +65,8 @@ where "pgwire: retrying plan after schema change" ); last_err = Some(Error::RetryableSchemaChanged { descriptor }); - if attempt + 1 < MAX_ATTEMPTS { - tokio::time::sleep(BACKOFFS[attempt]).await; + if let Some(backoff) = BACKOFFS.get(attempt) { + tokio::time::sleep(*backoff).await; } } Err(other) => return Err(other), @@ -108,8 +108,8 @@ where leader_node, leader_addr, }); - if attempt + 1 < MAX_ATTEMPTS { - tokio::time::sleep(BACKOFFS[attempt]).await; + if let Some(backoff) = BACKOFFS.get(attempt) { + tokio::time::sleep(*backoff).await; } } Err(other) => return Err(other), From 48b108199f15e1b38179c2026848dcedc0dceda4 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:37:07 +0800 Subject: [PATCH 3/8] refactor(nodedb-cluster): make join retry policy configurable via JoinRetryPolicy The join loop's backoff schedule was a hard-coded match arm table with a fixed attempt count. This made integration tests that exercise join-failure paths (e.g. cluster_join_leader_crash) wait up to ~64 seconds of cumulative backoff per run. Extract the policy into JoinRetryPolicy { max_attempts, max_backoff_secs } with a Default that preserves the production schedule (8 attempts, 32 s ceiling). The per-attempt delay is now derived from a single ceiling value: delay = max_backoff_secs >> (max_attempts - attempt), so the schedule grows exponentially from ~ceiling/2^max_attempts up to the ceiling. The formula is tested directly. ClusterConfig gains a join_retry field. nodedb's cluster init reads NODEDB_JOIN_RETRY_MAX_ATTEMPTS and NODEDB_JOIN_RETRY_MAX_BACKOFF_SECS from the environment so CI and integration test harnesses can override the schedule without recompiling. The raft_loop match arm for Ok(idx) was incorrectly structured as a statement; fixed to return the value directly. --- nodedb-cluster/src/bootstrap/bootstrap_fn.rs | 1 + nodedb-cluster/src/bootstrap/config.rs | 54 ++++++++++++ nodedb-cluster/src/bootstrap/join.rs | 91 ++++++++++---------- nodedb-cluster/src/bootstrap/mod.rs | 2 +- nodedb-cluster/src/bootstrap/probe.rs | 1 + nodedb-cluster/src/bootstrap/restart.rs | 1 + nodedb-cluster/src/lib.rs | 2 +- nodedb-cluster/src/lifecycle_state.rs | 2 +- nodedb-cluster/src/raft_loop/loop_core.rs | 2 +- nodedb/src/control/cluster/init.rs | 28 ++++++ 10 files changed, 136 insertions(+), 48 deletions(-) diff --git a/nodedb-cluster/src/bootstrap/bootstrap_fn.rs b/nodedb-cluster/src/bootstrap/bootstrap_fn.rs index 2a4c0260..2db01945 100644 --- a/nodedb-cluster/src/bootstrap/bootstrap_fn.rs +++ b/nodedb-cluster/src/bootstrap/bootstrap_fn.rs @@ -100,6 +100,7 @@ mod tests { replication_factor: 1, data_dir: _dir.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let state = bootstrap(&config, &catalog).unwrap(); diff --git a/nodedb-cluster/src/bootstrap/config.rs b/nodedb-cluster/src/bootstrap/config.rs index ee5da894..eb2a8611 100644 --- a/nodedb-cluster/src/bootstrap/config.rs +++ b/nodedb-cluster/src/bootstrap/config.rs @@ -1,11 +1,61 @@ //! Cluster configuration and post-start state. use std::net::SocketAddr; +use std::time::Duration; use crate::multi_raft::MultiRaft; use crate::routing::RoutingTable; use crate::topology::ClusterTopology; +/// Tunable retry policy for the join loop. +/// +/// The schedule is computed by halving from the configured ceiling: +/// for `max_attempts = 8` and `max_backoff_secs = 32`, the per-attempt +/// delays are `0.25 s, 0.5 s, 1 s, 2 s, 4 s, 8 s, 16 s, 32 s` — i.e. +/// each delay is `max_backoff_secs >> (max_attempts - attempt)`. This +/// keeps the formula obvious from a single number while preserving +/// exponential growth. +/// +/// Defaults match the production schedule. Tests construct their own +/// policy with a much smaller `max_backoff_secs` so the integration +/// suite doesn't pay a ~minute backoff on every join failure path. +#[derive(Debug, Clone, Copy)] +pub struct JoinRetryPolicy { + /// Number of join attempts before the loop gives up. + pub max_attempts: u32, + /// Cap on the per-attempt backoff delay, in seconds. The schedule + /// is derived from this ceiling — see the struct doc comment. + pub max_backoff_secs: u64, +} + +impl Default for JoinRetryPolicy { + fn default() -> Self { + Self { + max_attempts: 8, + max_backoff_secs: 32, + } + } +} + +impl JoinRetryPolicy { + /// Backoff delay before `attempt` (1-indexed). Attempt 0 is the + /// initial try and never sleeps. Returns `Duration::ZERO` for + /// out-of-range attempts. + pub fn backoff_for(&self, attempt: u32) -> Duration { + if attempt == 0 || attempt > self.max_attempts { + return Duration::ZERO; + } + // Schedule grows exponentially toward `max_backoff_secs`. We + // compute in millis so small `max_backoff_secs` values (test + // configs) still produce non-zero delays for the early + // attempts instead of being floored to zero seconds. + let exp = self.max_attempts - attempt; + let max_ms = self.max_backoff_secs.saturating_mul(1_000); + let ms = max_ms >> exp; + Duration::from_millis(ms.max(1)) + } +} + /// Configuration for cluster formation. #[derive(Debug, Clone)] pub struct ClusterConfig { @@ -30,6 +80,10 @@ pub struct ClusterConfig { /// to be present in `seed_nodes` (enforced at the caller's config /// validation layer). pub force_bootstrap: bool, + /// Retry policy for the join loop. Defaults to production values + /// (`8` attempts, `32 s` ceiling). Tests override this with a + /// faster policy. + pub join_retry: JoinRetryPolicy, } /// Result of cluster startup — everything needed to run the Raft loop. diff --git a/nodedb-cluster/src/bootstrap/join.rs b/nodedb-cluster/src/bootstrap/join.rs index 1c35b682..8ab1a10e 100644 --- a/nodedb-cluster/src/bootstrap/join.rs +++ b/nodedb-cluster/src/bootstrap/join.rs @@ -20,7 +20,6 @@ use std::collections::HashSet; use std::net::SocketAddr; -use std::time::Duration; use tracing::{debug, info, warn}; @@ -35,39 +34,11 @@ use crate::transport::NexarTransport; use super::config::{ClusterConfig, ClusterState}; -/// Maximum number of outer retry attempts before `join()` gives up and -/// returns the last concrete error to its caller. With the backoff -/// schedule below this gives a total window of roughly 32 seconds. -const MAX_JOIN_ATTEMPTS: u32 = 8; - /// Maximum number of leader-redirect hops inside a single join /// attempt. The redirect chain starts at whichever seed we first /// contact; each hop costs a round-trip, so keep this small. const MAX_REDIRECTS_PER_ATTEMPT: u32 = 3; -/// Exponential-backoff delay between join attempts, capped at 16 s. -/// -/// Attempt 0 is immediate. Subsequent attempts sleep 500 ms, 1 s, 2 s, -/// 4 s, 8 s, then 16 s for every further attempt. Total window for -/// the default `MAX_JOIN_ATTEMPTS = 8` is roughly 32 s. -/// -/// Pure so it can be unit-tested in isolation — no time-source -/// dependency. -pub(crate) fn next_backoff(attempt: u32) -> Duration { - if attempt == 0 { - return Duration::ZERO; - } - let secs_millis: u64 = match attempt { - 1 => 500, - 2 => 1_000, - 3 => 2_000, - 4 => 4_000, - 5 => 8_000, - _ => 16_000, - }; - Duration::from_millis(secs_millis) -} - /// Parse a `JoinResponse::error` string as a leader redirect hint. /// /// The prefix is defined as a shared constant in `rpc_codec` @@ -90,8 +61,8 @@ pub(crate) fn parse_leader_hint(error: &str) -> Option { /// /// The loop has two layers: /// -/// - **Outer**: up to `MAX_JOIN_ATTEMPTS` retry passes with -/// exponential backoff. Handles the "bootstrapper not up yet" +/// - **Outer**: retry passes with exponential backoff per +/// `config.join_retry`. Handles the "bootstrapper not up yet" /// startup race. /// - **Inner**: walk the seed list plus any leader-redirect hops for /// this attempt. A successful `JoinResponse` short-circuits the @@ -123,12 +94,13 @@ pub(super) async fn join( wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION, }; + let policy = config.join_retry; let mut last_err: Option = None; - for attempt in 0..MAX_JOIN_ATTEMPTS { + for attempt in 0..policy.max_attempts { lifecycle.to_joining(attempt); - let delay = next_backoff(attempt); + let delay = policy.backoff_for(attempt); if !delay.is_zero() { debug!( node_id = config.node_id, @@ -153,8 +125,9 @@ pub(super) async fn join( } } + let max_attempts = policy.max_attempts; let err = last_err.unwrap_or_else(|| ClusterError::Transport { - detail: format!("join exhausted {MAX_JOIN_ATTEMPTS} attempts with no concrete error"), + detail: format!("join exhausted {max_attempts} attempts with no concrete error"), }); lifecycle.to_failed(err.to_string()); Err(err) @@ -351,6 +324,7 @@ fn apply_join_response( #[cfg(test)] mod tests { use super::super::bootstrap_fn::bootstrap; + use super::super::config::JoinRetryPolicy; use super::super::handle_join::handle_join_request; use super::*; use std::sync::{Arc, Mutex}; @@ -398,16 +372,43 @@ mod tests { } #[test] - fn next_backoff_schedule() { - assert_eq!(next_backoff(0), Duration::ZERO); - assert_eq!(next_backoff(1), Duration::from_millis(500)); - assert_eq!(next_backoff(2), Duration::from_secs(1)); - assert_eq!(next_backoff(3), Duration::from_secs(2)); - assert_eq!(next_backoff(4), Duration::from_secs(4)); - assert_eq!(next_backoff(5), Duration::from_secs(8)); - assert_eq!(next_backoff(6), Duration::from_secs(16)); - assert_eq!(next_backoff(7), Duration::from_secs(16)); - assert_eq!(next_backoff(100), Duration::from_secs(16)); + fn join_retry_policy_default_schedule() { + // Production default: 8 attempts, ceiling 32 s. Each delay is + // `32 s >> (8 - attempt)`, so the schedule halves down from + // the ceiling toward the first attempt. + let policy = JoinRetryPolicy::default(); + assert_eq!(policy.backoff_for(0), Duration::ZERO); + assert_eq!(policy.backoff_for(1), Duration::from_millis(250)); + assert_eq!(policy.backoff_for(2), Duration::from_millis(500)); + assert_eq!(policy.backoff_for(3), Duration::from_secs(1)); + assert_eq!(policy.backoff_for(4), Duration::from_secs(2)); + assert_eq!(policy.backoff_for(5), Duration::from_secs(4)); + assert_eq!(policy.backoff_for(6), Duration::from_secs(8)); + assert_eq!(policy.backoff_for(7), Duration::from_secs(16)); + assert_eq!(policy.backoff_for(8), Duration::from_secs(32)); + // Out-of-range attempt → no backoff. + assert_eq!(policy.backoff_for(9), Duration::ZERO); + } + + #[test] + fn join_retry_policy_test_schedule_is_subsecond() { + // A typical test config: still 8 attempts, but a 2 s ceiling + // produces a sub-5-second total backoff window. + let policy = JoinRetryPolicy { + max_attempts: 8, + max_backoff_secs: 2, + }; + // First few attempts are floored to 1 ms (they round down + // below a millisecond in raw shifts). + let total: Duration = (0..=policy.max_attempts) + .map(|a| policy.backoff_for(a)) + .sum(); + assert!( + total < Duration::from_secs(5), + "test schedule too slow: {total:?}" + ); + // Final attempt sleeps the full ceiling. + assert_eq!(policy.backoff_for(8), Duration::from_secs(2)); } // ── End-to-end bootstrap + join flow over QUIC ──────────────── @@ -432,6 +433,7 @@ mod tests { replication_factor: 1, data_dir: _dir1.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let state1 = bootstrap(&config1, &catalog1).unwrap(); @@ -479,6 +481,7 @@ mod tests { replication_factor: 1, data_dir: _dir2.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; let lifecycle = ClusterLifecycleTracker::new(); diff --git a/nodedb-cluster/src/bootstrap/mod.rs b/nodedb-cluster/src/bootstrap/mod.rs index 82556564..5723e79b 100644 --- a/nodedb-cluster/src/bootstrap/mod.rs +++ b/nodedb-cluster/src/bootstrap/mod.rs @@ -19,6 +19,6 @@ pub mod probe; pub mod restart; pub mod start; -pub use config::{ClusterConfig, ClusterState}; +pub use config::{ClusterConfig, ClusterState, JoinRetryPolicy}; pub use handle_join::handle_join_request; pub use start::start_cluster; diff --git a/nodedb-cluster/src/bootstrap/probe.rs b/nodedb-cluster/src/bootstrap/probe.rs index a3bc8baa..389f3994 100644 --- a/nodedb-cluster/src/bootstrap/probe.rs +++ b/nodedb-cluster/src/bootstrap/probe.rs @@ -221,6 +221,7 @@ mod tests { replication_factor: 1, data_dir: std::env::temp_dir(), force_bootstrap: false, + join_retry: Default::default(), } } diff --git a/nodedb-cluster/src/bootstrap/restart.rs b/nodedb-cluster/src/bootstrap/restart.rs index 5cd56fa9..a6e7577a 100644 --- a/nodedb-cluster/src/bootstrap/restart.rs +++ b/nodedb-cluster/src/bootstrap/restart.rs @@ -110,6 +110,7 @@ mod tests { replication_factor: 1, data_dir: _dir.path().to_path_buf(), force_bootstrap: false, + join_retry: Default::default(), }; // Bootstrap first. diff --git a/nodedb-cluster/src/lib.rs b/nodedb-cluster/src/lib.rs index 08e8aa8a..ece709dc 100644 --- a/nodedb-cluster/src/lib.rs +++ b/nodedb-cluster/src/lib.rs @@ -36,7 +36,7 @@ pub mod transport; pub mod vshard_handler; pub mod wire; -pub use bootstrap::{ClusterConfig, ClusterState, start_cluster}; +pub use bootstrap::{ClusterConfig, ClusterState, JoinRetryPolicy, start_cluster}; pub use catalog::ClusterCatalog; pub use cluster_info::{ ClusterInfoSnapshot, ClusterObserver, GroupSnapshot, GroupStatusProvider, PeerSnapshot, diff --git a/nodedb-cluster/src/lifecycle_state.rs b/nodedb-cluster/src/lifecycle_state.rs index 5c5ed2a0..73bcf56d 100644 --- a/nodedb-cluster/src/lifecycle_state.rs +++ b/nodedb-cluster/src/lifecycle_state.rs @@ -40,7 +40,7 @@ pub enum ClusterLifecycleState { /// Joining an existing cluster. `attempt` counts from 0. Joining { /// Current join attempt (0-indexed). See - /// `bootstrap::join::next_backoff` for the backoff schedule. + /// `bootstrap::config::JoinRetryPolicy` for the backoff schedule. attempt: u32, }, /// Cluster init finished successfully. `nodes` is the number of diff --git a/nodedb-cluster/src/raft_loop/loop_core.rs b/nodedb-cluster/src/raft_loop/loop_core.rs index 4a6821d4..f39e3cbe 100644 --- a/nodedb-cluster/src/raft_loop/loop_core.rs +++ b/nodedb-cluster/src/raft_loop/loop_core.rs @@ -291,7 +291,7 @@ impl RaftLoop { pub async fn propose_to_metadata_group_via_leader(&self, data: Vec) -> Result { // Phase 1: try local propose. match self.propose_to_metadata_group(data.clone()) { - Ok(idx) => return Ok(idx), + Ok(idx) => Ok(idx), Err(crate::error::ClusterError::Raft(nodedb_raft::RaftError::NotLeader { leader_hint, })) => { diff --git a/nodedb/src/control/cluster/init.rs b/nodedb/src/control/cluster/init.rs index af47b215..056baa0d 100644 --- a/nodedb/src/control/cluster/init.rs +++ b/nodedb/src/control/cluster/init.rs @@ -73,6 +73,7 @@ pub async fn init_cluster_with_transport( replication_factor: config.replication_factor, data_dir: data_dir.to_path_buf(), force_bootstrap: config.force_bootstrap, + join_retry: join_retry_policy_from_env(), }; let lifecycle = nodedb_cluster::ClusterLifecycleTracker::new(); @@ -105,3 +106,30 @@ pub async fn init_cluster_with_transport( multi_raft: Mutex::new(Some(state.multi_raft)), }) } + +/// Build the join retry policy, honouring two optional environment +/// variables for test/CI overrides: +/// +/// - `NODEDB_JOIN_RETRY_MAX_ATTEMPTS` — total attempts (default 8) +/// - `NODEDB_JOIN_RETRY_MAX_BACKOFF_SECS` — per-attempt ceiling +/// (default 32 s) +/// +/// Production deployments leave both unset and get the production +/// schedule. The integration test harness sets both to small values +/// so a join-retry path doesn't spend ~1 minute sleeping in CI. +fn join_retry_policy_from_env() -> nodedb_cluster::JoinRetryPolicy { + let mut policy = nodedb_cluster::JoinRetryPolicy::default(); + if let Ok(v) = std::env::var("NODEDB_JOIN_RETRY_MAX_ATTEMPTS") + && let Ok(n) = v.parse::() + && n > 0 + { + policy.max_attempts = n; + } + if let Ok(v) = std::env::var("NODEDB_JOIN_RETRY_MAX_BACKOFF_SECS") + && let Ok(n) = v.parse::() + && n > 0 + { + policy.max_backoff_secs = n; + } + policy +} From 3ce34b8e942b753adef65d9387854aff236970d0 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:37:34 +0800 Subject: [PATCH 4/8] test(nodedb): harden cluster test harness against flaky shutdown and replication races Several independent sources of CI flakiness in the cluster integration suite are addressed together since they compound each other: Panic-safe teardown: TestClusterNode now implements Drop, firing all watch shutdown senders and aborting every JoinHandle synchronously. Previously a panicking test dropped the node without signalling shutdown, leaving background tasks alive, redb file handles open, and the tokio runtime blocked until nextest killed the process at slow-timeout (~2 minutes per flaky test). Applied-index convergence barrier: exec_ddl_on_any_leader now waits for every follower's applied_index to reach the proposer's current watermark before returning. propose_catalog_entry already waits for the entry to commit on the proposing node, but followers apply asynchronously. Without this barrier, subsequent visibility checks on followers would race the applier queue and trip their timeouts on the cold-start attempt. Rolling-upgrade compat-mode guard: TestCluster::spawn_three now waits for all three nodes to exit rolling-upgrade compat mode before returning. While in compat mode, propose_catalog_entry returns Ok(0) without going through Raft, taking a non-replicated legacy path. Tests that issued DDL immediately after join convergence would silently get a leader-only write and then find the record missing on followers. Test transports use a 4-second RPC timeout instead of the production 5-second default, cutting join-failure test wall time substantially. Wait budgets for all convergence checks are widened from 5s to 10s to absorb cold-start election lag on loaded CI runners without masking genuine regressions. Descriptor lease renewal test creates its collection before acquiring the lease so the renewal loop's lookup_current_version finds it and does not prematurely release the lease as orphaned. --- nodedb-cluster/tests/cluster_join_race.rs | 2 +- nodedb-cluster/tests/common/mod.rs | 30 ++++-- .../tests/common/cluster_harness/cluster.rs | 84 ++++++++++++++++- nodedb/tests/common/cluster_harness/node.rs | 51 +++++++++- ...descriptor_lease_forwarding_and_renewal.rs | 46 ++++++--- .../tests/descriptor_versioning_cross_node.rs | 8 +- nodedb/tests/prepared_cache_invalidation.rs | 8 +- nodedb/tests/sql_cluster_cross_node_dml.rs | 93 +++++++++++-------- 8 files changed, 258 insertions(+), 64 deletions(-) diff --git a/nodedb-cluster/tests/cluster_join_race.rs b/nodedb-cluster/tests/cluster_join_race.rs index f6d2d4f1..feddda21 100644 --- a/nodedb-cluster/tests/cluster_join_race.rs +++ b/nodedb-cluster/tests/cluster_join_race.rs @@ -28,7 +28,7 @@ async fn five_nodes_race_on_full_seed_list_form_one_cluster() { let mut transports: Vec> = Vec::with_capacity(NODE_COUNT as usize); for id in 1..=NODE_COUNT { transports.push(Arc::new( - NexarTransport::new(id, "127.0.0.1:0".parse().unwrap()).expect("bind transport"), + common::test_transport(id).expect("bind transport"), )); } let seeds: Vec = transports.iter().map(|t| t.local_addr()).collect(); diff --git a/nodedb-cluster/tests/common/mod.rs b/nodedb-cluster/tests/common/mod.rs index 614b6116..1e4f8dbe 100644 --- a/nodedb-cluster/tests/common/mod.rs +++ b/nodedb-cluster/tests/common/mod.rs @@ -37,6 +37,19 @@ use nodedb_cluster::{ CacheApplier, ClusterCatalog, ClusterConfig, ClusterLifecycleState, ClusterLifecycleTracker, ClusterTopology, MetadataCache, NexarTransport, NoopForwarder, RaftLoop, start_cluster, }; + +/// Build a `NexarTransport` with a tighter-than-production RPC +/// timeout for tests. Production default is 5 s × 3 retries = ~15 s +/// per failed peer contact. 4 s leaves enough headroom for legitimate +/// Raft RPCs under contention while still cutting the join-failure +/// tests (which retry against a dead seed) substantially. +pub fn test_transport(node_id: u64) -> Result { + NexarTransport::with_timeout( + node_id, + "127.0.0.1:0".parse().unwrap(), + Duration::from_secs(4), + ) +} use nodedb_raft::message::LogEntry; use tempfile::TempDir; use tokio::sync::watch; @@ -100,10 +113,7 @@ impl TestNode { node_id: u64, seed_nodes: Vec, ) -> Result> { - let transport = Arc::new(NexarTransport::new( - node_id, - "127.0.0.1:0".parse().unwrap(), - )?); + let transport = Arc::new(test_transport(node_id)?); Self::spawn_with_transport(node_id, transport, seed_nodes).await } @@ -139,10 +149,7 @@ impl TestNode { data_dir: &Path, seed_nodes: Vec, ) -> Result> { - let transport = Arc::new(NexarTransport::new( - node_id, - "127.0.0.1:0".parse().unwrap(), - )?); + let transport = Arc::new(test_transport(node_id)?); Self::spawn_inner(node_id, transport, seed_nodes, data_dir.to_path_buf(), None).await } @@ -172,6 +179,13 @@ impl TestNode { replication_factor: 3, data_dir: data_dir_path.clone(), force_bootstrap: false, + // Fast retry policy: 2 s ceiling keeps the join-failure + // tests (especially `cluster_join_leader_crash`) under + // ~5 s of sleeping instead of the production ~64 s. + join_retry: nodedb_cluster::JoinRetryPolicy { + max_attempts: 8, + max_backoff_secs: 2, + }, }; let lifecycle = ClusterLifecycleTracker::new(); diff --git a/nodedb/tests/common/cluster_harness/cluster.rs b/nodedb/tests/common/cluster_harness/cluster.rs index 2d3002c7..d9e6c0f3 100644 --- a/nodedb/tests/common/cluster_harness/cluster.rs +++ b/nodedb/tests/common/cluster_harness/cluster.rs @@ -49,6 +49,43 @@ impl TestCluster { ) .await; + // CRITICAL: wait for every node to exit rolling-upgrade + // compat mode before letting the test issue any DDL. + // + // `metadata_proposer::propose_catalog_entry` consults + // `cluster_version_view().can_activate_feature(DISTRIBUTED_CATALOG_VERSION)` + // and, while even one node still reports a lower wire + // version, returns `Ok(0)` without going through the raft + // group. The pgwire DDL handlers (CREATE USER, etc.) then + // fall through to a LEGACY path that writes the record + // directly on the proposing node — **with zero + // replication** to followers. Any subsequent + // `has_active_user` check on a follower returns false and + // the test flakes. + // + // Topology has three members the moment the join request + // completes, but the `wire_version` field on each node's + // topology entry is updated asynchronously by the gossip + // path. That's why `topology_size == 3` converges fast yet + // `can_activate_feature(...)` can still be false for + // several hundred milliseconds afterwards. Waiting here + // closes the window deterministically — no retries, no + // flakes, no compat-mode fallback silently breaking + // replication. + wait_for( + "all 3 nodes exit rolling-upgrade compat mode", + Duration::from_secs(10), + Duration::from_millis(20), + || { + cluster.nodes.iter().all(|n| { + n.shared.cluster_version_view().can_activate_feature( + nodedb::control::rolling_upgrade::DISTRIBUTED_CATALOG_VERSION, + ) + }) + }, + ) + .await; + Ok(cluster) } @@ -57,13 +94,28 @@ impl TestCluster { /// `not metadata-group leader` errors via the pgwire error path; /// the retry loop tries the next node on failure so the test /// doesn't have to discover the leader explicitly. + /// + /// After the DDL is accepted, **blocks until every node's + /// metadata applier has caught up to the proposer's applied + /// index**. `propose_catalog_entry` already waits for the entry + /// to be applied on the proposing node before returning, but + /// followers apply asynchronously — without this barrier a + /// subsequent `wait_for("x visible on every node")` would race + /// the follower appliers and trip its timeout on the cold-start + /// attempt. Polling the watermark directly is O(applied_index) + /// and converges as soon as the followers drain their commit + /// queues, so it's both strictly more correct and strictly + /// faster than waiting on the visibility check itself. pub async fn exec_ddl_on_any_leader(&self, sql: &str) -> Result { let deadline = std::time::Instant::now() + Duration::from_secs(10); let mut last_err = String::new(); while std::time::Instant::now() < deadline { for (idx, node) in self.nodes.iter().enumerate() { match node.exec(sql).await { - Ok(()) => return Ok(idx), + Ok(()) => { + self.wait_for_applied_index_convergence(idx).await; + return Ok(idx); + } Err(e) => last_err = e, } } @@ -74,6 +126,36 @@ impl TestCluster { )) } + /// Block until every node's metadata applier has caught up to the + /// proposer's current applied index. Called after every successful + /// DDL by `exec_ddl_on_any_leader`. + async fn wait_for_applied_index_convergence(&self, proposer_idx: usize) { + let target = self.nodes[proposer_idx] + .shared + .applied_index_watcher() + .current(); + if target == 0 { + return; + } + let deadline = std::time::Instant::now() + Duration::from_secs(10); + loop { + let all_caught_up = self + .nodes + .iter() + .all(|n| n.shared.applied_index_watcher().current() >= target); + if all_caught_up { + return; + } + if std::time::Instant::now() >= deadline { + // Don't panic — the caller's own `wait_for` assertion + // will report the specific visibility failure with a + // better error than "convergence timed out". + return; + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + } + /// Cooperatively shut down every node. Reverse order so peers /// observe their neighbours' drop without rejecting inbound /// traffic on an already-closed transport. diff --git a/nodedb/tests/common/cluster_harness/node.rs b/nodedb/tests/common/cluster_harness/node.rs index db6feb2a..b1da9210 100644 --- a/nodedb/tests/common/cluster_harness/node.rs +++ b/nodedb/tests/common/cluster_harness/node.rs @@ -155,7 +155,20 @@ impl TestClusterNode { CoreLoop::open(0, data_side.request_rx, data_side.response_tx, &core_dir) .expect("core open"); core.set_event_producer(event_producer); - while core_stop_rx.try_recv().is_err() { + // Continue ticking only while the channel is Empty. + // `Ok(())` means we got an explicit stop signal; + // `Disconnected` means the sender was dropped (e.g. the + // owning `TestClusterNode` was dropped mid-panic). In + // both cases we must exit — `spawn_blocking` threads + // cannot be aborted, so a loop that continued on + // `Disconnected` would block tokio runtime shutdown + // indefinitely and force nextest to kill the test + // process at `slow-timeout` (~2 minutes of wasted CI + // time per flaky cluster test). + while matches!( + core_stop_rx.try_recv(), + Err(std::sync::mpsc::TryRecvError::Empty) + ) { core.tick(); std::thread::sleep(Duration::from_millis(1)); } @@ -642,6 +655,42 @@ impl TestClusterNode { } } +/// Panic-safe teardown. Without this, a test that panics (e.g. a +/// `wait_for` tripping its budget) would drop `TestClusterNode` +/// without ever calling the async `shutdown()`, leaving every +/// background task still running: +/// +/// - `watch::Sender`s close on drop but DO NOT transmit their last +/// value, so the raft / pgwire / poller loops block on +/// `select { shutdown.changed() }` forever. +/// - `JoinHandle`s on drop DETACH the task instead of cancelling it. +/// - Those detached tasks keep the tempdir's redb files open, so +/// `TempDir::drop` either hangs or the whole test process sticks +/// around until nextest kills it at `slow-timeout` (previously +/// ~2 minutes of wasted CI time per flaky cluster test). +/// +/// The Drop here fires the watch senders synchronously and aborts +/// every JoinHandle we own. `abort()` is non-blocking: the next time +/// the task hits an `.await` it gets cancelled and releases its +/// resources, including the redb handles. Combined with the +/// already-present `core_stop_tx` drop (which disconnects the +/// blocking Data Plane loop), this guarantees the node tears down +/// in milliseconds instead of minutes. +impl Drop for TestClusterNode { + fn drop(&mut self) { + let _ = self.pg_shutdown_tx.send(true); + let _ = self.cluster_shutdown_tx.send(true); + let _ = self.poller_shutdown_tx.send(true); + // `core_stop_tx` is a std mpsc Sender; dropping it disconnects + // the receiver the spawn_blocking data-plane loop polls, so + // no explicit signal needed here. + self._conn_handle.abort(); + self._pg_handle.abort(); + self._poller_handle.abort(); + self._core_handle.abort(); + } +} + fn pg_error_detail(e: &tokio_postgres::Error) -> String { if let Some(db_err) = e.as_db_error() { format!( diff --git a/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs b/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs index 0a5b3198..032519a5 100644 --- a/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs +++ b/nodedb/tests/descriptor_lease_forwarding_and_renewal.rs @@ -9,13 +9,12 @@ //! the leader. All 3 leases land in `MetadataCache.leases` on //! every node. Without forwarding, the followers would panic //! with `not leader`. -//! //! 2. **lease_renews_before_expiry** — short lease (3 seconds) -//! + short renewal interval (250ms) + 50% threshold (renew when -//! < 1.5s remaining). Acquire on the leader, wait long enough -//! for at least one renewal cycle, assert the lease's -//! `expires_at` advanced (it was re-acquired with a fresh -//! expiry). +//! plus short renewal interval (250ms) and a 50% threshold +//! (renew when < 1.5s remaining). Acquire on the leader, wait +//! long enough for at least one renewal cycle, assert the +//! lease's `expires_at` advanced (it was re-acquired with a +//! fresh expiry). mod common; @@ -53,7 +52,7 @@ async fn follower_acquire_forwards_to_leader() { // 3 distinct (descriptor_id, node_id) keys. wait_for( "every node observes all 3 forwarded leases", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(20), || { cluster.nodes.iter().all(|n| { @@ -75,14 +74,39 @@ async fn lease_renews_before_expiry() { // Custom tuning: 3-second lease, check every 250ms, renew at // 50% remaining (< 1.5s left). Within a 2.5-second test wait // we should observe at least one renewal. - let mut tuning = ClusterTransportTuning::default(); - tuning.descriptor_lease_duration_secs = 3; - tuning.descriptor_lease_renewal_check_interval_secs = 1; // min granularity - tuning.descriptor_lease_renewal_threshold_pct = 80; + let tuning = ClusterTransportTuning { + descriptor_lease_duration_secs: 3, + descriptor_lease_renewal_check_interval_secs: 1, // min granularity + descriptor_lease_renewal_threshold_pct: 80, + ..ClusterTransportTuning::default() + }; let cluster = TestCluster::spawn_three_with_tuning(tuning) .await .expect("3-node cluster"); + + // Create the collection so the renewal loop's + // `lookup_current_version` finds it in the local catalog. + // Without this the renewal logic treats the lease as orphaned + // and releases it before our 1.5 s observation window — see + // `control::lease::renewal::lookup_current_version`. + cluster + .exec_ddl_on_any_leader("CREATE COLLECTION renewable (id BIGINT PRIMARY KEY, label TEXT)") + .await + .expect("create renewable collection"); + common::cluster_harness::wait_for( + "renewable visible on every node", + Duration::from_secs(10), + Duration::from_millis(50), + || { + cluster + .nodes + .iter() + .all(|n| n.collection_descriptor(TENANT, "renewable").is_some()) + }, + ) + .await; + let leader = &cluster.nodes[0]; // Acquire on the leader. Lease has ~3s expiry from now. diff --git a/nodedb/tests/descriptor_versioning_cross_node.rs b/nodedb/tests/descriptor_versioning_cross_node.rs index 0f458a11..da90f5ad 100644 --- a/nodedb/tests/descriptor_versioning_cross_node.rs +++ b/nodedb/tests/descriptor_versioning_cross_node.rs @@ -33,7 +33,7 @@ async fn create_collection_stamps_version_one_on_every_node() { wait_for( "all 3 nodes stamp orders @ version 1 with non-zero HLC", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { @@ -93,7 +93,7 @@ async fn alter_collection_bumps_version_monotonically() { // Wait for v1. wait_for( "v1 stamped on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -118,7 +118,7 @@ async fn alter_collection_bumps_version_monotonically() { let expected_version = (i + 2) as u64; wait_for( &format!("all nodes observe assets @ v{expected_version}"), - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { @@ -162,7 +162,7 @@ async fn distinct_collections_get_independent_versions() { wait_for( "all 3 collections present on all nodes", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster.nodes.iter().all(|n| { diff --git a/nodedb/tests/prepared_cache_invalidation.rs b/nodedb/tests/prepared_cache_invalidation.rs index cb0d922e..31c5f844 100644 --- a/nodedb/tests/prepared_cache_invalidation.rs +++ b/nodedb/tests/prepared_cache_invalidation.rs @@ -17,7 +17,13 @@ use std::time::Duration; use common::cluster_harness::{TestCluster, wait_for}; const TENANT: u32 = 1; -const WAIT_BUDGET: Duration = Duration::from_secs(5); +// 10 s — every visibility check in this test rides on the metadata +// Raft commit + apply + post-apply cache update path. Five seconds +// (the original budget) was tight enough that fresh-cluster startup +// jitter occasionally tripped a false timeout. Ten seconds is still +// strict enough to catch real regressions but tolerant of cold-start +// election lag. +const WAIT_BUDGET: Duration = Duration::from_secs(10); const POLL: Duration = Duration::from_millis(20); #[tokio::test(flavor = "multi_thread", worker_threads = 6)] diff --git a/nodedb/tests/sql_cluster_cross_node_dml.rs b/nodedb/tests/sql_cluster_cross_node_dml.rs index 280db5a3..8da26e95 100644 --- a/nodedb/tests/sql_cluster_cross_node_dml.rs +++ b/nodedb/tests/sql_cluster_cross_node_dml.rs @@ -76,7 +76,7 @@ async fn create_on_any_node_is_visible_on_every_node() { // Every node's replicated cache must see the new collection. wait_for( "all 3 nodes see the replicated collection", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -98,7 +98,7 @@ async fn create_on_any_node_is_visible_on_every_node() { // The replicated-cache view removes the descriptor on Drop. wait_for( "all 3 nodes no longer see the collection", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -131,7 +131,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated sequence in their in-memory registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_sequence(1, "order_id")), ) @@ -147,7 +147,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes see sequence counter == 500", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -165,7 +165,7 @@ async fn sequence_create_visible_on_every_node() { wait_for( "all 3 nodes remove the sequence from their registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_sequence(1, "order_id")), ) @@ -189,7 +189,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "collection visible on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -209,7 +209,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated trigger in trigger_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_trigger(1, "audit_ins")), ) @@ -222,7 +222,7 @@ async fn trigger_create_visible_on_every_node() { wait_for( "all 3 nodes unregister the trigger", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_trigger(1, "audit_ins")), ) @@ -245,7 +245,7 @@ async fn procedure_create_visible_on_every_node() { wait_for( "all 3 nodes see the procedure in local SystemCatalog redb", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -263,7 +263,7 @@ async fn procedure_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the procedure", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -293,7 +293,7 @@ async fn schedule_create_visible_on_every_node() { wait_for( "all 3 nodes see the schedule in schedule_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -311,7 +311,7 @@ async fn schedule_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the schedule", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -340,7 +340,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "collection visible on every node", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -358,7 +358,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "all 3 nodes see the stream in stream_registry", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -376,7 +376,7 @@ async fn change_stream_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the stream", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -404,7 +404,7 @@ async fn user_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated user in credentials", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_active_user("alice")), ) @@ -417,7 +417,7 @@ async fn user_create_visible_on_every_node() { wait_for( "all 3 nodes see alice as deactivated", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_active_user("alice")), ) @@ -440,7 +440,7 @@ async fn role_create_visible_on_every_node() { wait_for( "all 3 nodes see the replicated role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_role("data_analyst")), ) @@ -453,7 +453,7 @@ async fn role_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_role("data_analyst")), ) @@ -476,7 +476,7 @@ async fn alter_user_role_replicates() { wait_for( "all 3 nodes see bob with read_only role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -494,7 +494,7 @@ async fn alter_user_role_replicates() { wait_for( "all 3 nodes see bob with read_write role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -523,7 +523,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see charlie", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_active_user("charlie")), ) @@ -550,7 +550,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see a replicated API key for charlie", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || all_nodes_have_key(&cluster), ) @@ -572,7 +572,7 @@ async fn api_key_create_and_revoke_replicates() { wait_for( "all 3 nodes see the key as revoked", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_active_api_key(&key_id)), ) @@ -597,7 +597,7 @@ async fn function_create_visible_on_every_node() { wait_for( "all 3 nodes see the function in local SystemCatalog redb", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_function(1, "add_one")), ) @@ -610,7 +610,7 @@ async fn function_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the function", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_function(1, "add_one")), ) @@ -630,7 +630,7 @@ async fn tenant_create_visible_on_every_node() { wait_for( "all 3 nodes see tenant 4242", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| n.has_tenant(4242)), ) @@ -643,7 +643,7 @@ async fn tenant_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see tenant 4242", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || cluster.nodes.iter().all(|n| !n.has_tenant(4242)), ) @@ -670,7 +670,7 @@ async fn rls_policy_create_visible_on_every_node() { wait_for( "all 3 nodes see the RLS policy", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -688,7 +688,7 @@ async fn rls_policy_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the RLS policy", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -726,7 +726,7 @@ async fn grant_permission_visible_on_every_node() { let target = "collection:1:documents"; wait_for( "all 3 nodes see the grant", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -744,7 +744,7 @@ async fn grant_permission_visible_on_every_node() { wait_for( "all 3 nodes no longer see the grant", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -776,7 +776,7 @@ async fn grant_role_visible_on_every_node() { wait_for( "all 3 nodes see ops_user has monitor role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -794,7 +794,7 @@ async fn grant_role_visible_on_every_node() { wait_for( "all 3 nodes see ops_user no longer has monitor role", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -825,6 +825,25 @@ async fn ownership_transfer_visible_on_every_node() { .await .expect("create new owner user"); + // ALTER ... OWNER TO validates the target user against the + // executing node's local credential cache, which is updated + // asynchronously by the metadata applier when the Raft entry + // commits. Wait for every node to observe the user before + // referencing it in the next DDL — otherwise the ALTER races + // the apply on whichever node `exec_ddl_on_any_leader` picks. + wait_for( + "all 3 nodes observe new_owner_user", + Duration::from_secs(10), + Duration::from_millis(50), + || { + cluster + .nodes + .iter() + .all(|n| n.has_active_user("new_owner_user")) + }, + ) + .await; + cluster .exec_ddl_on_any_leader("ALTER COLLECTION assets OWNER TO new_owner_user") .await @@ -832,7 +851,7 @@ async fn ownership_transfer_visible_on_every_node() { wait_for( "all 3 nodes see new_owner_user as owner of assets", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -864,7 +883,7 @@ async fn materialized_view_create_visible_on_every_node() { wait_for( "all 3 nodes see the materialized view", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster @@ -882,7 +901,7 @@ async fn materialized_view_create_visible_on_every_node() { wait_for( "all 3 nodes no longer see the materialized view", - Duration::from_secs(5), + Duration::from_secs(10), Duration::from_millis(50), || { cluster From 1bd04da8d0b78ccfb5ed4c8487853fbc171c89a1 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:37:55 +0800 Subject: [PATCH 5/8] fix(nodedb): miscellaneous correctness fixes in catalog init and lease renewal system_catalog now opens all declared redb tables during the init transaction. Tables that were referenced later but never opened in the migration block caused a redb schema mismatch on the first write after an upgrade (alert_rules, retention_policies, sequences, sequence_state, column_stats, vector_model_metadata, checkpoints). JWT test RSA keygen switched from 2048-bit to 1024-bit keys. The tests exercise signing and verification logic, not key strength; the reduced size cuts per-test keygen time ~10x without changing coverage. Lease renewal code removes inline comments that duplicated the logic they annotated verbatim, replaced with the struct-update syntax for ClusterTransportTuning in the unit test so it reads clearly. drop(lock_guard) before await in the peer warm-up path in main.rs is replaced with a scoped block to satisfy clippy::await_holding_lock. --- nodedb/src/control/lease/renewal.rs | 25 +++---------------- .../security/catalog/system_catalog.rs | 21 ++++++++++++++++ nodedb/src/control/security/jwt.rs | 14 ++++++++--- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/nodedb/src/control/lease/renewal.rs b/nodedb/src/control/lease/renewal.rs index 6601a02a..abb25666 100644 --- a/nodedb/src/control/lease/renewal.rs +++ b/nodedb/src/control/lease/renewal.rs @@ -163,26 +163,9 @@ impl LeaseRenewalLoop { "descriptor lease renewal: re-acquiring near-expiry leases" ); for (id, held_version) in near_expiry { - // Look up the CURRENT persisted version from the - // local catalog before re-acquiring. If the - // descriptor has been altered since we last took - // the lease, we need to advance to the new version - // — otherwise the old lease sticks around forever - // and blocks drain on any ALTER that wants to bump - // past it. - // - // If the descriptor has been dropped we release the - // lease instead of renewing: renewing a lease on a - // non-existent descriptor would leak it. let current_version = lookup_current_version(&shared, &id); match current_version { Some(v) => { - // Re-acquire at whichever version is higher: - // the persisted version, or the one we - // already hold (defensive — a concurrent - // PutCollection apply between cache read - // and propose could leave us briefly - // observing an older version). let version = v.max(held_version); if let Err(e) = super::propose::force_refresh_lease( &shared, @@ -199,8 +182,6 @@ impl LeaseRenewalLoop { } } None => { - // Descriptor dropped — release our lease so - // drain on the drop path can make progress. if let Err(e) = super::release::release_leases(&shared, vec![id.clone()]) { warn!( descriptor = ?id, @@ -328,8 +309,10 @@ mod tests { #[test] fn threshold_clamped_at_100() { - let mut tuning = ClusterTransportTuning::default(); - tuning.descriptor_lease_renewal_threshold_pct = 250; + let tuning = ClusterTransportTuning { + descriptor_lease_renewal_threshold_pct: 250, + ..ClusterTransportTuning::default() + }; let config = LeaseRenewalConfig::from_tuning(&tuning); assert_eq!(config.threshold_pct, 100); } diff --git a/nodedb/src/control/security/catalog/system_catalog.rs b/nodedb/src/control/security/catalog/system_catalog.rs index 9a640000..f59817e3 100644 --- a/nodedb/src/control/security/catalog/system_catalog.rs +++ b/nodedb/src/control/security/catalog/system_catalog.rs @@ -107,6 +107,27 @@ impl SystemCatalog { let _ = write_txn .open_table(super::rls::RLS_POLICIES) .map_err(|e| catalog_err("init rls_policies table", e))?; + let _ = write_txn + .open_table(ALERT_RULES) + .map_err(|e| catalog_err("init alert_rules table", e))?; + let _ = write_txn + .open_table(RETENTION_POLICIES) + .map_err(|e| catalog_err("init retention_policies table", e))?; + let _ = write_txn + .open_table(SEQUENCES) + .map_err(|e| catalog_err("init sequences table", e))?; + let _ = write_txn + .open_table(SEQUENCE_STATE) + .map_err(|e| catalog_err("init sequence_state table", e))?; + let _ = write_txn + .open_table(COLUMN_STATS) + .map_err(|e| catalog_err("init column_stats table", e))?; + let _ = write_txn + .open_table(VECTOR_MODEL_METADATA) + .map_err(|e| catalog_err("init vector_model_metadata table", e))?; + let _ = write_txn + .open_table(CHECKPOINTS) + .map_err(|e| catalog_err("init checkpoints table", e))?; } write_txn .commit() diff --git a/nodedb/src/control/security/jwt.rs b/nodedb/src/control/security/jwt.rs index 02a3743e..aeac9dd2 100644 --- a/nodedb/src/control/security/jwt.rs +++ b/nodedb/src/control/security/jwt.rs @@ -339,6 +339,14 @@ mod tests { base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(data) } + /// Bit length used for RSA keys generated in this test module. + /// Production validates whatever the operator configures; the + /// signing/verification logic doesn't care about strength, so we + /// use 1024 here to keep `RsaPrivateKey::new` from dominating the + /// test runtime. RSA-1024 keygen is ~10x faster than RSA-2048 + /// without changing what these tests actually exercise. + const TEST_RSA_BITS: usize = 1024; + #[test] fn rs256_roundtrip() { use rsa::pkcs1v15::SigningKey; @@ -346,7 +354,7 @@ mod tests { // Generate a test RSA key pair. let mut rng = rsa::rand_core::OsRng; - let private_key = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); + let private_key = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); let public_key = rsa::RsaPublicKey::from(&private_key); // Export public key as DER (PKCS#8). @@ -387,8 +395,8 @@ mod tests { use rsa::signature::{SignatureEncoding, Signer}; let mut rng = rsa::rand_core::OsRng; - let key1 = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); - let key2 = rsa::RsaPrivateKey::new(&mut rng, 2048).unwrap(); + let key1 = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); + let key2 = rsa::RsaPrivateKey::new(&mut rng, TEST_RSA_BITS).unwrap(); let pub2 = rsa::RsaPublicKey::from(&key2); let pub2_der = { From 751e804b0c6e1566e8106745f2d8333d78d607b7 Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:38:20 +0800 Subject: [PATCH 6/8] chore(docker,ci,docs): privilege-drop entrypoint, nextest in CI, expanded getting-started guide Docker: the image no longer runs as root. A new docker-entrypoint.sh (using gosu) fixes ownership on the data volume when started as root, then drops to uid 10001 (nodedb) before exec-ing the server. When already started as a non-root user (--user 10001:10001) the entrypoint passes through directly. This makes named-volume mounts work on Linux hosts where Docker initialises volumes as root-owned. CI: the test workflow now installs cargo-nextest via taiki-e/install-action and runs cargo nextest run. Plain cargo test ignores the nextest.toml cluster test-group that serialises 3-node integration tests and would hang on the cluster suite. JUnit output is uploaded as an artifact on every run for post-mortem analysis. Docs: getting-started gains a prebuilt binary install section for Linux (x64 and arm64), plain docker run instructions alongside the existing Compose block, a systemd unit example, and a unified configuration reference that applies to all install methods. README test command updated to reflect nextest. --- .github/workflows/test.yml | 18 ++++- Dockerfile | 13 +++- README.md | 3 +- docker-entrypoint.sh | 47 ++++++++++++ docs/getting-started.md | 147 +++++++++++++++++++++++++++++++------ 5 files changed, 203 insertions(+), 25 deletions(-) create mode 100755 docker-entrypoint.sh diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d0345d2e..c3ac51ea 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -55,5 +55,21 @@ jobs: sudo apt-get install -y --no-install-recommends \ cmake clang libclang-dev pkg-config protobuf-compiler perl \ libcurl4-openssl-dev libsasl2-dev + # nextest is required — `.config/nextest.toml` defines the + # `cluster` test-group that serializes 3-node integration tests + # and the `ci` profile that retries flaky cluster tests once and + # writes a JUnit report. Plain `cargo test` ignores all of that + # and will hang/fail on the cluster suite. + - name: Install cargo-nextest + uses: taiki-e/install-action@v2 + with: + tool: nextest - name: Run tests - run: cargo test --all-features --profile ci + run: cargo nextest run --all-features --cargo-profile ci --profile ci + - name: Upload JUnit report + if: always() + uses: actions/upload-artifact@v4 + with: + name: junit-report + path: target/nextest/ci/junit.xml + if-no-files-found: ignore diff --git a/Dockerfile b/Dockerfile index 62ef70c6..ee517fb4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -36,9 +36,11 @@ FROM debian:bookworm-slim AS runtime # ca-certificates: needed for JWKS fetch, OTLP export, S3 archival # curl: needed for HEALTHCHECK +# gosu: drop privileges from root after fixing data-dir ownership in entrypoint RUN apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ curl \ + gosu \ && rm -rf /var/lib/apt/lists/* # Non-root user @@ -51,12 +53,18 @@ RUN mkdir -p /var/lib/nodedb /etc/nodedb \ COPY --from=builder /build/target/release/nodedb /usr/local/bin/nodedb +# Entrypoint: when started as root, fix data-dir ownership and drop to the +# nodedb user. When already started as a non-root user (e.g. `--user 10001`), +# exec directly. This makes `-v :/var/lib/nodedb` work even +# when Docker initialises the volume as root-owned (common on Linux hosts). +COPY docker-entrypoint.sh /usr/local/bin/docker-entrypoint.sh +RUN chmod +x /usr/local/bin/docker-entrypoint.sh + # Bind to all interfaces (required for Docker port mapping) # Point data dir at the declared volume ENV NODEDB_HOST=0.0.0.0 \ NODEDB_DATA_DIR=/var/lib/nodedb -USER nodedb WORKDIR /var/lib/nodedb # pgwire | native protocol | HTTP API | WebSocket sync | OTLP gRPC | OTLP HTTP @@ -67,4 +75,5 @@ VOLUME ["/var/lib/nodedb"] HEALTHCHECK --interval=10s --timeout=3s --start-period=5s \ CMD curl -f http://localhost:6480/health || exit 1 -ENTRYPOINT ["/usr/local/bin/nodedb"] +ENTRYPOINT ["/usr/local/bin/docker-entrypoint.sh"] +CMD ["/usr/local/bin/nodedb"] diff --git a/README.md b/README.md index 16a9b7bd..c2b276cc 100644 --- a/README.md +++ b/README.md @@ -148,7 +148,8 @@ For development or contributing: git clone https://github.com/NodeDB-Lab/nodedb.git cd nodedb cargo build --release -cargo test --all-features +cargo install cargo-nextest --locked # one-time +cargo nextest run --all-features ``` ## Status diff --git a/docker-entrypoint.sh b/docker-entrypoint.sh new file mode 100755 index 00000000..20d67508 --- /dev/null +++ b/docker-entrypoint.sh @@ -0,0 +1,47 @@ +#!/bin/sh +# NodeDB container entrypoint. +# +# When invoked as root (the default for `docker run` with no --user), fix +# ownership of NODEDB_DATA_DIR and drop privileges to the unprivileged +# `nodedb` user before exec'ing the server. When invoked as any other UID +# (e.g. `--user 10001` or via Kubernetes runAsUser), exec directly and +# leave the data directory alone. +# +# This makes `-v :/var/lib/nodedb` work even when Docker +# initialises the named volume as root-owned (common on Linux hosts where +# the volume is created out-of-band before the container's first run). + +set -e + +DATA_DIR="${NODEDB_DATA_DIR:-/var/lib/nodedb}" + +if [ "$(id -u)" = "0" ]; then + # Running as root: ensure the data dir exists and is owned by nodedb, + # then drop privileges. mkdir is a no-op for the declared VOLUME but + # protects against custom NODEDB_DATA_DIR overrides. + mkdir -p "$DATA_DIR" + chown -R nodedb:nodedb "$DATA_DIR" + exec gosu nodedb "$@" +fi + +# Already non-root: ensure we can actually write to the data dir, otherwise +# fail fast with a clear message instead of the cryptic WAL "Permission +# denied (os error 13)" the user sees on a misconfigured volume mount. +if [ ! -w "$DATA_DIR" ]; then + cat >&2 < For a specific version or to browse changelogs, see the release page: . The SQL surface is still pre-1.0 and changes between tags, so pin a version in production. + +--- + ## Build from Source ```bash @@ -70,8 +160,9 @@ cd nodedb # Release build (all crates) cargo build --release -# Run tests -cargo test --all-features +# Run tests (use nextest — see .config/nextest.toml) +cargo install cargo-nextest --locked # one-time +cargo nextest run --all-features ``` Requires Rust 1.94+ and Linux (the Data Plane uses io_uring). The build produces two binaries: @@ -84,8 +175,22 @@ Requires Rust 1.94+ and Linux (the Data Plane uses io_uring). The build produces ```bash # Single-node, default ports ./target/release/nodedb + +# Or with a config file +./target/release/nodedb --config nodedb.toml ``` +--- + +## Configuration + +This section applies to **every** install method — Docker, prebuilt binary, and source builds all read the same TOML schema and respond to the same environment variables. Pick whichever is convenient: + +- **TOML file** — pass `--config /path/to/nodedb.toml` on the command line. Best for production / systemd / pre-baked images. +- **Environment variables** — prefix `NODEDB_*`. Best for Docker (`-e`), Compose (`environment:`), and Kubernetes. Env vars **override** values from the TOML file when both are set. + +### Default ports + By default, NodeDB listens on: - **6432** — PostgreSQL wire protocol (pgwire) @@ -98,11 +203,11 @@ Two additional protocols are available but **disabled by default**: - **RESP** (Redis-compatible KV protocol) — `GET`/`SET`/`DEL`/`EXPIRE`/`SCAN`/`SUBSCRIBE` - **ILP** (InfluxDB Line Protocol) — high-throughput timeseries ingest -Enable them by setting a listen address in config or via env var (see below). +Enable them by setting a listen address in the config or via env var (see below). -### Configuration +### Example config file -All protocols share one bind address (`host`). Only the port differs per protocol. Env vars take precedence over the TOML file. +All protocols share one bind address (`host`). Only the port differs per protocol. ```toml # nodedb.toml @@ -133,19 +238,19 @@ ilp = false # Example: disable TLS for ILP ingest **Server settings:** -| Config field | Environment variable | Default | -| ------------------ | ------------------------- | ---------------- | -| `host` | `NODEDB_HOST` | `127.0.0.1` | -| `ports.native` | `NODEDB_PORT_NATIVE` | `6433` | -| `ports.pgwire` | `NODEDB_PORT_PGWIRE` | `6432` | -| `ports.http` | `NODEDB_PORT_HTTP` | `6480` | -| `ports.resp` | `NODEDB_PORT_RESP` | disabled | -| `ports.ilp` | `NODEDB_PORT_ILP` | disabled | -| `data_dir` | `NODEDB_DATA_DIR` | `~/.nodedb/data` | -| `memory_limit` | `NODEDB_MEMORY_LIMIT` | `1GiB` | -| `data_plane_cores` | `NODEDB_DATA_PLANE_CORES` | CPUs - 1 | -| `max_connections` | `NODEDB_MAX_CONNECTIONS` | `1024` | -| `log_format` | `NODEDB_LOG_FORMAT` | `text` | +| Config field | Environment variable | Default | +| ------------------ | ------------------------- | ----------------------------------------------------- | +| `host` | `NODEDB_HOST` | `127.0.0.1` | +| `ports.native` | `NODEDB_PORT_NATIVE` | `6433` | +| `ports.pgwire` | `NODEDB_PORT_PGWIRE` | `6432` | +| `ports.http` | `NODEDB_PORT_HTTP` | `6480` | +| `ports.resp` | `NODEDB_PORT_RESP` | disabled | +| `ports.ilp` | `NODEDB_PORT_ILP` | disabled | +| `data_dir` | `NODEDB_DATA_DIR` | `~/.nodedb/data` (binary), `/var/lib/nodedb` (Docker) | +| `memory_limit` | `NODEDB_MEMORY_LIMIT` | `1GiB` | +| `data_plane_cores` | `NODEDB_DATA_PLANE_CORES` | CPUs - 1 | +| `max_connections` | `NODEDB_MAX_CONNECTIONS` | `1024` | +| `log_format` | `NODEDB_LOG_FORMAT` | `text` | **Per-protocol TLS** (only applies when `[server.tls]` is configured): From 06d3b078c759ff82b235e628765058b445e57b2b Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 21:39:02 +0800 Subject: [PATCH 7/8] chore(nextest): add config to serialize cluster tests and retry flakes Cluster integration tests spin up 3-node Raft clusters with per-node Tokio runtimes; running them alongside the rest of the suite caused port/fd exhaustion and starved Raft heartbeats on high-core machines. Pin them to a single-threaded test-group that claims all test slots, and allow one retry for startup jitter. CI profile adds more retries and JUnit output. --- .config/nextest.toml | 79 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) create mode 100644 .config/nextest.toml diff --git a/.config/nextest.toml b/.config/nextest.toml new file mode 100644 index 00000000..7f6a0f39 --- /dev/null +++ b/.config/nextest.toml @@ -0,0 +1,79 @@ +# nextest configuration. Run with: cargo nextest run --all-features +# +# Why nextest over `cargo test`: +# - Each test runs in its own process → no in-process state contention. +# Integration tests that spawn 3-node clusters used to hang under +# `cargo test`'s default within-binary parallelism because multiple +# clusters in the same process exhausted ports / file descriptors. +# - Per-test timeouts make hangs fail fast instead of stalling CI. +# - Better failure output, retry support, and JUnit XML for CI. + +[profile.default] +# Hard ceiling per test. Anything above this is a bug, not a slow test. +slow-timeout = { period = "30s", terminate-after = 4 } + +# Use every available core for cheap unit tests. Heavy cluster tests +# are kept from starving by `threads-required` overrides below — they +# claim ALL slots so nothing else runs alongside them, regardless of +# whether you're on a 24-core dev box or a 2-core CI runner. +test-threads = "num-cpus" + +# Heavy cluster tests: each one brings up 3 servers + per-node Tokio +# runtimes. Two things keep them stable across machine sizes: +# +# 1. `test-group = "cluster"` with `max-threads = 1` ensures at +# most ONE cluster test runs at a time (no two clusters share +# ports / file descriptors / thread pools). +# 2. `threads-required = "num-test-threads"` makes the running +# cluster test claim every available test slot, which evicts +# every other test from the run-queue while it's executing. +# That's what prevents a 24-core dev box from scheduling 23 +# unit tests alongside the cluster and starving its Raft +# heartbeats. +# +# The combined effect: cluster tests run strictly serially AND +# strictly alone, and the rest of the suite gets full parallelism +# the moment the cluster test finishes. +[[profile.default.overrides]] +filter = ''' +binary(/cluster/) +| binary(/cross_node/) +| binary(/_lease_/) +| binary(descriptor_lease_drain) +| binary(descriptor_lease_forwarding_and_renewal) +| binary(descriptor_lease_planner_integration) +| binary(descriptor_versioning_cross_node) +| binary(prepared_cache_invalidation) +| binary(sql_cluster_cross_node_dml) +''' +test-group = 'cluster' +threads-required = 'num-test-threads' +# Cluster tests bring up real Raft nodes and racy multi-node +# convergence checks. They're flaky enough that one retry catches +# legitimate startup jitter without hiding real regressions — a +# genuinely broken test fails twice in a row. +retries = { backoff = "fixed", count = 2, delay = "1s" } + +[test-groups] +cluster = { max-threads = 1 } + +[profile.ci] +# CI inherits the default profile (cluster group, threads-required, +# slow-timeout) and adds: +# - more retries: CI runners are ~2× slower per-core than dev +# workstations, so the cluster tests' in-test `wait_for` +# budgets are proportionally tighter. Three retries (four total +# attempts) buys headroom for jitter without papering over real +# regressions — a genuinely broken test fails four times in a row. +# - JUnit XML: picked up by the workflow's artifact upload. +# +# NOTE: we deliberately do NOT bump `slow-timeout` here. The +# slow-timeout only controls when nextest gives up on a stuck +# *process*; it does NOT extend the test's internal `wait_for` +# budgets. Once a `wait_for` panics, the test has already failed — +# making nextest wait longer just wastes CI minutes on cleanup. +retries = { backoff = "fixed", count = 3, delay = "2s" } +fail-fast = false + +[profile.ci.junit] +path = "junit.xml" From d96d81e042137cdf87a227f4699c0cf901a0292d Mon Sep 17 00:00:00 2001 From: Farhan Syah Date: Tue, 14 Apr 2026 22:10:36 +0800 Subject: [PATCH 8/8] fix(nodedb-cluster): eliminate 30s-per-peer stalls during cluster join MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two independent but compounding issues caused cluster join to hang for tens of seconds on every startup when seeds were not yet bootstrapped: 1. The QUIC RPC timeout only covered the response-read phase. A handshake attempt against an unreachable or not-yet-listening peer blocked for the transport's internal idle timeout (~30 s), not the configured RPC timeout. In a 5-node race where every non-bootstrapper seed redirects to another non-bootstrapper, this multiplied to (N-1) × 30 s of wasted wall time per join attempt. Fixed by wrapping the entire send_rpc_to_addr operation — handshake, stream open, write, and read — in a single tokio::time::timeout bounded by self.rpc_timeout, and extracting the inner work into send_rpc_to_addr_inner so the public interface stays clean. 2. The seed work-list was a Vec used as a stack (pop), so seed order was unspecified. Under the single-elected-bootstrapper rule the lexicographically smallest address is the one peer that can actually answer during the initial race; hitting it last meant exhausting timeouts against every other seed first. Fixed by sorting seeds at the start of the join loop so the designated bootstrapper surfaces first, and switching to VecDeque so leader redirects are pushed to the front (push_front / pop_front) and consumed before unvisited seeds. --- nodedb-cluster/src/bootstrap/join.rs | 27 ++++++++++++++++++++------ nodedb-cluster/src/transport/client.rs | 23 ++++++++++++++++------ 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/nodedb-cluster/src/bootstrap/join.rs b/nodedb-cluster/src/bootstrap/join.rs index 8ab1a10e..cad1dfad 100644 --- a/nodedb-cluster/src/bootstrap/join.rs +++ b/nodedb-cluster/src/bootstrap/join.rs @@ -143,15 +143,30 @@ async fn try_join_once( transport: &NexarTransport, req_template: &JoinRequest, ) -> Result { - // Work list: start with the configured seeds, prepend leader hints - // as they arrive. `HashSet` deduplicates so a redirect loop can't - // consume all attempts against the same address. - let mut work: Vec = config.seed_nodes.clone(); + // Work list: try seeds in sorted order so the lexicographically + // smallest address — the designated bootstrapper under the + // single-elected-bootstrapper rule — is contacted first. This is + // critical during the initial 5-node race: every other seed points + // at a node that is itself still joining, so asking them first + // eats the full RPC timeout per non-bootstrapper before we reach + // the one peer that can actually answer. `HashSet` deduplicates + // so a redirect loop can't consume all attempts against the same + // address. + let mut work: std::collections::VecDeque = + config.seed_nodes.iter().copied().collect(); + { + // Sort so the designated bootstrapper surfaces first. Leader + // redirects get prepended with push_front below, keeping the + // "most likely to answer" candidate at the head. + let mut sorted: Vec = work.drain(..).collect(); + sorted.sort(); + work.extend(sorted); + } let mut visited: HashSet = HashSet::new(); let mut redirects: u32 = 0; let mut last_err: Option = None; - while let Some(addr) = work.pop() { + while let Some(addr) = work.pop_front() { if !visited.insert(addr) { continue; } @@ -172,7 +187,7 @@ async fn try_join_once( "following leader redirect" ); redirects += 1; - work.push(leader); + work.push_front(leader); continue; } debug!( diff --git a/nodedb-cluster/src/transport/client.rs b/nodedb-cluster/src/transport/client.rs index 69c34f24..71a7d5fd 100644 --- a/nodedb-cluster/src/transport/client.rs +++ b/nodedb-cluster/src/transport/client.rs @@ -267,7 +267,23 @@ impl NexarTransport { } /// Send an RPC to an address directly (for bootstrap/join before peer IDs are known). + /// + /// The **entire** operation — handshake, stream open, write, read — is + /// bounded by `self.rpc_timeout`. Previously only the response read was + /// timed out, which meant a QUIC handshake against an unreachable peer + /// could block for the transport's internal idle timeout (~30 s per + /// address). That was fatal to cluster join races where every non- + /// bootstrapper seed points at another non-bootstrapper: each attempt + /// would stall 30 s × (N-1) peers, compounding across retry passes. pub async fn send_rpc_to_addr(&self, addr: SocketAddr, rpc: RaftRpc) -> Result { + tokio::time::timeout(self.rpc_timeout, self.send_rpc_to_addr_inner(addr, rpc)) + .await + .map_err(|_| ClusterError::Transport { + detail: format!("RPC timeout ({}ms) to {addr}", self.rpc_timeout.as_millis()), + })? + } + + async fn send_rpc_to_addr_inner(&self, addr: SocketAddr, rpc: RaftRpc) -> Result { let frame = rpc_codec::encode(&rpc)?; let conn = self @@ -295,12 +311,7 @@ impl NexarTransport { detail: format!("finish send to {addr}: {e}"), })?; - let response_frame = tokio::time::timeout(self.rpc_timeout, server::read_frame(&mut recv)) - .await - .map_err(|_| ClusterError::Transport { - detail: format!("RPC timeout ({}ms) to {addr}", self.rpc_timeout.as_millis()), - })??; - + let response_frame = server::read_frame(&mut recv).await?; rpc_codec::decode(&response_frame) }