Skip to content

Commit 749cff7

Browse files
authored
Add column-selective appender API with default support (#642)
Fixes #551 I did not end up using `duckdb_append_default_to_chunk` because: - When limiting active columns, DuckDB automatically fills all the other columns with their default values at flush time - The Rust appender is row-oriented, not chunk-based - `duckdb_append_default_to_chunk` is unstable
2 parents 7976f1a + 60ced61 commit 749cff7

File tree

3 files changed

+209
-7
lines changed

3 files changed

+209
-7
lines changed

crates/duckdb/src/appender/mod.rs

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,31 @@ impl Appender<'_> {
204204
result_from_duckdb_appender(res, &mut self.app)
205205
}
206206
}
207+
208+
/// Add a column to the appender's active column list.
209+
///
210+
/// When columns are added, only those columns need values during append.
211+
/// Other columns will use their DEFAULT value (or NULL if no default).
212+
///
213+
/// This flushes any pending data before modifying the column list.
214+
#[inline]
215+
pub fn add_column(&mut self, name: &str) -> Result<()> {
216+
let c_name = std::ffi::CString::new(name)?;
217+
let rc = unsafe { ffi::duckdb_appender_add_column(self.app, c_name.as_ptr() as *const c_char) };
218+
result_from_duckdb_appender(rc, &mut self.app)
219+
}
220+
221+
/// Clear the appender's active column list.
222+
///
223+
/// After clearing, all columns become active again and values must be
224+
/// provided for every column during append.
225+
///
226+
/// This flushes any pending data before clearing.
227+
#[inline]
228+
pub fn clear_columns(&mut self) -> Result<()> {
229+
let rc = unsafe { ffi::duckdb_appender_clear_columns(self.app) };
230+
result_from_duckdb_appender(rc, &mut self.app)
231+
}
207232
}
208233

