Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions nodedb-cluster/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::sync::{Arc, RwLock};

use serde::{Deserialize, Serialize};

use crate::forward::RequestForwarder;
use crate::forward::PlanExecutor;
use crate::lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
use crate::multi_raft::GroupStatus;
use crate::raft_loop::{CommitApplier, RaftLoop};
Expand All @@ -25,16 +25,16 @@ use crate::topology::ClusterTopology;
/// Implemented for every `RaftLoop` via a blanket impl so the main
/// binary can coerce `Arc<RaftLoop<...>>` to `Arc<dyn
/// GroupStatusProvider + Send + Sync>` without thinking about the
/// `CommitApplier` / `RequestForwarder` type parameters.
/// `CommitApplier` / `PlanExecutor` type parameters.
pub trait GroupStatusProvider: Send + Sync {
/// Current status of every Raft group hosted on this node.
fn group_statuses(&self) -> Vec<GroupStatus>;
}

impl<A, F> GroupStatusProvider for RaftLoop<A, F>
impl<A, P> GroupStatusProvider for RaftLoop<A, P>
where
A: CommitApplier,
F: RequestForwarder,
P: PlanExecutor,
{
fn group_statuses(&self) -> Vec<GroupStatus> {
RaftLoop::group_statuses(self)
Expand Down
60 changes: 30 additions & 30 deletions nodedb-cluster/src/forward.rs
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
//! Query forwarding trait for leader-based request routing.
//! Physical-plan execution trait for leader-based request routing.
//!
//! When a client connects to a non-leader node, the query is forwarded
//! to the leader for the target vShard. The [`RequestForwarder`] trait
//! abstracts local execution so the cluster crate doesn't depend on the
//! main binary's SharedState or pgwire infrastructure.
//! [`PlanExecutor`]: the physical-plan execution path introduced in C-β.
//! The legacy [`RequestForwarder`] SQL-string path was deleted in C-δ.6.

use crate::rpc_codec::{ForwardRequest, ForwardResponse};
use crate::rpc_codec::{ExecuteRequest, ExecuteResponse};

/// Trait for executing forwarded SQL queries on the local Data Plane.
// ── Physical-plan execution (C-β) ────────────────────────────────────────────

/// Trait for executing a pre-planned `PhysicalPlan` on the local Data Plane.
///
/// Implemented in `nodedb/src/control/exec_receiver.rs` by `LocalPlanExecutor`.
/// The cluster RPC handler calls this when it receives an `ExecuteRequest`.
///
/// Implemented by the main binary crate using SharedState + QueryContext.
/// The cluster RPC handler calls this when it receives a `ForwardRequest`.
pub trait RequestForwarder: Send + Sync + 'static {
/// Execute a forwarded SQL query locally and return the result.
///
/// The implementation should:
/// 1. Create a synthetic identity from the tenant_id (trusted node-to-node)
/// 2. Plan the SQL through DataFusion
/// 3. Dispatch to the local Data Plane
/// 4. Collect response payloads
/// 5. Return them in a ForwardResponse
fn execute_forwarded(
/// Responsibilities:
/// 1. Validate that `deadline_remaining_ms > 0`.
/// 2. For each `DescriptorVersionEntry`, verify the local descriptor version matches.
/// 3. Decode `plan_bytes` via `nodedb::bridge::physical_plan::wire::decode`.
/// 4. Dispatch through the local SPSC bridge.
/// 5. Collect response payloads.
/// 6. Map errors to `TypedClusterError`.
pub trait PlanExecutor: Send + Sync + 'static {
fn execute_plan(
&self,
req: ForwardRequest,
) -> impl std::future::Future<Output = ForwardResponse> + Send;
req: ExecuteRequest,
) -> impl std::future::Future<Output = ExecuteResponse> + Send;
}

/// No-op forwarder for single-node mode or testing.
pub struct NoopForwarder;
/// No-op executor for single-node mode or testing.
pub struct NoopPlanExecutor;

impl RequestForwarder for NoopForwarder {
async fn execute_forwarded(&self, _req: ForwardRequest) -> ForwardResponse {
ForwardResponse {
success: false,
payloads: vec![],
error_message: "query forwarding not available (single-node mode)".into(),
}
impl PlanExecutor for NoopPlanExecutor {
async fn execute_plan(&self, _req: ExecuteRequest) -> ExecuteResponse {
use crate::rpc_codec::TypedClusterError;
ExecuteResponse::err(TypedClusterError::Internal {
code: 0,
message: "plan execution not available (single-node mode)".into(),
})
}
}
4 changes: 3 additions & 1 deletion nodedb-cluster/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub mod rebalance_scheduler;
pub mod routing;
pub mod rpc_codec;
pub mod shard_split;
pub mod swim;
pub mod topology;
pub mod transport;
pub mod vshard_handler;
Expand All @@ -43,7 +44,7 @@ pub use cluster_info::{
};
pub use conf_change::{ConfChange, ConfChangeType};
pub use error::{ClusterError, Result};
pub use forward::{NoopForwarder, RequestForwarder};
pub use forward::{NoopPlanExecutor, PlanExecutor};
pub use ghost::{GhostStub, GhostTable};
pub use health::{HealthConfig, HealthMonitor};
pub use lifecycle_state::{ClusterLifecycleState, ClusterLifecycleTracker};
Expand Down Expand Up @@ -77,3 +78,4 @@ pub use lifecycle::{
pub use rdma_transport::{RdmaConfig, RdmaTransport};
pub use rebalance_scheduler::{NodeMetrics, RebalanceScheduler, RebalanceTrigger, SchedulerConfig};
pub use shard_split::{SplitPlan, SplitStrategy, plan_graph_split, plan_vector_split};
pub use swim::{Incarnation, Member, MemberState, MembershipList, SwimConfig, SwimError};
13 changes: 7 additions & 6 deletions nodedb-cluster/src/raft_loop/handle_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//! orchestration in [`super::join`].

use crate::error::{ClusterError, Result};
use crate::forward::RequestForwarder;
use crate::forward::PlanExecutor;
use crate::health;
use crate::rpc_codec::RaftRpc;
use crate::transport::RaftRpcHandler;
Expand Down Expand Up @@ -61,7 +61,7 @@ pub(super) fn decide_join(
}
}

impl<A: CommitApplier, F: RequestForwarder> RaftRpcHandler for RaftLoop<A, F> {
impl<A: CommitApplier, P: PlanExecutor> RaftRpcHandler for RaftLoop<A, P> {
async fn handle_rpc(&self, rpc: RaftRpc) -> Result<RaftRpc> {
match rpc {
// Raft consensus RPCs — lock MultiRaft (sync, never across await).
Expand Down Expand Up @@ -135,10 +135,11 @@ impl<A: CommitApplier, F: RequestForwarder> RaftRpcHandler for RaftLoop<A, F> {
}
Ok(ack)
}
// Query forwarding — execute locally via the RequestForwarder.
RaftRpc::ForwardRequest(req) => {
let resp = self.forwarder.execute_forwarded(req).await;
Ok(RaftRpc::ForwardResponse(resp))
// Physical-plan execution (C-β) — execute locally via the PlanExecutor,
// skipping SQL re-planning entirely.
RaftRpc::ExecuteRequest(req) => {
let resp = self.plan_executor.execute_plan(req).await;
Ok(RaftRpc::ExecuteResponse(resp))
}
// Metadata-group proposal forwarding — apply locally if
// we're the metadata leader, otherwise return a
Expand Down
4 changes: 2 additions & 2 deletions nodedb-cluster/src/raft_loop/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use tracing::{debug, info, warn};
use crate::bootstrap::handle_join_request;
use crate::conf_change::{ConfChange, ConfChangeType};
use crate::error::{ClusterError, Result};
use crate::forward::RequestForwarder;
use crate::forward::PlanExecutor;
use crate::health;
use crate::multi_raft::GroupStatus;
use crate::routing::RoutingTable;
Expand All @@ -78,7 +78,7 @@ const CONF_CHANGE_COMMIT_TIMEOUT: Duration = Duration::from_secs(5);
/// Polling interval for the commit-wait loop.
const CONF_CHANGE_POLL_INTERVAL: Duration = Duration::from_millis(20);

impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
/// Full server-side `JoinRequest` handler. See module docs for the
/// phase-by-phase description.
pub(super) async fn join_flow(&self, req: JoinRequest) -> JoinResponse {
Expand Down
56 changes: 25 additions & 31 deletions nodedb-cluster/src/raft_loop/loop_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use nodedb_raft::message::LogEntry;
use crate::catalog::ClusterCatalog;
use crate::conf_change::ConfChange;
use crate::error::Result;
use crate::forward::RequestForwarder;
use crate::forward::{NoopPlanExecutor, PlanExecutor};
use crate::metadata_group::applier::{MetadataApplier, NoopMetadataApplier};
use crate::multi_raft::MultiRaft;
use crate::topology::ClusterTopology;
Expand Down Expand Up @@ -53,17 +53,20 @@ pub type VShardEnvelopeHandler = Arc<
/// ticks. Implements [`crate::transport::RaftRpcHandler`] (in
/// [`super::handle_rpc`]) so it can be passed directly to
/// [`NexarTransport::serve`] for incoming RPC dispatch.
pub struct RaftLoop<A: CommitApplier, F: RequestForwarder = crate::forward::NoopForwarder> {
///
/// The `F: RequestForwarder` generic parameter was removed in C-δ.6 when the
/// SQL-string forwarding path was retired. Cross-node SQL routing now goes
/// through `gateway.execute / ExecuteRequest` (C-β path).
pub struct RaftLoop<A: CommitApplier, P: PlanExecutor = NoopPlanExecutor> {
pub(super) node_id: u64,
pub(super) multi_raft: Arc<Mutex<MultiRaft>>,
pub(super) transport: Arc<NexarTransport>,
pub(super) topology: Arc<RwLock<ClusterTopology>>,
pub(super) applier: A,
/// Applies committed entries from the metadata Raft group (group 0).
/// Every node has one; defaults to a no-op until the host crate wires
/// in a real [`MetadataApplier`] via [`Self::with_metadata_applier`].
pub(super) metadata_applier: Arc<dyn MetadataApplier>,
pub(super) forwarder: Arc<F>,
/// Executes incoming `ExecuteRequest` RPCs without SQL re-planning.
pub(super) plan_executor: Arc<P>,
pub(super) tick_interval: Duration,
/// Optional handler for incoming VShardEnvelope messages.
/// Set when the Event Plane or other subsystems need cross-node messaging.
Expand Down Expand Up @@ -119,7 +122,7 @@ impl<A: CommitApplier> RaftLoop<A> {
topology,
applier,
metadata_applier: Arc::new(NoopMetadataApplier),
forwarder: Arc::new(crate::forward::NoopForwarder),
plan_executor: Arc::new(NoopPlanExecutor),
tick_interval: DEFAULT_TICK_INTERVAL,
vshard_handler: None,
catalog: None,
Expand All @@ -129,31 +132,22 @@ impl<A: CommitApplier> RaftLoop<A> {
}
}

impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
/// Create a RaftLoop with a custom request forwarder (for cluster mode).
pub fn with_forwarder(
multi_raft: MultiRaft,
transport: Arc<NexarTransport>,
topology: Arc<RwLock<ClusterTopology>>,
applier: A,
forwarder: Arc<F>,
) -> Self {
let node_id = multi_raft.node_id();
let (shutdown_watch, _) = tokio::sync::watch::channel(false);
let (ready_watch, _) = tokio::sync::watch::channel(false);
Self {
node_id,
multi_raft: Arc::new(Mutex::new(multi_raft)),
transport,
topology,
applier,
metadata_applier: Arc::new(NoopMetadataApplier),
forwarder,
tick_interval: DEFAULT_TICK_INTERVAL,
vshard_handler: None,
catalog: None,
shutdown_watch,
ready_watch,
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
/// Install a custom plan executor (for cluster mode — C-β path).
pub fn with_plan_executor<P2: PlanExecutor>(self, executor: Arc<P2>) -> RaftLoop<A, P2> {
RaftLoop {
node_id: self.node_id,
multi_raft: self.multi_raft,
transport: self.transport,
topology: self.topology,
applier: self.applier,
metadata_applier: self.metadata_applier,
plan_executor: executor,
tick_interval: self.tick_interval,
vshard_handler: self.vshard_handler,
catalog: self.catalog,
shutdown_watch: self.shutdown_watch,
ready_watch: self.ready_watch,
}
}

Expand Down
4 changes: 2 additions & 2 deletions nodedb-cluster/src/raft_loop/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ use tracing::{debug, warn};
use nodedb_raft::transport::RaftTransport;

use crate::conf_change::{ConfChange, ConfChangeType};
use crate::forward::RequestForwarder;
use crate::forward::PlanExecutor;

use super::loop_core::{CommitApplier, RaftLoop};

impl<A: CommitApplier, F: RequestForwarder> RaftLoop<A, F> {
impl<A: CommitApplier, P: PlanExecutor> RaftLoop<A, P> {
/// Execute a single tick: drive Raft, dispatch outbound messages,
/// apply commits, promote caught-up learners.
pub(super) fn do_tick(&self) {
Expand Down
Loading
Loading