diff --git a/src/bin/rustup-init.rs b/src/bin/rustup-init.rs index 6055d6ea02..d540efa367 100644 --- a/src/bin/rustup-init.rs +++ b/src/bin/rustup-init.rs @@ -36,20 +36,25 @@ use rustup::is_proxyable_tools; use rustup::process::Process; use rustup::utils; -#[tokio::main] -async fn main() -> Result { +fn main() -> Result { #[cfg(windows)] pre_rustup_main_init(); let process = Process::os(); - let result = { + let runtime = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .worker_threads(process.io_thread_count()?) + .build() + .unwrap(); + + let result = runtime.block_on(async { #[cfg(feature = "otel")] let _telemetry_guard = log::set_global_telemetry(); tracing_log::LogTracer::init()?; let (subscriber, console_filter) = log::tracing_subscriber(&process); tracing::subscriber::set_global_default(subscriber)?; run_rustup(&process, console_filter).await - }; + }); match result { Err(e) => { diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index e0e02a79aa..a03bef1cf7 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -63,10 +63,9 @@ use std::sync::mpsc::Receiver; use std::time::{Duration, Instant}; use std::{fmt::Debug, fs::OpenOptions}; -use anyhow::{Context, Result}; +use anyhow::Result; use crate::process::Process; -use crate::utils; use crate::utils::notifications::Notification; use threaded::PoolReference; @@ -447,15 +446,9 @@ pub(crate) fn get_executor<'a>( notify_handler: Option<&'a dyn Fn(Notification<'_>)>, ram_budget: usize, process: &Process, -) -> Result> { +) -> anyhow::Result> { // If this gets lots of use, consider exposing via the config file. - let thread_count = match process.var("RUSTUP_IO_THREADS") { - Err(_) => utils::io_thread_count(), - Ok(n) => n - .parse::() - .context("invalid value in RUSTUP_IO_THREADS. Must be a natural number")?, - }; - Ok(match thread_count { + Ok(match process.io_thread_count()? { 0 | 1 => Box::new(immediate::ImmediateUnpacker::new()), n => Box::new(threaded::Threaded::new(notify_handler, n, ram_budget)), }) diff --git a/src/install.rs b/src/install.rs index 9b37d25a53..a7e48a6c4a 100644 --- a/src/install.rs +++ b/src/install.rs @@ -38,11 +38,11 @@ pub(crate) enum InstallMethod<'a> { impl InstallMethod<'_> { // Install a toolchain #[tracing::instrument(level = "trace", err(level = "trace"), skip_all)] - pub(crate) async fn install(&self) -> Result { + pub(crate) async fn install(&self) -> anyhow::Result { // Initialize rayon for use by the remove_dir_all crate limiting the number of threads. // This will error if rayon is already initialized but it's fine to ignore that. let _ = rayon::ThreadPoolBuilder::new() - .num_threads(utils::io_thread_count()) + .num_threads(self.cfg().process.io_thread_count()?) .build_global(); let nh = &self.cfg().notify_handler; match self { diff --git a/src/process.rs b/src/process.rs index 2b8074bd16..9f4370a48c 100644 --- a/src/process.rs +++ b/src/process.rs @@ -1,9 +1,9 @@ -use std::env; use std::ffi::OsString; use std::fmt::Debug; use std::io; use std::io::IsTerminal; use std::path::PathBuf; +use std::str::FromStr; #[cfg(feature = "test")] use std::{ collections::HashMap, @@ -11,8 +11,9 @@ use std::{ path::Path, sync::{Arc, Mutex}, }; +use std::{env, thread}; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, bail}; #[cfg(feature = "test")] use tracing::subscriber::DefaultGuard; #[cfg(feature = "test")] @@ -64,6 +65,26 @@ impl Process { home::env::rustup_home_with_env(self).context("failed to determine rustup home dir") } + pub fn io_thread_count(&self) -> anyhow::Result { + if let Ok(n) = self.var("RUSTUP_IO_THREADS") { + let threads = usize::from_str(&n).context( + "invalid value in RUSTUP_IO_THREADS -- must be a natural number greater than zero", + )?; + match threads { + 0 => bail!("RUSTUP_IO_THREADS must be a natural number greater than zero"), + _ => return Ok(threads), + } + }; + + Ok(match thread::available_parallelism() { + // Don't spawn more than 8 I/O threads unless the user tells us to. + // Feel free to increase this value if it improves performance. + Ok(threads) => Ord::min(threads.get(), 8), + // Unknown for target platform or no permission to query. + Err(_) => 1, + }) + } + pub fn var(&self, key: &str) -> Result { match self { Process::OsProcess(_) => env::var(key), diff --git a/src/utils/mod.rs b/src/utils/mod.rs index d1973706a8..b88c5ee49b 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -7,7 +7,6 @@ use std::io::{self, BufReader, Write}; use std::ops::{BitAnd, BitAndAssign}; use std::path::{Path, PathBuf}; use std::process::ExitStatus; -use std::thread; use anyhow::{Context, Result, anyhow, bail}; use retry::delay::{Fibonacci, jitter}; @@ -27,16 +26,6 @@ pub(crate) mod notify; pub mod raw; pub(crate) mod units; -pub fn io_thread_count() -> usize { - // Don't spawn more than this many I/O threads unless the user tells us to. - // Feel free to increase this value if it improves performance. - const DEFAULT_IO_THREAD_LIMIT: usize = 8; - - thread::available_parallelism() - .map_or(1, |p| p.get()) - .min(DEFAULT_IO_THREAD_LIMIT) -} - #[must_use] #[derive(Debug, PartialEq, Eq)] pub struct ExitCode(pub i32);