209234
impl Drop for Appender<'_> {
@@ -424,4 +449,110 @@ mod test {
424449

425450
Ok(())
426451
}
452+
453+
#[test]
454+
fn test_appender_defaults_and_column_switching() -> Result<()> {
455+
let db = Connection::open_in_memory()?;
456+
db.execute_batch("CREATE TABLE foo(a INT DEFAULT 99, b INT, c INT DEFAULT 7)")?;
457+
458+
// Only provide column b; a and c should use their defaults
459+
{
460+
let mut app = db.appender_with_columns("foo", &["b"])?;
461+
app.append_row([Some(1)])?;
462+
app.append_row([Option::<i32>::None])?;
463+
}
464+
465+
// Switch to a different active column set, then back to full width
466+
{
467+
let mut app = db.appender("foo")?;
468+
app.add_column("c")?;
469+
app.add_column("a")?;
470+
app.append_row([10, 1])?; // set c and a; b gets NULL
471+
472+
app.clear_columns()?; // revert to all columns
473+
app.append_row([2, 3, 4])?;
474+
}
475+
476+
let rows: Vec<(i32, Option<i32>, i32)> = db
477+
.prepare("SELECT a, b, c FROM foo ORDER BY a, b NULLS LAST")?
478+
.query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))?
479+
.collect::<Result<_>>()?;
480+
481+
assert_eq!(
482+
rows,
483+
vec![
484+
(1, None, 10), // add_column path; b NULL, c set
485+
(2, Some(3), 4), // clear_columns path; all provided
486+
(99, Some(1), 7), // defaults applied for a and c
487+
(99, None, 7) // default + NULL
488+
]
489+
);
490+
491+
Ok(())
492+
}
493+
494+
#[test]
495+
fn test_appender_with_columns_sequence_default() -> Result<()> {
496+
let db = Connection::open_in_memory()?;
497+
db.execute_batch(
498+
"CREATE SEQUENCE seq START 1;
499+
CREATE TABLE foo(id INTEGER DEFAULT nextval('seq'), name TEXT)",
500+
)?;
501+
502+
{
503+
let mut app = db.appender_with_columns("foo", &["name"])?;
504+
app.append_row(["Alice"])?;
505+
app.append_row(["Bob"])?;
506+
app.append_row(["Charlie"])?;
507+
}
508+
509+
let rows: Vec<(i32, String)> = db
510+
.prepare("SELECT id, name FROM foo ORDER BY id")?
511+
.query_map([], |row| Ok((row.get(0)?, row.get(1)?)))?
512+
.collect::<Result<_>>()?;
513+
514+
assert_eq!(
515+
rows,
516+
vec![(1, "Alice".into()), (2, "Bob".into()), (3, "Charlie".into())]
517+
);
518+
519+
Ok(())
520+
}
521+
522+
#[test]
523+
fn test_appender_with_columns_to_db_schema() -> Result<()> {
524+
let db = Connection::open_in_memory()?;
525+
db.execute_batch(
526+
"CREATE SCHEMA s;
527+
CREATE TABLE s.foo(a INTEGER DEFAULT 5, b INTEGER)",
528+
)?;
529+
530+
{
531+
let mut app = db.appender_with_columns_to_db("foo", "s", &["b"])?;
532+
app.append_row([7])?;
533+
}
534+
535+
let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.foo", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
536+
assert_eq!((a, b), (5, 7));
537+
Ok(())
538+
}
539+
540+
#[test]
541+
fn test_appender_with_columns_to_catalog_and_db() -> Result<()> {
542+
let db = Connection::open_in_memory()?;
543+
db.execute_batch(
544+
"CREATE SCHEMA s;
545+
CREATE TABLE s.bar(a INTEGER DEFAULT 11, b INTEGER)",
546+
)?;
547+
548+
{
549+
// Default in-memory catalog is "memory"
550+
let mut app = db.appender_with_columns_to_catalog_and_db("bar", "memory", "s", &["b"])?;
551+
app.append_row([9])?;
552+
}
553+
554+
let (a, b): (i32, i32) = db.query_row("SELECT a, b FROM s.bar", [], |row| Ok((row.get(0)?, row.get(1)?)))?;
555+
assert_eq!((a, b), (11, 9));
556+
Ok(())
557+
}
427558
}

crates/duckdb/src/inner_connection.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl InnerConnection {
8585
}
8686

