Skip to content

Commit 4ed5209

Browse files
committed
Add support for Microsoft Azure in ObjectStore
1 parent 1a92214 commit 4ed5209

File tree

4 files changed

+217
-2
lines changed

4 files changed

+217
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ getrandom = { version = "0.3.1", features = ["std"] }
3737
itertools = "0.14.0"
3838
lazy_static = "1.5.0"
3939
lru = "0.16.0"
40-
object_store = { version = "0.12", features = ["aws", "gcp"] }
40+
object_store = { version = "0.12", features = ["aws", "gcp", "azure"] }
4141
murmur3 = { version = "0.5.2" }
4242
parquet = { version = "56", features = ["async", "object_store"] }
4343
pin-project-lite = "0.2"

iceberg-rust/src/object_store/mod.rs

Lines changed: 140 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::{fmt::Display, path::Path, str::FromStr, sync::Arc};
66

77
use object_store::{
88
aws::{AmazonS3Builder, AmazonS3ConfigKey, S3CopyIfNotExists},
9+
azure::{AzureConfigKey, MicrosoftAzureBuilder},
910
gcp::{GoogleCloudStorageBuilder, GoogleConfigKey},
1011
local::LocalFileSystem,
1112
memory::InMemory,
@@ -24,6 +25,8 @@ pub enum Bucket<'s> {
2425
S3(&'s str),
2526
/// GCS bucket
2627
GCS(&'s str),
28+
/// Azure container
29+
Azure(&'s str),
2730
/// No bucket
2831
Local,
2932
}
@@ -33,13 +36,14 @@ impl Display for Bucket<'_> {
3336
match self {
3437
Bucket::S3(s) => write!(f, "s3://{s}"),
3538
Bucket::GCS(s) => write!(f, "gs://{s}"),
39+
Bucket::Azure(s) => write!(f, "https://{s}"),
3640
Bucket::Local => write!(f, ""),
3741
}
3842
}
3943
}
4044

4145
impl Bucket<'_> {
42-
/// Get the bucket and coud provider from the location string
46+
/// Get the bucket and cloud provider from the location string
4347
pub fn from_path(path: &str) -> Result<Bucket<'_>, Error> {
4448
if path.starts_with("s3://") || path.starts_with("s3a://") {
4549
let prefix = if path.starts_with("s3://") {
@@ -63,6 +67,17 @@ impl Bucket<'_> {
6367
.next()
6468
.map(Bucket::GCS)
6569
.ok_or(Error::NotFound(format!("Bucket in path {path}")))
70+
} else if path.starts_with("https://")
71+
&& (path.contains("dfs.core.windows.net")
72+
|| path.contains("blob.core.windows.net")
73+
|| path.contains("dfs.fabric.microsoft.com")
74+
|| path.contains("blob.fabric.microsoft.com"))
75+
{
76+
path.trim_start_matches("https://")
77+
.split('/')
78+
.nth(1)
79+
.map(Bucket::Azure)
80+
.ok_or(Error::NotFound(format!("Bucket in path {path}")))
6681
} else {
6782
Ok(Bucket::Local)
6883
}
@@ -72,6 +87,8 @@ impl Bucket<'_> {
7287
/// A wrapper for ObjectStore builders that can be used as a template to generate an ObjectStore given a particular bucket.
7388
#[derive(Debug, Clone)]
7489
pub enum ObjectStoreBuilder {
90+
/// Microsoft Azure builder
91+
Azure(Box<MicrosoftAzureBuilder>),
7592
/// AWS s3 builder
7693
S3(Box<AmazonS3Builder>),
7794
/// Google Cloud Storage builder
@@ -84,6 +101,8 @@ pub enum ObjectStoreBuilder {
84101

85102
/// Configuration keys for [ObjectStoreBuilder]
86103
pub enum ConfigKey {
104+
/// Configuration keys for Microsoft Azure
105+
Azure(AzureConfigKey),
87106
/// Configuration keys for AWS S3
88107
AWS(AmazonS3ConfigKey),
89108
/// Configuration keys for GCS
@@ -93,6 +112,9 @@ pub enum ConfigKey {
93112
impl FromStr for ConfigKey {
94113
type Err = object_store::Error;
95114
fn from_str(s: &str) -> Result<Self, Self::Err> {
115+
if let Ok(x) = s.parse() {
116+
return Ok(ConfigKey::Azure(x));
117+
};
96118
if let Ok(x) = s.parse() {
97119
return Ok(ConfigKey::AWS(x));
98120
};
@@ -106,6 +128,10 @@ impl FromStr for ConfigKey {
106128
}
107129
}
108130
impl ObjectStoreBuilder {
131+
/// Create a new Microsoft Azure ObjectStoreBuilder
132+
pub fn azure() -> Self {
133+
ObjectStoreBuilder::Azure(Box::new(MicrosoftAzureBuilder::from_env()))
134+
}
109135
/// Create new AWS S3 Object Store builder
110136
pub fn s3() -> Self {
111137
ObjectStoreBuilder::S3(Box::new(AmazonS3Builder::from_env()))
@@ -125,6 +151,9 @@ impl ObjectStoreBuilder {
125151
/// Set config value for builder
126152
pub fn with_config(self, key: ConfigKey, value: impl Into<String>) -> Self {
127153
match (self, key) {
154+
(ObjectStoreBuilder::Azure(azure), ConfigKey::Azure(key)) => {
155+
ObjectStoreBuilder::Azure(Box::new(azure.with_config(key, value)))
156+
}
128157
(ObjectStoreBuilder::S3(aws), ConfigKey::AWS(key)) => {
129158
ObjectStoreBuilder::S3(Box::new(aws.with_config(key, value)))
130159
}
@@ -137,6 +166,13 @@ impl ObjectStoreBuilder {
137166
/// Create objectstore from template
138167
pub fn build(&self, bucket: Bucket) -> Result<Arc<dyn ObjectStore>, Error> {
139168
match (bucket, self) {
169+
(Bucket::Azure(bucket), Self::Azure(builder)) => Ok::<_, Error>(Arc::new(
170+
(**builder)
171+
.clone()
172+
.with_container_name(bucket)
173+
.build()
174+
.map_err(Error::from)?,
175+
)),
140176
(Bucket::S3(bucket), Self::S3(builder)) => Ok::<_, Error>(Arc::new(
141177
(**builder)
142178
.clone()
@@ -158,3 +194,106 @@ impl ObjectStoreBuilder {
158194
}
159195
}
160196
}
197+
198+
#[cfg(test)]
199+
mod tests {
200+
use super::*;
201+
202+
#[test]
203+
fn test_from_path_s3() {
204+
let bucket = Bucket::from_path("s3://my-bucket/path/to/file").unwrap();
205+
match bucket {
206+
Bucket::S3(name) => assert_eq!(name, "my-bucket"),
207+
_ => panic!("Expected S3 bucket"),
208+
}
209+
}
210+
211+
#[test]
212+
fn test_from_path_s3a() {
213+
let bucket = Bucket::from_path("s3a://my-bucket/path/to/file").unwrap();
214+
match bucket {
215+
Bucket::S3(name) => assert_eq!(name, "my-bucket"),
216+
_ => panic!("Expected S3 bucket"),
217+
}
218+
}
219+
220+
#[test]
221+
fn test_from_path_gcs() {
222+
let bucket = Bucket::from_path("gcs://my-bucket/path/to/file").unwrap();
223+
match bucket {
224+
Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
225+
_ => panic!("Expected GCS bucket"),
226+
}
227+
}
228+
229+
#[test]
230+
fn test_from_path_gs() {
231+
let bucket = Bucket::from_path("gs://my-bucket/path/to/file").unwrap();
232+
match bucket {
233+
Bucket::GCS(name) => assert_eq!(name, "my-bucket"),
234+
_ => panic!("Expected GCS bucket"),
235+
}
236+
}
237+
238+
#[test]
239+
fn test_from_path_azure_dfs() {
240+
let bucket =
241+
Bucket::from_path("https://mystorageaccount.dfs.core.windows.net/container/path")
242+
.unwrap();
243+
match bucket {
244+
Bucket::Azure(name) => assert_eq!(name, "container"),
245+
_ => panic!("Expected Azure bucket"),
246+
}
247+
}
248+
249+
#[test]
250+
fn test_from_path_azure_blob() {
251+
let bucket =
252+
Bucket::from_path("https://mystorageaccount.blob.core.windows.net/container/path")
253+
.unwrap();
254+
match bucket {
255+
Bucket::Azure(name) => assert_eq!(name, "container"),
256+
_ => panic!("Expected Azure bucket"),
257+
}
258+
}
259+
260+
#[test]
261+
fn test_from_path_azure_fabric_dfs() {
262+
let bucket =
263+
Bucket::from_path("https://mystorageaccount.dfs.fabric.microsoft.com/container/path")
264+
.unwrap();
265+
match bucket {
266+
Bucket::Azure(name) => assert_eq!(name, "container"),
267+
_ => panic!("Expected Azure bucket"),
268+
}
269+
}
270+
271+
#[test]
272+
fn test_from_path_azure_fabric_blob() {
273+
let bucket =
274+
Bucket::from_path("https://mystorageaccount.blob.fabric.microsoft.com/container/path")
275+
.unwrap();
276+
match bucket {
277+
Bucket::Azure(name) => assert_eq!(name, "container"),
278+
_ => panic!("Expected Azure bucket"),
279+
}
280+
}
281+
282+
#[test]
283+
fn test_from_path_local() {
284+
let bucket = Bucket::from_path("/local/path/to/file").unwrap();
285+
match bucket {
286+
Bucket::Local => {}
287+
_ => panic!("Expected Local bucket"),
288+
}
289+
}
290+
291+
#[test]
292+
fn test_from_path_https_non_azure() {
293+
let bucket = Bucket::from_path("https://example.com/path").unwrap();
294+
match bucket {
295+
Bucket::Local => {}
296+
_ => panic!("Expected Local bucket"),
297+
}
298+
}
299+
}

iceberg-rust/src/object_store/parse.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
use crate::error::Error;
55
use object_store::aws::{AmazonS3Builder, AmazonS3ConfigKey};
6+
use object_store::azure::{AzureConfigKey, MicrosoftAzureBuilder};
67
use object_store::gcp::{GcpCredential, GoogleCloudStorageBuilder, GoogleConfigKey};
78
use object_store::{parse_url_opts, ObjectStore, ObjectStoreScheme, StaticCredentialProvider};
89
use std::collections::HashMap;
@@ -23,6 +24,12 @@ const GCS_BUCKET: &str = "gcs.bucket";
2324
const GCS_CREDENTIALS_JSON: &str = "gcs.credentials-json";
2425
const GCS_TOKEN: &str = "gcs.oauth2.token";
2526

27+
/// Azure configs
28+
const AZURE_CONTAINER_NAME: &str = "azure.container-name";
29+
const AZURE_ENDPOINT: &str = "azure.endpoint";
30+
const AZURE_STORAGE_ACCESS_KEY: &str = "azure.access-key";
31+
const AZURE_STORAGE_ACCOUNT_NAME: &str = "azure.account-name";
32+
2633
/// Parse the url and Iceberg format of variuos storage options into the equivalent `object_store`
2734
/// options and build the corresponding `ObjectStore`.
2835
pub fn object_store_from_config(
@@ -73,6 +80,27 @@ pub fn object_store_from_config(
7380
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
7481
}
7582

83+
(ObjectStoreScheme::MicrosoftAzure, _) => {
84+
let mut builder = MicrosoftAzureBuilder::new().with_url(url);
85+
for (key, option) in config {
86+
let azure_key = match key.as_str() {
87+
AZURE_CONTAINER_NAME => AzureConfigKey::ContainerName,
88+
AZURE_STORAGE_ACCOUNT_NAME => AzureConfigKey::AccountName,
89+
AZURE_STORAGE_ACCESS_KEY => AzureConfigKey::AccessKey,
90+
AZURE_ENDPOINT => {
91+
if option.starts_with("http://") {
92+
// This is mainly used for testing, e.g. against Azurite
93+
builder = builder.with_allow_http(true);
94+
}
95+
AzureConfigKey::Endpoint
96+
}
97+
_ => continue,
98+
};
99+
builder = builder.with_config(azure_key, option);
100+
}
101+
Arc::new(builder.build()?) as Arc<dyn ObjectStore>
102+
}
103+
76104
_ => {
77105
let (store, _path) = parse_url_opts(&url, config)?;
78106
store.into()
@@ -169,4 +197,51 @@ mod tests {
169197
assert!(store_repr.contains("bearer: \"oauth-token-123\""));
170198
assert!(store_repr.contains("bucket_name: \"test-bucket\""));
171199
}
200+
201+
#[test]
202+
fn test_azure_config_basic() {
203+
let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
204+
let mut config = HashMap::new();
205+
config.insert(
206+
AZURE_STORAGE_ACCOUNT_NAME.to_string(),
207+
"testaccount".to_string(),
208+
);
209+
config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
210+
211+
let store = object_store_from_config(url, config).unwrap();
212+
let store_repr = format!("{store:?}");
213+
214+
println!("{}", store_repr);
215+
assert!(store_repr.contains("account: \"testaccount\""));
216+
assert!(store_repr.contains("container: \"test-container\""));
217+
assert!(store_repr.contains("host: Some(Domain(\"testaccount.blob.core.windows.net\"))"));
218+
assert!(store_repr.contains("port: None"));
219+
assert!(store_repr.contains("scheme: \"https\""));
220+
assert!(store_repr.contains("allow_http: Parsed(false)"));
221+
}
222+
223+
#[test]
224+
fn test_azure_config_with_http_endpoint() {
225+
let url = Url::parse("https://testaccount.blob.core.windows.net/test-container").unwrap();
226+
let mut config = HashMap::new();
227+
config.insert(
228+
AZURE_ENDPOINT.to_string(),
229+
"http://localhost:9000".to_string(),
230+
);
231+
config.insert(
232+
AZURE_STORAGE_ACCOUNT_NAME.to_string(),
233+
"testaccount".to_string(),
234+
);
235+
config.insert(AZURE_STORAGE_ACCESS_KEY.to_string(), "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string());
236+
237+
let store = object_store_from_config(url, config).unwrap();
238+
let store_repr = format!("{store:?}");
239+
240+
assert!(store_repr.contains("account: \"testaccount\""));
241+
assert!(store_repr.contains("container: \"test-container\""));
242+
assert!(store_repr.contains("host: Some(Domain(\"localhost\"))"));
243+
assert!(store_repr.contains("port: Some(9000)"));
244+
assert!(store_repr.contains("scheme: \"http\""));
245+
assert!(store_repr.contains("allow_http: Parsed(true)"));
246+
}
172247
}

0 commit comments

Comments
 (0)