Skip to content

Commit 6513eca

Browse files
authored
Merge pull request #31 from NodeDB-Lab/cluster
Cluster: plan-based gateway, phased startup/shutdown, SWIM membership
2 parents b024c56 + 330e49a commit 6513eca

File tree

206 files changed

+17988
-3027
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

206 files changed

+17988
-3027
lines changed

nodedb-cluster/src/cluster_info.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use std::sync::{Arc, RwLock};
1313

1414
use serde::{Deserialize, Serialize};
1515

16-
use crate::forward::RequestForwarder;
16+
use crate::forward::PlanExecutor;
1717
use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
1818
use crate::multi_raft::GroupStatus;
1919
use crate::raft_loop::{CommitApplier, RaftLoop};
@@ -25,16 +25,16 @@ use crate::topology::ClusterTopology;
2525
/// Implemented for every `RaftLoop` via a blanket impl so the main
2626
/// binary can coerce `Arc<RaftLoop<...>>` to `Arc<dyn
2727
/// GroupStatusProvider + Send + Sync>` without thinking about the
28-
/// `CommitApplier` / `RequestForwarder` type parameters.
28+
/// `CommitApplier` / `PlanExecutor` type parameters.
2929
pub trait GroupStatusProvider: Send + Sync {
3030
/// Current status of every Raft group hosted on this node.
3131
fn group_statuses(&self) -> Vec<GroupStatus>;
3232
}
3333

