diff --git a/Cargo.toml b/Cargo.toml index 54bc1615..f497815a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,13 @@ description = "A generic object store interface for uniformly interacting with A keywords = ["object", "storage", "cloud"] repository = "https://github.com/apache/arrow-rs-object-store" rust-version = "1.85" -include = ["src/**/*.rs", "README.md", "LICENSE.txt", "NOTICE.txt", "Cargo.toml"] +include = [ + "src/**/*.rs", + "README.md", + "LICENSE.txt", + "NOTICE.txt", + "Cargo.toml", +] [package.metadata.docs.rs] all-features = true @@ -46,10 +52,14 @@ url = "2.2" walkdir = { version = "2", optional = true } # Cloud storage support -base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } +base64 = { version = "0.22", default-features = false, features = [ + "std", +], optional = true } form_urlencoded = { version = "1.2", optional = true } http-body-util = { version = "0.1.2", optional = true } -httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } +httparse = { version = "1.8.0", default-features = false, features = [ + "std", +], optional = true } hyper = { version = "1.2", default-features = false, optional = true } md-5 = { version = "0.11.0", default-features = false, optional = true } quick-xml = { version = "0.39.0", features = ["serialize", "overlapped-lists"], optional = true } @@ -65,6 +75,9 @@ serde_urlencoded = { version = "0.7", optional = true } tokio = { version = "1.29.0", features = ["sync", "macros", "rt", "time", "io-util"], optional = true } tracing = { version = "0.1", optional = true } +[target.'cfg(target_family="unix")'.dependencies] +xattr = { version = "1", optional = true } + [target.'cfg(target_family="unix")'.dev-dependencies] nix = { version = "0.31.1", features = ["fs"] } @@ -74,7 +87,7 @@ wasm-bindgen-futures = "0.4.18" futures-channel = {version = "0.3", features = ["sink"]} [features] -default = ["fs"] +default = ["fs", "xattr"] cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded", "tokio"] azure = ["cloud", "httparse"] fs = ["walkdir", "tokio"] diff --git a/src/local.rs b/src/local.rs index a12a775a..90d8a74a 100644 --- a/src/local.rs +++ b/src/local.rs @@ -116,6 +116,20 @@ pub(crate) enum Error { #[error("Upload aborted")] Aborted, + + #[cfg(feature = "xattr")] + #[error("Unable to set extended attribute on {}: {source}", path.display())] + UnableToSetXattr { + source: std::io::Error, + path: PathBuf, + }, + + #[cfg(feature = "xattr")] + #[error("Unable to read extended attribute on {}: {source}", path.display())] + UnableToReadXattr { + source: std::io::Error, + path: PathBuf, + }, } impl From for super::Error { @@ -325,6 +339,103 @@ fn is_valid_file_path(path: &Path) -> bool { } } +/// Sets extended attributes on a file from an Attributes collection +#[cfg(feature = "xattr")] +fn set_xattrs(path: &std::path::Path, attributes: &Attributes) -> Result<()> { + use crate::Attribute; + use std::borrow::Cow; + + for (attr, value) in attributes { + let name: Cow<'static, str> = match attr { + Attribute::CacheControl => Cow::Borrowed("user.object_store.cache_control"), + Attribute::ContentDisposition => Cow::Borrowed("user.object_store.content_disposition"), + Attribute::ContentEncoding => Cow::Borrowed("user.object_store.content_encoding"), + Attribute::ContentLanguage => Cow::Borrowed("user.object_store.content_language"), + Attribute::ContentType => Cow::Borrowed("user.object_store.content_type"), + Attribute::StorageClass => Cow::Borrowed("user.object_store.storage_class"), + Attribute::Metadata(key) => Cow::Owned(format!("user.object_store.{key}")), + }; + xattr::set(path, name.as_ref(), value.as_ref().as_bytes()).map_err(|source| { + Error::UnableToSetXattr { + source, + path: path.into(), + } + })?; + } + Ok(()) +} + +/// Reads extended attributes from a file and returns an Attributes collection +#[cfg(feature = "xattr")] +fn get_xattrs(path: &std::path::Path) -> Result { + use crate::Attribute; + + let mut attributes = Attributes::new(); + + let list = match xattr::list(path) { + Ok(list) => list, + Err(source) => { + return Err(Error::UnableToReadXattr { + source, + path: path.into(), + } + .into()); + } + }; + + for name in list { + let name_str = match name.to_str() { + Some(s) if s.starts_with("user.object_store.") => s, + _ => continue, + }; + + let value = match xattr::get(path, &name) { + Ok(Some(v)) => match String::from_utf8(v) { + Ok(s) => s, + Err(_) => continue, + }, + Ok(None) => continue, + Err(source) => { + return Err(Error::UnableToReadXattr { + source, + path: path.into(), + } + .into()); + } + }; + + let key = &name_str["user.object_store.".len()..]; + let attr = match key { + "cache_control" => Attribute::CacheControl, + "content_disposition" => Attribute::ContentDisposition, + "content_encoding" => Attribute::ContentEncoding, + "content_language" => Attribute::ContentLanguage, + "content_type" => Attribute::ContentType, + "storage_class" => Attribute::StorageClass, + key => Attribute::Metadata(key.to_string().into()), + }; + attributes.insert(attr, value.into()); + } + Ok(attributes) +} + +/// Returns an error if attributes are non-empty when xattr feature is disabled. +#[cfg(not(feature = "xattr"))] +fn set_xattrs(_path: &std::path::Path, attributes: &Attributes) -> Result<()> { + if !attributes.is_empty() { + return Err(super::Error::NotSupported { + source: "Setting extended attributes requires the 'xattr' feature".into(), + }); + } + Ok(()) +} + +/// No-op when xattr feature is disabled: returns empty attributes. +#[cfg(not(feature = "xattr"))] +fn get_xattrs(_path: &std::path::Path) -> Result { + Ok(Attributes::new()) +} + #[async_trait] impl ObjectStore for LocalFileSystem { async fn put_opts( @@ -340,13 +451,6 @@ impl ObjectStore for LocalFileSystem { }); } - if !opts.attributes.is_empty() { - return Err(crate::Error::NotImplemented { - operation: "`put_opts` with `opts.attributes` specified".into(), - implementer: self.to_string(), - }); - } - let path = self.path_to_filesystem(location)?; maybe_spawn_blocking(move || { let (mut file, staging_path) = new_staged_upload(&path)?; @@ -359,6 +463,9 @@ impl ObjectStore for LocalFileSystem { path: path.to_string_lossy().to_string(), })?; e_tag = Some(get_etag(&metadata)); + + set_xattrs(&staging_path, &opts.attributes)?; + match opts.mode { PutMode::Overwrite => { // For some fuse types of file systems, the file must be closed first @@ -406,16 +513,9 @@ impl ObjectStore for LocalFileSystem { location: &Path, opts: PutMultipartOptions, ) -> Result> { - if !opts.attributes.is_empty() { - return Err(crate::Error::NotImplemented { - operation: "`put_multipart_opts` with `opts.attributes` specified".into(), - implementer: self.to_string(), - }); - } - let dest = self.path_to_filesystem(location)?; let (file, src) = new_staged_upload(&dest)?; - Ok(Box::new(LocalUpload::new(src, dest, file))) + Ok(Box::new(LocalUpload::new(src, dest, file, opts.attributes))) } async fn get_opts(&self, location: &Path, options: GetOptions) -> Result { @@ -434,9 +534,11 @@ impl ObjectStore for LocalFileSystem { None => 0..meta.size, }; + let attributes = get_xattrs(&path)?; + Ok(GetResult { payload: GetResultPayload::File(file, path), - attributes: Attributes::default(), + attributes, range, meta, }) @@ -835,6 +937,8 @@ struct LocalUpload { src: Option, /// The next offset to write into the file offset: u64, + /// Attributes to set on the file + attributes: Attributes, } #[derive(Debug)] @@ -844,7 +948,7 @@ struct UploadState { } impl LocalUpload { - pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self { + pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, attributes: Attributes) -> Self { Self { state: Arc::new(UploadState { dest, @@ -852,6 +956,7 @@ impl LocalUpload { }), src: Some(src), offset: 0, + attributes, } } } @@ -882,9 +987,13 @@ impl MultipartUpload for LocalUpload { async fn complete(&mut self) -> Result { let src = self.src.take().ok_or(Error::Aborted)?; let s = Arc::clone(&self.state); + let attributes = std::mem::take(&mut self.attributes); maybe_spawn_blocking(move || { // Ensure no inflight writes let file = s.file.lock(); + + set_xattrs(&src, &attributes)?; + std::fs::rename(&src, &s.dest) .map_err(|source| Error::UnableToRenameFile { source })?; let metadata = file.metadata().map_err(|e| Error::Metadata { @@ -1280,6 +1389,8 @@ mod tests { copy_rename_nonexistent_object(&integration).await; stream_get(&integration).await; put_opts(&integration, false).await; + #[cfg(feature = "xattr")] + put_get_attributes(&integration).await; } #[test] @@ -1914,3 +2025,80 @@ mod unix_test { spawned.await.unwrap(); } } + +#[cfg(feature = "xattr")] +#[cfg(test)] +mod xattr_test { + use tempfile::TempDir; + + use crate::local::LocalFileSystem; + use crate::{Attribute, Attributes, ObjectStore, ObjectStoreExt, Path, PutOptions}; + + #[tokio::test] + async fn test_put_get_attributes() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let location = Path::from("test_file"); + let data = "test data"; + + let mut attributes = Attributes::new(); + attributes.insert(Attribute::ContentType, "text/plain".into()); + attributes.insert(Attribute::CacheControl, "max-age=3600".into()); + attributes.insert(Attribute::ContentDisposition, "inline".into()); + attributes.insert(Attribute::ContentEncoding, "gzip".into()); + attributes.insert(Attribute::ContentLanguage, "en-US".into()); + attributes.insert(Attribute::StorageClass, "STANDARD".into()); + attributes.insert( + Attribute::Metadata("custom_key".into()), + "custom_value".into(), + ); + + let opts = PutOptions { + attributes: attributes.clone(), + ..Default::default() + }; + + integration + .put_opts(&location, data.into(), opts) + .await + .unwrap(); + + let result = integration.get(&location).await.unwrap(); + assert_eq!(result.attributes, attributes); + + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), data.as_bytes()); + } + + #[tokio::test] + async fn test_multipart_attributes() { + let root = TempDir::new().unwrap(); + let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap(); + + let location = Path::from("multipart_file"); + + let mut attributes = Attributes::new(); + attributes.insert(Attribute::ContentType, "application/octet-stream".into()); + attributes.insert(Attribute::Metadata("part_count".into()), "2".into()); + + let opts = crate::PutMultipartOptions { + attributes: attributes.clone(), + ..Default::default() + }; + + let mut upload = integration + .put_multipart_opts(&location, opts) + .await + .unwrap(); + upload.put_part("part1".into()).await.unwrap(); + upload.put_part("part2".into()).await.unwrap(); + upload.complete().await.unwrap(); + + let result = integration.get(&location).await.unwrap(); + assert_eq!(result.attributes, attributes); + + let bytes = result.bytes().await.unwrap(); + assert_eq!(bytes.as_ref(), b"part1part2"); + } +}