Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 108 additions & 51 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,20 @@ use crate::Column;

use arrow::compute::can_cast_types;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use std::fmt::{Display, Formatter};
use std::fmt::{Debug, Display, Formatter};

/// A reference-counted reference to a `DFSchema`.
pub type DFSchemaRef = Arc<DFSchema>;

/// DFSchema wraps an Arrow schema and adds relation names
#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Clone, PartialEq, Eq)]
pub struct DFSchema {
/// Fields
fields: Vec<DFField>,
/// Additional metadata in form of key value pairs
metadata: HashMap<String, String>,
/// Field-to-index hash map for fast lookup
field_to_index: HashMap<LookupField, LookupResult>,
}

impl DFSchema {
Expand All @@ -47,6 +49,7 @@ impl DFSchema {
Self {
fields: vec![],
metadata: HashMap::new(),
field_to_index: HashMap::new(),
}
}

Expand Down Expand Up @@ -101,7 +104,38 @@ impl DFSchema {
)));
}
}
Ok(Self { fields, metadata })

let field_to_index = Self::create_field_to_index_map(&fields);
Ok(Self {
fields,
metadata,
field_to_index,
})
}

fn create_field_to_index_map(
fields: &[DFField],
) -> HashMap<LookupField, LookupResult> {
let mut field_to_index = HashMap::new();
for (index, field) in fields.iter().enumerate() {
let lookup_field = LookupField::new(field.qualifier.as_deref(), field.name());
field_to_index.insert(lookup_field, LookupResult::Exact(index));
if field.qualifier.is_some() {
let unqualified_lookup_field = LookupField::new(None, field.name());
match field_to_index.get(&unqualified_lookup_field) {
None => {
field_to_index
.insert(unqualified_lookup_field, LookupResult::Exact(index));
}
Some(LookupResult::Exact(_)) => {
field_to_index
.insert(unqualified_lookup_field, LookupResult::Ambiguous);
}
Some(LookupResult::Ambiguous) => { /* already ambiguous */ }
}
}
}
field_to_index
}

/// Create a `DFSchema` from an Arrow schema
Expand Down Expand Up @@ -138,7 +172,8 @@ impl DFSchema {
self.fields.push(field.clone());
}
}
self.metadata.extend(other_schema.metadata.clone())
self.metadata.extend(other_schema.metadata.clone());
self.field_to_index = Self::create_field_to_index_map(&self.fields);
}

/// Get a list of fields
Expand Down Expand Up @@ -171,42 +206,20 @@ impl DFSchema {
qualifier: Option<&str>,
name: &str,
) -> Result<usize> {
let mut matches = self
.fields
.iter()
.enumerate()
.filter(|(_, field)| match (qualifier, &field.qualifier) {
// field to lookup is qualified.
// current field is qualified and not shared between relations, compare both
// qualifier and name.
(Some(q), Some(field_q)) => {
q.to_ascii_lowercase() == field_q.to_ascii_lowercase()
&& field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
}
// field to lookup is qualified but current field is unqualified.
(Some(_), None) => false,
// field to lookup is unqualified, no need to compare qualifier
(None, Some(_)) | (None, None) => {
field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
}
})
.map(|(idx, _)| idx);
match matches.next() {
let lookup_field = LookupField::new(qualifier, name);
match self.field_to_index.get(&lookup_field) {
Some(LookupResult::Exact(idx)) => Ok(*idx),
Some(LookupResult::Ambiguous) => Err(DataFusionError::Plan(format!(
"Ambiguous reference to field named '{}.{}'",
qualifier.unwrap_or("<unqualified>"),
name
))),
None => Err(DataFusionError::Plan(format!(
"No field named '{}.{}'. Valid fields are {}.",
qualifier.unwrap_or("<unqualified>"),
name,
self.get_field_names()
))),
Some(idx) => match matches.next() {
None => Ok(idx),
// found more than one matches
Some(_) => Err(DataFusionError::Internal(format!(
"Ambiguous reference to qualified field named '{}.{}'",
qualifier.unwrap_or("<unqualified>"),
name
))),
},
}
}

Expand Down Expand Up @@ -315,31 +328,37 @@ impl DFSchema {

/// Strip all field qualifier in schema
pub fn strip_qualifiers(self) -> Self {
let fields = self
.fields
.into_iter()
.map(|f| f.strip_qualifier())
.collect::<Vec<_>>();
let field_to_index = Self::create_field_to_index_map(&fields);
DFSchema {
fields: self
.fields
.into_iter()
.map(|f| f.strip_qualifier())
.collect(),
fields,
field_to_index,
..self
}
}

/// Replace all field qualifier with new value in schema
pub fn replace_qualifier(self, qualifier: &str) -> Self {
let fields = self
.fields
.into_iter()
.map(|f| {
DFField::new(
Some(qualifier),
f.name(),
f.data_type().to_owned(),
f.is_nullable(),
)
})
.collect::<Vec<_>>();
let field_to_index = Self::create_field_to_index_map(&fields);
DFSchema {
fields: self
.fields
.into_iter()
.map(|f| {
DFField::new(
Some(qualifier),
f.name(),
f.data_type().to_owned(),
f.is_nullable(),
)
})
.collect(),
fields,
field_to_index,
..self
}
}
Expand All @@ -362,6 +381,23 @@ impl DFSchema {
}
}

impl Debug for DFSchema {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
#[derive(Debug)]
#[allow(dead_code)]
struct DFSchema<'a> {
fields: &'a Vec<DFField>,
metadata: &'a HashMap<String, String>,
}

let debug = DFSchema {
fields: &self.fields,
metadata: &self.metadata,
};
debug.fmt(f)
}
}

impl From<DFSchema> for Schema {
/// Convert DFSchema into a Schema
fn from(df_schema: DFSchema) -> Self {
Expand Down Expand Up @@ -603,6 +639,27 @@ impl DFField {
}
}

#[derive(Clone, PartialEq, Eq, Hash)]
struct LookupField {
qualifier: Option<String>,
name: String,
}

impl LookupField {
fn new(qualifier: Option<&str>, name: &str) -> Self {
Self {
qualifier: qualifier.map(|s| s.to_ascii_lowercase()),
name: name.to_ascii_lowercase(),
}
}
}

#[derive(Clone, Copy, PartialEq, Eq)]
enum LookupResult {
Exact(usize),
Ambiguous,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading