Featured image of post DBT 증분 모델 전략 정리. DBT Incremental Startegy

DBT 증분 모델 전략 정리. DBT Incremental Startegy

DBT에서 Incremental Model (증분 모델)을 만드는 방법에 대해 정리한 글입니다.

DBT 공식 문서의 Incremental Models 를 번역 및 정리한 글입니다.

DBT 모델을 배포할 때 여태까지는 데이터 사이즈가 그리 크지 않았기 때문에 full-refresh하는 방법으로 만들었었습니다.

최근에는 데이터 사이즈가 커져 full-refresh 방법으로는 부담이 가서, 증분 전략(Incremental Strategy)를 사용하고자 했습니다.

실제로 알아보니 엔지니어링 관점에서 공수를 줄일 수 있는 많은 편의성을 제공하는데요, 왜 DBT가 사랑 받는지 알 수 있는 또 다른 대목이 되었습니다.

이 글에서는 materialization과 구체화를 혼용하여 사용합니다.

Overview

증분 모델은 데이터 웨어하우스에서 테이블로 구축됩니다.

  • 모델을 처음 실행하면 소스 데이터의 모든 행을 변환하여 테이블이 구축됩니다.
  • 이후 실행 시에는 dbt가 소스 데이터에서 필터링하도록 지시한 행만 변환하여 이미 작성된 테이블인 대상 테이블에 삽입합니다.

증분 실행에서 필터링하는 행은 대개 dbt를 마지막으로 실행한 이후 생성 또는 업데이트된 소스 데이터의 행입니다. 따라서 dbt를 실행할 때마다 모델이 증분 방식으로 빌드됩니다.

증분 모델을 사용하면 변환해야 하는 데이터의 양이 제한되어 변환의 런타임이 크게 단축됩니다. 따라서 웨어하우스 성능이 향상되고 컴퓨팅 비용이 절감됩니다.

Using incremental materializations

1
2
3
4
{{
	config(materialized='incremental')
}}
select ..

incremental materializations(증분 구체화)를 통해 증분 모델을 사용하려면 다음을 명시해야 합니다:

  • 증분 실행에서 행을 필터링하는 방법
  • 모델의 Primary Key(선택 사항)

증분 실행에서 행을 필터링하는 방법

증분 실행 시 어떤 행을 변환해야 하는지 dbt에 알려주려면, 이러한 행을 필터링하는 유효한 SQL을 is_incremental() macro에 래핑해야 합니다.

dbt가 이 모델을 마지막으로 실행한 이후 생성된 행을 필터링하고 싶을 때가 많습니다. 이 모델을 가장 최근에 실행한 타임스탬프를 찾는 가장 좋은 방법은 대상 테이블에서 가장 최근의 타임스탬프를 확인하는 것입니다. dbt를 사용하면 “{{ this }}” 변수를 사용하여 대상 테이블을 쉽게 쿼리할 수 있습니다.

또한 새 레코드와 업데이트된 레코드를 모두 캡처하려는 경우도 흔합니다. 업데이트된 레코드의 경우, 수정된 레코드를 중복으로 가져오지 않도록 고유 키를 정의해야 합니다. is_incremental() 코드는 dbt가 이 모델을 마지막으로 실행한 이후 생성되거나 수정된 행이 있는지 확인합니다.

예를 들어, 열에 계산 속도가 느린 변환을 포함하는 모델은 다음과 같이 증분적으로 구축할 수 있습니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
{{
    config(
        materialized='incremental'
    )
}}

select
    *,
    my_slow_function(my_column)

from raw_app_data.events

{% if is_incremental() %}

  -- 이 필터는 증분 실행시 적용됩니다.
  where event_time > (select max(event_time) from {{ this }})

{% endif %}

CTE(일반 테이블 표현식)를 사용하는 보다 복잡한 증분 모델의 경우, is_incremental()매크로의 위치가 쿼리 성능에 미치는 영향을 고려해야 합니다.

일부 웨어하우스에서는 레코드를 조기에 필터링하면 쿼리 실행 시간을 크게 개선할 수 있습니다!

모델의 Primary Key(선택 사항)

unique_key를 사용하면 새 행을 추가하는 대신 기존 행을 업데이트할 수 있습니다.

