Skip to content

Commit 1d2e30a

Browse files
committed
Implement hq server wait command
1 parent 8f42784 commit 1d2e30a

4 files changed

Lines changed: 129 additions & 0 deletions

File tree

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
* Resource policy `compact!` is now allowed to take fractional resource request.
2121
* There is a new command `hq alloc cat <alloc-id> <stdout/stderr>`, which can be used
2222
to debug the output of allocations submitted by the automatic allocator.
23+
* There is a new command `hq server wait` that repeatedly tries to connect to a server with a configurable timeout.
24+
This is useful for deployment scripts that need to wait for server availability.
2325

2426
### Fixes
2527

crates/hyperqueue/src/client/commands/server.rs

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::server::bootstrap::{
1313
use crate::transfer::auth::generate_key;
1414
use crate::transfer::messages::{FromClientMessage, ToClientMessage};
1515
use clap::Parser;
16+
use humantime::format_duration;
1617
use std::path::PathBuf;
1718
use std::sync::Arc;
1819
use std::time::Duration;
@@ -71,6 +72,8 @@ pub enum ServerCommand {
7172
GenerateAccess(GenerateAccessOpts),
7273
/// Dump internal scheduler info into a file
7374
DebugDump(DebugDumpOpts),
75+
/// Wait until the server becomes available
76+
Wait(WaitOpts),
7477
}
7578

7679
#[derive(Parser)]
@@ -141,13 +144,28 @@ pub struct DebugDumpOpts {
141144
path: PathBuf,
142145
}
143146

147+
#[derive(Parser)]
148+
pub struct WaitOpts {
149+
/// Timeout after which to stop trying to connect.
150+
///
151+
/// If we cannot connect until the timeout expires, the wait command will fail.
152+
#[arg(
153+
long,
154+
value_parser = parse_hms_or_human_time,
155+
default_value = "5m",
156+
help = duration_doc!("Timeout after which to stop trying to connect.")
157+
)]
158+
timeout: Duration,
159+
}
160+
144161
pub async fn command_server(gsettings: &GlobalSettings, opts: ServerOpts) -> anyhow::Result<()> {
145162
match opts.subcmd {
146163
ServerCommand::Start(opts) => start_server(gsettings, opts).await,
147164
ServerCommand::Stop(opts) => stop_server(gsettings, opts).await,
148165
ServerCommand::Info(opts) => command_server_info(gsettings, opts).await,
149166
ServerCommand::GenerateAccess(opts) => command_server_generate_access(gsettings, opts),
150167
ServerCommand::DebugDump(opts) => debug_dump(gsettings, opts).await,
168+
ServerCommand::Wait(opts) => wait_for_server(gsettings, opts).await,
151169
}
152170
}
153171

@@ -282,3 +300,40 @@ fn command_server_generate_access(
282300
}
283301
Ok(())
284302
}
303+
304+
async fn wait_for_server(gsettings: &GlobalSettings, opts: WaitOpts) -> anyhow::Result<()> {
305+
let retry_interval = Duration::from_secs(5);
306+
307+
log::info!(
308+
"Waiting for server to become available (timeout: {})...",
309+
format_duration(opts.timeout)
310+
);
311+
312+
let result = tokio::time::timeout(opts.timeout, async {
313+
loop {
314+
match get_client_session(gsettings.server_directory()).await {
315+
Ok(_session) => {
316+
log::info!("Successfully connected to server");
317+
break;
318+
}
319+
Err(_) => {
320+
log::debug!(
321+
"Server not yet available, retrying in {}s...",
322+
retry_interval.as_secs()
323+
);
324+
tokio::time::sleep(retry_interval).await;
325+
}
326+
}
327+
}
328+
})
329+
.await;
330+
331+
match result {
332+
Ok(()) => Ok(()),
333+
Err(_) => Err(anyhow::anyhow!(
334+
"Timeout after {}: Could not connect to server at `{}`. Make sure that the server is running.",
335+
format_duration(opts.timeout),
336+
gsettings.server_directory().display()
337+
)),
338+
}
339+
}

