Skip to content

Commit 8940f08

Browse files
committed
Added ToClientMessage::EventLiveBoundary
1 parent 86d2e15 commit 8940f08

File tree

4 files changed

+28
-14
lines changed

4 files changed

+28
-14
lines changed

crates/hyperqueue/src/client/commands/journal/mod.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,15 @@ async fn stream_json(gsettings: &GlobalSettings, live_events: bool) -> anyhow::R
7171
let mut stdout = BufWriter::new(stdout);
7272
while let Some(event) = connection.connection().receive().await {
7373
let event = event?;
74-
if let ToClientMessage::Event(e) = event {
75-
writeln!(stdout, "{}", format_event(e))?;
76-
stdout.flush()?;
77-
} else {
78-
anyhow::bail!("Invalid message receive, not ToClientMessage::Event");
74+
match event {
75+
ToClientMessage::Event(e) => {
76+
writeln!(stdout, "{}", format_event(e))?;
77+
stdout.flush()?;
78+
}
79+
ToClientMessage::EventLiveBoundary => { /* Do nothing */ }
80+
_ => {
81+
anyhow::bail!("Invalid message receive, not ToClientMessage::Event");
82+
}
7983
}
8084
}
8185
Ok(())

crates/hyperqueue/src/dashboard/data/fetch.rs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,16 +35,21 @@ pub async fn create_data_fetch_process(
3535
let Some(message) = message else { break; };
3636

3737
let message = message?;
38-
let ToClientMessage::Event(event) = message else {
39-
return Err(anyhow::anyhow!(
40-
"Dashboard received unexpected message {message:?}"
41-
));
38+
match message {
39+
ToClientMessage::Event(event) => {
40+
events.push(event);
41+
if events.len() == CAPACITY {
42+
sender.send(events).await?;
43+
events = Vec::with_capacity(CAPACITY);
44+
}
45+
},
46+
ToClientMessage::EventLiveBoundary => {
47+
/* Do nothing */
48+
}
49+
_ => {
50+
return Err(anyhow::anyhow!("Dashboard received unexpected message {message:?}"));
51+
}
4252
};
43-
events.push(event);
44-
if events.len() == CAPACITY {
45-
sender.send(events).await?;
46-
events = Vec::with_capacity(CAPACITY);
47-
}
4853
}
4954
}
5055
}

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ pub async fn client_rpc_loop<
185185
senders.events.replay_journal(tx1);
186186
stream_history_events(&mut tx, rx1).await;
187187
if let Some((rx2, listener_id)) = live {
188+
let _ = tx.send(ToClientMessage::EventLiveBoundary).await;
188189
stream_events(&mut tx, &mut rx, rx2).await;
189190
senders.events.unregister_listener(listener_id);
190191
}

crates/hyperqueue/src/transfer/messages.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub enum FromClientMessage {
4141
// This command switches the connection into streaming connection,
4242
// it will no longer reacts to any other client messages
4343
// and client will only receive ToClientMessage::Event
44+
// or ToClientMessage::EventLiveBoundary
4445
StreamEvents(StreamEvents),
4546
PruneJournal,
4647
FlushJournal,
@@ -348,6 +349,9 @@ pub enum ToClientMessage {
348349
Error(String),
349350
ServerInfo(ServerInfo),
350351
Event(Event),
352+
// This indicates in live event streaming when old events where
353+
// old streamed, and now we are getting new ones
354+
EventLiveBoundary,
351355
Finished, // Generic response, now used only for journal pruning/flushing
352356
}
353357

0 commit comments

Comments
 (0)