diff --git a/richat/src/source.rs b/richat/src/source.rs index 8604fff..5e46a45 100644 --- a/richat/src/source.rs +++ b/richat/src/source.rs @@ -195,6 +195,7 @@ impl Subscription { config, global_replay_from_slot, None, + false, ), move |mut state: ( Backoff, @@ -202,6 +203,11 @@ impl Subscription { ConfigChannelSourceGeneral, GlobalReplayFromSlot, Option>, + bool, // fallback_to_live: this source could not replay from + // the requested slot, but another source is covering + // the gap. Subsequent subscribe attempts skip replay + // and connect at live tip so we still get future + // messages from this source. )| async move { loop { if let Some(stream) = state.4.as_mut() { @@ -213,7 +219,12 @@ impl Subscription { if state.3.report_replay_failed(name) { return Err(ReceiveError::ReplayFailed); } - error!(name, "failed to replay, waiting for other sources"); + state.5 = true; + error!( + name, + "failed to replay, another source covers the gap, \ + reconnecting at live tip" + ); } Ok(Err(error)) => { error!(name, ?error, "failed to receive") @@ -225,26 +236,45 @@ impl Subscription { state.4 = None; state.0.sleep().await; } else { + let replay_from_slot = if state.5 { + None + } else { + state.3.load() + }; match Subscription::subscribe( name, state.1.clone(), state.2.disable_accounts, state.2.parser, state.2.channel_size, - state.3.load(), + replay_from_slot, ) .await { Ok(stream) => { state.4 = Some(stream); state.0.reset(); + // Reset the fallback flag on every + // successful subscribe. A brief + // disconnect later will normally find + // `global_replay_from_slot` close to + // live, so the replay path has a fair + // chance to succeed and keep the source + // gap free. If replay fails again we + // simply fall back once more. + state.5 = false; } Err(error) => { if error.is_replay_slot_not_available() { if state.3.report_replay_failed(name) { return Err(ReceiveError::ReplayFailed); } - error!(name, "failed to replay at subscribe time, waiting for other sources"); + state.5 = true; + error!( + name, + "failed to replay at subscribe time, another \ + source covers the gap, reconnecting at live tip" + ); } else { error!(name, ?error, "failed to connect"); }