8787
pub fn execute(&mut self, sql: &str) -> Result<()> {
88-
let c_str = CString::new(sql).unwrap();
88+
let c_str = CString::new(sql)?;
8989
unsafe {
9090
let mut out = mem::zeroed();
9191
let r = ffi::duckdb_query_arrow(self.con, c_str.as_ptr() as *const c_char, &mut out);
@@ -96,7 +96,7 @@ impl InnerConnection {
9696
}
9797

9898
pub fn prepare<'a>(&mut self, conn: &'a Connection, sql: &str) -> Result<Statement<'a>> {
99-
let c_str = CString::new(sql).unwrap();
99+
let c_str = CString::new(sql)?;
100100

101101
// Extract statements (handles both single and multi-statement queries)
102102
let mut extracted = ptr::null_mut();
@@ -162,8 +162,8 @@ impl InnerConnection {
162162

163163
pub fn appender<'a>(&mut self, conn: &'a Connection, table: &str, schema: &str) -> Result<Appender<'a>> {
164164
let mut c_app: ffi::duckdb_appender = ptr::null_mut();
165-
let c_table = CString::new(table).unwrap();
166-
let c_schema = CString::new(schema).unwrap();
165+
let c_table = CString::new(table)?;
166+
let c_schema = CString::new(schema)?;
167167
let r = unsafe {
168168
ffi::duckdb_appender_create(
169169
self.con,
@@ -184,9 +184,9 @@ impl InnerConnection {
184184
schema: &str,
185185
) -> Result<Appender<'a>> {
186186
let mut c_app: ffi::duckdb_appender = ptr::null_mut();
187-
let c_table = CString::new(table).unwrap();
188-
let c_catalog = CString::new(catalog).unwrap();
189-
let c_schema = CString::new(schema).unwrap();
187+
let c_table = CString::new(table)?;
188+
let c_catalog = CString::new(catalog)?;
189+
let c_schema = CString::new(schema)?;
190190

191191
let r = unsafe {
192192
ffi::duckdb_appender_create_ext(
@@ -201,6 +201,26 @@ impl InnerConnection {
201201
Ok(Appender::new(conn, c_app))
202202
}
203203

204+
pub fn appender_with_columns<'a>(
205+
&mut self,
206+
conn: &'a Connection,
207+
table: &str,
208+
schema: &str,
209+
catalog: Option<&str>,
210+
columns: &[&str],
211+
) -> Result<Appender<'a>> {
212+
// The C API only supports narrowing columns after the appender is created.
213+
// Create the appender first, then activate the requested column subset.
214+
let mut appender = match catalog {
215+
Some(catalog) => self.appender_to_catalog_and_db(conn, table, catalog, schema)?,
216+
None => self.appender(conn, table, schema)?,
217+
};
218+
for column in columns {
219+
appender.add_column(column)?;
220+
}
221+
Ok(appender)
222+
}
223+
204224
pub fn get_interrupt_handle(&self) -> Arc<InterruptHandle> {
205225
self.interrupt.clone()
206226
}

crates/duckdb/src/lib.rs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -553,6 +553,57 @@ impl Connection {
553553
.appender_to_catalog_and_db(self, table, catalog, schema)
554554
}
555555

556+
/// Create an Appender that only provides values for specific columns.
557+
///
558+
/// Columns not in the list will use their DEFAULT value, or NULL if no default.
559+
/// This supports all types of DEFAULT expressions including non-deterministic
560+
/// ones like `random()`, `current_timestamp`, or sequences.
561+
///
562+
/// ## Example
563+
///
564+
/// ```rust,no_run
565+
/// # use duckdb::{Connection, Result};
566+
/// fn insert_partial(conn: &Connection) -> Result<()> {
567+
/// // Table: CREATE TABLE foo(id INT DEFAULT nextval('seq'), name TEXT, created TIMESTAMP DEFAULT current_timestamp)
568+
/// let mut app = conn.appender_with_columns("foo", &["name"])?;
569+
/// // Only provide name; id and created use their defaults
570+
/// app.append_row(["Alice"])?;
571+
/// app.append_row(["Bob"])?;
572+
/// Ok(())
573+
/// }
574+
/// ```
575+
///
576+
/// # Failure
577+
///
578+
/// Will return `Err` if `table` does not exist or a column name is invalid.
579+
pub fn appender_with_columns(&self, table: &str, columns: &[&str]) -> Result<Appender<'_>> {
580+
self.appender_with_columns_to_db(table, &DatabaseName::Main.to_string(), columns)
581+
}
582+
583+
/// Create an Appender that only provides values for specific columns, with schema.
584+
///
585+
/// See [`appender_with_columns`](Connection::appender_with_columns) for details.
586+
pub fn appender_with_columns_to_db(&self, table: &str, schema: &str, columns: &[&str]) -> Result<Appender<'_>> {
587+
self.db
588+
.borrow_mut()
589+
.appender_with_columns(self, table, schema, None, columns)
590+
}
591+
592+
/// Create an Appender that only provides values for specific columns, with catalog and schema.
593+
///
594+
/// See [`appender_with_columns`](Connection::appender_with_columns) for details.
595+
pub fn appender_with_columns_to_catalog_and_db(
596+
&self,
597+
table: &str,
598+
catalog: &str,
599+
schema: &str,
600+
columns: &[&str],
601+
) -> Result<Appender<'_>> {
602+
self.db
603+
.borrow_mut()
604+
.appender_with_columns(self, table, schema, Some(catalog), columns)
605+
}
606+
556607
/// Get a handle to interrupt long-running queries.
557608
///
558609
/// ## Example

0 commit comments

Comments
 (0)