docs/deployment/server.md

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,28 @@ The [`hq journal prune`](cli:hq.journal.prune) command removes all completed job
127127

128128
The [`hq journal flush`](cli:hq.journal.flush) command will force the server to flush the journal, so that the latest state of affairs is persisted to disk. It is mainly useful for testing or if you are going to run `hq journal export` while a server is running (however, it is usually better to use `hq journal stream`).
129129

130+
## Waiting for server availability
131+
132+
If you need to wait for a server to become available (for example when coordinating server startup in scripts),
133+
you can use the [`hq server wait`](cli:hq.server.wait) command:
134+
135+
```bash
136+
$ hq server wait
137+
```
138+
139+
This command will repeatedly attempt to connect to the server (every 5 seconds) until it succeeds or until a timeout
140+
is reached. By default, it will wait for up to 5 minutes, but you can specify a custom timeout[^1]:
141+
142+
```bash
143+
# Wait for up to 2 minutes
144+
$ hq server wait --timeout 2m
145+
```
146+
147+
[^1]: You can use various [shortcuts](../cli/shortcuts.md#duration) for the timeout duration.
148+
149+
This is particularly useful in deployment scripts where you start a server and then need to ensure it's ready before
150+
proceeding with other operations like connecting workers or submitting jobs.
151+
130152
## Stopping the server
131153

132154
You can stop a running server with the [`hq server stop`](cli:hq.server.stop) command:

tests/test_server.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import signal
55
import socket
66
import subprocess
7+
import time
78
from typing import List
89

910
import pytest
@@ -258,3 +259,52 @@ def test_server_debug_dump(hq_env: HqEnv):
258259
with open("out.json") as f:
259260
result = json.loads(f.read())
260261
assert "tako" in result
262+
263+
264+
def test_server_wait_timeout_no_server(hq_env: HqEnv, tmp_path):
265+
"""Test that 'hq server wait' times out when no server is running"""
266+
start_time = time.time()
267+
268+
with pytest.raises(Exception, match=r"Timeout after 2s.*Could not connect to server"):
269+
hq_env.command(["server", "wait", "--timeout", "2s", "--server-dir", str(tmp_path)], use_server_dir=False)
270+
271+
elapsed = time.time() - start_time
272+
273+
# Should timeout after roughly 2 seconds (allow some tolerance)
274+
assert 1.5 < elapsed < 4.0
275+
276+
277+
def test_server_wait_success_immediate(hq_env: HqEnv):
278+
"""Test that 'hq server wait' succeeds immediately when server is already running"""
279+
hq_env.start_server()
280+
281+
start_time = time.time()
282+
hq_env.command(["server", "wait", "--timeout", "5s"])
283+
elapsed = time.time() - start_time
284+
285+
# Should succeed quickly (less than 5s timeout)
286+
assert elapsed < 4.0
287+
288+
289+
def test_server_wait_success_delayed(hq_env: HqEnv, tmp_path):
290+
"""Test that 'hq server wait' succeeds when server starts during the wait period"""
291+
292+
def delayed_server_start():
293+
time.sleep(2)
294+
hq_env.start_server(server_dir=str(tmp_path))
295+
296+
import threading
297+
298+
server_thread = threading.Thread(target=delayed_server_start)
299+
300+
start_time = time.time()
301+
server_thread.start()
302+
303+
# Should succeed when server starts after ~2s
304+
hq_env.command(["server", "wait", "--timeout", "10s", "--server-dir", str(tmp_path)], use_server_dir=False)
305+
elapsed = time.time() - start_time
306+
307+
server_thread.join()
308+
309+
# Should complete after roughly 2-3 seconds (when server becomes available)
310+
assert 1 < elapsed < 6.0

0 commit comments

Comments
 (0)