Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 122 additions & 0 deletions src/azure/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,26 @@ impl MicrosoftAzureBuilder {
self.container_name = Some(validate(host)?);
} else {
match host.split_once('.') {
// Workspace-level Private Link detection
// "{workspaceid}.z??.(onelake|dfs|blob).fabric.microsoft.com"
Some((workspaceid, rest))
if rest.starts_with('z') && rest.ends_with("fabric.microsoft.com") =>
{
// Account name for WS-PL is two labels: "{workspaceid}.z{xy}"
let (zone, _) = rest.split_once('.').unwrap_or((rest, ""));

self.account_name = Some(format!("{workspaceid}.{zone}"));
self.endpoint = Some(format!("https://{}", host));

self.container_name = Some(validate(parsed.username())?);
self.use_fabric_endpoint = true.into();
}

Some((a, "dfs.core.windows.net")) | Some((a, "blob.core.windows.net")) => {
self.account_name = Some(validate(a)?);
self.container_name = Some(validate(parsed.username())?);
}

Some((a, "dfs.fabric.microsoft.com"))
| Some((a, "blob.fabric.microsoft.com")) => {
self.account_name = Some(validate(a)?);
Expand All @@ -681,6 +697,30 @@ impl MicrosoftAzureBuilder {
}
}
"https" => match host.split_once('.') {
// Workspace-level Private Link detection
// "{workspaceid}.z??.(onelake|dfs|blob).fabric.microsoft.com"
Some((workspaceid, rest))
if rest.starts_with('z') && rest.ends_with("fabric.microsoft.com") =>
{
// rest looks like: "z28.dfs.fabric.microsoft.com" / "z28.blob.fabric.microsoft.com" / etc.
// Account name for WS-PL is two labels: "{workspaceid}.z{xy}"
let (zone, _) = rest.split_once('.').unwrap_or((rest, ""));

self.account_name = Some(format!("{workspaceid}.{zone}"));
self.endpoint = Some(format!("https://{}", host));

// Attempt to infer the container name from the URL
let container = parsed.path_segments().unwrap().next().expect(
"iterator always contains at least one string (which may be empty)",
);

if !container.is_empty() {
self.container_name = Some(validate(container)?);
}

self.use_fabric_endpoint = true.into();
}

Some((a, "dfs.core.windows.net")) | Some((a, "blob.core.windows.net")) => {
self.account_name = Some(validate(a)?);
let container = parsed.path_segments().unwrap().next().expect(
Expand Down Expand Up @@ -1203,6 +1243,17 @@ mod tests {
assert_eq!(builder.container_name.as_deref(), Some("container"));
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://onelake.dfs.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456.lakehouse/Files/tables/sales/data.parquet")
.unwrap();
assert_eq!(builder.account_name, Some("onelake".to_string()));
assert_eq!(
builder.container_name.as_deref(),
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3")
);
assert!(builder.use_fabric_endpoint.get().unwrap());

let mut builder = MicrosoftAzureBuilder::new();
builder
.parse_url("https://account.blob.fabric.microsoft.com/")
Expand Down Expand Up @@ -1234,6 +1285,77 @@ mod tests {
}
}

#[test]
fn azure_test_workspace_private_link() {
let test_cases: Vec<(&str, &str, Option<&str>)> = vec![
(
"https://Ab000000000000000000000000000000.zAb.dfs.fabric.microsoft.com/",
"ab000000000000000000000000000000.zab",
None,
),
(
"https://ab000000000000000000000000000000.zab.dfs.fabric.microsoft.com/",
"ab000000000000000000000000000000.zab",
None,
),
(
"https://c047b3e34e89407a98d7cf9949ae92a3.zc0.blob.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3"),
),
(
"https://c047b3e34e89407a98d7cf9949ae92a3.zc0.dfs.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3"),
),
(
"https://c047b3e34e89407a98d7cf9949ae92a3.zc0.onelake.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3"),
),
(
"https://c047b3e34e89407a98d7cf9949ae92a3.zc0.w.api.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3"),
),
(
"https://c047b3e34e89407a98d7cf9949ae92a3.zc0.c.api.fabric.microsoft.com/c047b3e3-4e89-407a-98d7-cf9949ae92a3/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e3-4e89-407a-98d7-cf9949ae92a3"),
),
(
"abfss://c047b3e34e89407a98d7cf9949ae92a3@c047b3e34e89407a98d7cf9949ae92a3.zc0.dfs.fabric.microsoft.com/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e34e89407a98d7cf9949ae92a3"),
),
(
"abfss://c047b3e34e89407a98d7cf9949ae92a3@c047b3e34e89407a98d7cf9949ae92a3.zc0.blob.fabric.microsoft.com/9f1a2b3c-4d5e-6f70-8a9b-c0d1e2f3a456/file",
"c047b3e34e89407a98d7cf9949ae92a3.zc0",
Some("c047b3e34e89407a98d7cf9949ae92a3"),
),
];

for (url, expected_account, expected_container) in &test_cases {
let mut builder = MicrosoftAzureBuilder::new();
builder.parse_url(url).unwrap();

assert_eq!(
builder.account_name.as_deref(),
Some(*expected_account),
"account mismatch for URL: {url}"
);
assert_eq!(
builder.container_name.as_deref(),
*expected_container,
"container mismatch for URL: {url}"
);
assert!(
builder.use_fabric_endpoint.get().unwrap(),
"use_fabric_endpoint not set for URL: {url}"
);
}
}

#[test]
fn azure_test_config_from_map() {
let azure_client_id = "object_store:fake_access_key_id";
Expand Down
41 changes: 41 additions & 0 deletions src/azure/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,47 @@ mod tests {
}
}

#[ignore = "Used for manual testing against a real Workspace Private Link Endpoint."]
#[tokio::test]
async fn azure_onelake_wspl_test() {
maybe_skip_integration!();

let url =
std::env::var("AZURE_ONELAKE_URL").expect("Set AZURE_ONELAKE_URL to a WS-PL FQDN");
let parsed = url::Url::parse(&url).unwrap();

let path = match parsed.scheme() {
"abfss" | "abfs" => {
// abfss://<container>@<host>/<path...>
// container is in username, entire path is the object path
let segments: Vec<&str> = parsed.path_segments().unwrap().collect();
Path::from(segments.join("/"))
}
_ => {
// https://<host>/<container>/<path...>
// first segment is container, rest is the object path
let segments: Vec<&str> = parsed.path_segments().unwrap().collect();
Path::from(segments[1..].join("/"))
}
};

let store = MicrosoftAzureBuilder::new()
.with_url(&url)
.with_bearer_token_authorization(
std::env::var("AZURE_STORAGE_TOKEN").expect("Set AZURE_STORAGE_TOKEN"),
)
.build()
.unwrap();

let data = Bytes::from("Hello OneLake WSPL");

store.put(&path, data.clone().into()).await.unwrap();
let result = store.get(&path).await.unwrap();
let loaded = result.bytes().await.unwrap();
assert_eq!(data, loaded);
store.delete(&path).await.unwrap();
}

#[ignore = "Used for manual testing against a real storage account."]
#[tokio::test]
async fn test_user_delegation_key() {
Expand Down