Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/core/src/layers/correctness_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ impl<A: Access> LayeredAccess for CorrectnessAccessor<A> {
"if_match",
));
}
if args.source_if_match().is_some() && !capability.copy_with_source_if_match {
return Err(new_unsupported_error(
&self.info,
Operation::Copy,
"source_if_match",
));
}

self.inner.copy(from, to, args, opts).await
}
Expand Down
16 changes: 16 additions & 0 deletions core/core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ impl From<options::WriteOptions> for (OpWrite, OpWriter) {
pub struct OpCopy {
if_not_exists: bool,
if_match: Option<String>,
source_if_match: Option<String>,
}

impl OpCopy {
Expand Down Expand Up @@ -930,6 +931,20 @@ impl OpCopy {
pub fn if_match(&self) -> Option<&str> {
self.if_match.as_deref()
}

/// Set the source_if_match condition for the operation.
///
/// When set, the copy operation will only proceed if the source object's
/// ETag matches the given value.
pub fn with_source_if_match(mut self, source_if_match: impl Into<String>) -> Self {
self.source_if_match = Some(source_if_match.into());
self
}

/// Get source_if_match condition.
pub fn source_if_match(&self) -> Option<&str> {
self.source_if_match.as_deref()
}
}

/// Args for `copier` operation.
Expand Down Expand Up @@ -986,6 +1001,7 @@ impl From<options::CopyOptions> for (OpCopy, OpCopier) {
OpCopy {
if_not_exists: value.if_not_exists,
if_match: value.if_match,
source_if_match: value.source_if_match,
},
OpCopier {
concurrent: value.concurrent.max(1),
Expand Down
2 changes: 2 additions & 0 deletions core/core/src/types/capability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ pub struct Capability {
pub copy_with_if_not_exists: bool,
/// Indicates if conditional copy operations with if-match are supported.
pub copy_with_if_match: bool,
/// Indicates if conditional copy operations with source-side if-match are supported.
pub copy_with_source_if_match: bool,
/// Indicates if copy operations can be split into multiple server-side tasks.
pub copy_can_multi: bool,
/// Maximum size supported for segmented copy tasks.
Expand Down
9 changes: 9 additions & 0 deletions core/core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,15 @@ impl<F: Future<Output = Result<Metadata>>> FutureCopy<F> {
self
}

/// Sets the condition that copy operation will succeed only if the source
/// object currently has the given ETag.
///
/// Refer to [`options::CopyOptions::source_if_match`] for more details.
pub fn source_if_match(mut self, etag: &str) -> Self {
self.args.0.source_if_match = Some(etag.to_string());
self
}

/// Sets concurrent copy operations for this copy.
///
/// Refer to [`options::CopyOptions::concurrent`] for more details.
Expand Down
13 changes: 13 additions & 0 deletions core/core/src/types/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,19 @@ pub struct CopyOptions {
/// destination object's ETag matches the given value.
pub if_match: Option<String>,

/// Sets the condition that copy operation will succeed only if the source
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

S3 copy supports multiple conditions, do we want to support them all, so we could declare we support S3 condition copy with source check?

x-amz-copy-source: CopySource
x-amz-copy-source-if-match: CopySourceIfMatch
x-amz-copy-source-if-modified-since: CopySourceIfModifiedSince
x-amz-copy-source-if-none-match: CopySourceIfNoneMatch
x-amz-copy-source-if-unmodified-since: CopySourceIfUnmodifiedSince

ref: https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html#API_CopyObject_RequestSyntax

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, makes sense, I will extend this PR to support all of them

/// object currently has the given ETag.
///
/// ### Capability
///
/// Check [`Capability::copy_with_source_if_match`] before using this feature.
///
/// ### Behavior
///
/// - If supported, the copy operation will only succeed when the source
/// object's ETag matches the given value.
pub source_if_match: Option<String>,

/// Known content length of the source object.
///
/// This is an execution hint that allows OpenDAL to avoid extra metadata
Expand Down
7 changes: 7 additions & 0 deletions core/layers/capability-check/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ impl<A: Access> LayeredAccess for CapabilityAccessor<A> {
"if_not_exists",
));
}
if args.source_if_match().is_some() && !capability.copy_with_source_if_match {
return Err(new_unsupported_error(
self.info.as_ref(),
Operation::Copy,
"source_if_match",
));
}

self.inner.copy(from, to, args, opts).await
}
Expand Down
1 change: 1 addition & 0 deletions core/services/s3/src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -983,6 +983,7 @@ impl Builder for S3Builder {
copy_can_multi: true,
copy_with_if_not_exists: true,
copy_with_if_match: true,
copy_with_source_if_match: true,
// The min multipart size of S3 is 5 MiB.
//
// ref: <https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html>
Expand Down
7 changes: 7 additions & 0 deletions core/services/s3/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ use opendal_core::*;
pub mod constants {
pub const X_AMZ_COPY_SOURCE: &str = "x-amz-copy-source";
pub const X_AMZ_COPY_SOURCE_RANGE: &str = "x-amz-copy-source-range";
pub const X_AMZ_COPY_SOURCE_IF_MATCH: &str = "x-amz-copy-source-if-match";

pub const X_AMZ_SERVER_SIDE_ENCRYPTION: &str = "x-amz-server-side-encryption";
pub const X_AMZ_SERVER_REQUEST_PAYER: (&str, &str) = ("x-amz-request-payer", "requester");
Expand Down Expand Up @@ -669,6 +670,9 @@ impl S3Core {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if let Some(source_if_match) = args.source_if_match() {
req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match);
}

// Set SSE headers.
req = self.insert_sse_headers(req, true);
Expand Down Expand Up @@ -1119,6 +1123,9 @@ impl S3Core {
if let Some(if_match) = args.if_match() {
req = req.header(IF_MATCH, if_match);
}
if let Some(source_if_match) = args.source_if_match() {
req = req.header(constants::X_AMZ_COPY_SOURCE_IF_MATCH, source_if_match);
}

// Set request payer header if enabled.
req = self.insert_request_payer_header(req);
Expand Down
72 changes: 72 additions & 0 deletions core/tests/behavior/async_copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ pub fn tests(op: &Operator, tests: &mut Vec<Trial>) {
test_copier_with_if_not_exists_to_existing_file
))
}

if cap.read && cap.write && cap.copy && cap.copy_with_source_if_match {
tests.extend(async_trials!(
op,
test_copy_with_source_if_match_match,
test_copy_with_source_if_match_mismatch
))
}
}

fn copy_multi_chunk_size(cap: Capability) -> Option<(usize, usize)> {
Expand Down Expand Up @@ -349,6 +357,70 @@ pub async fn test_copy_with_if_not_exists_to_existing_file(op: Operator) -> Resu
Ok(())
}

/// Copy with source_if_match matching the source ETag should succeed.
pub async fn test_copy_with_source_if_match_match(op: Operator) -> Result<()> {
if !op.info().full_capability().copy_with_source_if_match {
return Ok(());
}

let source_path = uuid::Uuid::new_v4().to_string();
let (source_content, _) = gen_bytes(op.info().full_capability());
op.write(&source_path, source_content.clone()).await?;

let Some(etag) = op.stat(&source_path).await?.etag().map(|s| s.to_string()) else {
op.delete(&source_path).await.expect("delete must succeed");
return Ok(());
};

let target_path = uuid::Uuid::new_v4().to_string();

op.copy_with(&source_path, &target_path)
.source_if_match(&etag)
.await?;

let target_content = op
.read(&target_path)
.await
.expect("read must succeed")
.to_bytes();
assert_eq!(
sha256_digest(target_content),
sha256_digest(&source_content),
);

op.delete(&source_path).await.expect("delete must succeed");
op.delete(&target_path).await.expect("delete must succeed");
Ok(())
}

/// Copy with source_if_match not matching should fail with ConditionNotMatch.
pub async fn test_copy_with_source_if_match_mismatch(op: Operator) -> Result<()> {
if !op.info().full_capability().copy_with_source_if_match {
return Ok(());
}

let source_path = uuid::Uuid::new_v4().to_string();
let (source_content, _) = gen_bytes(op.info().full_capability());
op.write(&source_path, source_content.clone()).await?;

let target_path = uuid::Uuid::new_v4().to_string();

let err = op
.copy_with(&source_path, &target_path)
.source_if_match("\"00000000000000000000000000000000\"")
.await
.expect_err("copy must fail");
assert_eq!(err.kind(), ErrorKind::ConditionNotMatch);

assert!(
!op.exists(&target_path).await.expect("exists must succeed"),
"target must not be created on mismatch"
);

op.delete(&source_path).await.expect("delete must succeed");
Ok(())
}

/// Copy with chunk should copy a file successfully.
pub async fn test_copy_with_chunk(op: Operator) -> Result<()> {
let cap = op.info().full_capability();
Expand Down
Loading