diff --git a/Cargo.lock b/Cargo.lock index a47f4d2a09..32f0c95de8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2258,6 +2258,7 @@ dependencies = [ "proptest", "pulldown-cmark", "rand 0.9.1", + "rayon", "regex", "remove_dir_all", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index b187a7d2a0..0f1d612f43 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ opentelemetry_sdk = { version = "0.30", features = ["rt-tokio"], optional = true platforms = "3.4" pulldown-cmark = { version = "0.13", default-features = false } rand = "0.9" +rayon = "1.10.0" regex = "1" remove_dir_all = { version = "1.0.0", features = ["parallel"] } reqwest = { version = "0.12", default-features = false, features = ["blocking", "gzip", "http2", "socks", "stream"], optional = true } diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index 94f3dc61ec..e0e02a79aa 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -60,13 +60,13 @@ use std::io::{self, Write}; use std::ops::{Deref, DerefMut}; use std::path::{Path, PathBuf}; use std::sync::mpsc::Receiver; -use std::thread::available_parallelism; use std::time::{Duration, Instant}; use std::{fmt::Debug, fs::OpenOptions}; use anyhow::{Context, Result}; use crate::process::Process; +use crate::utils; use crate::utils::notifications::Notification; use threaded::PoolReference; @@ -448,15 +448,9 @@ pub(crate) fn get_executor<'a>( ram_budget: usize, process: &Process, ) -> Result> { - // Don't spawn more than this many I/O threads unless the user tells us to. - const DEFAULT_THREAD_LIMIT: usize = 8; - // If this gets lots of use, consider exposing via the config file. let thread_count = match process.var("RUSTUP_IO_THREADS") { - Err(_) => available_parallelism() - .map(|p| p.get()) - .unwrap_or(1) - .min(DEFAULT_THREAD_LIMIT), + Err(_) => utils::io_thread_count(), Ok(n) => n .parse::() .context("invalid value in RUSTUP_IO_THREADS. Must be a natural number")?, diff --git a/src/install.rs b/src/install.rs index c95461ab0b..9b37d25a53 100644 --- a/src/install.rs +++ b/src/install.rs @@ -39,6 +39,11 @@ impl InstallMethod<'_> { // Install a toolchain #[tracing::instrument(level = "trace", err(level = "trace"), skip_all)] pub(crate) async fn install(&self) -> Result { + // Initialize rayon for use by the remove_dir_all crate limiting the number of threads. + // This will error if rayon is already initialized but it's fine to ignore that. + let _ = rayon::ThreadPoolBuilder::new() + .num_threads(utils::io_thread_count()) + .build_global(); let nh = &self.cfg().notify_handler; match self { InstallMethod::Copy { .. } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index de0b60df6a..a3639cc978 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,6 +7,7 @@ use std::io::{self, BufReader, Write}; use std::ops::{BitAnd, BitAndAssign}; use std::path::{Path, PathBuf}; use std::process::ExitStatus; +use std::thread; use anyhow::{Context, Result, anyhow, bail}; use retry::delay::{Fibonacci, jitter}; @@ -26,6 +27,16 @@ pub(crate) mod notify; pub mod raw; pub(crate) mod units; +pub fn io_thread_count() -> usize { + // Don't spawn more than this many I/O threads unless the user tells us to. + // Feel free to increase this value if it improves performance. + const DEFAULT_IO_THREAD_LIMIT: usize = 8; + + thread::available_parallelism() + .map_or(1, |p| p.get()) + .min(DEFAULT_IO_THREAD_LIMIT) +} + #[must_use] #[derive(Debug, PartialEq, Eq)] pub struct ExitCode(pub i32);