Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 10 additions & 11 deletions dbt/include/impala/macros/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
Expected one of: 'append', 'insert_overwrite'
{%- endset %}

{% if incremental_strategy not in ['append', 'insert_overwrite'] %}
{% if incremental_strategy not in ['append', 'insert_overwrite', 'merge'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

Expand All @@ -40,8 +40,14 @@
{% endif %}
{% endmacro %}

{% macro impala__get_incremental_default_sql(arg_dict) %}
{% do return(get_insert_overwrite_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"])) %}
{% macro get_incremental_default_sql(arg_dict) %}
{% do return(get_transformation_sql(
target = arg_dict["target_relation"],
source = arg_dict["temp_relation"],
unique_key = arg_dict["unique_key"],
dest_columns = arg_dict["dest_columns"],
predicates = arg_dict["predicates"],
)) %}
{% endmacro %}

{% materialization incremental, adapter='impala' -%}
Expand All @@ -54,20 +60,13 @@
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{% set unique_key = config.get('unique_key') %}
{% set uniquekey_msg -%}
Impala adapter does not support 'unique_key'
{%- endset %}
{% if unique_key is not none %}
{% do exceptions.raise_compiler_error(uniquekey_msg) %}
{% endif %}

{% set incremental_strategy = config.get('incremental_strategy') or 'append' %}
{% if incremental_strategy == None %}
{% set incremental_strategy = 'append' %}
{% endif %}
{% set incremental_strategy = validate_get_incremental_strategy(incremental_strategy) %}
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

Expand Down
57 changes: 0 additions & 57 deletions dbt/include/impala/macros/insertoverwrite.sql

This file was deleted.

139 changes: 139 additions & 0 deletions dbt/include/impala/macros/strategies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
{#
# Copyright 2022 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#}

{% macro get_qualified_columnnames_csv(columns, qualifier='') %}
{% set quoted = [] %}
{% for col in columns -%}
{% if qualifier != '' %}
{%- do quoted.append(qualifier + '.' + col.name) -%}
{% else %}
{%- do quoted.append(col.name) -%}
{% endif %}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}
{{ return(dest_cols_csv) }}

{% endmacro %}

{% macro impala__get_merge_sql(source, target, unique_key, dest_columns, predicates=none) %}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not mapping and unique_key is not string %}
{% for key in unique_key %}
{% set this_key_match %}
DBT_INTERNAL_SOURCE.{{ key }} = DBT_INTERNAL_DEST.{{ key }}
{% endset %}
{% do predicates.append(this_key_match) %}
{% endfor %}
{% else %}
{% set unique_key_match %}
DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% endset %}
{% do predicates.append(unique_key_match) %}
{% endif %}
{% else %}
{% do predicates.append('FALSE') %}
{% endif %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
{% for column_name in update_columns -%}
{{ column_name | replace('"', "`") }} = DBT_INTERNAL_SOURCE.{{ column_name | replace('"', "`") }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}

when not matched then insert
({{ get_qualified_columnnames_csv(dest_columns) }})
values
({{ get_qualified_columnnames_csv(dest_columns, 'DBT_INTERNAL_SOURCE') }})

{% endmacro %}

{% macro impala__get_insert_overwrite_sql(source, target) %}

{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
{%- set dest_cols_csv = dest_columns | map(attribute="name") | join(", ") -%}
{%- set partition_cols = config.get('partition_by', validator=validation.any[list]) -%}

{% if partition_cols is not none %}
{% if partition_cols is string %}
{%- set partition_cols_csv = partition_cols -%}
{% else %}
{%- set partition_cols_csv = partition_cols | join(", ") -%}
{% endif %}

insert overwrite {{ target }} partition({{ partition_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% else %}
{% do exceptions.raise_compiler_error("Impala adapter does not support 'insert_overwrite' if 'partition_cols' are not present") %}
{% endif %}

{% endmacro %}

{% macro impala__get_incremental_sql(source, target) %}

{%- set dest_columns = adapter.get_columns_in_relation(target) -%}
{%- set dest_cols_csv = dest_columns | map(attribute="name") | join(", ") -%}
{%- set partition_cols = config.get('partition_by', validator=validation.any[list]) -%}

{% if partition_cols is not none %}
{% if partition_cols is string %}
{%- set partition_cols_csv = partition_cols -%}
{% else %}
{%- set partition_cols_csv = partition_cols | join(", ") -%}
{% endif %}

insert into {{ target }} partition({{ partition_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% else %}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endif %}

{% endmacro %}

{% macro get_transformation_sql(target, source, unique_key, dest_columns, predicates) -%}

{% set raw_strategy = config.get('incremental_strategy') or 'append' %}

{% if raw_strategy == 'insert_overwrite' %}
{{ impala__get_insert_overwrite_sql(source, target) }}
{% elif raw_strategy == 'merge' %}
{{ impala__get_merge_sql(source, target, unique_key, dest_columns) }}
{% else %}
{{ impala__get_incremental_sql(source, target) }}
{% endif %}

{%- endmacro %}
22 changes: 22 additions & 0 deletions tests/functional/adapter/iceberg_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
merge_iceberg_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id',
incremental_strategy='merge',
merge_exclude_columns=['msg'],
table_type='iceberg'
) }}
{% if not is_incremental() %}
-- data for first invocation of model
select CAST(1 AS INT) as id, 'hello' as msg, 'blue' as color
union all
select CAST(2 AS INT) as id, 'goodbye' as msg, 'red' as color
{% else %}
-- data for subsequent incremental update
select CAST(1 AS INT) as id, 'hey' as msg, 'blue' as color
union all
select CAST(2 AS INT) as id, 'yo' as msg, 'green' as color
union all
select CAST(3 AS INT) as id, 'anyway' as msg, 'purple' as color
{% endif %}
"""
14 changes: 14 additions & 0 deletions tests/functional/adapter/test_iceberg_format.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,21 @@
TestIncrementalImpala,
)

from dbt.tests.adapter.incremental.test_incremental_merge_exclude_columns import (
BaseMergeExcludeColumns,
)

from dbt.tests.adapter.basic.files import (
model_base,
model_incremental,
base_view_sql,
schema_base_yml,
)

from tests.functional.adapter.iceberg_files import (
merge_iceberg_sql,
)


def is_iceberg_table(project, tableName):
rows = project.run_sql(f"describe formatted {tableName}", fetch="all")
Expand Down Expand Up @@ -293,3 +301,9 @@ def models(self):
"incremental_test_model.sql": insertoverwrite_iceberg_sql,
"schema.yml": schema_base_yml,
}


class TestMergeIcebergHive(BaseMergeExcludeColumns):
@pytest.fixture(scope="class")
def models(self):
return {"merge_exclude_columns.sql": merge_iceberg_sql}