기존 unique_key에 대한 새 정보가 도착하면 해당 새 정보가 테이블에 추가되는 대신 현재 정보를 대체할 수 있습니다. 중복 행이 도착하면 무시할 수 있습니다.

unique_key를 지정하지 않으면 추가 전용 동작이 발생하며, 이는 dbt가 모델의 SQL에서 반환된 모든 행을 중복 여부와 관계없이 기존 대상 테이블에 삽입한다는 의미입니다. (선택 사항이지만 정합성을 위해서는 어쩔 수 없이 선택 해야 할 옵션처럼 보이네요.)

unique_key는 모델 상단의 구성 블록에서 정의할 수 있으며, 단일 열 이름 또는 열 이름 목록이 될 수 있습니다.

unique_key는 모델 정의에서 단일 열을 나타내는 문자열 또는 함께 사용할 수 있는 작은따옴표로 묶인 열 이름의 목록(예:['col1', 'col2', ...])으로 제공해야 합니다.

  • 이 방식으로 사용되는 열에는 Null이 포함되어서는 안 되며, 그렇지 않으면 증분 모델 실행이 실패할 수 있습니다.
    • 각 열에 Null값이 없도록 대체하거나(예: coalesce(COLUMN_NAME, 'VALUE_IF_NULL'),
    • 단일 열 대리 키를 정의합니다(예: dbt_utils.generate_surrogate_key 사용).

각 행을 고유하게 식별하기 위해 여러 열을 조합해야 하는 경우, 이러한 열을 문자열 표현식(unique_key = 'concat(user_id, session_number)')이 아닌 목록(unique_key = ['user_id', 'session_number'])으로 전달할 것을 권장합니다.

  • 두 번째 구문을 사용하면 데이터베이스에 적합한 방식으로 증분 모델 구체화에 열이 템플릿화되도록 보장할 수 있습니다.
  • 마찬가지로 각 열에 Null이 포함되어 있지 않은지 확인해야 하며, 그렇지 않으면 증분 모델 실행이 실패할 수 있습니다.
  • 또는 단일 열 대리 키를 정의할 수도 있습니다(예: dbt_utils.generate_surrogate_key).

unique_key 예시

이벤트 스트림을 기반으로 일일 활성 사용자 수(DAU)를 계산하는 모델을 생각해 보겠습니다.

소스 데이터가 도착하면 dbt가 마지막으로 실행된 날과 그 이후의 모든 날에 대한 DAU를 계산하는 모델입니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
{{
    config(
        materialized='incremental',
        unique_key='date_day'
    )
}}

select
    date_trunc('day', event_at) as date_day,
    count(distinct user_id) as daily_active_users

from raw_app_data.events

{% if is_incremental() %}

  -- 증분 실행 시 실행되는 조건문
  where date_day >= (select max(date_day) from {{ this }})

{% endif %}

group by 1

증분 모델을 다시 빌드하려면?

증분 모델 로직이 변경된 경우 새 데이터 행의 변환이 대상 테이블에 저장된 과거 변환과 다를 수 있습니다. 이 경우 증분 모델을 다시 빌드해야 합니다.

dbt가 전체 증분 모델을 처음부터 다시 작성하도록 하려면 명령줄에서--full-refresh 플래그를 사용합니다. 이 플래그를 사용하면 dbt가 데이터베이스의 기존 대상 테이블을 삭제한 후 완전히 다시 빌드합니다.

1
dbt run --full-refresh --select my_incremental_model+

모델 명 뒤에 +를 명시하여 모든 다운스트림 모델도 다시 빌드하는 것이 좋습니다.

증분 모델을 쓰기에 적절한 상황

table 구체화는 모델을 테이블로 만들기도 하지만, 각 dbt 실행 시 테이블을 다시 빌드합니다. 이러한 실행은 다음과 같은 경우 많은 컴퓨팅을 사용한다는 점에서 문제가 될 수 있습니다:

  • 소스 데이터 테이블에 수백만 또는 수십억 개의 행이 있는 경우.
  • 소스 데이터의 변환이 계산 비용이 많이 들거나(즉, 실행하는 데 오랜 시간이 걸리는 경우), 복잡한 정규식 함수를 사용하거나, 데이터를 변환하는 데 UDF를 사용하는 경우.

프로그래밍의 많은 부분과 마찬가지로, 증분 모델은 복잡성과 성능 사이의 절충점입니다. 증분 모델은 viewtable materialization만큼 간단하지는 않지만, DBT 실행의 성능을 크게 향상시킬 수 있습니다.

is_incremental() macro 살펴보기

다음 조건이 모두 충족되면 is_incremental()매크로는 True를 반환합니다:

  • 대상 테이블이 데이터베이스에 이미 존재할 때
  • dbt가 full-refresh 모드로 실행되고 있지 않을 때
  • 실행 중인 모델이 materialized='incremental'으로 구성되어 있을 때

증분 모델의 백그라운드 동작

DBT의 증분 구체화는 데이터베이스마다 다르게 작동합니다. 지원되는 경우, merge 문은 새 레코드를 삽입하고 기존 레코드를 업데이트하는 데 사용됩니다.

merge 문을 지원하지 않는 웨어하우스에서는 먼저 delete 문을 사용하여 업데이트할 대상 테이블의 레코드를 삭제한 다음 insert 문을 사용하여 병합을 구현합니다.

트랜잭션 관리는 이 작업이 단일 작업 단위로 실행되도록 하는 데 사용됩니다.

증분 모델의 스키마가 중간에 변경되는 경우

on_schema_change 옵션을 사용하면 스키마 변경이 있는 경우에도 증분 모델을 계속 실행할 수 있으므로 —full-refresh 시나리오가 줄어들고 쿼리 비용이 절감됩니다.

다음과 같이 on_schema_change 설정을 구성할 수 있습니다.

1
2
3
# dbt_project.yml
models:
  +on_schema_change: "sync_all_columns"
1
2
3
4
5
6
7
{{
    config(
        materialized='incremental',
        unique_key='date_day',
        on_schema_change='fail' -- <- here
    )
}}

on_schema_change에 사용할 수 있는 값은 다음과 같습니다:

  • ignore: default.
    • 증분 모델에 열을 추가하고 dbt 실행을 실행하는 경우 이 열은 대상 테이블에 나타나지 않습니다.
    • 마찬가지로 증분 모델에서 열을 제거하고 dbt 실행을 실행하는 경우 이 열은 대상 테이블에서 제거되지 않습니다.
    • 대신 증분 로직이 변경될 때마다 증분 모델과 다운스트림 모델 모두에 대한 full-refresh 실행을 통해 업데이트는 가능합니다.
  • fail: 소스 스키마와 대상 스키마가 다를 때 오류 메시지를 트리거합니다.
  • append_new_columns: 기존 테이블에 새 열을 추가합니다. 이 설정은 새 데이터에 없는 열을 기존 테이블에서 제거하지 않습니다.
  • sync_all_columns: 기존 테이블에 새 열을 추가하고 현재 누락된 열을 제거합니다. 여기에는 데이터 유형 변경이 포함됩니다. BigQuery에서 열 유형을 변경하려면 전체 테이블을 스캔해야 하므로 구현할 때 장단점을 염두에 두어야 합니다.

참고: 새로 추가된 열에 대한 이전 레코드의 값을 다시 채우는 on_schema_change 동작은 없습니다. 이러한 값을 채워야 하는 경우 수동 업데이트를 실행하거나 --full-refresh를 트리거하는 것이 좋습니다.

주의: on_schema_change는 최상위 수준 변경을 추적합니다. 현재 on_schema_change는 최상위 수준 열 변경 사항만 추적합니다. 중첩된 열 변경은 추적하지 않습니다. 예를 들어, BigQuery에서 중첩된 열을 추가, 제거 또는 수정하면 on_schema_change가 적절하게 설정되어 있더라도 스키마 변경이 트리거되지 않습니다.

incremetal_strategy

일부 어댑터에서는 선택적 incremental_strategy 구성이 dbt가 증분 모델을 빌드하는 데 사용하는 코드를 제어합니다. 데이터의 양, unique_key의 신뢰성 또는 특정 기능의 가용성에 따라 접근 방식에 따라 효과가 달라질 수 있습니다.

Snowflake: merge(기본값), delete+insert(선택 사항), append(선택 사항) BigQuery: merge(기본값), insert_overwrite(선택 사항) Spark: append(기본값), insert_overwrite(선택 사항), merge(선택 사항, Delta-only)

Configuring incremental strategy

incremental_stategy 구성은 특정 모델에 지정하거나 dbt_project.yml파일에 있는 모든 모델에 지정할 수 있습니다:

  • 특정 모델에만 지정하는 방법

     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    
    # models/my_model.sql
    {{
      config(
        materialized='incremental',
        unique_key='date_day',
        incremental_strategy='delete+insert',
        ...
      )
    }}
    
    select ...
    
  • dbt_project.yml에 모든 모델 지정하는 방법

    1
    2
    
    models:
      +incremental_strategy: "insert_overwrite"
    

Strategy-specifig configs

merge 전략을 사용 중이고 unique_key를 지정한 경우 기본적으로 dbt는 일치하는 행을 새 값으로 완전히 덮어씁니다.

merge 전략을 지원하는 어댑터(Snowflake, BigQuery, Apache Spark 및 Databricks 포함)에서는 선택적으로 merge_update_columns 구성에 열 이름 목록을 전달할 수 있습니다. 이 경우 dbt는 구성에 지정된 열만 업데이트하고 다른 열의 이전 값은 유지합니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
{{
  config(
    materialized = 'incremental',
    unique_key = 'id',
    merge_update_columns = ['email', 'ip_address'],
    ...
  )
}}

select ...

Incremental_predicates

incremental_predicates는 데이터가 너무 커서 성능에 대한 추가 투자를 정당화할 수 있을 때 복잡성을 증가시켜 성능을 향상시키는 옵션입니다.

다음은 Snowflake에서 볼 수 있는 yml 파일의 모델 구성 예시입니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
models:
  - name: my_incremental_model
    config:
      materialized: incremental
      unique_key: id
      cluster_by: ['session_start']  
      incremental_strategy: merge
      # `incremental_predicates` SQL문 리스트를 받습니다. 이렇게 하면 기존 테이블의 스캔이 최근 7일간의 데이터로 제한됩니다.
      incremental_predicates: ["DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)"]
      
      # DBT_INTERNAL_DEST`  `DBT_INTERNAL_SOURCE` merge 전략을 사용하여 증분 실행하는 동안 사용하는 대상 테이블과 임시 테이블의 표준 별칭입니다.

또는 모델 파일 내에 동일한 구성을 구성하는 방법도 있습니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
-- in models/my_incremental_model.sql

{{
  config(
    materialized = 'incremental',
    unique_key = 'id',
    cluster_by = ['session_start'],  
    incremental_strategy = 'merge',
    incremental_predicates = [
      "DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)"
    ]
  )
}}

...

이렇게 하면 dbt.log 파일에 다음과 같은 병합 문이 템플릿으로 생성됩니다:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
merge into <existing_table> DBT_INTERNAL_DEST
    from <temp_table_with_new_records> DBT_INTERNAL_SOURCE
    on
        -- unique key
        DBT_INTERNAL_DEST.id = DBT_INTERNAL_SOURCE.id
        and
        -- custom predicate: limits data scan in the "old" data / existing table
        DBT_INTERNAL_DEST.session_start > datediff(day, -7, current_date)
    when matched then update ...
    when not matched then insert ...

증분 모델 SQL 본문 내에서 업스트림 테이블의 데이터 스캔을 제한하여 처리/변환되는 “새” 데이터의 양을 제한하는 방법도 있습니다.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
with large_source_table as (

    select * from {{ ref('large_source_table') }}
    {% if is_incremental() %}
        where session_start > dateadd(day, -3, current_date)
    {% endif %}

),

...

참고: 구문은 incremental_strategy을 구성하는 방법에 따라 다릅니다:

  • merge 전략을 사용하는 경우, 열의 별칭을 DBT_INTERNAL_DEST(“이전” 데이터) 또는 DBT_INTERNAL_SOURCE(“새” 데이터)로 명시적으로 지정해야 할 수 있습니다. 삽입_오버라이트 증분 전략과 개념적으로 상당 부분 겹치는 부분이 있습니다.
Hugo로 만듦
JimmyStack 테마 사용 중