Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

merge_exclude_columns in dbt incremental merge strategy not working as expected

I am trying to build an incremental merge model using dbt. My requirement is to exclude some columns from being updated when the dbt merge query runs.

I am using the dbt configuration, merge_exclude_columns like below,

{{
        config(
            tags = ['model_test'],
            unique_key = 'id',
            materialized = 'incremental',
            incremental_strategy = 'merge',
            merge_exclude_columns = ['method']
        )
    }}

The merge query being generated by dbt for this,

merge into target as DBT_INTERNAL_DEST
      using source_temp as DBT_INTERNAL_SOURCE
      on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
      when matched then update set *
      when not matched then insert *

When I run the model using this configuration, I still see the method field being updated in the target table.

Can anyone help me understand what exactly is going wrong over here?

like image 334
sakshi Avatar asked Dec 10 '25 11:12

sakshi


1 Answers

You should look at the macro **/macros/materializations/incremental/strategies.sql and override its behavior as you wish. For example, when I needed to insert only specified fields into a merge statement, I wrote something like this

{% macro oracle__get_incremental_merge_sql(args_dict) %}
{%- set dest_columns = args_dict["dest_columns"] -%}
{%- set temp_relation = args_dict["temp_relation"] -%}
{%- set target_relation = args_dict["target_relation"] -%}
{%- set unique_key = args_dict["unique_key"] -%}
{%- set dest_column_names = dest_columns | map(attribute='name') | list -%}
{%- set dest_cols_csv = get_quoted_column_csv(model, dest_column_names)  -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
{%- set merge_insert_columns = config.get('merge_insert_columns') -%}
{%- set custom_insert = config.get('custom_insert') -%}
{%- set incremental_predicates = args_dict["incremental_predicates"] -%}
{%- set update_columns = get_merge_update_columns(merge_update_columns, merge_exclude_columns, dest_columns) -%}
{%- set insert_columns = get_merge_update_columns(merge_insert_columns, merge_exclude_columns, dest_columns) -%}
{%- if unique_key -%}
    {%- set unique_key_result = oracle_check_and_quote_unique_key_for_incremental_merge(unique_key, incremental_predicates) -%}
    {%- set unique_key_list = unique_key_result['unique_key_list'] -%}
    {%- set unique_key_merge_predicates = unique_key_result['unique_key_merge_predicates'] -%}
    merge into {{ target_relation }} DBT_INTERNAL_DEST
      using {{ temp_relation }} DBT_INTERNAL_SOURCE
      on ({{ unique_key_merge_predicates | join(' AND ') }})
        when matched then
      update set
      {% for col in update_columns if (col.upper() not in unique_key_list and col not in unique_key_list) -%}
        DBT_INTERNAL_DEST.{{ col }} = DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
      {% endfor -%}
    when not matched then
    {% if custom_insert -%}
      insert(
         {% for col in insert_columns -%}
          {{ col }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
      values(
        {% for col in insert_columns -%}
          DBT_INTERNAL_SOURCE.{{ col }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
    {%- else -%}
      insert({{ dest_cols_csv }})
      values(
        {% for col in dest_columns -%}
          DBT_INTERNAL_SOURCE.{{ adapter.check_and_quote_identifier(col.name, model.columns) }}{% if not loop.last %}, {% endif %}
        {% endfor -%}
      )
    {% endif -%}
{%- else -%}
insert into {{ target_relation }} ({{ dest_cols_csv }})
(
   select {{ dest_cols_csv }}
   from {{ temp_relation }}
)
{%- endif -%}{% endmacro %}

And after that I use my custom parameter in the model configuration:

{{
config(
    materialized='incremental',
    unique_key='uk',
    incremental_strategy='merge',
    schema='my_schema',
    merge_update_columns = ['col1', 'col2', 'col3'],
    merge_insert_columns = ['col1', 'col2', 'col3', 'col4'],
    custom_insert = true,
    tags=["my_tag"]
)}}
like image 179
Roman S Avatar answered Dec 13 '25 01:12

Roman S



Donate For Us

If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!