Skip to content

Commit 72088de

Browse files
authored
Add ObjectStoreRegistry (#347) (#375)
* Add ObjectStoreRegistry (#347) * Make path segment based * Fix doc * Additional test * Handle race * Review feedback * Fix prefix bug * Review feedback
1 parent 612d74f commit 72088de

File tree

3 files changed

+350
-0
lines changed

3 files changed

+350
-0
lines changed

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ pub mod local;
524524
pub mod memory;
525525
pub mod path;
526526
pub mod prefix;
527+
pub mod registry;
527528
#[cfg(feature = "cloud")]
528529
pub mod signer;
529530
pub mod throttle;

src/parse.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,15 @@ mod tests {
286286
"s3://bucket/foo%20bar",
287287
(ObjectStoreScheme::AmazonS3, "foo bar"),
288288
),
289+
(
290+
"s3://bucket/foo bar",
291+
(ObjectStoreScheme::AmazonS3, "foo bar"),
292+
),
293+
("s3://bucket/😀", (ObjectStoreScheme::AmazonS3, "😀")),
294+
(
295+
"s3://bucket/%F0%9F%98%80",
296+
(ObjectStoreScheme::AmazonS3, "😀"),
297+
),
289298
(
290299
"https://foo/bar%20baz",
291300
(ObjectStoreScheme::Http, "bar baz"),

src/registry.rs

Lines changed: 340 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,340 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Map object URLs to [`ObjectStore`]
19+
20+
use crate::path::{InvalidPart, Path, PathPart};
21+
use crate::{parse_url_opts, ObjectStore};
22+
use parking_lot::RwLock;
23+
use std::collections::HashMap;
24+
use std::sync::Arc;
25+
use url::Url;
26+
27+
/// [`ObjectStoreRegistry`] maps a URL to an [`ObjectStore`] instance
28+
pub trait ObjectStoreRegistry: Send + Sync + std::fmt::Debug + 'static {
29+
/// Register a new store for the provided store URL
30+
///
31+
/// If a store with the same URL existed before, it is replaced and returned
32+
fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>>;
33+
34+
/// Resolve an object URL
35+
///
36+
/// If [`ObjectStoreRegistry::register`] has been called with a URL with the same
37+
/// scheme, and authority as the object URL, and a path that is a prefix of the object
38+
/// URL's, it should be returned along with the trailing path. Paths should be matched
39+
/// on a path segment basis, and in the event of multiple possibilities the longest
40+
/// path match should be returned.
41+
///
42+
/// If a store hasn't been registered, an [`ObjectStoreRegistry`] may lazily create
43+
/// one if the URL is understood
44+
///
45+
/// For example
46+
///
47+
/// ```
48+
/// # use std::sync::Arc;
49+
/// # use url::Url;
50+
/// # use object_store::memory::InMemory;
51+
/// # use object_store::ObjectStore;
52+
/// # use object_store::prefix::PrefixStore;
53+
/// # use object_store::registry::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
54+
/// #
55+
/// let registry = DefaultObjectStoreRegistry::new();
56+
///
57+
/// let bucket1 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
58+
/// let base = Url::parse("s3://bucket1/").unwrap();
59+
/// registry.register(base, bucket1.clone());
60+
///
61+
/// let url = Url::parse("s3://bucket1/path/to/object").unwrap();
62+
/// let (ret, path) = registry.resolve(&url).unwrap();
63+
/// assert_eq!(path.as_ref(), "path/to/object");
64+
/// assert!(Arc::ptr_eq(&ret, &bucket1));
65+
///
66+
/// let bucket2 = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
67+
/// let base = Url::parse("https://s3.region.amazonaws.com/bucket").unwrap();
68+
/// registry.register(base, bucket2.clone());
69+
///
70+
/// let url = Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object").unwrap();
71+
/// let (ret, path) = registry.resolve(&url).unwrap();
72+
/// assert_eq!(path.as_ref(), "path/to/object");
73+
/// assert!(Arc::ptr_eq(&ret, &bucket2));
74+
///
75+
/// let bucket3 = Arc::new(PrefixStore::new(InMemory::new(), "path")) as Arc<dyn ObjectStore>;
76+
/// let base = Url::parse("https://s3.region.amazonaws.com/bucket/path").unwrap();
77+
/// registry.register(base, bucket3.clone());
78+
///
79+
/// let url = Url::parse("https://s3.region.amazonaws.com/bucket/path/to/object").unwrap();
80+
/// let (ret, path) = registry.resolve(&url).unwrap();
81+
/// assert_eq!(path.as_ref(), "to/object");
82+
/// assert!(Arc::ptr_eq(&ret, &bucket3));
83+
/// ```
84+
fn resolve(&self, url: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)>;
85+
}
86+
87+
/// Error type for [`DefaultObjectStoreRegistry`]
88+
///
89+
/// Crate private/opaque type to make the error handling code more ergonomic.
90+
/// Always converted into `crate::Error` when reported externally.
91+
#[derive(Debug, thiserror::Error)]
92+
#[non_exhaustive]
93+
enum Error {
94+
#[error("ObjectStore not found")]
95+
NotFound,
96+
97+
#[error("Error parsing URL path segment")]
98+
InvalidPart(#[from] InvalidPart),
99+
}
100+
101+
impl From<Error> for crate::Error {
102+
fn from(value: Error) -> Self {
103+
Self::Generic {
104+
store: "ObjectStoreRegistry",
105+
source: Box::new(value),
106+
}
107+
}
108+
}
109+
110+
/// An [`ObjectStoreRegistry`] that uses [`parse_url_opts`] to create stores based on the environment
111+
#[derive(Debug, Default)]
112+
pub struct DefaultObjectStoreRegistry {
113+
/// Mapping from [`url_key`] to [`PathEntry`]
114+
map: RwLock<HashMap<String, PathEntry>>,
115+
}
116+
117+
/// [`PathEntry`] construct a tree of path segments starting from the root
118+
///
119+
/// For example the following paths
120+
///
121+
/// * `/` => store1
122+
/// * `/foo/bar` => store2
123+
///
124+
/// Would be represented by
125+
///
126+
/// ```yaml
127+
/// store: Some(store1)
128+
/// children:
129+
/// foo:
130+
/// store: None
131+
/// children:
132+
/// bar:
133+
/// store: Some(store2)
134+
/// ```
135+
///
136+
#[derive(Debug, Default)]
137+
struct PathEntry {
138+
/// Store, if defined at this path
139+
store: Option<Arc<dyn ObjectStore>>,
140+
/// Child [`PathEntry`], keyed by the next path segment in their path
141+
children: HashMap<String, Self>,
142+
}
143+
144+
impl PathEntry {
145+
/// Lookup a store based on URL path
146+
///
147+
/// Returns the store and its path segment depth
148+
fn lookup(&self, to_resolve: &Url) -> Option<(&Arc<dyn ObjectStore>, usize)> {
149+
let mut current = self;
150+
let mut ret = self.store.as_ref().map(|store| (store, 0));
151+
let mut depth = 0;
152+
// Traverse the PathEntry tree to find the longest match
153+
for segment in path_segments(to_resolve.path()) {
154+
match current.children.get(segment) {
155+
Some(e) => {
156+
current = e;
157+
depth += 1;
158+
if let Some(store) = &current.store {
159+
ret = Some((store, depth))
160+
}
161+
}
162+
None => break,
163+
}
164+
}
165+
ret
166+
}
167+
}
168+
169+
impl DefaultObjectStoreRegistry {
170+
/// Create a new [`DefaultObjectStoreRegistry`]
171+
pub fn new() -> Self {
172+
Self::default()
173+
}
174+
}
175+
176+
impl ObjectStoreRegistry for DefaultObjectStoreRegistry {
177+
fn register(&self, url: Url, store: Arc<dyn ObjectStore>) -> Option<Arc<dyn ObjectStore>> {
178+
let mut map = self.map.write();
179+
let key = url_key(&url);
180+
let mut entry = map.entry(key.to_string()).or_default();
181+
182+
for segment in path_segments(url.path()) {
183+
entry = entry.children.entry(segment.to_string()).or_default();
184+
}
185+
entry.store.replace(store)
186+
}
187+
188+
fn resolve(&self, to_resolve: &Url) -> crate::Result<(Arc<dyn ObjectStore>, Path)> {
189+
let key = url_key(to_resolve);
190+
{
191+
let map = self.map.read();
192+
193+
if let Some((store, depth)) = map.get(key).and_then(|entry| entry.lookup(to_resolve)) {
194+
let path = path_suffix(to_resolve, depth)?;
195+
return Ok((Arc::clone(store), path));
196+
}
197+
}
198+
199+
if let Ok((store, path)) = parse_url_opts(to_resolve, std::env::vars()) {
200+
let depth = num_segments(to_resolve.path()) - num_segments(path.as_ref());
201+
202+
let mut map = self.map.write();
203+
let mut entry = map.entry(key.to_string()).or_default();
204+
for segment in path_segments(to_resolve.path()).take(depth) {
205+
entry = entry.children.entry(segment.to_string()).or_default();
206+
}
207+
let store = Arc::clone(match &entry.store {
208+
None => entry.store.insert(Arc::from(store)),
209+
Some(x) => x, // Racing creation - use existing
210+
});
211+
212+
let path = path_suffix(to_resolve, depth)?;
213+
return Ok((store, path));
214+
}
215+
216+
Err(Error::NotFound.into())
217+
}
218+
}
219+
220+
/// Extracts the scheme and authority of a URL (components before the Path)
221+
fn url_key(url: &Url) -> &str {
222+
&url[..url::Position::AfterPort]
223+
}
224+
225+
/// Returns the non-empty segments of a path
226+
///
227+
/// Note: We don't use [`Url::path_segments`] as we only want non-empty paths
228+
fn path_segments(s: &str) -> impl Iterator<Item = &str> {
229+
s.split('/').filter(|x| !x.is_empty())
230+
}
231+
232+
/// Returns the number of non-empty path segments in a path
233+
fn num_segments(s: &str) -> usize {
234+
path_segments(s).count()
235+
}
236+
237+
/// Returns the path of `url` skipping the first `depth` segments
238+
fn path_suffix(url: &Url, depth: usize) -> Result<Path, Error> {
239+
let segments = path_segments(url.path()).skip(depth);
240+
let path = segments.map(PathPart::parse).collect::<Result<_, _>>()?;
241+
Ok(path)
242+
}
243+
244+
#[cfg(test)]
245+
mod tests {
246+
use super::*;
247+
use crate::memory::InMemory;
248+
use crate::prefix::PrefixStore;
249+
250+
#[test]
251+
fn test_num_segments() {
252+
assert_eq!(num_segments(""), 0);
253+
assert_eq!(num_segments("/"), 0);
254+
assert_eq!(num_segments("/banana"), 1);
255+
assert_eq!(num_segments("banana"), 1);
256+
assert_eq!(num_segments("/banana/crumble"), 2);
257+
assert_eq!(num_segments("banana/crumble"), 2);
258+
}
259+
260+
#[test]
261+
fn test_default_registry() {
262+
let registry = DefaultObjectStoreRegistry::new();
263+
264+
// Should automatically register in memory store
265+
let banana_url = Url::parse("memory:///banana").unwrap();
266+
let (resolved, path) = registry.resolve(&banana_url).unwrap();
267+
assert_eq!(path.as_ref(), "banana");
268+
269+
// Should replace store
270+
let url = Url::parse("memory:///").unwrap();
271+
let root = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
272+
let replaced = registry.register(url, Arc::clone(&root)).unwrap();
273+
assert!(Arc::ptr_eq(&resolved, &replaced));
274+
275+
// Should not replace store
276+
let banana = Arc::new(PrefixStore::new(InMemory::new(), "banana")) as Arc<dyn ObjectStore>;
277+
assert!(registry
278+
.register(banana_url.clone(), Arc::clone(&banana))
279+
.is_none());
280+
281+
// Should resolve to banana store
282+
let (resolved, path) = registry.resolve(&banana_url).unwrap();
283+
assert_eq!(path.as_ref(), "");
284+
assert!(Arc::ptr_eq(&resolved, &banana));
285+
286+
// If we register another store it still resolves banana
287+
let apples_url = Url::parse("memory:///apples").unwrap();
288+
let apples = Arc::new(PrefixStore::new(InMemory::new(), "apples")) as Arc<dyn ObjectStore>;
289+
assert!(registry.register(apples_url, Arc::clone(&apples)).is_none());
290+
291+
// Should still resolve to banana store
292+
let (resolved, path) = registry.resolve(&banana_url).unwrap();
293+
assert_eq!(path.as_ref(), "");
294+
assert!(Arc::ptr_eq(&resolved, &banana));
295+
296+
// Should be path segment based
297+
let banana_muffins_url = Url::parse("memory:///banana_muffins").unwrap();
298+
let (resolved, path) = registry.resolve(&banana_muffins_url).unwrap();
299+
assert_eq!(path.as_ref(), "banana_muffins");
300+
assert!(Arc::ptr_eq(&resolved, &root));
301+
302+
// Should resolve to root even though path contains prefix of valid store
303+
let to_resolve = Url::parse("memory:///foo/banana").unwrap();
304+
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
305+
assert_eq!(path.as_ref(), "foo/banana");
306+
assert!(Arc::ptr_eq(&resolved, &root));
307+
308+
let nested_url = Url::parse("memory:///apples/bananas").unwrap();
309+
let nested =
310+
Arc::new(PrefixStore::new(InMemory::new(), "apples/bananas")) as Arc<dyn ObjectStore>;
311+
assert!(registry.register(nested_url, Arc::clone(&nested)).is_none());
312+
313+
let to_resolve = Url::parse("memory:///apples/bananas/muffins/cupcakes").unwrap();
314+
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
315+
assert_eq!(path.as_ref(), "muffins/cupcakes");
316+
assert!(Arc::ptr_eq(&resolved, &nested));
317+
318+
let nested_url2 = Url::parse("memory:///1/2/3").unwrap();
319+
let nested2 = Arc::new(PrefixStore::new(InMemory::new(), "1/2/3")) as Arc<dyn ObjectStore>;
320+
assert!(registry
321+
.register(nested_url2, Arc::clone(&nested2))
322+
.is_none());
323+
324+
let to_resolve = Url::parse("memory:///1/2/3/4/5/6").unwrap();
325+
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
326+
assert_eq!(path.as_ref(), "4/5/6");
327+
assert!(Arc::ptr_eq(&resolved, &nested2));
328+
329+
let custom_scheme_url = Url::parse("custom:///").unwrap();
330+
let custom_scheme = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
331+
assert!(registry
332+
.register(custom_scheme_url, Arc::clone(&custom_scheme))
333+
.is_none());
334+
335+
let to_resolve = Url::parse("custom:///6/7").unwrap();
336+
let (resolved, path) = registry.resolve(&to_resolve).unwrap();
337+
assert_eq!(path.as_ref(), "6/7");
338+
assert!(Arc::ptr_eq(&resolved, &custom_scheme));
339+
}
340+
}

0 commit comments

Comments
 (0)