Skip to content

Commit b3767d0

Browse files
committed
Adds flag --history-only to "hq journal stream"
1 parent 19cbeae commit b3767d0

File tree

8 files changed

+92
-53
lines changed

8 files changed

+92
-53
lines changed

crates/hyperqueue/src/client/commands/journal/mod.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use crate::common::utils::str::pluralize;
66
use crate::rpc_call;
77
use crate::server::bootstrap::get_client_session;
88
use crate::server::event::journal::JournalReader;
9-
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
9+
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
1010
use anyhow::anyhow;
1111
use clap::{Parser, ValueHint};
1212
use std::io::{BufWriter, Write};
@@ -25,8 +25,8 @@ enum JournalCommand {
2525
/// Events will be exported to `stdout`, you can redirect it e.g. to a file.
2626
Export(ExportOpts),
2727

28-
/// Live stream events from the server.
29-
Stream,
28+
/// Stream events from a running server.
29+
Stream(StreamOpts),
3030

3131
/// Connect to a server and remove completed tasks and non-active workers from journal
3232
Prune,
@@ -43,20 +43,30 @@ struct ExportOpts {
4343
journal: PathBuf,
4444
}
4545

46+
#[derive(Parser)]
47+
struct StreamOpts {
48+
/// If enabled, server terminates the connection when all currents events
49+
/// are sent.
50+
#[arg(long)]
51+
history_only: bool,
52+
}
53+
4654
pub async fn command_journal(gsettings: &GlobalSettings, opts: JournalOpts) -> anyhow::Result<()> {
4755
match opts.command {
4856
JournalCommand::Export(opts) => export_json(opts),
49-
JournalCommand::Stream => stream_json(gsettings).await,
57+
JournalCommand::Stream(opts) => stream_json(gsettings, opts).await,
5058
JournalCommand::Prune => prune_journal(gsettings).await,
5159
JournalCommand::Flush => flush_journal(gsettings).await,
5260
}
5361
}
5462

55-
async fn stream_json(gsettings: &GlobalSettings) -> anyhow::Result<()> {
63+
async fn stream_json(gsettings: &GlobalSettings, opts: StreamOpts) -> anyhow::Result<()> {
5664
let mut connection = get_client_session(gsettings.server_directory()).await?;
5765
connection
5866
.connection()
59-
.send(FromClientMessage::StreamEvents)
67+
.send(FromClientMessage::StreamEvents(StreamEvents {
68+
history_only: opts.history_only,
69+
}))
6070
.await?;
6171
let stdout = std::io::stdout();
6272
let stdout = stdout.lock();

crates/hyperqueue/src/dashboard/data/fetch.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::server::event::Event;
22
use crate::transfer::connection::ClientSession;
3-
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
3+
use crate::transfer::messages::{FromClientMessage, StreamEvents, ToClientMessage};
44
use std::time::Duration;
55
use tokio::sync::mpsc::Sender;
66

@@ -10,7 +10,9 @@ pub async fn create_data_fetch_process(
1010
) -> anyhow::Result<()> {
1111
session
1212
.connection()
13-
.send(FromClientMessage::StreamEvents)
13+
.send(FromClientMessage::StreamEvents(StreamEvents {
14+
history_only: false,
15+
}))
1416
.await?;
1517

1618
const CAPACITY: usize = 1024;

crates/hyperqueue/src/server/client/mod.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -75,21 +75,26 @@ async fn handle_client(
7575
Ok(())
7676
}
7777

78-
async fn stream_events<
79-
Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static,
80-
Rx: Stream<Item = crate::Result<FromClientMessage>> + Unpin,
81-
>(
78+
async fn stream_history_events<Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static>(
8279
tx: &mut Tx,
83-
rx: &mut Rx,
8480
mut history: mpsc::UnboundedReceiver<Event>,
85-
mut current: mpsc::UnboundedReceiver<Event>,
8681
) {
8782
log::debug!("Resending history started");
8883
while let Some(e) = history.recv().await {
8984
if tx.send(ToClientMessage::Event(e)).await.is_err() {
9085
return;
9186
}
9287
}
88+
}
89+
90+
async fn stream_events<
91+
Tx: Sink<ToClientMessage, Error = HqError> + Unpin + 'static,
92+
Rx: Stream<Item = crate::Result<FromClientMessage>> + Unpin,
93+
>(
94+
tx: &mut Tx,
95+
rx: &mut Rx,
96+
mut current: mpsc::UnboundedReceiver<Event>,
97+
) {
9398
log::debug!("History streaming completed");
9499
loop {
95100
let r = tokio::select! {
@@ -164,18 +169,25 @@ pub async fn client_rpc_loop<
164169
FromClientMessage::CloseJob(msg) => {
165170
handle_job_close(&state_ref, senders, &msg.selector).await
166171
}
167-
FromClientMessage::StreamEvents => {
172+
FromClientMessage::StreamEvents(msg) => {
168173
log::debug!("Start streaming events to client");
169174
/* We create two event queues, one for historic events and one for live events
170175
So while historic events are loaded from the file and streamed, live events are already
171176
collected and sent immediately the historic events are sent */
172177
let (tx1, rx1) = mpsc::unbounded_channel::<Event>();
173-
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
174-
let listener_id = senders.events.register_listener(tx1, tx2);
175-
176-
stream_events(&mut tx, &mut rx, rx1, rx2).await;
177-
178-
senders.events.unregister_listener(listener_id);
178+
let live = if !msg.history_only {
179+
let (tx2, rx2) = mpsc::unbounded_channel::<Event>();
180+
let listener_id = senders.events.register_listener(tx2);
181+
Some((rx2, listener_id))
182+
} else {
183+
None
184+
};
185+
senders.events.replay_journal(tx1);
186+
stream_history_events(&mut tx, rx1).await;
187+
if let Some((rx2, listener_id)) = live {
188+
stream_events(&mut tx, &mut rx, rx2).await;
189+
senders.events.unregister_listener(listener_id);
190+
}
179191
break;
180192
}
181193
FromClientMessage::ServerInfo => {

crates/hyperqueue/src/server/event/journal/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use tokio::sync::mpsc;
1515

1616
pub enum EventStreamMessage {
1717
Event(Event),
18-
RegisterListener(mpsc::UnboundedSender<Event>),
18+
ReplayJournal(mpsc::UnboundedSender<Event>),
1919
PruneJournal {
2020
callback: tokio::sync::oneshot::Sender<()>,
2121
live_jobs: Set<JobId>,
@@ -88,7 +88,7 @@ async fn streaming_process(
8888
break
8989
}
9090
}
91-
Some(EventStreamMessage::RegisterListener(tx)) => {
91+
Some(EventStreamMessage::ReplayJournal(tx)) => {
9292
/* We are blocking the thread here, but it is intended.
9393
But we are blocking just a thread managing log file, not the whole HQ
9494
And while this read is performed, we cannot allow modification of the file,

crates/hyperqueue/src/server/event/streamer.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -169,11 +169,19 @@ impl EventStreamer {
169169
});
170170
}
171171

172-
pub fn register_listener(
173-
&self,
174-
history_sender: mpsc::UnboundedSender<Event>,
175-
current_sender: mpsc::UnboundedSender<Event>,
176-
) -> u32 {
172+
pub fn replay_journal(&self, history_sender: mpsc::UnboundedSender<Event>) {
173+
let inner = self.inner.get();
174+
if let Some(ref streamer) = inner.storage_sender {
175+
if streamer
176+
.send(EventStreamMessage::ReplayJournal(history_sender))
177+
.is_err()
178+
{
179+
log::error!("Event streaming queue has been closed.");
180+
}
181+
}
182+
}
183+
184+
pub fn register_listener(&self, current_sender: mpsc::UnboundedSender<Event>) -> u32 {
177185
let mut inner = self.inner.get_mut();
178186
let listener_id = inner
179187
.client_listeners
@@ -183,14 +191,6 @@ impl EventStreamer {
183191
.unwrap_or(0)
184192
+ 1;
185193
inner.client_listeners.push((current_sender, listener_id));
186-
if let Some(ref streamer) = inner.storage_sender {
187-
if streamer
188-
.send(EventStreamMessage::RegisterListener(history_sender))
189-
.is_err()
190-
{
191-
log::error!("Event streaming queue has been closed.");
192-
}
193-
}
194194
listener_id
195195
}
196196

crates/hyperqueue/src/transfer/messages.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub enum FromClientMessage {
4141
// This command switches the connection into streaming connection,
4242
// it will no longer reacts to any other client messages
4343
// and client will only receive ToClientMessage::Event
44-
StreamEvents,
44+
StreamEvents(StreamEvents),
4545
PruneJournal,
4646
FlushJournal,
4747
}
@@ -86,6 +86,11 @@ pub struct TaskKindProgram {
8686
pub task_dir: bool,
8787
}
8888

89+
#[derive(Serialize, Deserialize, Debug, Clone)]
90+
pub struct StreamEvents {
91+
pub history_only: bool,
92+
}
93+
8994
#[derive(Serialize, Deserialize, Debug, Clone)]
9095
pub enum TaskKind {
9196
ExternalProgram(TaskKindProgram),

tests/autoalloc/test_autoalloc.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,15 +88,17 @@ def test_slurm_queue_sbatch_additional_output(hq_env: HqEnv):
8888
class Handler(SlurmCommandHandler):
8989
async def handle_submit(self, input: CommandInput) -> CommandOutput:
9090
output = await super().handle_submit(input)
91-
return response(f"""
91+
return response(
92+
f"""
9293
No reservation for this job
9394
--> Verifying valid submit host (login)...OK
9495
--> Verifying valid jobname...OK
9596
--> Verifying valid ssh keys...OK
9697
--> Verifying access to desired queue (normal)...OK
9798
--> Checking available allocation...OK
9899
{output.stdout}
99-
""")
100+
"""
101+
)
100102

101103
with MockJobManager(hq_env, Handler(DefaultManager())):
102104
hq_env.start_server()

tests/test_events.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ def test_worker_stream_events2(hq_env: HqEnv, tmp_path):
6464
assert events[2]["event"]["desc"]["name"] == "uname"
6565

6666

67+
def test_worker_stream_history_only(hq_env: HqEnv, tmp_path):
68+
journal = tmp_path.joinpath("test.journal")
69+
hq_env.start_server(args=["--journal", journal])
70+
r = hq_env.command(["journal", "stream", "--history-only"])
71+
msg = json.loads(r)
72+
assert msg["event"]["type"] == "server-start"
73+
74+
6775
def test_worker_connected_event(hq_env: HqEnv):
6876
def body():
6977
hq_env.start_worker()
@@ -154,21 +162,21 @@ def body():
154162
with hq_env.mock.mock_program_with_code(
155163
"rocm-smi",
156164
"""
157-
import json
158-
data = {
159-
"card0": {
160-
"GPU use (%)": "1.5",
161-
"GPU memory use (%)": "12.5",
162-
"PCI Bus": "FOOBAR1"
163-
},
164-
"card1": {
165-
"GPU use (%)": "12.5",
166-
"GPU memory use (%)": "64.0",
167-
"PCI Bus": "FOOBAR2"
165+
import json
166+
data = {
167+
"card0": {
168+
"GPU use (%)": "1.5",
169+
"GPU memory use (%)": "12.5",
170+
"PCI Bus": "FOOBAR1"
171+
},
172+
"card1": {
173+
"GPU use (%)": "12.5",
174+
"GPU memory use (%)": "64.0",
175+
"PCI Bus": "FOOBAR2"
176+
}
168177
}
169-
}
170-
print(json.dumps(data))
171-
""",
178+
print(json.dumps(data))
179+
""",
172180
):
173181
hq_env.start_worker(args=["--overview-interval", "10ms", "--resource", "gpus/amd=[0]"])
174182
wait_for_worker_state(hq_env, 1, "RUNNING")

0 commit comments

Comments
 (0)