Skip to content

Commit 926b4ab

Browse files
author
Jan Kaul
committed
register schema in catalog mirror
1 parent 44593db commit 926b4ab

File tree

2 files changed

+44
-3
lines changed

2 files changed

+44
-3
lines changed

datafusion_iceberg/src/catalog/catalog.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,14 @@ impl CatalogProvider for IcebergCatalog {
5757

5858
fn register_schema(
5959
&self,
60-
_name: &str,
60+
name: &str,
6161
_schema: Arc<dyn SchemaProvider>,
6262
) -> Result<Option<Arc<dyn SchemaProvider>>> {
63-
unimplemented!()
63+
let old_namespace = self.catalog.register_schema(name)?;
64+
if old_namespace.is_some() {
65+
Ok(self.schema(name))
66+
} else {
67+
Ok(None)
68+
}
6469
}
6570
}

datafusion_iceberg/src/catalog/mirror.rs

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use dashmap::DashMap;
22
use datafusion::{datasource::TableProvider, error::DataFusionError};
33
use std::{collections::HashSet, sync::Arc};
4+
use tokio::runtime::Handle;
45

56
use iceberg_rust::spec::view_metadata::REF_PREFIX;
67
use iceberg_rust::{
@@ -12,7 +13,7 @@ use crate::{error::Error, DataFusionTable};
1213

1314
type NamespaceNode = HashSet<String>;
1415

15-
#[derive(Debug)]
16+
#[derive(Debug, Clone)]
1617
enum Node {
1718
Namespace(NamespaceNode),
1819
Relation(Identifier),
@@ -186,4 +187,39 @@ impl Mirror {
186187
pub fn catalog(&self) -> Arc<dyn Catalog> {
187188
self.catalog.clone()
188189
}
190+
191+
pub fn register_schema(&self, name: &str) -> Result<Option<NamespaceNode>, DataFusionError> {
192+
let namespace = Namespace::try_new(
193+
&name
194+
.split('.')
195+
.map(|z| z.to_owned())
196+
.collect::<Vec<String>>(),
197+
)
198+
.map_err(|err| DataFusionError::External(Box::new(err)))?;
199+
200+
let old_value =
201+
self.storage
202+
.get(&namespace.to_string())
203+
.and_then(|entry| match entry.value() {
204+
Node::Namespace(namespace_node) => Some(namespace_node.clone()),
205+
_ => None,
206+
});
207+
208+
let handle = Handle::current();
209+
handle.spawn({
210+
let catalog = self.catalog.clone();
211+
let storage = self.storage.clone();
212+
async move {
213+
catalog
214+
.clone()
215+
.create_namespace(&namespace, None)
216+
.await
217+
.map_err(|err| DataFusionError::External(Box::new(err)))
218+
.unwrap();
219+
220+
storage.insert(namespace.to_string(), Node::Namespace(HashSet::new()));
221+
}
222+
});
223+
Ok(old_value)
224+
}
189225
}

0 commit comments

Comments
 (0)