1818use std:: collections:: HashMap ;
1919use std:: sync:: Arc ;
2020
21- use arrow_array:: { ArrayRef , Float64Array , Int32Array , LargeBinaryArray , StringArray } ;
21+ use arrow_array:: { ArrayRef , Float64Array , Int32Array , LargeBinaryArray , RecordBatch , StringArray } ;
2222use geo_types:: { Geometry , Point } ;
2323use iceberg:: spec:: { NestedField , PrimitiveType , Schema , Type } ;
24- use iceberg:: writer:: IcebergWriterBuilder ;
2524use iceberg:: writer:: base_writer:: data_file_writer:: DataFileWriterBuilder ;
2625use iceberg:: writer:: file_writer:: ParquetWriterBuilder ;
2726use iceberg:: writer:: file_writer:: location_generator:: {
2827 DefaultFileNameGenerator , DefaultLocationGenerator ,
2928} ;
3029use iceberg:: writer:: file_writer:: rolling_writer:: RollingFileWriterBuilder ;
30+ use iceberg:: writer:: { IcebergWriter , IcebergWriterBuilder } ;
3131use iceberg:: { Catalog , CatalogBuilder , NamespaceIdent , TableCreation , TableIdent } ;
3232use iceberg_catalog_rest:: { REST_CATALOG_PROP_URI , RestCatalogBuilder } ;
3333use parquet:: file:: properties:: WriterProperties ;
3434
3535static REST_URI : & str = "http://localhost:8181" ;
3636static NAMESPACE : & str = "ns1" ;
37- static TABLE_NAME : & str = "cities_table2 " ;
37+ static TABLE_NAME : & str = "cities_table3 " ;
3838
3939//This is an example of creating and loading an table using a schema with
4040// geo types via the Iceberg REST Catalog.
@@ -143,41 +143,28 @@ async fn main() {
143143 HashMap :: from ( [ ( REST_CATALOG_PROP_URI . to_string ( ) , REST_URI . to_string ( ) ) ] ) ,
144144 )
145145 . await
146- . map_err ( |e| {
147- eprintln ! ( "Failed to connect to REST catalog: {:?}" , e) ;
148- eprintln ! ( "Error: {}" , e) ;
149- e
150- } )
151146 . unwrap ( ) ;
152147 println ! ( "Connected to REST Catalog at {}" , REST_URI ) ;
153148
154149 let namespace_ident = NamespaceIdent :: from_vec ( vec ! [ NAMESPACE . to_string( ) ] ) . unwrap ( ) ;
155- let table_ident = TableIdent :: new ( namespace_ident. clone ( ) , TABLE_NAME . to_string ( ) ) ;
156-
157- println ! ( "Checking if table exists..." ) ;
158- let table_exists = catalog
159- . table_exists ( & table_ident)
160- . await
161- . map_err ( |e| {
162- eprintln ! ( "Failed to check if table exists: {:?}" , e) ;
163- eprintln ! ( "Error: {}" , e) ;
164- e
165- } )
166- . unwrap ( ) ;
167150
168- if table_exists {
169- println ! ( "Table {TABLE_NAME} already exists, dropping now." ) ;
151+ // Create namespace if it doesn't exist
152+ if !catalog. namespace_exists ( & namespace_ident) . await . unwrap ( ) {
153+ println ! ( "Creating namespace {NAMESPACE}..." ) ;
170154 catalog
171- . drop_table ( & table_ident )
155+ . create_namespace ( & namespace_ident , HashMap :: new ( ) )
172156 . await
173- . map_err ( |e| {
174- eprintln ! ( "Failed to drop table: {:?}" , e) ;
175- eprintln ! ( "Error: {}" , e) ;
176- e
177- } )
178157 . unwrap ( ) ;
179158 }
180159
160+ let table_ident = TableIdent :: new ( namespace_ident. clone ( ) , TABLE_NAME . to_string ( ) ) ;
161+
162+ println ! ( "Checking if table exists..." ) ;
163+ if catalog. table_exists ( & table_ident) . await . unwrap ( ) {
164+ println ! ( "Table {TABLE_NAME} already exists, dropping now." ) ;
165+ catalog. drop_table ( & table_ident) . await . unwrap ( ) ;
166+ }
167+
181168 let iceberg_schema = Schema :: builder ( )
182169 . with_fields ( vec ! [
183170 NestedField :: required( 1 , "id" . to_string( ) , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
@@ -251,15 +238,6 @@ async fn main() {
251238 let _created_table = catalog
252239 . create_table ( & table_ident. namespace , table_creation)
253240 . await
254- . map_err ( |e| {
255- eprintln ! ( "\n === FAILED TO CREATE TABLE ===" ) ;
256- eprintln ! ( "Error type: {:?}" , e) ;
257- eprintln ! ( "Error message: {}" , e) ;
258- eprintln ! ( "Namespace: {:?}" , table_ident. namespace) ;
259- eprintln ! ( "Table name: {}" , table_ident. name) ;
260- eprintln ! ( "==============================\n " ) ;
261- e
262- } )
263241 . unwrap ( ) ;
264242 println ! ( "Table {TABLE_NAME} created." ) ;
265243 assert ! (
@@ -269,6 +247,14 @@ async fn main() {
269247 . unwrap( )
270248 . contains( & table_ident)
271249 ) ;
250+ let schema: Arc < arrow_schema:: Schema > = Arc :: new (
251+ _created_table
252+ . metadata ( )
253+ . current_schema ( )
254+ . as_ref ( )
255+ . try_into ( )
256+ . unwrap ( ) ,
257+ ) ;
272258 let location_generator =
273259 DefaultLocationGenerator :: new ( _created_table. metadata ( ) . clone ( ) ) . unwrap ( ) ;
274260 let file_name_generator = DefaultFileNameGenerator :: new (
@@ -287,66 +273,65 @@ async fn main() {
287273 file_name_generator. clone ( ) ,
288274 ) ;
289275 let data_file_writer_builder = DataFileWriterBuilder :: new ( rolling_file_writer_builder) ;
290- // let data_file_writer = data_file_writer_builder.build(None).await.unwrap();
276+ let mut data_file_writer = data_file_writer_builder. build ( None ) . await . unwrap ( ) ;
291277
292278 let features = mock_sample_features ( ) ;
293- /*
294- let ids: ArrayRef = Arc::new(Int32Array::from_iter_values(features.iter().map(|f| f.id)));
295- let names: ArrayRef = Arc::new(StringArray::from_iter_values(
296- features.iter().map(|f| f.name.as_str()),
297- ));
298- let geometries_wkb: ArrayRef = Arc::new(LargeBinaryArray::from_iter_values(
299- features.iter().map(|f| f.to_wkb()),
300- ));
301- let geometry_types: ArrayRef = Arc::new(StringArray::from_iter_values(
302- features.iter().map(|f| f.geometry_type()),
303- ));
304- let srids: ArrayRef = Arc::new(Int32Array::from_iter_values(
305- features.iter().map(|f| f.srid),
306- ));
307- let bbox_min_xs: ArrayRef = Arc::new(Float64Array::from_iter_values(
308- features.iter().map(|f| f.bbox().0),
309- ));
310- let bbox_min_ys: ArrayRef = Arc::new(Float64Array::from_iter_values(
311- features.iter().map(|f| f.bbox().1),
312- ));
313- let bbox_max_xs: ArrayRef = Arc::new(Float64Array::from_iter_values(
314- features.iter().map(|f| f.bbox().2),
315- ));
316- let bbox_max_ys: ArrayRef = Arc::new(Float64Array::from_iter_values(
317- features.iter().map(|f| f.bbox().3),
318- ));
319279
320- let countries: ArrayRef = Arc::new(StringArray::from_iter_values(
321- features
322- .iter()
323- .map(|f| f.properties.get("country").unwrap().as_str()),
324- ));
325- let populations: ArrayRef = Arc::new(StringArray::from_iter_values(
326- features
327- .iter()
328- .map(|f| f.properties.get("population").unwrap().as_str()),
329- ));
330- */
280+ let ids: ArrayRef = Arc :: new ( Int32Array :: from_iter_values ( features. iter ( ) . map ( |f| f. id ) ) ) ;
281+ let names: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
282+ features. iter ( ) . map ( |f| f. name . as_str ( ) ) ,
283+ ) ) ;
284+ let geometries_wkb: ArrayRef = Arc :: new ( LargeBinaryArray :: from_iter_values (
285+ features. iter ( ) . map ( |f| f. to_wkb ( ) ) ,
286+ ) ) ;
287+ let geometry_types: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
288+ features. iter ( ) . map ( |f| f. geometry_type ( ) ) ,
289+ ) ) ;
290+ let srids: ArrayRef = Arc :: new ( Int32Array :: from_iter_values (
291+ features. iter ( ) . map ( |f| f. srid ) ,
292+ ) ) ;
293+ let bbox_min_xs: ArrayRef = Arc :: new ( Float64Array :: from_iter_values (
294+ features. iter ( ) . map ( |f| f. bbox ( ) . 0 ) ,
295+ ) ) ;
296+ let bbox_min_ys: ArrayRef = Arc :: new ( Float64Array :: from_iter_values (
297+ features. iter ( ) . map ( |f| f. bbox ( ) . 1 ) ,
298+ ) ) ;
299+ let bbox_max_xs: ArrayRef = Arc :: new ( Float64Array :: from_iter_values (
300+ features. iter ( ) . map ( |f| f. bbox ( ) . 2 ) ,
301+ ) ) ;
302+ let bbox_max_ys: ArrayRef = Arc :: new ( Float64Array :: from_iter_values (
303+ features. iter ( ) . map ( |f| f. bbox ( ) . 3 ) ,
304+ ) ) ;
305+
306+ let countries: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
307+ features
308+ . iter ( )
309+ . map ( |f| f. properties . get ( "country" ) . unwrap ( ) . as_str ( ) ) ,
310+ ) ) ;
311+ let populations: ArrayRef = Arc :: new ( StringArray :: from_iter_values (
312+ features
313+ . iter ( )
314+ . map ( |f| f. properties . get ( "population" ) . unwrap ( ) . as_str ( ) ) ,
315+ ) ) ;
316+
331317 //TODO: make write with credentials
332- /* let record_batch = RecordBatch::try_new(schema.clone(), vec![
333- ids,
334- names,
335- geometries_wkb,
336- geometry_types,
337- srids,
338- bbox_min_xs,
339- bbox_min_ys,
340- bbox_max_xs,
341- bbox_max_ys,
342- countries,
343- populations,
344- ])
345- .unwrap();
318+ let record_batch = RecordBatch :: try_new ( schema. clone ( ) , vec ! [
319+ ids,
320+ names,
321+ geometries_wkb,
322+ geometry_types,
323+ srids,
324+ bbox_min_xs,
325+ bbox_min_ys,
326+ bbox_max_xs,
327+ bbox_max_ys,
328+ countries,
329+ populations,
330+ ] )
331+ . unwrap ( ) ;
346332
347- data_file_writer.write(record_batch.clone()).await.unwrap();
348- let data_file = data_file_writer.close().await.unwrap();
349- */
333+ data_file_writer. write ( record_batch. clone ( ) ) . await . unwrap ( ) ;
334+ let data_file = data_file_writer. close ( ) . await . unwrap ( ) ;
350335
351336 let loaded_table = catalog. load_table ( & table_ident) . await . unwrap ( ) ;
352337 println ! ( "Table {TABLE_NAME} loaded!\n \n Table: {loaded_table:?}" ) ;
0 commit comments