34-
impl<A, F> GroupStatusProvider for RaftLoop<A, F>
34+
impl<A, P> GroupStatusProvider for RaftLoop<A, P>
3535
where
3636
A: CommitApplier,
37-
F: RequestForwarder,
37+
P: PlanExecutor,
3838
{
3939
fn group_statuses(&self) -> Vec<GroupStatus> {
4040
RaftLoop::group_statuses(self)

nodedb-cluster/src/forward.rs

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,40 @@
1-
//! Query forwarding trait for leader-based request routing.
1+
//! Physical-plan execution trait for leader-based request routing.
22
//!
3-
//! When a client connects to a non-leader node, the query is forwarded
4-
//! to the leader for the target vShard. The [`RequestForwarder`] trait
5-
//! abstracts local execution so the cluster crate doesn't depend on the
6-
//! main binary's SharedState or pgwire infrastructure.
3+
//! [`PlanExecutor`]: the physical-plan execution path introduced in C-β.
4+
//! The legacy [`RequestForwarder`] SQL-string path was deleted in C-δ.6.
75
8-
use crate::rpc_codec::{ForwardRequest, ForwardResponse};
6+
use crate::rpc_codec::{ExecuteRequest, ExecuteResponse};
97

10-
/// Trait for executing forwarded SQL queries on the local Data Plane.
8+
// ── Physical-plan execution (C-β) ────────────────────────────────────────────
9+
10+
/// Trait for executing a pre-planned `PhysicalPlan` on the local Data Plane.
11+
///
12+
/// Implemented in `nodedb/src/control/exec_receiver.rs` by `LocalPlanExecutor`.
13+
/// The cluster RPC handler calls this when it receives an `ExecuteRequest`.
1114
///
12-
/// Implemented by the main binary crate using SharedState + QueryContext.
13-
/// The cluster RPC handler calls this when it receives a `ForwardRequest`.
14-
pub trait RequestForwarder: Send + Sync + 'static {
15-
/// Execute a forwarded SQL query locally and return the result.
16-
///
17-
/// The implementation should:
18-
/// 1. Create a synthetic identity from the tenant_id (trusted node-to-node)
19-
/// 2. Plan the SQL through DataFusion
20-
/// 3. Dispatch to the local Data Plane
21-
/// 4. Collect response payloads
22-
/// 5. Return them in a ForwardResponse
23-
fn execute_forwarded(
15+
/// Responsibilities:
16+
/// 1. Validate that `deadline_remaining_ms > 0`.
17+
/// 2. For each `DescriptorVersionEntry`, verify the local descriptor version matches.
18+
/// 3. Decode `plan_bytes` via `nodedb::bridge::physical_plan::wire::decode`.
19+
/// 4. Dispatch through the local SPSC bridge.
20+
/// 5. Collect response payloads.
21+
/// 6. Map errors to `TypedClusterError`.
22+
pub trait PlanExecutor: Send + Sync + 'static {
23+
fn execute_plan(
2424
&self,
25-
req: ForwardRequest,
26-
) -> impl std::future::Future<Output = ForwardResponse> + Send;
25+
req: ExecuteRequest,
26+
) -> impl std::future::Future<Output = ExecuteResponse> + Send;
2727
}
2828

29-
/// No-op forwarder for single-node mode or testing.
30-
pub struct NoopForwarder;
29+
/// No-op executor for single-node mode or testing.
30+
pub struct NoopPlanExecutor;
3131

32-
impl RequestForwarder for NoopForwarder {
33-
async fn execute_forwarded(&self, _req: ForwardRequest) -> ForwardResponse {
34-
ForwardResponse {
35-
success: false,
36-
payloads: vec![],
37-
error_message: "query forwarding not available (single-node mode)".into(),
38-
}
32+
impl PlanExecutor for NoopPlanExecutor {
33+
async fn execute_plan(&self, _req: ExecuteRequest) -> ExecuteResponse {
34+
use crate::rpc_codec::TypedClusterError;
35+
ExecuteResponse::err(TypedClusterError::Internal {
36+
code: 0,
37+
message: "plan execution not available (single-node mode)".into(),
38+
})
3939
}
4040
}

nodedb-cluster/src/lib.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ pub mod rebalance_scheduler;
3131
pub mod routing;
3232
pub mod rpc_codec;
3333
pub mod shard_split;
34+
pub mod swim;
3435
pub mod topology;
3536
pub mod transport;
3637
pub mod vshard_handler;
@@ -43,7 +44,7 @@ pub use cluster_info::{
4344
};
4445
pub use conf_change::{ConfChange, ConfChangeType};
4546
pub use error::{ClusterError, Result};
46-
pub use forward::{NoopForwarder, RequestForwarder};
47+
pub use forward::{NoopPlanExecutor, PlanExecutor};
4748
pub use ghost::{GhostStub, GhostTable};
4849
pub use health::{HealthConfig, HealthMonitor};
4950
pub use lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
@@ -77,3 +78,4 @@ pub use lifecycle::{
7778
pub use rdma_transport::{RdmaConfig, RdmaTransport};
7879
pub use rebalance_scheduler::{NodeMetrics, RebalanceScheduler, RebalanceTrigger, SchedulerConfig};
7980
pub use shard_split::{SplitPlan, SplitStrategy, plan_graph_split, plan_vector_split};
81+
pub use swim::{Incarnation, Member, MemberState, MembershipList, SwimConfig, SwimError};

nodedb-cluster/src/raft_loop/handle_rpc.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
//! orchestration in [`super::join`].
77
88
use crate::error::{ClusterError, Result};
9-
use crate::forward::RequestForwarder;
9+
use crate::forward::PlanExecutor;
1010
use crate::health;
1111
use crate::rpc_codec::RaftRpc;
1212
use crate::transport::RaftRpcHandler;
@@ -61,7 +61,7 @@ pub(super) fn decide_join(
6161
}
6262
}
6363

64-
impl<A: CommitApplier, F: RequestForwarder> RaftRpcHandler for RaftLoop<A, F> {
64+
impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
6565
async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
6666
match rpc {
6767
// Raft consensus RPCs — lock MultiRaft (sync, never across await).
@@ -135,10 +135,11 @@ impl<A: CommitApplier, F: RequestForwarder> RaftRpcHandler for RaftLoop<A, F> {
135135
}
136136
Ok(ack)
137137
}
138-
// Query forwarding — execute locally via the RequestForwarder.
139-
RaftRpc::ForwardRequest(req) => {
140-
let resp = self.forwarder.execute_forwarded(req).await;
141-
Ok(RaftRpc::ForwardResponse(resp))
138+
// Physical-plan execution (C-β) — execute locally via the PlanExecutor,
139+
// skipping SQL re-planning entirely.
140+
RaftRpc::ExecuteRequest(req) => {
141+
let resp = self.plan_executor.execute_plan(req).await;
142+
Ok(RaftRpc::ExecuteResponse(resp))
142143
}
143144
// Metadata-group proposal forwarding — apply locally if
144145
// we're the metadata leader, otherwise return a

nodedb-cluster/src/raft_loop/join.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use tracing::{debug, info, warn};
6161
use crate::bootstrap::handle_join_request;
6262
use crate::conf_change::{ConfChange, ConfChangeType};
6363
use crate::error::{ClusterError, Result};
64-
use crate::forward::RequestForwarder;
64+
use crate::forward::PlanExecutor;
6565
use crate::health;
6666
use crate::multi_raft::GroupStatus;
6767
use crate::routing::RoutingTable;
@@ -78,7 +78,7 @@ const CONF_CHANGE_COMMIT_TIMEOUT: Duration = Duration::from_secs(5);
7878
/// Polling interval for the commit-wait loop.
7979
const CONF_CHANGE_POLL_INTERVAL: Duration = Duration::from_millis(20);
8080

81-
impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
81+
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
8282
/// Full server-side `JoinRequest` handler. See module docs for the
8383
/// phase-by-phase description.
8484
pub(super) async fn join_flow(&self, req: JoinRequest) -> JoinResponse {

nodedb-cluster/src/raft_loop/loop_core.rs

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use nodedb_raft::message::LogEntry;
1515
use crate::catalog::ClusterCatalog;
1616
use crate::conf_change::ConfChange;
1717
use crate::error::Result;
18-
use crate::forward::RequestForwarder;
18+
use crate::forward::{NoopPlanExecutor, PlanExecutor};
1919
use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
2020
use crate::multi_raft::MultiRaft;
2121
use crate::topology::ClusterTopology;
@@ -53,17 +53,20 @@ pub type VShardEnvelopeHandler = Arc<
5353
/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
5454
/// [`super::handle_rpc`]) so it can be passed directly to
5555
/// [`NexarTransport::serve`] for incoming RPC dispatch.
56-
pub struct RaftLoop<A: CommitApplier, F: RequestForwarder = crate::forward::NoopForwarder> {
56+
///
57+
/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
58+
/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
59+
/// through `gateway.execute / ExecuteRequest` (C-β path).
60+
pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
5761
pub(super) node_id: u64,
5862
pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
5963
pub(super) transport: Arc<NexarTransport>,
6064
pub(super) topology: Arc<RwLock<ClusterTopology>>,
6165
pub(super) applier: A,
6266
/// Applies committed entries from the metadata Raft group (group 0).
63-
/// Every node has one; defaults to a no-op until the host crate wires
64-
/// in a real [`MetadataApplier`] via [`Self::with_metadata_applier`].
6567
pub(super) metadata_applier: Arc<dyn MetadataApplier>,
66-
pub(super) forwarder: Arc<F>,
68+
/// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
69+
pub(super) plan_executor: Arc<P>,
6770
pub(super) tick_interval: Duration,
6871
/// Optional handler for incoming VShardEnvelope messages.
6972
/// Set when the Event Plane or other subsystems need cross-node messaging.
@@ -119,7 +122,7 @@ impl<A: CommitApplier> RaftLoop<A> {
119122
topology,
120123
applier,
121124
metadata_applier: Arc::new(NoopMetadataApplier),
122-
forwarder: Arc::new(crate::forward::NoopForwarder),
125+
plan_executor: Arc::new(NoopPlanExecutor),
123126
tick_interval: DEFAULT_TICK_INTERVAL,
124127
vshard_handler: None,
125128
catalog: None,
@@ -129,31 +132,22 @@ impl<A: CommitApplier> RaftLoop<A> {
129132
}
130133
}
131134

132-
impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
133-
/// Create a RaftLoop with a custom request forwarder (for cluster mode).
134-
pub fn with_forwarder(
135-
multi_raft: MultiRaft,
136-
transport: Arc<NexarTransport>,
137-
topology: Arc<RwLock<ClusterTopology>>,
138-
applier: A,
139-
forwarder: Arc<F>,
140-
) -> Self {
141-
let node_id = multi_raft.node_id();
142-
let (shutdown_watch, _) = tokio::sync::watch::channel(false);
143-
let (ready_watch, _) = tokio::sync::watch::channel(false);
144-
Self {
145-
node_id,
146-
multi_raft: Arc::new(Mutex::new(multi_raft)),
147-
transport,
148-
topology,
149-
applier,
150-
metadata_applier: Arc::new(NoopMetadataApplier),
151-
forwarder,
152-
tick_interval: DEFAULT_TICK_INTERVAL,
153-
vshard_handler: None,
154-
catalog: None,
155-
shutdown_watch,
156-
ready_watch,
135+
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
136+
/// Install a custom plan executor (for cluster mode — C-β path).
137+
pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
138+
RaftLoop {
139+
node_id: self.node_id,
140+
multi_raft: self.multi_raft,
141+
transport: self.transport,
142+
topology: self.topology,
143+
applier: self.applier,
144+
metadata_applier: self.metadata_applier,
145+
plan_executor: executor,
146+
tick_interval: self.tick_interval,
147+
vshard_handler: self.vshard_handler,
148+
catalog: self.catalog,
149+
shutdown_watch: self.shutdown_watch,
150+
ready_watch: self.ready_watch,
157151
}
158152
}
159153

nodedb-cluster/src/raft_loop/tick.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use tracing::{debug, warn};
2727
use nodedb_raft::transport::RaftTransport;
2828

2929
use crate::conf_change::{ConfChange, ConfChangeType};
30-
use crate::forward::RequestForwarder;
30+
use crate::forward::PlanExecutor;
3131

3232
use super::loop_core::{CommitApplier, RaftLoop};
3333

34-
impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
34+
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
3535
/// Execute a single tick: drive Raft, dispatch outbound messages,
3636
/// apply commits, promote caught-up learners.
3737
pub(super) fn do_tick(&self) {

0 commit comments

Comments
 (0)