Skip to content

Commit

Permalink
Merge pull request #69 from clrcrl/insert-by-period
Browse files Browse the repository at this point in the history
Add insert-by-period materialization
  • Loading branch information
drewbanin authored Jun 27, 2018
2 parents 77ba63d + b924133 commit 754db12
Show file tree
Hide file tree
Showing 2 changed files with 230 additions and 2 deletions.
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ This macro implements an "outer union." The list of tables provided to this macr
Usage:
```
{{ dbt_utils.union_tables(
tables=[ref('table_1'), ref('table_2')],
column_override={"some_field": "varchar(100)"},
tables=[ref('table_1'), ref('table_2')],
column_override={"some_field": "varchar(100)"},
exclude=["some_other_field"]
) }}
```
Expand Down Expand Up @@ -266,6 +266,59 @@ Usage:
```
{{ dbt_utils.get_url_parameter(field='page_url', url_parameter='utm_source') }}
```
---
### Materializations
#### insert_by_period ([source](macros/materializations/insert_by_period_materialization.sql))
`insert_by_period` allows dbt to insert records into a table one period (i.e. day, week) at a time.

This materialization is appropriate for event data that can be processed in discrete periods. It is similar in concept to the built-in incremental materialization, but has the added benefit of building the model in chunks even during a full-refresh so is particularly useful for models where the initial run can be problematic.

Should a run of a model using this materialization be interrupted, a subsequent run will continue building the target table from where it was interrupted (granted the `--full-refresh` flag is omitted).

Progress is logged in the command line for easy monitoring.

Usage:
```sql
{{
config(
materialized = "insert_by_period",
period = "day",
timestamp_field = "created_at",
start_date = "2018-01-01",
stop_date = "2018-06-01"
}}

with events as (

select *
from {{ ref('events') }}
where __PERIOD_FILTER__ -- This will be replaced with a filter in the materialization code

)

....complex aggregates here....

```
Configuration values:
* `period`: period to break the model into, must be a valid [datepart](https://docs.aws.amazon.com/redshift/latest/dg/r_Dateparts_for_datetime_functions.html) (default='Week')
* `timestamp_field`: the column name of the timestamp field that will be used to break the model into smaller queries
* `start_date`: literal date or timestamp - generally choose a date that is earlier than the start of your data
* `stop_date`: literal date or timestamp (default=current_timestamp)

Caveats:
* This materialization is compatible with dbt 0.10.1.
* This materialization has been written for Redshift.
* This materialization can only be used for a model where records are not expected to change after they are created.
* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example:
```yaml
models:
project-name:
post-hook: "grant select on {{ this }} to db_reader"
```
A useful workaround is to change the above post-hook to:
```yaml
post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader"
```
----
Expand Down
175 changes: 175 additions & 0 deletions macros/materializations/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{dbt_utils.dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp")}},
{{dbt_utils.current_timestamp()}}
) as stop_timestamp
from "{{target_schema}}"."{{target_table}}"
)

select
start_timestamp,
stop_timestamp,
{{dbt_utils.datediff('start_timestamp',
'stop_timestamp',
period)}} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}

{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
)

{%- endmacro %}

{% materialization insert_by_period, default -%}
{%- set timestamp_field = config.require('timestamp_field') -%}
{%- set start_date = config.require('start_date') -%}
{%- set stop_date = config.get('stop_date') or '' -%}}
{%- set period = config.get('period') or 'week' -%}

{%- if not '__PERIOD_FILTER__' is in sql -%}
{%- set error_message -%}
Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql
{%- endset -%}
{{ exceptions.raise_compiler_error(error_message) }}
{%- endif -%}

{%- set identifier = model['name'] -%}

{%- set existing_relations = adapter.list_relations(schema=schema) -%}
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}

{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}

{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}

-- setup
{% if old_relation is none -%}
-- noop
{%- elif should_truncate -%}
{{adapter.truncate_relation(old_relation)}}
{%- elif should_drop -%}
{{adapter.drop_relation(old_relation)}}
{%- set old_relation = none -%}
{%- endif %}

{{run_hooks(pre_hooks, inside_transaction=False)}}

-- `begin` happens here, so `commit` after it to finish the transaction
{{run_hooks(pre_hooks, inside_transaction=True)}}
{% call statement() -%}
begin; -- make extra sure we've closed out the transaction
commit;
{%- endcall %}

-- build model
{% if force_create or old_relation is none -%}
{# Create an empty target table -#}
{% call statement('main') -%}
{%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%}
{{create_table_as(False, target_relation, empty_sql)}};
{%- endcall %}
{%- endif %}

{% set _ = dbt_utils.get_period_boundaries(schema,
identifier,
timestamp_field,
start_date,
stop_date,
period) %}
{%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%}
{%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%}
{%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%}

{% set target_columns = adapter.get_columns_in_table(schema, identifier) %}
{%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%}
{%- set loop_vars = {'sum_rows_inserted': 0} -%}

-- commit each period as a separate transaction
{% for i in range(num_periods) -%}
{%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%}
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}

{%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%}
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema, type='table') -%}
{% call statement() -%}
{% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv,
sql,
timestamp_field,
period,
start_timestamp,
stop_timestamp,
i) %}
{{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}}
{%- endcall %}

{{adapter.expand_target_column_types(temp_table=tmp_identifier,
to_schema=schema,
to_table=identifier)}}
{%- set name = 'main-' ~ i -%}
{% call statement(name, fetch_result=True) -%}
insert into {{target_relation}} ({{target_cols_csv}})
(
select
{{target_cols_csv}}
from {{tmp_relation.include(schema=False)}}
);
{%- endcall %}
{%- set rows_inserted = (load_result('main-' ~ i)['status'].split(" "))[2] | int -%}
{%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%}
{%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%}

{%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%}
{{log(" + " ~ modules.datetime.datetime.now().strftime('%H:%M:%S') ~ " " ~ msg, info=True)}}

{%- endfor %}

{% call statement() -%}
begin;
{%- endcall %}

{{run_hooks(post_hooks, inside_transaction=True)}}

{% call statement() -%}
commit;
{%- endcall %}

{{run_hooks(post_hooks, inside_transaction=False)}}

{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}

{% call noop_statement(name='main', status=status_string) -%}
-- no-op
{%- endcall %}

{%- endmaterialization %}

0 comments on commit 754db12

Please sign in to comment.