Skip to content

Commit 29aca00

Browse files
authored
cluster: allow forwarded messages to be forwarded twice (#1153)
During a leadership change, forward-to-leader requests may fail because they'll go to a stale leader and we don't allow re-forwarding. This adds a new `hop_ttl` field to the forward request which allows us to forward multiple times without looping forever.
1 parent ffc2e1a commit 29aca00

3 files changed

Lines changed: 49 additions & 23 deletions

File tree

src/core/cluster/app.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -179,17 +179,11 @@ async fn handle_forwarded_write(
179179
// reset the timestamp in case the forwarding node was out of sync
180180
req.request.timestamp = state.state_machine.time.now_utm();
181181

182-
// intentionally do not use state.client_write because we don't want an infinite recursion
183-
// of forwardings
184-
let wrapped = Arc::new(req.request);
185-
let response =
186-
state.raft.client_write(wrapped).await.map_err(|e| {
187-
crate::Error::internal(format!("Unable to execute forwarded write: {e:?}"))
188-
})?;
189-
let response = ForwardedWriteResponse {
190-
log_id: response.log_id,
191-
response: response.data,
192-
};
182+
let (response, log_id) = state
183+
.client_write_forward(req.request, req.hop_ttl)
184+
.await
185+
.map_err(|e| crate::Error::internal(format!("Unable to execute forwarded write: {e:?}")))?;
186+
let response = ForwardedWriteResponse { log_id, response };
193187
Ok(MsgPack(response))
194188
}
195189

src/core/cluster/handle.rs

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,8 @@ impl From<RaftError<ClientWriteError>> for DiomErrorOrForwardToLeader {
345345
}
346346
}
347347

348+
const DEFAULT_HOP_TTL: usize = 2;
349+
348350
#[derive(Clone)]
349351
pub struct RaftState {
350352
pub cfg: Configuration,
@@ -359,7 +361,6 @@ pub struct RaftState {
359361

360362
impl RaftState {
361363
/// Write a single operation into the Raft log and return its response.
362-
#[tracing::instrument(name = "raft_client_write", skip_all, fields(forwarded = false))]
363364
pub async fn client_write<O>(&self, op: O) -> anyhow::Result<O::Response>
364365
where
365366
O: OperationRequest<
@@ -370,14 +371,43 @@ impl RaftState {
370371
>,
371372
> + Into<O::RequestParent>,
372373
{
373-
let start = std::time::Instant::now();
374374
let inner: Request = op.into().into();
375375
let now = self.time.update_now();
376376
let request = RequestWithContext::new(
377377
inner,
378378
now.into(),
379379
Some(opentelemetry::Context::current().into()),
380380
);
381+
let (response, _) = self.client_write_inner(request, DEFAULT_HOP_TTL).await?;
382+
let module_response = <O::Response as OperationResponse>::ResponseParent::try_from(
383+
response,
384+
)
385+
.map_err(|e| {
386+
anyhow::anyhow!("raft response should be convertible into module response type: {e:?}")
387+
})?;
388+
let resp = module_response.try_into().map_err(|e| {
389+
anyhow::anyhow!("module response should be convertible into target type: {e:?}")
390+
})?;
391+
Ok(resp)
392+
}
393+
394+
pub(crate) fn client_write_forward(
395+
&self,
396+
op: RequestWithContext,
397+
hop_ttl: usize,
398+
) -> impl Future<Output = anyhow::Result<(Response, LogId)>> {
399+
self.client_write_inner(op, hop_ttl)
400+
}
401+
402+
/// Write a request to the current node. If we get a ForwardToLeader error, and
403+
/// `hop_ttl` is > 0, then attempt to forward it to the leader
404+
#[tracing::instrument(name = "raft_client_write", skip_all, fields(forwarded = false))]
405+
async fn client_write_inner(
406+
&self,
407+
request: RequestWithContext,
408+
hop_ttl: usize,
409+
) -> anyhow::Result<(Response, LogId)> {
410+
let start = std::time::Instant::now();
381411
let request = Arc::new(request);
382412
let mut write_type = WriteType::Local;
383413
let (response, log_id) = match self.raft.client_write(Arc::clone(&request)).await {
@@ -390,6 +420,10 @@ impl RaftState {
390420
if let Some(leader_id) = forward_to_leader.leader_id
391421
&& let Some(leader_node) = &forward_to_leader.leader_node
392422
{
423+
if hop_ttl == 0 {
424+
tracing::error!("received request with no remaining ttl");
425+
anyhow::bail!("too many forwards; possible routing loop");
426+
}
393427
tracing::Span::current().record("forwarded", true);
394428
tracing::trace!("received write to non-leader, forwarding");
395429
let mut network_handle = self.network.clone();
@@ -399,6 +433,7 @@ impl RaftState {
399433
.forward_request(super::proto::ForwardedWriteRequest {
400434
source_node_id: self.node_id,
401435
request: Arc::unwrap_or_clone(request),
436+
hop_ttl: hop_ttl.saturating_sub(1),
402437
})
403438
.await
404439
.map(|r| (r.response, r.log_id))
@@ -423,16 +458,7 @@ impl RaftState {
423458
});
424459
});
425460
self.metrics.record_write(write_type, start.elapsed());
426-
let module_response = <O::Response as OperationResponse>::ResponseParent::try_from(
427-
response,
428-
)
429-
.map_err(|e| {
430-
anyhow::anyhow!("raft response should be convertible into module response type: {e:?}")
431-
})?;
432-
let resp = module_response.try_into().map_err(|e| {
433-
anyhow::anyhow!("module response should be convertible into target type: {e:?}")
434-
})?;
435-
Ok(resp)
461+
Ok((response, log_id))
436462
}
437463

438464
pub async fn run_discovery_if_necessary(&self, app_state: AppState) -> anyhow::Result<()> {

src/core/cluster/proto.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ pub struct HealthResponse {
8787
pub struct ForwardedWriteRequest {
8888
pub source_node_id: NodeId,
8989
pub request: RequestWithContext,
90+
#[serde(default = "default_hop_ttl")]
91+
pub hop_ttl: usize,
9092
}
9193

9294
#[derive(Debug, Serialize, Deserialize, Clone)]
@@ -102,3 +104,7 @@ pub struct LastIdRequest {}
102104
pub struct LastIdResponse {
103105
pub last_committed_log_id: Option<LogId>,
104106
}
107+
108+
const fn default_hop_ttl() -> usize {
109+
1
110+
}

0 commit comments

Comments
 (0)