Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions richat/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,13 +195,19 @@ impl Subscription {
config,
global_replay_from_slot,
None,
false,
),
move |mut state: (
Backoff,
SubscriptionConfig,
ConfigChannelSourceGeneral,
GlobalReplayFromSlot,
Option<kanal::AsyncReceiver<SubscriptionMessage>>,
bool, // fallback_to_live: this source could not replay from
// the requested slot, but another source is covering
// the gap. Subsequent subscribe attempts skip replay
// and connect at live tip so we still get future
// messages from this source.
)| async move {
loop {
if let Some(stream) = state.4.as_mut() {
Expand All @@ -213,7 +219,12 @@ impl Subscription {
if state.3.report_replay_failed(name) {
return Err(ReceiveError::ReplayFailed);
}
error!(name, "failed to replay, waiting for other sources");
state.5 = true;
error!(
name,
"failed to replay, another source covers the gap, \
reconnecting at live tip"
);
}
Ok(Err(error)) => {
error!(name, ?error, "failed to receive")
Expand All @@ -225,26 +236,45 @@ impl Subscription {
state.4 = None;
state.0.sleep().await;
} else {
let replay_from_slot = if state.5 {
None
} else {
state.3.load()
};
match Subscription::subscribe(
name,
state.1.clone(),
state.2.disable_accounts,
state.2.parser,
state.2.channel_size,
state.3.load(),
replay_from_slot,
)
.await
{
Ok(stream) => {
state.4 = Some(stream);
state.0.reset();
// Reset the fallback flag on every
// successful subscribe. A brief
// disconnect later will normally find
// `global_replay_from_slot` close to
// live, so the replay path has a fair
// chance to succeed and keep the source
// gap free. If replay fails again we
// simply fall back once more.
state.5 = false;
}
Err(error) => {
if error.is_replay_slot_not_available() {
if state.3.report_replay_failed(name) {
return Err(ReceiveError::ReplayFailed);
}
error!(name, "failed to replay at subscribe time, waiting for other sources");
state.5 = true;
error!(
name,
"failed to replay at subscribe time, another \
source covers the gap, reconnecting at live tip"
);
} else {
error!(name, ?error, "failed to connect");
}
Expand Down
Loading