diff --git a/dbt/include/impala/macros/incremental.sql b/dbt/include/impala/macros/incremental.sql index f968ef7..96d27ef 100644 --- a/dbt/include/impala/macros/incremental.sql +++ b/dbt/include/impala/macros/incremental.sql @@ -20,7 +20,7 @@ Expected one of: 'append', 'insert_overwrite' {%- endset %} - {% if incremental_strategy not in ['append', 'insert_overwrite'] %} + {% if incremental_strategy not in ['append', 'insert_overwrite', 'merge'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {% endif %} @@ -40,8 +40,14 @@ {% endif %} {% endmacro %} -{% macro impala__get_incremental_default_sql(arg_dict) %} - {% do return(get_insert_overwrite_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"])) %} +{% macro get_incremental_default_sql(arg_dict) %} + {% do return(get_transformation_sql( + target = arg_dict["target_relation"], + source = arg_dict["temp_relation"], + unique_key = arg_dict["unique_key"], + dest_columns = arg_dict["dest_columns"], + predicates = arg_dict["predicates"], + )) %} {% endmacro %} {% materialization incremental, adapter='impala' -%} @@ -54,20 +60,13 @@ {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - -- configs - {% set unique_key = config.get('unique_key') %} - {% set uniquekey_msg -%} - Impala adapter does not support 'unique_key' - {%- endset %} - {% if unique_key is not none %} - {% do exceptions.raise_compiler_error(uniquekey_msg) %} - {% endif %} {% set incremental_strategy = config.get('incremental_strategy') or 'append' %} {% if incremental_strategy == None %} {% set incremental_strategy = 'append' %} {% endif %} {% set incremental_strategy = validate_get_incremental_strategy(incremental_strategy) %} + {%- set unique_key = config.get('unique_key') -%} {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} diff --git a/dbt/include/impala/macros/insertoverwrite.sql b/dbt/include/impala/macros/insertoverwrite.sql deleted file mode 100644 index 451468b..0000000 --- a/dbt/include/impala/macros/insertoverwrite.sql +++ /dev/null @@ -1,57 +0,0 @@ -{# -# Copyright 2022 Cloudera Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -#} - -{% macro get_insert_overwrite_sql(target, source, dest_columns) -%} - - {% set raw_strategy = config.get('incremental_strategy') or 'append' %} - {%- set partition_cols = config.get('partition_by', validator=validation.any[list]) -%} - {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(", ") -%} - - {% if partition_cols is not none %} - {% if partition_cols is string %} - {%- set partition_cols_csv = partition_cols -%} - {% else %} - {%- set partition_cols_csv = partition_cols | join(", ") -%} - {% endif %} - {{ print("partition_cols_csv = " + partition_cols_csv) }} - - {% if raw_strategy == 'insert_overwrite' %} - - insert overwrite {{ target }} partition({{ partition_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ source }} - ) - - {% elif raw_strategy == 'append' %} - - insert into {{ target }} partition({{ partition_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ source }} - ) - - {% endif %} - {% else %} - - insert into {{ target }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ source }} - ) - {% endif %} - -{%- endmacro %} diff --git a/dbt/include/impala/macros/strategies.sql b/dbt/include/impala/macros/strategies.sql new file mode 100644 index 0000000..7bcc46a --- /dev/null +++ b/dbt/include/impala/macros/strategies.sql @@ -0,0 +1,139 @@ +{# +# Copyright 2022 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +#} + +{% macro get_qualified_columnnames_csv(columns, qualifier='') %} + {% set quoted = [] %} + {% for col in columns -%} + {% if qualifier != '' %} + {%- do quoted.append(qualifier + '.' + col.name) -%} + {% else %} + {%- do quoted.append(col.name) -%} + {% endif %} + {%- endfor %} + + {%- set dest_cols_csv = quoted | join(', ') -%} + {{ return(dest_cols_csv) }} + +{% endmacro %} + +{% macro impala__get_merge_sql(source, target, unique_key, dest_columns, predicates=none) %} + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set merge_update_columns = config.get('merge_update_columns') -%} + {%- set merge_exclude_columns = config.get('merge_exclude_columns') -%} + {%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%} + + {% if unique_key %} + {% if unique_key is sequence and unique_key is not mapping and unique_key is not string %} + {% for key in unique_key %} + {% set this_key_match %} + DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }} + {% endset %} + {% do predicates.append(this_key_match) %} + {% endfor %} + {% else %} + {% set unique_key_match %} + DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% endset %} + {% do predicates.append(unique_key_match) %} + {% endif %} + {% else %} + {% do predicates.append('FALSE') %} + {% endif %} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on {{"(" ~ predicates | join(") and (") ~ ")"}} + + {% if unique_key %} + when matched then update set + {% for column_name in update_columns -%} + {{ column_name | replace('"', "`") }} = DBT_INTERNAL_SOURCE.{{ column_name | replace('"', "`") }} + {%- if not loop.last %}, {%- endif %} + {%- endfor %} + {% endif %} + + when not matched then insert + ({{ get_qualified_columnnames_csv(dest_columns) }}) + values + ({{ get_qualified_columnnames_csv(dest_columns, 'DBT_INTERNAL_SOURCE') }}) + +{% endmacro %} + +{% macro impala__get_insert_overwrite_sql(source, target) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(", ") -%} + {%- set partition_cols = config.get('partition_by', validator=validation.any[list]) -%} + + {% if partition_cols is not none %} + {% if partition_cols is string %} + {%- set partition_cols_csv = partition_cols -%} + {% else %} + {%- set partition_cols_csv = partition_cols | join(", ") -%} + {% endif %} + + insert overwrite {{ target }} partition({{ partition_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + {% else %} + {% do exceptions.raise_compiler_error("Impala adapter does not support 'insert_overwrite' if 'partition_cols' are not present") %} + {% endif %} + +{% endmacro %} + +{% macro impala__get_incremental_sql(source, target) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target) -%} + {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(", ") -%} + {%- set partition_cols = config.get('partition_by', validator=validation.any[list]) -%} + + {% if partition_cols is not none %} + {% if partition_cols is string %} + {%- set partition_cols_csv = partition_cols -%} + {% else %} + {%- set partition_cols_csv = partition_cols | join(", ") -%} + {% endif %} + + insert into {{ target }} partition({{ partition_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + {% else %} + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) + {% endif %} + +{% endmacro %} + +{% macro get_transformation_sql(target, source, unique_key, dest_columns, predicates) -%} + + {% set raw_strategy = config.get('incremental_strategy') or 'append' %} + + {% if raw_strategy == 'insert_overwrite' %} + {{ impala__get_insert_overwrite_sql(source, target) }} + {% elif raw_strategy == 'merge' %} + {{ impala__get_merge_sql(source, target, unique_key, dest_columns) }} + {% else %} + {{ impala__get_incremental_sql(source, target) }} + {% endif %} + +{%- endmacro %} diff --git a/tests/functional/adapter/iceberg_files.py b/tests/functional/adapter/iceberg_files.py new file mode 100644 index 0000000..3986e84 --- /dev/null +++ b/tests/functional/adapter/iceberg_files.py @@ -0,0 +1,22 @@ +merge_iceberg_sql = """ +{{ config( + materialized = 'incremental', + unique_key = 'id', + incremental_strategy='merge', + merge_exclude_columns=['msg'], + table_type='iceberg' +) }} +{% if not is_incremental() %} +-- data for first invocation of model +select CAST(1 AS INT) as id, 'hello' as msg, 'blue' as color +union all +select CAST(2 AS INT) as id, 'goodbye' as msg, 'red' as color +{% else %} +-- data for subsequent incremental update +select CAST(1 AS INT) as id, 'hey' as msg, 'blue' as color +union all +select CAST(2 AS INT) as id, 'yo' as msg, 'green' as color +union all +select CAST(3 AS INT) as id, 'anyway' as msg, 'purple' as color +{% endif %} +""" diff --git a/tests/functional/adapter/test_iceberg_format.py b/tests/functional/adapter/test_iceberg_format.py index 7bfcf18..75bc6c1 100644 --- a/tests/functional/adapter/test_iceberg_format.py +++ b/tests/functional/adapter/test_iceberg_format.py @@ -27,6 +27,10 @@ TestIncrementalImpala, ) +from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import ( + BaseMergeExcludeColumns, +) + from dbt.tests.adapter.basic.files import ( model_base, model_incremental, @@ -34,6 +38,10 @@ schema_base_yml, ) +from tests.functional.adapter.iceberg_files import ( + merge_iceberg_sql, +) + def is_iceberg_table(project, tableName): rows = project.run_sql(f"describe formatted {tableName}", fetch="all") @@ -293,3 +301,9 @@ def models(self): "incremental_test_model.sql": insertoverwrite_iceberg_sql, "schema.yml": schema_base_yml, } + + +class TestMergeIcebergHive(BaseMergeExcludeColumns): + @pytest.fixture(scope="class") + def models(self): + return {"merge_exclude_columns.sql": merge_iceberg_sql}