From b324eea4f0d215f30d60e4e34d6a969ad937fdc4 Mon Sep 17 00:00:00 2001 From: James Kay Date: Wed, 8 Oct 2025 20:03:54 +0100 Subject: [PATCH] `linera_core::client`: batch downloading of missing blobs (#4755) Currently, when we synchronize a chain, even though we receive the certificates in a batch to `process_certificates`, we handle them one by one at the local node level, and if a certificate is missing blobs we stop, download the blobs for that certificate, then retry, making the download of the blobs sequential. This makes startup time for the client linear in the number of certificates-with-blobs present in its initial chains (notably, the admin chain). We already have an ahead-of-time indicator of which blobs will be required by the certificates, so there's no need to download them one at a time. If we don't have some blobs marked as required by the certificate batch, try to download them (concurrently) before proceeding to process the batch. ~Thereafter, `BlobsNotFound` when processing the batch is a hard error.~ `required_blob_ids()` is conservative, so we still need to download blobs and retry if we get a `BlobsNotFound` error thereafter. CI. - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK. --- linera-core/src/client/mod.rs | 71 ++++++++++++++++++++++++----------- 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/linera-core/src/client/mod.rs b/linera-core/src/client/mod.rs index 936ac080326f..55e0dcd7ca1e 100644 --- a/linera-core/src/client/mod.rs +++ b/linera-core/src/client/mod.rs @@ -339,7 +339,7 @@ impl Client { } }; for certificate in certificates { - last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info); + last_info = Some(self.handle_certificate(certificate).await?.info); } // Now download the rest in batches from the remote node. while next_height < stop { @@ -361,6 +361,24 @@ impl Client { Ok(last_info) } + async fn download_blobs( + &self, + remote_node: &RemoteNode, + blob_ids: impl IntoIterator, + ) -> Result<(), ChainClientError> { + self.local_node + .store_blobs( + &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move { + remote_node.try_download_blob(blob_id).await.unwrap() + })) + .buffer_unordered(self.options.max_joined_tasks) + .collect::>() + .await, + ) + .await + .map_err(Into::into) + } + /// Tries to process all the certificates, requesting any missing blobs from the given node. /// Returns the chain info of the last successfully processed certificate. #[instrument(level = "trace", skip_all)] @@ -370,34 +388,47 @@ impl Client { certificates: Vec, ) -> Result>, ChainClientError> { let mut info = None; - for certificate in certificates { - let certificate = Box::new(certificate); - let mut result = self.handle_certificate(certificate.clone()).await; + let required_blob_ids: Vec<_> = certificates + .iter() + .flat_map(|certificate| certificate.value().required_blob_ids()) + .collect(); - if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result { - let blobs = - futures::stream::iter(blob_ids.iter().copied().map(|blob_id| async move { - remote_node.try_download_blob(blob_id).await.unwrap() - })) - .buffer_unordered(self.options.max_joined_tasks) - .collect::>() - .await; - self.local_node.store_blobs(&blobs).await?; - result = self.handle_certificate(certificate.clone()).await; + match self + .local_node + .read_blob_states_from_storage(&required_blob_ids) + .await + { + Err(LocalNodeError::BlobsNotFound(blob_ids)) => { + self.download_blobs(remote_node, blob_ids).await?; } + x => { + x?; + } + } - info = Some(result?.info); + for certificate in certificates { + info = Some( + match self.handle_certificate(certificate.clone()).await { + Err(LocalNodeError::BlobsNotFound(blob_ids)) => { + self.download_blobs(remote_node, blob_ids).await?; + self.handle_certificate(certificate).await? + } + x => x?, + } + .info, + ); } + // Done with all certificates. Ok(info) } async fn handle_certificate( &self, - certificate: Box>, + certificate: GenericCertificate, ) -> Result { self.local_node - .handle_certificate(*certificate, &self.notifier) + .handle_certificate(certificate, &self.notifier) .await } @@ -489,7 +520,7 @@ impl Client { &self, certificate: Box>, ) -> Result<(), LocalNodeError> { - let info = self.handle_certificate(certificate).await?.info; + let info = self.handle_certificate(*certificate).await?.info; self.update_from_info(&info); Ok(()) } @@ -707,8 +738,6 @@ impl Client { mode: ReceiveCertificateMode, nodes: Option>>, ) -> Result<(), ChainClientError> { - let certificate = Box::new(certificate); - // Verify the certificate before doing any expensive networking. let (max_epoch, committees) = self.admin_committees().await?; if let ReceiveCertificateMode::NeedsCheck = mode { @@ -1145,7 +1174,7 @@ impl Client { }; if let Some(timeout) = remote_info.manager.timeout { - self.handle_certificate(Box::new(*timeout)).await?; + self.handle_certificate(*timeout).await?; } let mut proposals = Vec::new(); if let Some(proposal) = remote_info.manager.requested_signed_proposal {