diff --git a/.gitignore b/.gitignore index 96548ba..75647e1 100644 --- a/.gitignore +++ b/.gitignore @@ -40,3 +40,4 @@ tmp/ temp/ .claude/ +NUL diff --git a/src/cli.zig b/src/cli.zig index d10c241..634b0c3 100644 --- a/src/cli.zig +++ b/src/cli.zig @@ -4,6 +4,7 @@ //! and subcommands (vmu zig/zls). const std = @import("std"); + const errors = @import("core/errors.zig"); /// Supported shell types for completion generation. @@ -21,6 +22,7 @@ pub const Command = enum { vmu, mirrorlist, proxy, + probe, completion, version, help, @@ -38,6 +40,7 @@ pub const InstallFlags = packed struct { zls: bool = false, full: bool = false, nomirror: bool = false, + reprobe: bool = false, }; /// Flags for the list command. @@ -83,6 +86,7 @@ pub const ParsedCommand = union(Command) { proxy: struct { url: ?[]const u8, }, + probe, completion: struct { shell: ShellType, }, @@ -112,6 +116,7 @@ const command_aliases = std.StaticStringMap(Command).initComptime(.{ .{ "vmu", .vmu }, .{ "mirrorlist", .mirrorlist }, .{ "proxy", .proxy }, + .{ "probe", .probe }, .{ "completion", .completion }, .{ "version", .version }, .{ "help", .help }, @@ -177,6 +182,7 @@ pub fn parse(allocator: std.mem.Allocator, init: std.process.Init.Minimal) !stru .vmu => try parseVmu(allocator, &args), .mirrorlist => try parseMirrorlist(allocator, &args), .proxy => try parseProxy(allocator, &args), + .probe => parseProbe(&args), .completion => try parseCompletion(&args), .version => ParsedCommand.version, .help => ParsedCommand{ .help = null }, @@ -224,6 +230,8 @@ fn parseInstall(allocator: std.mem.Allocator, args: anytype) !ParsedCommand { flags.full = true; } else if (std.mem.eql(u8, arg, "--nomirror")) { flags.nomirror = true; + } else if (std.mem.eql(u8, arg, "--test") or std.mem.eql(u8, arg, "-t")) { + flags.reprobe = true; } else { version = try allocator.dupe(u8, arg); } @@ -330,6 +338,13 @@ fn parseProxy(allocator: std.mem.Allocator, args: anytype) !ParsedCommand { return .{ .proxy = .{ .url = null } }; } +fn parseProbe(args: anytype) ParsedCommand { + if (args.next()) |arg| { + if (checkHelp(.probe, arg)) |h| return h; + } + return ParsedCommand.probe; +} + fn parseCompletion(args: anytype) !ParsedCommand { const shell_str = args.next() orelse return error.MissingArgument; if (checkHelp(.completion, shell_str)) |h| return h; @@ -361,6 +376,7 @@ pub fn printHelp(writer: *std.Io.Writer) !void { \\ vmu Set version map source (zig/zls) \\ mirrorlist Set mirror distribution server \\ proxy Set HTTP/HTTPS proxy for downloads + \\ probe Test mirror speeds without installing \\ completion Generate shell completion script \\ version Print zvm version \\ help Print this help message @@ -389,6 +405,7 @@ pub fn printCommandHelp(writer: *std.Io.Writer, cmd: Command) !void { \\ --zls Also install ZLS (Zig Language Server) \\ --full Install ZLS with full compatibility mode \\ --nomirror Skip community mirror downloads + \\ --test, -t Force re-probe mirrors (ignore cache) \\ \\Examples: \\ zvm install master Install latest nightly @@ -488,6 +505,16 @@ pub fn printCommandHelp(writer: *std.Io.Writer, cmd: Command) !void { \\ zvm proxy Show current proxy setting \\ ), + .probe => try writer.writeAll( + \\Test mirror download speeds without installing anything. + \\ + \\Downloads data from each mirror for 5 seconds and measures + \\throughput. Results are displayed in real-time. + \\ + \\Usage: + \\ zvm probe + \\ + ), .completion => try writer.writeAll( \\Generate shell completion script. \\ diff --git a/src/command/install.zig b/src/command/install.zig index 54acd36..59b1ec8 100644 --- a/src/command/install.zig +++ b/src/command/install.zig @@ -111,7 +111,7 @@ fn installVersion( try http_client.downloadToFileWithProxy(allocator, zvm.io, zvm.environ_map, tar_url, archive_path, zvm.settings.proxy, stdout); break :blk tar_url; } else blk: { - const mirror_url = http_client.attemptMirrorDownload(allocator, zvm.io, zvm.environ_map, zvm.settings.mirror_list_url, tar_url, archive_path, stdout, stdout, &zvm.settings) catch { + const mirror_url = http_client.attemptMirrorDownload(allocator, zvm.io, zvm.environ_map, zvm.settings.mirror_list_url, tar_url, archive_path, stdout, stdout, &zvm.settings, flags.reprobe) catch { try http_client.downloadToFileWithProxy(allocator, zvm.io, zvm.environ_map, tar_url, archive_path, zvm.settings.proxy, stdout); break :blk tar_url; }; diff --git a/src/command/probe.zig b/src/command/probe.zig new file mode 100644 index 0000000..83f38cb --- /dev/null +++ b/src/command/probe.zig @@ -0,0 +1,106 @@ +//! Probe command — test mirror download speeds without installing. +//! Fetches the version map and mirror list, probes all mirrors for +//! latency and throughput, and displays sorted results. + +const std = @import("std"); + +const Console = @import("../core/Console.zig"); +const platform = @import("../core/platform.zig"); +const zvm_mod = @import("../core/zvm.zig"); +const http_client = @import("../network/http_client.zig"); +const mirror_probe = @import("../network/mirror_probe.zig"); +const version_map = @import("../network/version_map.zig"); + +pub fn run( + zvm: *zvm_mod.ZVM, + allocator: std.mem.Allocator, + console: Console, +) !void { + const stdout = console.stdout.writer; + const proxy = zvm.settings.proxy; + + // 1. Fetch version map to get a real tar URL for probing + console.plain("Fetching version map...", .{}); + const parsed_map = version_map.fetchVersionMap(allocator, zvm.io, zvm.environ_map, zvm.settings.version_map_url, proxy) catch { + console.err("Failed to fetch version map", .{}); + return; + }; + defer parsed_map.deinit(); + const vmap = &parsed_map.value.object; + + // 2. Get master tar URL for the current platform + const sys_info = platform.zigStyleSystemInfo(); + var plat_buf: [128]u8 = undefined; + const target = platform.platformTarget(&plat_buf, sys_info); + + const tar_url = version_map.getTarPath("master", target, vmap) catch { + console.err("Failed to find download for your platform", .{}); + return; + }; + + const filename = if (std.mem.lastIndexOfScalar(u8, tar_url, '/')) |idx| tar_url[idx + 1 ..] else tar_url; + + // 3. Fetch mirror list + if (zvm.settings.mirror_list_url.len == 0) { + console.err("No mirror list configured. Use 'zvm mirrorlist ' to set one.", .{}); + return; + } + + const mirror_list_content = http_client.downloadToMemoryWithProxy(allocator, zvm.io, zvm.environ_map, zvm.settings.mirror_list_url, proxy) catch { + console.err("Failed to fetch mirror list", .{}); + return; + }; + defer allocator.free(mirror_list_content); + + // 4. Parse mirrors (one URL per line) + var mirrors: std.ArrayList([]const u8) = .empty; + defer mirrors.deinit(allocator); + + var lines = std.mem.splitSequence(u8, mirror_list_content, "\n"); + while (lines.next()) |line| { + const trimmed = std.mem.trim(u8, line, " \r"); + if (trimmed.len == 0) continue; + try mirrors.append(allocator, trimmed); + } + + if (mirrors.items.len == 0) { + console.err("No mirrors found in mirror list", .{}); + return; + } + + // 5. Probe all mirrors + var candidates: std.ArrayList(mirror_probe.MirrorCandidate) = .empty; + defer { + for (candidates.items) |c| { + if (c.owned) allocator.free(c.url); + } + candidates.deinit(allocator); + } + + try mirror_probe.probeAll(allocator, zvm.io, zvm.environ_map, tar_url, &mirrors, filename, proxy, &candidates, stdout); + + // 6. Sort by bandwidth (with latency tiebreaker) + std.mem.sort(mirror_probe.MirrorCandidate, candidates.items, {}, mirror_probe.greaterThanByBandwidth); + + // 7. Display sorted summary + if (candidates.items.len == 0) { + console.plain(" No mirrors responded.", .{}); + return; + } + + try stdout.print("\n Summary (sorted by speed):\n", .{}); + for (candidates.items, 1..) |candidate, rank| { + var latency_buf: [64]u8 = undefined; + var speed_buf: [64]u8 = undefined; + const lat_str = if (candidate.latency_ns > 0) + mirror_probe.formatLatency(&latency_buf, candidate.latency_ns) + else + "?"; + const spd_str = mirror_probe.formatThroughput(&speed_buf, candidate.bandwidth_bps); + const marker = if (rank == 1) " <-- fastest" else ""; + try stdout.print(" {d:>3}. {s:<32} latency: {s:<10} speed: {s}/s{s}\n", .{ + rank, mirror_probe.shortUrl(candidate.url), lat_str, spd_str, marker, + }); + } + try stdout.flush(); +} diff --git a/src/completion.zig b/src/completion.zig index 21df449..9fc8a05 100644 --- a/src/completion.zig +++ b/src/completion.zig @@ -4,6 +4,7 @@ //! new commands trigger a compile error here until their metadata is added. const std = @import("std"); + const cli = @import("cli.zig"); const Console = @import("core/Console.zig"); @@ -96,6 +97,9 @@ fn cmdMeta(cmd: cli.Command) CmdMeta { .desc = "Set HTTP/HTTPS proxy for downloads", .arg = .url, }, + .probe => .{ + .desc = "Test mirror speeds without installing", + }, .completion => .{ .desc = "Generate shell completion script", .arg = .shell_type, diff --git a/src/main.zig b/src/main.zig index 8b8d00b..a22581b 100644 --- a/src/main.zig +++ b/src/main.zig @@ -6,6 +6,8 @@ const std = @import("std"); const Io = std.Io; const build_options = @import("build_options"); +/// Current version of zvm, injected at build time from git tag or -Dversion=. +const VERSION = build_options.version; const cli = @import("cli.zig"); const Console = @import("core/Console.zig"); @@ -14,9 +16,6 @@ const platform = @import("core/platform.zig"); const update_check = @import("core/update_check.zig"); const zvm_mod = @import("core/zvm.zig"); -/// Current version of zvm, injected at build time from git tag or -Dversion=. -const VERSION = build_options.version; - /// Full version string with 'v' prefix and git commit hash. fn fullVersion() []const u8 { return "v" ++ VERSION ++ " (" ++ build_options.git_commit ++ ")"; @@ -128,6 +127,10 @@ pub fn main(init: std.process.Init) !void { const proxy_mod = @import("command/proxy.zig"); proxy_mod.run(&zvm, allocator, proxy_cmd.url, console) catch |err| commandFail(console, err); }, + .probe => { + const probe = @import("command/probe.zig"); + probe.run(&zvm, allocator, console) catch |err| commandFail(console, err); + }, .completion => |comp_cmd| { const completion = @import("completion.zig"); completion.run(comp_cmd.shell, console) catch |err| commandFail(console, err); diff --git a/src/network/http_client.zig b/src/network/http_client.zig index 798f1f5..4b10816 100644 --- a/src/network/http_client.zig +++ b/src/network/http_client.zig @@ -6,6 +6,7 @@ const std = @import("std"); const builtin = @import("builtin"); + const settings_mod = @import("../core/settings.zig"); const mirror_probe = @import("mirror_probe.zig"); const proxy_tunnel = @import("proxy_tunnel.zig"); @@ -305,12 +306,13 @@ pub fn attemptMirrorDownload( verbose_writer: ?*std.Io.Writer, progress_writer: ?*std.Io.Writer, settings: *settings_mod.Settings, + force_probe: bool, ) ![]const u8 { // Extract filename once (used for constructing mirror URLs and extracting base URL) const filename = if (std.mem.lastIndexOfScalar(u8, original_url, '/')) |idx| original_url[idx + 1 ..] else original_url; // --- Cache-fast path: try cached mirror if fresh --- - if (settings.preferred_mirror.len > 0 and settings.mirror_updated_at > 0) { + if (!force_probe and settings.preferred_mirror.len > 0 and settings.mirror_updated_at > 0) { const now = std.Io.Clock.Timestamp.now(io, .real).raw.toSeconds(); const age = now - settings.mirror_updated_at; if (age >= 0 and age < MIRROR_CACHE_TTL) { @@ -388,7 +390,7 @@ fn probeAndDownload( var candidates: std.ArrayList(mirror_probe.MirrorCandidate) = .empty; defer candidates.deinit(allocator); - // Probe all candidates concurrently + // Probe all candidates sequentially try mirror_probe.probeAll(allocator, io, environ_map, original_url, &mirrors, filename, proxy, &candidates, progress_writer); // If no candidates responded, fall back to the original URL @@ -397,23 +399,21 @@ fn probeAndDownload( return allocator.dupe(u8, original_url); } - // Sort candidates by latency (fastest first) - std.mem.sort(mirror_probe.MirrorCandidate, candidates.items, {}, mirror_probe.lessThanByLatency); + // Sort candidates by bandwidth (fastest first) + std.mem.sort(mirror_probe.MirrorCandidate, candidates.items, {}, mirror_probe.greaterThanByBandwidth); - // Print latency results if verbose output is requested + // Show which mirror was selected if (verbose_writer) |vw| { - var time_buf: [64]u8 = undefined; - try vw.print(" Probed {d} source(s):\n", .{candidates.items.len}); - for (candidates.items, 1..) |candidate, rank| { - const time_str = mirror_probe.formatLatency(&time_buf, candidate.latency_ns); - const marker = if (rank == 1) " <-- fastest" else ""; - const short_url = mirror_probe.shortUrl(candidate.url); - try vw.print(" {d}. {s} ({s}{s})\n", .{ rank, short_url, time_str, marker }); - } + const best = candidates.items[0]; + var speed_buf: [64]u8 = undefined; + var lat_buf: [64]u8 = undefined; + const spd_str = mirror_probe.formatThroughput(&speed_buf, best.bandwidth_bps); + const lat_str = mirror_probe.formatLatency(&lat_buf, best.latency_ns); + try vw.print(" Selected: {s} ({s}/s, latency: {s})\n\n", .{ mirror_probe.shortUrl(best.url), spd_str, lat_str }); try vw.flush(); } - // Try downloading from candidates in latency order + // Try downloading from candidates in bandwidth order for (candidates.items, 0..) |candidate, idx| { downloadToFileWithProxy(allocator, io, environ_map, candidate.url, dest_path, proxy, progress_writer) catch { continue; diff --git a/src/network/mirror_probe.zig b/src/network/mirror_probe.zig index dda38d7..5660fea 100644 --- a/src/network/mirror_probe.zig +++ b/src/network/mirror_probe.zig @@ -1,228 +1,127 @@ -//! Concurrent mirror latency probing. -//! On POSIX: uses raw TCP connect (getaddrinfo + non-blocking socket + poll with timeout) -//! for maximum parallelism — completely bypasses std.http.Client and std.Io. -//! On Windows: uses std.http.Client per probe thread (Winsock API not fully wrapped in Zig 0.16). -//! A background ticker thread provides real-time progress display. -//! -//! Cross-compilation note: native_os (builtin.os.tag) is a comptime constant, so -//! `if (native_os == .windows) return;` guards prevent the dead POSIX branch from -//! being type-checked on Windows, avoiding errors from unavailable libc symbols. +//! Sequential mirror probing. +//! Tests each mirror one by one, measuring both latency (HTTP HEAD round-trip) +//! and throughput (5-second download to NUL). Results are displayed immediately +//! as each mirror is tested, giving the user real-time visibility. const std = @import("std"); const builtin = @import("builtin"); -const native_os = builtin.os.tag; -const http_client = @import("http_client.zig"); -const proxy_tunnel = @import("proxy_tunnel.zig"); -/// Per-probe timeout in seconds. -const PROBE_TIMEOUT_S: u64 = 2; +const proxy_tunnel = @import("proxy_tunnel.zig"); -/// A candidate URL with its measured latency. +/// A candidate URL with its measured latency and throughput. pub const MirrorCandidate = struct { url: []const u8, + /// HTTP HEAD round-trip time latency_ns: u64, + /// Bytes per second (0 = bandwidth measurement failed) + bandwidth_bps: u64, owned: bool, }; -/// Result of a single latency probe, written by a probe thread. -const ProbeResult = struct { - url: []const u8, - latency_ns: u64, -}; +/// ============================================================ +/// Internal helpers +/// ============================================================ +fn setupClient(client: *std.http.Client, allocator: std.mem.Allocator, environ_map: *std.process.Environ.Map, proxy: []const u8) void { + if (proxy.len > 0) { + proxy_tunnel.setProxyFromUrl(client, allocator, proxy) catch {}; + } else { + client.initDefaultProxies(allocator, environ_map) catch {}; + } +} -/// Context passed to each probe thread. -const ProbeThreadContext = struct { - allocator: std.mem.Allocator, - url: []const u8, - result: *?ProbeResult, - done: *std.atomic.Value(usize), - // Fields used only by Windows fallback - io: ?std.Io = null, - environ_map: ?*std.process.Environ.Map = null, - proxy: []const u8 = "", -}; +/// Measure latency via HTTP HEAD (DNS + TCP + TLS + response). +fn measureLatency(allocator: std.mem.Allocator, io: std.Io, environ_map: *std.process.Environ.Map, url: []const u8, proxy: []const u8) ?u64 { + var client: std.http.Client = .{ .allocator = allocator, .io = io }; + defer client.deinit(); + setupClient(&client, allocator, environ_map, proxy); -/// Shared state for the background progress ticker thread. -const TickerContext = struct { - total: usize, - done: *std.atomic.Value(usize), - start_ns: i96, - stop: std.atomic.Value(bool), - current_url: []const u8 = "", -}; + const uri = std.Uri.parse(url) catch return null; + const start = std.Io.Timestamp.now(io, .awake); -/// Background thread that refreshes the progress display at a fixed interval. -/// POSIX-only: uses clock_gettime, nanosleep, and write(2). -fn tickerThread(ctx: *TickerContext) void { - if (native_os == .windows) return; + var req = client.request(.HEAD, uri, .{ .redirect_behavior = .init(3) }) catch return null; + defer req.deinit(); + req.sendBodiless() catch return null; - var latency_buf: [64]u8 = undefined; - var msg_buf: [256]u8 = undefined; - const req: std.c.timespec = .{ .sec = 0, .nsec = 100_000_000 }; // 100ms - - while (!ctx.stop.load(.acquire)) { - var ts: std.c.timespec = undefined; - _ = std.c.clock_gettime(.MONOTONIC, &ts); - const now_ns: i96 = @as(i96, ts.sec) * 1_000_000_000 + ts.nsec; - const elapsed_ns: u64 = if (now_ns > ctx.start_ns) @intCast(now_ns - ctx.start_ns) else 0; - const elapsed_str = formatLatency(&latency_buf, elapsed_ns); - const done_val = ctx.done.load(.monotonic); - const pct: u32 = if (ctx.total > 0) @intCast(done_val * 100 / ctx.total) else 0; - const msg = std.fmt.bufPrint(&msg_buf, "\x1b[2K\r Probing: {s} ... {d}% ({s})", .{ shortUrl(ctx.current_url), pct, elapsed_str }) catch return; - _ = std.c.write(2, msg.ptr, msg.len); - _ = std.c.nanosleep(&req, null); - } -} + var head_buf: [4096]u8 = undefined; + var response = req.receiveHead(&head_buf) catch return null; -/// ============================================================ -/// POSIX implementation — raw TCP connect -/// ============================================================ -fn probeThreadMainPosix(ctx: *ProbeThreadContext) void { - if (native_os == .windows) { - // Never called on Windows (comptime switch in probeThreadMain), - // but guard prevents body from being type-checked on Windows. - _ = ctx.done.fetchAdd(1, .monotonic); - return; - } - defer _ = ctx.done.fetchAdd(1, .monotonic); - - // Parse URL to extract host and port - const uri = std.Uri.parse(ctx.url) catch return; - const host_component = uri.host orelse return; - const host = host_component.percent_encoded; - const is_https = std.mem.eql(u8, uri.scheme, "https"); - const port: u16 = uri.port orelse if (is_https) @as(u16, 443) else @as(u16, 80); - - // Null-terminate host for getaddrinfo - const host_z = ctx.allocator.dupeZ(u8, host) catch return; - defer ctx.allocator.free(host_z); - - var port_buf: [6:0]u8 = undefined; - const port_str = std.fmt.bufPrintZ(&port_buf, "{d}", .{port}) catch return; - - // Start timing - var start_ts: std.c.timespec = undefined; - _ = std.c.clock_gettime(.MONOTONIC, &start_ts); - - // Resolve hostname - const hints: std.c.addrinfo = .{ - .flags = .{}, - .family = std.c.AF.UNSPEC, - .socktype = std.c.SOCK.STREAM, - .protocol = 0, - .addrlen = 0, - .canonname = null, - .addr = null, - .next = null, - }; - var addr_result: ?*std.c.addrinfo = null; - if (std.c.getaddrinfo(host_z, port_str, &hints, &addr_result) != @as(std.c.EAI, @enumFromInt(0))) return; - defer std.c.freeaddrinfo(addr_result.?); - - const ai = addr_result.?; - - // Create socket - const sock = std.c.socket( - @intCast(ai.family), - @intCast(ai.socktype), - @intCast(ai.protocol), - ); - if (sock < 0) return; - defer _ = std.c.close(sock); - - // Set non-blocking for timeout control - const flags = std.c.fcntl(sock, std.c.F.GETFL, @as(c_int, 0)); - if (flags >= 0) { - const nonblocking: std.c.O = .{ .NONBLOCK = true }; - _ = std.c.fcntl(sock, std.c.F.SETFL, @as(c_int, @bitCast(nonblocking))); - } + const end = std.Io.Timestamp.now(io, .awake); + if (response.head.status.class() != .success) return null; - // Start non-blocking connect - _ = std.c.connect(sock, ai.addr.?, ai.addrlen); - - // Poll for connect completion with timeout - var pfd: [1]std.c.pollfd = .{.{ - .fd = sock, - .events = std.c.POLL.OUT, - .revents = 0, - }}; - const timeout_ms: c_int = @intCast(PROBE_TIMEOUT_S * 1000); - const poll_rc = std.c.poll(&pfd, 1, timeout_ms); - if (poll_rc <= 0) return; // timeout or error - - // Check for connection error - var err_code: u32 = 0; - var err_len: std.c.socklen_t = @sizeOf(u32); - if (std.c.getsockopt(sock, std.c.SOL.SOCKET, std.c.SO.ERROR, &err_code, &err_len) < 0) return; - if (err_code != 0) return; - - // Success — calculate elapsed time - var end_ts: std.c.timespec = undefined; - _ = std.c.clock_gettime(.MONOTONIC, &end_ts); - - const elapsed_ns: i96 = (end_ts.sec - start_ts.sec) * 1_000_000_000 + (end_ts.nsec - start_ts.nsec); - if (elapsed_ns <= 0 or elapsed_ns > PROBE_TIMEOUT_S * std.time.ns_per_s) return; - - ctx.result.* = .{ - .url = ctx.url, - .latency_ns = @intCast(elapsed_ns), - }; + const elapsed: u64 = @intCast(std.Io.Timestamp.durationTo(start, end).nanoseconds); + return if (elapsed > 0) elapsed else null; } -/// ============================================================ -/// Windows fallback — std.http.Client per thread -/// ============================================================ -fn probeThreadMainWindows(ctx: *ProbeThreadContext) void { - defer _ = ctx.done.fetchAdd(1, .monotonic); - - const io = ctx.io orelse return; - const environ_map = ctx.environ_map orelse return; - - var client: std.http.Client = .{ .allocator = ctx.allocator, .io = io }; +/// Measure throughput by downloading data for up to 5 seconds. +/// Uses the same I/O path as the real download (stream to a file writer), +/// but writes to NUL (/dev/null) so no data is stored on disk. +/// Returns bytes per second, or null on failure. +fn measureBandwidth(allocator: std.mem.Allocator, io: std.Io, environ_map: *std.process.Environ.Map, url: []const u8, proxy: []const u8) ?u64 { + var client: std.http.Client = .{ .allocator = allocator, .io = io }; defer client.deinit(); - if (ctx.proxy.len > 0) { - proxy_tunnel.setProxyFromUrl(&client, ctx.allocator, ctx.proxy) catch {}; - } else { - client.initDefaultProxies(ctx.allocator, environ_map) catch {}; - } + setupClient(&client, allocator, environ_map, proxy); - const uri = std.Uri.parse(ctx.url) catch return; - var req = client.request(.HEAD, uri, .{ - .redirect_behavior = .init(3), - }) catch return; - defer req.deinit(); + const uri = std.Uri.parse(url) catch return null; + + // Open null device — discards all written data, no disk I/O + const null_path: []const u8 = if (builtin.os.tag == .windows) "NUL" else "/dev/null"; + const null_file = std.Io.Dir.cwd().createFile(io, null_path, .{}) catch return null; + defer null_file.close(io); const start = std.Io.Timestamp.now(io, .awake); - req.sendBodiless() catch return; - var buf: [4096]u8 = undefined; - const response = req.receiveHead(&buf) catch return; - const end = std.Io.Timestamp.now(io, .awake); + // Send GET request + var req = client.request(.GET, uri, .{ .redirect_behavior = .init(3) }) catch return null; + defer req.deinit(); + req.sendBodiless() catch return null; + + // Receive response head + var head_buf: [4096]u8 = undefined; + var response = req.receiveHead(&head_buf) catch return null; + if (response.head.status.class() != .success) return null; + + // Stream body to null device — same pattern as the real download (streamBodyToFile). + // 64KB chunk limit matches the real download. Writer buffer is flushed to NUL + // when full, so we can keep reading indefinitely. + var reader_buf: [16384]u8 = undefined; + const body_reader = response.reader(&reader_buf); + var file_buf: [16384]u8 = undefined; + var file_writer = null_file.writer(io, &file_buf); + + const max_ns: u64 = 5 * std.time.ns_per_s; + var total_read: u64 = 0; + while (true) { + const now = std.Io.Timestamp.now(io, .awake); + const elapsed_ns: u64 = @intCast(std.Io.Timestamp.durationTo(start, now).nanoseconds); + if (elapsed_ns >= max_ns) break; + + const n = body_reader.stream(&file_writer.interface, .limited(64 * 1024)) catch |err| switch (err) { + error.EndOfStream => break, + else => break, + }; + if (n == 0) { + file_writer.interface.flush() catch break; + continue; + } + total_read += n; + } + file_writer.interface.flush() catch {}; - const elapsed_ns = std.Io.Timestamp.durationTo(start, end).nanoseconds; - if (elapsed_ns > PROBE_TIMEOUT_S * std.time.ns_per_s) return; - if (response.head.status.class() != .success) return; + if (total_read == 0) return null; - ctx.result.* = .{ - .url = ctx.url, - .latency_ns = @intCast(elapsed_ns), - }; -} + const end = std.Io.Timestamp.now(io, .awake); + const final_ns: u64 = @intCast(std.Io.Timestamp.durationTo(start, end).nanoseconds); + if (final_ns == 0) return null; -/// ============================================================ -/// Platform-dispatched probe thread entry point -/// ============================================================ -fn probeThreadMain(ctx: *ProbeThreadContext) void { - switch (native_os) { - .windows => probeThreadMainWindows(ctx), - else => probeThreadMainPosix(ctx), - } + return @intFromFloat(@as(f64, @floatFromInt(total_read)) / (@as(f64, @floatFromInt(final_ns)) / 1_000_000_000.0)); } /// ============================================================ /// Public API /// ============================================================ -/// Probe all candidate URLs concurrently using real OS threads. +/// Probe all candidate URLs sequentially. +/// For each mirror, measures latency (HEAD) and throughput (5-second download), +/// displaying results immediately to the terminal. pub fn probeAll( allocator: std.mem.Allocator, io: std.Io, @@ -252,104 +151,88 @@ pub fn probeAll( } const total = all_urls.items.len; - var done = std.atomic.Value(usize).init(0); - var time_buf: [64]u8 = undefined; - - // Capture start time (POSIX: clock_gettime, Windows: zero — ticker is skipped) - const start_ns: i96 = if (native_os == .windows) 0 else ns: { - var ts: std.c.timespec = undefined; - _ = std.c.clock_gettime(.MONOTONIC, &ts); - break :ns @as(i96, ts.sec) * 1_000_000_000 + ts.nsec; - }; - - // Spawn background ticker thread for dynamic progress display (POSIX only) - var ticker_ctx: TickerContext = .{ - .total = total, - .done = &done, - .start_ns = start_ns, - .stop = std.atomic.Value(bool).init(false), - }; - const ticker = if (progress_writer != null and native_os != .windows) - std.Thread.spawn(.{}, tickerThread, .{&ticker_ctx}) catch null - else - null; - defer { - ticker_ctx.stop.store(true, .release); - if (ticker) |t| t.join(); - } + var latency_buf: [64]u8 = undefined; + var speed_buf: [64]u8 = undefined; - // Allocate results and thread contexts - var results = try allocator.alloc(?ProbeResult, total); - defer allocator.free(results); - @memset(results, null); - - var contexts = try allocator.alloc(ProbeThreadContext, total); - defer allocator.free(contexts); - - ticker_ctx.current_url = all_urls.items[0]; - - for (0..total) |i| { - contexts[i] = .{ - .allocator = allocator, - .url = all_urls.items[i], - .result = &results[i], - .done = &done, - // Windows fallback fields - .io = io, - .environ_map = environ_map, - .proxy = proxy, - }; + if (progress_writer) |pw| { + try pw.print(" Probing {d} source(s):\n", .{total}); + try pw.flush(); } - // Spawn one OS thread per probe — guaranteed true parallelism - var threads = try allocator.alloc(?std.Thread, total); - defer allocator.free(threads); - @memset(threads, null); - - for (0..total) |i| { - threads[i] = std.Thread.spawn(.{}, probeThreadMain, .{&contexts[i]}) catch null; - } + for (all_urls.items, 0..) |url, idx| { + // Display: " Testing: mirror_name ..." + if (progress_writer) |pw| { + try pw.print(" Testing: {s} ...", .{shortUrl(url)}); + try pw.flush(); + } - // Wait for all probe threads to complete - for (0..total) |i| { - if (threads[i]) |t| t.join(); - } + // Phase 1: Measure latency (HTTP HEAD) + const latency = measureLatency(allocator, io, environ_map, url, proxy); + + // Phase 2: Measure bandwidth (HTTP GET, 5-second download) + const bandwidth = measureBandwidth(allocator, io, environ_map, url, proxy); + + // Display result and add to candidates + if (progress_writer) |pw| { + const lat_str = if (latency) |lat| formatLatency(&latency_buf, lat) else "timeout"; + if (bandwidth) |bw| { + const spd_str = formatThroughput(&speed_buf, bw); + try pw.print("\r {d:>3}. {s:<32} latency: {s:<10} speed: {s}/s\n", .{ + idx + 1, shortUrl(url), lat_str, spd_str, + }); + } else { + try pw.print("\r {d:>3}. {s:<32} latency: {s:<10} speed: timeout\n", .{ + idx + 1, shortUrl(url), lat_str, + }); + } + try pw.flush(); + } - // Collect successful results - for (results) |res| { - if (res) |r| { + if (bandwidth) |bw| { try candidates.append(allocator, .{ - .url = r.url, - .latency_ns = r.latency_ns, + .url = url, + .latency_ns = latency orelse 0, + .bandwidth_bps = bw, .owned = true, }); + // Mark URL as transferred to candidates for (all_urls.items) |*u| { - if (u.*.ptr == r.url.ptr) { + if (u.*.ptr == url.ptr) { u.* = ""; break; } } } } +} - if (progress_writer) |pw| { - if (native_os == .windows) { - try pw.print("\x1b[2K\r Probing: done ({d} sources)\n", .{total}); - } else { - var end_ts: std.c.timespec = undefined; - _ = std.c.clock_gettime(.MONOTONIC, &end_ts); - const end_ns: i96 = @as(i96, end_ts.sec) * 1_000_000_000 + end_ts.nsec; - const elapsed_ns: u64 = if (end_ns > start_ns) @intCast(end_ns - start_ns) else 0; - const elapsed_str = formatLatency(&time_buf, elapsed_ns); - try pw.print("\x1b[2K\r Probing: done ({d} sources, {s})\n", .{ total, elapsed_str }); - } - try pw.flush(); +/// Compare two MirrorCandidates by bandwidth (for sorting). +/// Higher bandwidth_bps = faster mirror. Failed probes (bandwidth_bps == 0) sort last. +/// When bandwidth difference is <10%, lower latency wins as tiebreaker. +pub fn greaterThanByBandwidth(_: void, a: MirrorCandidate, b: MirrorCandidate) bool { + // Failed bandwidth probes sort last + if (a.bandwidth_bps == 0) return false; + if (b.bandwidth_bps == 0) return true; + + // If bandwidth difference < 10%, use latency as tiebreaker + const max_bps = @max(a.bandwidth_bps, b.bandwidth_bps); + const diff = if (a.bandwidth_bps > b.bandwidth_bps) + a.bandwidth_bps - b.bandwidth_bps + else + b.bandwidth_bps - a.bandwidth_bps; + + if (diff * 10 < max_bps) { + // Both failed latency → order doesn't matter + if (a.latency_ns == 0 and b.latency_ns == 0) return false; + // One failed latency → sort last + if (a.latency_ns == 0) return false; + if (b.latency_ns == 0) return true; + // Lower latency = better + return a.latency_ns < b.latency_ns; } -} -/// Compare two MirrorCandidates by latency (for sorting). -pub fn lessThanByLatency(_: void, a: MirrorCandidate, b: MirrorCandidate) bool { - return a.latency_ns < b.latency_ns; + // Higher bandwidth = better + return a.bandwidth_bps > b.bandwidth_bps; } /// Format nanoseconds as a human-readable string (e.g., "123ms", "1.2s"). @@ -365,6 +248,28 @@ pub fn formatLatency(buf: []u8, ns: u64) []const u8 { } } +/// Format throughput from bytes per second as "X.YMB" etc. +pub fn formatThroughput(buf: []u8, bps: u64) []const u8 { + if (bps == 0) return "?B"; + return formatBytes(buf, bps); +} + +/// Format bytes as a human-readable string (e.g., "427.0KB", "1.2MB"). +fn formatBytes(buf: []u8, bytes: u64) []const u8 { + const KB: u64 = 1024; + const MB: u64 = 1024 * KB; + const GB: u64 = 1024 * MB; + if (bytes < KB) { + return std.fmt.bufPrint(buf, "{d}B", .{bytes}) catch "?"; + } else if (bytes < MB) { + return std.fmt.bufPrint(buf, "{d:.1}KB", .{@as(f64, @floatFromInt(bytes)) / @as(f64, @floatFromInt(KB))}) catch "?"; + } else if (bytes < GB) { + return std.fmt.bufPrint(buf, "{d:.1}MB", .{@as(f64, @floatFromInt(bytes)) / @as(f64, @floatFromInt(MB))}) catch "?"; + } else { + return std.fmt.bufPrint(buf, "{d:.2}GB", .{@as(f64, @floatFromInt(bytes)) / @as(f64, @floatFromInt(GB))}) catch "?"; + } +} + /// Extract the host part of a URL for concise display. pub fn shortUrl(url: []const u8) []const u8 { const start: usize = if (std.mem.startsWith(u8, url, "https://"))