From 04a727be1f4f8bf0b9eefa9b81b97f78507d5b86 Mon Sep 17 00:00:00 2001 From: Prashant Deva Date: Fri, 16 Jan 2026 01:52:31 +0000 Subject: [PATCH 1/3] add conditional delete support --- src/aws/builder.rs | 1 + src/aws/mod.rs | 19 ++++- src/aws/precondition.rs | 1 + src/azure/client.rs | 50 +++++++++-- src/azure/mod.rs | 12 ++- src/chunked.rs | 8 +- src/gcp/client.rs | 17 +++- src/gcp/mod.rs | 16 +++- src/http/mod.rs | 14 +++- src/integration.rs | 136 +++++++++++++++++++++++++++++- src/lib.rs | 178 ++++++++++++++++++++++++++++++++++++---- src/limit.rs | 11 ++- src/local.rs | 19 ++++- src/memory.rs | 25 +++++- src/prefix.rs | 10 ++- src/throttle.rs | 7 +- 16 files changed, 472 insertions(+), 52 deletions(-) diff --git a/src/aws/builder.rs b/src/aws/builder.rs index e49145a4..86ee1b4a 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -388,6 +388,7 @@ pub enum AmazonS3ConfigKey { /// - `conditional_put` ConditionalPut, + /// Skip signing request /// /// Supported keys: diff --git a/src/aws/mod.rs b/src/aws/mod.rs index 030590ad..40d56ffa 100644 --- a/src/aws/mod.rs +++ b/src/aws/mod.rs @@ -44,9 +44,9 @@ use crate::multipart::{MultipartStore, PartId}; use crate::signer::Signer; use crate::util::STRICT_ENCODE_SET; use crate::{ - CopyMode, CopyOptions, Error, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, - Result, UploadPart, + CopyMode, CopyOptions, DeleteOptions, Error, GetOptions, GetResult, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, Path, PutMode, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, UploadPart, }; static TAGS_HEADER: HeaderName = HeaderName::from_static("x-amz-tagging"); @@ -259,6 +259,17 @@ impl ObjectStore for AmazonS3 { self.client.get_opts(location, options).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let mut request = self.client.request(Method::DELETE, location); + + if let Some(if_match) = &opts.if_match { + request = request.header(&IF_MATCH, if_match); + } + + request.with_extensions(opts.extensions).send().await?; + Ok(()) + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -658,6 +669,8 @@ mod tests { } if test_conditional_put { put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; } // run integration test with unsigned payload enabled diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs index b4ae938a..0bbe0e3a 100644 --- a/src/aws/precondition.rs +++ b/src/aws/precondition.rs @@ -159,6 +159,7 @@ impl Parse for S3ConditionalPut { } } + #[cfg(test)] mod tests { use super::S3CopyIfNotExists; diff --git a/src/azure/client.rs b/src/azure/client.rs index 54ab3077..f681fc6d 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -28,8 +28,8 @@ use crate::list::{PaginatedListOptions, PaginatedListResult}; use crate::multipart::PartId; use crate::util::{GetRange, deserialize_rfc1123}; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, + Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, ListResult, ObjectMeta, Path, + PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, TagSet, }; use async_trait::async_trait; use base64::Engine; @@ -74,6 +74,12 @@ pub(crate) enum Error { path: String, }, + #[error("Error performing delete request {}: {}", path, source)] + DeleteRequest { + source: crate::client::retry::RetryError, + path: String, + }, + #[error("Error performing bulk delete request: {}", source)] BulkDeleteRequest { source: crate::client::retry::RetryError, @@ -144,9 +150,9 @@ pub(crate) enum Error { impl From for crate::Error { fn from(err: Error) -> Self { match err { - Error::GetRequest { source, path } | Error::PutRequest { source, path } => { - source.error(STORE, path) - } + Error::GetRequest { source, path } + | Error::PutRequest { source, path } + | Error::DeleteRequest { source, path } => source.error(STORE, path), _ => Self::Generic { store: STORE, source: Box::new(err), @@ -568,6 +574,40 @@ impl AzureClient { .map_err(|source| Error::Metadata { source })?) } + /// Make an Azure DELETE request + pub(crate) async fn delete_request(&self, path: &Path, opts: DeleteOptions) -> Result<()> { + let credential = self.get_credential().await?; + let url = self.config.path_url(path); + let sensitive = credential + .as_deref() + .map(|c| c.sensitive_request()) + .unwrap_or_default(); + + let mut builder = self + .client + .delete(url.as_str()) + .header(CONTENT_LENGTH, HeaderValue::from_static("0")) + .extensions(opts.extensions) + .with_azure_authorization(&credential, &self.config.account) + .retryable(&self.config.retry_config) + .sensitive(sensitive) + .idempotent(true); + + if let Some(etag) = &opts.if_match { + builder = builder.header(&IF_MATCH, etag); + } + + builder + .send() + .await + .map_err(|source| Error::DeleteRequest { + source, + path: path.to_string(), + })?; + + Ok(()) + } + /// PUT a block pub(crate) async fn put_block( &self, diff --git a/src/azure/mod.rs b/src/azure/mod.rs index 04c8f31d..0b237f08 100644 --- a/src/azure/mod.rs +++ b/src/azure/mod.rs @@ -23,9 +23,9 @@ //! //! Unused blocks will automatically be dropped after 7 days. use crate::{ - CopyMode, CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, - UploadPart, + CopyMode, CopyOptions, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, + PutResult, Result, UploadPart, multipart::{MultipartStore, PartId}, path::Path, signer::Signer, @@ -117,6 +117,10 @@ impl ObjectStore for MicrosoftAzure { self.client.get_opts(location, options).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.client.delete_request(location, opts).await + } + fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result> { self.client.list(prefix) } @@ -331,6 +335,8 @@ mod tests { copy_if_not_exists(&integration).await; stream_get(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; multipart(&integration, &integration).await; multipart_race_condition(&integration, false).await; multipart_out_of_order(&integration).await; diff --git a/src/chunked.rs b/src/chunked.rs index b362366d..5a9762cf 100644 --- a/src/chunked.rs +++ b/src/chunked.rs @@ -28,8 +28,8 @@ use futures::stream::BoxStream; use crate::path::Path; use crate::{ - CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions, + CopyOptions, DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutResult, RenameOptions, }; use crate::{PutPayload, Result}; @@ -139,6 +139,10 @@ impl ObjectStore for ChunkedStore { self.inner.get_ranges(location, ranges).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.inner.delete_opts(location, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/gcp/client.rs b/src/gcp/client.rs index 00d3168f..32b6ec1a 100644 --- a/src/gcp/client.rs +++ b/src/gcp/client.rs @@ -32,8 +32,8 @@ use crate::multipart::PartId; use crate::path::Path; use crate::util::hex_encode; use crate::{ - Attribute, Attributes, ClientOptions, GetOptions, MultipartId, PutMode, PutMultipartOptions, - PutOptions, PutPayload, PutResult, Result, RetryConfig, + Attribute, Attributes, ClientOptions, DeleteOptions, GetOptions, MultipartId, PutMode, + PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, RetryConfig, }; use async_trait::async_trait; use base64::Engine; @@ -560,8 +560,17 @@ impl GoogleCloudStorageClient { } /// Perform a delete request - pub(crate) async fn delete_request(&self, path: &Path) -> Result<()> { - self.request(Method::DELETE, path).send().await?; + pub(crate) async fn delete_request(&self, path: &Path, opts: DeleteOptions) -> Result<()> { + let mut builder = self + .request(Method::DELETE, path) + .with_extensions(opts.extensions) + .idempotent(true); + + if let Some(if_match) = &opts.if_match { + builder = builder.header(&HeaderName::from_static("if-match"), if_match); + } + + builder.send().await?; Ok(()) } diff --git a/src/gcp/mod.rs b/src/gcp/mod.rs index 2fb74b4f..f247750e 100644 --- a/src/gcp/mod.rs +++ b/src/gcp/mod.rs @@ -42,9 +42,9 @@ use crate::gcp::credential::GCSAuthorizer; use crate::signer::Signer; use crate::{CopyMode, CopyOptions}; use crate::{ - GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, - PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, multipart::PartId, - path::Path, + DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, + ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, + multipart::PartId, path::Path, }; use async_trait::async_trait; use client::GoogleCloudStorageClient; @@ -181,6 +181,10 @@ impl ObjectStore for GoogleCloudStorage { self.client.get_opts(location, options).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.client.delete_request(location, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -191,7 +195,9 @@ impl ObjectStore for GoogleCloudStorage { let client = Arc::clone(&client); async move { let location = location?; - client.delete_request(&location).await?; + client + .delete_request(&location, DeleteOptions::default()) + .await?; Ok(location) } }) @@ -334,6 +340,8 @@ mod test { // Fake GCS server doesn't currently honor preconditions get_opts(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; // Fake GCS server doesn't currently support attributes put_get_attributes(&integration).await; } diff --git a/src/http/mod.rs b/src/http/mod.rs index 9e5af9a1..1b06b658 100644 --- a/src/http/mod.rs +++ b/src/http/mod.rs @@ -45,9 +45,9 @@ use crate::client::{HttpConnector, http_connector}; use crate::http::client::Client; use crate::path::Path; use crate::{ - ClientConfigKey, ClientOptions, CopyMode, CopyOptions, GetOptions, GetResult, ListResult, - MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, - PutResult, Result, RetryConfig, + ClientConfigKey, ClientOptions, CopyMode, CopyOptions, DeleteOptions, GetOptions, GetResult, + ListResult, MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, + PutPayload, PutResult, Result, RetryConfig, }; mod client; @@ -138,6 +138,14 @@ impl ObjectStore for HttpStore { self.client.get_opts(location, options).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let _ = (location, opts); + Err(crate::Error::NotImplemented { + operation: "`delete_opts`".into(), + implementer: self.to_string(), + }) + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/integration.rs b/src/integration.rs index 5c601b5c..b1a9fa71 100644 --- a/src/integration.rs +++ b/src/integration.rs @@ -28,13 +28,15 @@ use crate::list::{PaginatedListOptions, PaginatedListStore}; use crate::multipart::MultipartStore; use crate::path::Path; use crate::{ - Attribute, Attributes, DynObjectStore, Error, GetOptions, GetRange, MultipartUpload, - ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, WriteMultipart, + Attribute, Attributes, DeleteOptions, DynObjectStore, Error, GetOptions, GetRange, + MultipartUpload, ObjectStore, ObjectStoreExt, PutMode, PutPayload, UpdateVersion, + WriteMultipart, }; use bytes::Bytes; use futures::stream::FuturesUnordered; use futures::{StreamExt, TryStreamExt}; use rand::{Rng, rng}; +use rand::distr::Alphanumeric; use std::collections::HashSet; use std::slice; @@ -688,6 +690,136 @@ pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) { assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS); } +/// Tests conditional deletes +pub async fn delete_opts(storage: &dyn ObjectStore, supports_conditional: bool) { + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + + // Test 1: Delete with matching ETag + let path = Path::from(format!("delete_opts_etag_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + let etag = result.e_tag.clone(); + + if supports_conditional && etag.is_some() { + let opts = DeleteOptions { + if_match: etag.clone(), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } else if !supports_conditional { + let opts = DeleteOptions { + if_match: Some("some-etag".to_string()), + ..Default::default() + }; + let err = storage.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::NotImplemented { .. }), "{err}"); + + storage.delete(&path).await.unwrap(); + return; + } + + // Test 2: Delete with non-matching ETag should fail + let path = Path::from(format!("delete_opts_etag_fail_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional && result.e_tag.is_some() { + let opts = DeleteOptions { + if_match: Some("wrong-etag".to_string()), + ..Default::default() + }; + let err = storage.delete_opts(&path, opts).await.unwrap_err(); + assert!(matches!(err, Error::Precondition { .. }), "{err}"); + + let data = storage.get(&path).await.unwrap().bytes().await.unwrap(); + assert_eq!(data.as_ref(), b"test data"); + + storage.delete(&path).await.unwrap(); + } + + // Test 3: Delete with wildcard should succeed + let path = Path::from(format!("delete_opts_wildcard_{suffix}")); + storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional { + let opts = DeleteOptions { + if_match: Some("*".to_string()), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + } + + // Test 4: Delete with multiple ETags should succeed if one matches + let path = Path::from(format!("delete_opts_multi_etag_{suffix}")); + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if supports_conditional && result.e_tag.is_some() { + let etag = result.e_tag.unwrap(); + let opts = DeleteOptions { + if_match: Some(format!("\"wrong1\", {etag}, \"wrong2\"")), + ..Default::default() + }; + storage.delete_opts(&path, opts).await.unwrap(); + + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. }), "{err}"); + } +} + +/// Test concurrent conditional deletes (race conditions) +pub async fn delete_opts_race_condition(storage: &dyn ObjectStore, supports_conditional: bool) { + if !supports_conditional { + return; + } + + use futures::stream::{FuturesUnordered, StreamExt}; + + let rng = rng(); + let suffix = String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap(); + let path = Path::from(format!("delete_race_{suffix}")); + + let result = storage.put(&path, "test data".into()).await.unwrap(); + + if let Some(etag) = result.e_tag { + const NUM_WORKERS: usize = 5; + + let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS) + .map(|_| { + let opts = DeleteOptions { + if_match: Some(etag.clone()), + ..Default::default() + }; + storage.delete_opts(&path, opts) + }) + .collect(); + + let mut success_count = 0; + let mut precondition_count = 0; + let mut not_found_count = 0; + + while let Some(result) = futures.next().await { + match result { + Ok(_) => success_count += 1, + Err(Error::Precondition { .. }) => precondition_count += 1, + Err(Error::NotFound { .. }) => not_found_count += 1, + Err(err) => panic!("Unexpected error: {err}"), + } + } + + assert_eq!(success_count, 1, "Exactly one delete should succeed"); + assert_eq!( + precondition_count + not_found_count, + NUM_WORKERS - 1, + "Other deletes should fail" + ); + + let err = storage.get(&path).await.unwrap_err(); + assert!(matches!(err, Error::NotFound { .. })); + } +} + /// Returns a chunk of length `chunk_length` fn get_chunk(chunk_length: usize) -> Bytes { let mut data = vec![0_u8; chunk_length]; diff --git a/src/lib.rs b/src/lib.rs index b22c6c4c..2b5a9dcc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -515,6 +515,35 @@ //! [Apache Iceberg]: https://iceberg.apache.org/ //! [Delta Lake]: https://delta.io/ //! +//! # Conditional Delete +//! +//! Deletes can be guarded with [`DeleteOptions::if_match`] to ensure the object hasn't changed +//! since it was last observed. +//! +//! ``` +//! # use object_store::{DeleteOptions, Error, ObjectStore, ObjectStoreExt}; +//! # use std::sync::Arc; +//! # use object_store::memory::InMemory; +//! # use object_store::path::Path; +//! # fn get_object_store() -> Arc { +//! # Arc::new(InMemory::new()) +//! # } +//! # async fn conditional_delete() { +//! let store = get_object_store(); +//! let path = Path::from("test"); +//! +//! // Fetch version information for the object +//! let meta = store.head(&path).await.unwrap(); +//! let opts = DeleteOptions::new().with_if_match(meta.e_tag); +//! +//! match store.delete_opts(&path, opts).await { +//! Ok(_) => {} // Successfully deleted +//! Err(Error::Precondition { .. }) => {} // Object has changed, retry +//! Err(e) => panic!("{e}"), +//! } +//! # } +//! ``` +//! //! # TLS Certificates //! //! Stores that use HTTPS/TLS (this is true for most cloud stores) can choose the source of their [CA] @@ -894,6 +923,37 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + Debug + 'static { .await } + /// Delete the object at the specified location with options + /// + /// The default implementation supports only unconditional deletes and will + /// return [`Error::NotImplemented`] if conditional options are provided. + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + if opts.if_match.is_some() { + return Err(Error::NotImplemented { + operation: "`delete_opts` with `if_match`".into(), + implementer: self.to_string(), + }); + } + + let location = location.clone(); + let mut stream = + self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed()); + let _path = stream.try_next().await?.ok_or_else(|| Error::Generic { + store: "ext", + source: "`delete_stream` with one location should yield once but didn't".into(), + })?; + if stream.next().await.is_some() { + Err(Error::Generic { + store: "ext", + source: + "`delete_stream` with one location expected to yield exactly once, but yielded more than once" + .into(), + }) + } else { + Ok(()) + } + } + /// Delete all the objects at the specified locations /// /// When supported, this method will use bulk operations that delete more @@ -1158,6 +1218,10 @@ macro_rules! as_ref_impl { self.as_ref().get_ranges(location, ranges).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + self.as_ref().delete_opts(location, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -1357,21 +1421,7 @@ where } async fn delete(&self, location: &Path) -> Result<()> { - let location = location.clone(); - let mut stream = - self.delete_stream(futures::stream::once(async move { Ok(location) }).boxed()); - let _path = stream.try_next().await?.ok_or_else(|| Error::Generic { - store: "ext", - source: "`delete_stream` with one location should yield once but didn't".into(), - })?; - if stream.next().await.is_some() { - Err(Error::Generic { - store: "ext", - source: "`delete_stream` with one location expected to yield exactly once, but yielded more than once".into(), - }) - } else { - Ok(()) - } + self.delete_opts(location, DeleteOptions::default()).await } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { @@ -1877,6 +1927,88 @@ pub struct PutResult { pub version: Option, } +/// Options for a delete request +#[derive(Debug, Clone, Default)] +pub struct DeleteOptions { + /// Delete will succeed if the `ObjectMeta::e_tag` matches + /// otherwise returning [`Error::Precondition`] + /// + /// See + /// + /// Examples: + /// + /// ```text + /// If-Match: "xyzzy" + /// If-Match: "xyzzy", "r2d2xxxx", "c3piozzzz" + /// If-Match: * + /// ``` + pub if_match: Option, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + /// + /// They are also excluded from [`PartialEq`] and [`Eq`]. + pub extensions: Extensions, +} + +impl DeleteOptions { + /// Returns an error if the preconditions on this request are not satisfied + pub fn check_preconditions(&self, meta: &ObjectMeta) -> Result<()> { + // The use of the invalid etag "*" means no ETag is equivalent to never matching + let etag = meta.e_tag.as_deref().unwrap_or("*"); + + if let Some(m) = &self.if_match { + if m != "*" && m.split(',').map(str::trim).all(|x| x != etag) { + return Err(Error::Precondition { + path: meta.location.to_string(), + source: format!("{etag} does not match {m}").into(), + }); + } + } + Ok(()) + } + + /// Create a new [`DeleteOptions`] + pub fn new() -> Self { + Self::default() + } + + /// Sets the `if_match` condition. + /// + /// See [`DeleteOptions::if_match`] + #[must_use] + pub fn with_if_match(mut self, etag: Option>) -> Self { + self.if_match = etag.map(Into::into); + self + } + + /// Sets the `extensions`. + /// + /// See [`DeleteOptions::extensions`]. + #[must_use] + pub fn with_extensions(mut self, extensions: Extensions) -> Self { + self.extensions = extensions; + self + } +} + +impl PartialEq for DeleteOptions { + fn eq(&self, other: &Self) -> bool { + let Self { + if_match, + extensions: _, + } = self; + let Self { + if_match: other_if_match, + extensions: _, + } = other; + if_match == other_if_match + } +} + +impl Eq for DeleteOptions {} + /// Configure preconditions for the copy operation #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] pub enum CopyMode { @@ -2424,6 +2556,22 @@ mod tests { assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>()); } + #[test] + fn test_delete_options_builder() { + let extensions = Extensions::new(); + + let options = DeleteOptions::new(); + assert_eq!(options.if_match, None); + assert!(options.extensions.get::<&str>().is_none()); + + let options = options + .with_if_match(Some("etag-match")) + .with_extensions(extensions.clone()); + + assert_eq!(options.if_match, Some("etag-match".to_string())); + assert_eq!(options.extensions.get::<&str>(), extensions.get::<&str>()); + } + fn takes_generic_object_store(store: T) { // This function is just to ensure that the trait bounds are satisfied let _ = store; diff --git a/src/limit.rs b/src/limit.rs index 30fe2b65..2cc906f2 100644 --- a/src/limit.rs +++ b/src/limit.rs @@ -18,9 +18,9 @@ //! An object store that limits the maximum concurrency of the wrapped implementation use crate::{ - BoxStream, CopyOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, - ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, PutResult, - RenameOptions, Result, StreamExt, UploadPart, + BoxStream, CopyOptions, DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, + MultipartUpload, ObjectMeta, ObjectStore, Path, PutMultipartOptions, PutOptions, PutPayload, + PutResult, RenameOptions, Result, StreamExt, UploadPart, }; use async_trait::async_trait; use bytes::Bytes; @@ -105,6 +105,11 @@ impl ObjectStore for LimitStore { self.inner.get_ranges(location, ranges).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let _permit = self.semaphore.acquire().await.unwrap(); + self.inner.delete_opts(location, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/local.rs b/src/local.rs index 961ed124..a65a2693 100644 --- a/src/local.rs +++ b/src/local.rs @@ -34,8 +34,8 @@ use url::Url; use walkdir::{DirEntry, WalkDir}; use crate::{ - Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, - ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, + Attributes, DeleteOptions, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, + ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, UploadPart, maybe_spawn_blocking, path::{Path, absolute_path_to_url}, util::InvalidGetRange, @@ -453,6 +453,19 @@ impl ObjectStore for LocalFileSystem { .await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + if opts.if_match.is_some() { + let meta = self.head(location).await?; + opts.check_preconditions(&meta)?; + } + + let config = Arc::clone(&self.config); + let automatic_cleanup = self.automatic_cleanup; + let location = location.clone(); + maybe_spawn_blocking(move || Self::delete_location(config, automatic_cleanup, &location)) + .await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -1177,6 +1190,8 @@ mod tests { copy_rename_nonexistent_object(&integration).await; stream_get(&integration).await; put_opts(&integration, false).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; } #[test] diff --git a/src/memory.rs b/src/memory.rs index f026907d..68892b99 100644 --- a/src/memory.rs +++ b/src/memory.rs @@ -29,9 +29,9 @@ use parking_lot::RwLock; use crate::multipart::{MultipartStore, PartId}; use crate::util::InvalidGetRange; use crate::{ - Attributes, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, - ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, Result, - UpdateVersion, UploadPart, path::Path, + Attributes, DeleteOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartId, + MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutResult, + Result, UpdateVersion, UploadPart, path::Path, }; use crate::{CopyMode, CopyOptions, GetOptions, PutPayload}; @@ -294,6 +294,23 @@ impl ObjectStore for InMemory { .collect() } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + if opts.if_match.is_some() { + let entry = self.entry(location)?; + let meta = ObjectMeta { + location: location.clone(), + last_modified: entry.last_modified, + size: entry.data.len() as u64, + e_tag: Some(entry.e_tag.to_string()), + version: None, + }; + opts.check_preconditions(&meta)?; + } + + self.storage.write().map.remove(location); + Ok(()) + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, @@ -555,6 +572,8 @@ mod tests { copy_if_not_exists(&integration).await; stream_get(&integration).await; put_opts(&integration, true).await; + delete_opts(&integration, true).await; + delete_opts_race_condition(&integration, true).await; multipart(&integration, &integration).await; put_get_attributes(&integration).await; } diff --git a/src/prefix.rs b/src/prefix.rs index 67456fd5..82a979cc 100644 --- a/src/prefix.rs +++ b/src/prefix.rs @@ -23,8 +23,9 @@ use std::ops::Range; use crate::multipart::{MultipartStore, PartId}; use crate::path::Path; use crate::{ - CopyOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, ObjectMeta, - ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, Result, + CopyOptions, DeleteOptions, GetOptions, GetResult, ListResult, MultipartId, MultipartUpload, + ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, RenameOptions, + Result, }; /// Store wrapper that applies a constant prefix to all paths handled by the store. @@ -125,6 +126,11 @@ impl ObjectStore for PrefixStore { self.inner.get_ranges(&full_path, ranges).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + let full_path = self.full_path(location); + self.inner.delete_opts(&full_path, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, diff --git a/src/throttle.rs b/src/throttle.rs index 1fc90d7e..64f0a601 100644 --- a/src/throttle.rs +++ b/src/throttle.rs @@ -21,7 +21,7 @@ use std::ops::Range; use std::{convert::TryInto, sync::Arc}; use crate::multipart::{MultipartStore, PartId}; -use crate::{CopyOptions, GetOptions, RenameOptions, UploadPart}; +use crate::{CopyOptions, DeleteOptions, GetOptions, RenameOptions, UploadPart}; use crate::{ GetResult, GetResultPayload, ListResult, MultipartId, MultipartUpload, ObjectMeta, ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result, path::Path, @@ -196,6 +196,11 @@ impl ObjectStore for ThrottledStore { self.inner.get_ranges(location, ranges).await } + async fn delete_opts(&self, location: &Path, opts: DeleteOptions) -> Result<()> { + sleep(self.config().wait_delete_per_call).await; + self.inner.delete_opts(location, opts).await + } + fn delete_stream( &self, locations: BoxStream<'static, Result>, From 5c86ce5fca8df69f4091beb65d660a07390a4d16 Mon Sep 17 00:00:00 2001 From: Prashant Deva Date: Fri, 16 Jan 2026 19:45:18 +0000 Subject: [PATCH 2/3] remove whitespace --- src/aws/builder.rs | 1 - src/aws/precondition.rs | 1 - 2 files changed, 2 deletions(-) diff --git a/src/aws/builder.rs b/src/aws/builder.rs index 86ee1b4a..e49145a4 100644 --- a/src/aws/builder.rs +++ b/src/aws/builder.rs @@ -388,7 +388,6 @@ pub enum AmazonS3ConfigKey { /// - `conditional_put` ConditionalPut, - /// Skip signing request /// /// Supported keys: diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs index 0bbe0e3a..b4ae938a 100644 --- a/src/aws/precondition.rs +++ b/src/aws/precondition.rs @@ -159,7 +159,6 @@ impl Parse for S3ConditionalPut { } } - #[cfg(test)] mod tests { use super::S3CopyIfNotExists; From 6101fc85e4b565f5a8a18e3e871b3d306fbb0b75 Mon Sep 17 00:00:00 2001 From: Prashant Deva Date: Fri, 16 Jan 2026 20:05:00 +0000 Subject: [PATCH 3/3] minor fix for azure --- src/azure/client.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/azure/client.rs b/src/azure/client.rs index f681fc6d..5223278f 100644 --- a/src/azure/client.rs +++ b/src/azure/client.rs @@ -587,17 +587,17 @@ impl AzureClient { .client .delete(url.as_str()) .header(CONTENT_LENGTH, HeaderValue::from_static("0")) - .extensions(opts.extensions) - .with_azure_authorization(&credential, &self.config.account) - .retryable(&self.config.retry_config) - .sensitive(sensitive) - .idempotent(true); + .extensions(opts.extensions); if let Some(etag) = &opts.if_match { builder = builder.header(&IF_MATCH, etag); } builder + .with_azure_authorization(&credential, &self.config.account) + .retryable(&self.config.retry_config) + .sensitive(sensitive) + .idempotent(true) .send() .await .map_err(|source| Error::DeleteRequest {