Skip to content

Commit

Permalink
Revert renaming of manifest columns
Browse files Browse the repository at this point in the history
  • Loading branch information
agnessnowplow committed Dec 19, 2024
1 parent 56f6fd7 commit fdfccde
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) model,
cast('1970-01-01' as {{ type_timestamp() }}) as first_processed_load_tstamp,
cast('1970-01-01' as {{ type_timestamp() }}) as last_processed_load_tstamp
cast('1970-01-01' as {{ type_timestamp() }}) as first_success,
cast('1970-01-01' as {{ type_timestamp() }}) as last_success
)

select *
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% set status_query %}
select
min(first_processed_load_tstamp) as min_first_processed_load_tstamp,
max(first_processed_load_tstamp) as max_first_processed_load_tstamp,
min(last_processed_load_tstamp) as min_last_processed_load_tstamp,
max(last_processed_load_tstamp) as max_last_processed_load_tstamp,
min(first_success) as min_first_success,
max(first_success) as max_first_success,
min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models,
count(distinct last_processed_load_tstamp) as sync_count
count(distinct last_success) as sync_count
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}
Expand All @@ -40,18 +40,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{% if execute %}

{% set min_first_processed_load_tstamp = results.columns[0].values()[0] %}
{% set max_first_processed_load_tstamp = results.columns[1].values()[0] %}
{% set min_last_processed_load_tstamp = results.columns[2].values()[0] %}
{% set max_last_processed_load_tstamp = results.columns[3].values()[0] %}
{% set min_first_success = results.columns[0].values()[0] %}
{% set max_first_success = results.columns[1].values()[0] %}
{% set min_last_success = results.columns[2].values()[0] %}
{% set max_last_success = results.columns[3].values()[0] %}
{% set models_matched_from_manifest = results.columns[4].values()[0] %}
{% set sync_count = results.columns[5].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}

{{ return([min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{{ return([min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models]) }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,18 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0

{# Returns the sql to calculate the lower/upper limits of the run #}

{% macro get_run_limits_t(min_first_processed_load_tstamp,
max_first_processed_load_tstamp,
min_last_processed_load_tstamp,
max_last_processed_load_tstamp,
{% macro get_run_limits_t(min_first_success,
max_first_success,
min_last_success,
max_last_success,
models_matched_from_manifest,
sync_count,
has_matched_all_models,
start_date) -%}

{% set start_tstamp = snowplow_utils.cast_to_tstamp(start_date) %}
{% set min_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(min_last_processed_load_tstamp) %}
{% set max_last_processed_load_tstamp = snowplow_utils.cast_to_tstamp(max_last_processed_load_tstamp) %}
{% set min_last_success = snowplow_utils.cast_to_tstamp(min_last_success) %}
{% set max_last_success = snowplow_utils.cast_to_tstamp(max_last_success) %}

{% if not execute %}
{{ return('') }}
Expand Down Expand Up @@ -53,7 +53,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: New Snowplow incremental model. Backfilling") %}
{% set run_limits_query %}
select {{ start_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), start_tstamp) }}) as upper_limit
{% endset %}
{% endif %}
Expand All @@ -77,9 +77,9 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% do snowplow_utils.log_message("Snowplow: Snowplow incremental models out of sync. Syncing") %}

{% set run_limits_query %}
select {{ min_last_processed_load_tstamp }} as lower_limit,
least({{ max_last_processed_load_tstamp }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }}) as upper_limit
select {{ min_last_success }} as lower_limit,
least({{ max_last_success }},
{{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }}) as upper_limit
{% endset %}

{# State 5: If all models in the run exists in the manifest, none are out of sync, it is a standard incremental run #}
Expand All @@ -92,16 +92,16 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
select

{% if var("snowplow__run_type", "incremental") == 'incremental' %}
{{ min_last_processed_load_tstamp }} as lower_limit,
{{ min_last_success }} as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'current_day_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(0) }}, {{ min_last_success }}) as lower_limit,
{% elif var("snowplow__run_type", "incremental") == 'last_n_days_incremental'%}
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_processed_load_tstamp }}) as lower_limit,
least({{ snowplow_utils.deduct_days_from_current_tstamp_utc(var("snowplow__reprocess_days", 1)) }}, {{ min_last_success }}) as lower_limit,
{% else %}
{{ exceptions.raise_compiler_error("Snowplow Error: Input for variable snowplow__run_type not recognised. Input must be 'incremental', 'current_day_incremental' or 'last_n_days_incremental''. Input given: " ~ var("snowplow__run_type")) }}
{% endif %}

least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_processed_load_tstamp) }},
least({{ snowplow_utils.timestamp_add('day', var("snowplow__backfill_limit_days", 30), min_last_success) }},
{{ snowplow_utils.current_timestamp_in_utc() }}) as upper_limit
{% endset %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,27 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0
{% set last_success_query %}
select
b.model,
a.last_processed_load_tstamp,
a.first_processed_load_tstamp
a.last_success,
a.first_success

from
(select max(load_tstamp) as last_processed_load_tstamp,
min(load_tstamp) as first_processed_load_tstamp from {{ base_events_table }}) a,
(select max(load_tstamp) as last_success,
min(load_tstamp) as first_success from {{ base_events_table }}) a,
({% for model in models %} select '{{model}}' as model {%- if not loop.last %} union all {% endif %} {% endfor %}) b

where a.last_processed_load_tstamp is not null -- if run contains no data don't add to manifest
where a.last_success is not null -- if run contains no data don't add to manifest
{% endset %}

merge into {{ manifest_table }} m
using ( {{ last_success_query }} ) s
on m.model = s.model
when matched then
update set last_processed_load_tstamp = greatest(m.last_processed_load_tstamp, s.last_processed_load_tstamp),
first_processed_load_tstamp = coalesce(m.first_processed_load_tstamp, s.first_processed_load_tstamp)
update set last_success = greatest(m.last_success, s.last_success),
first_success = coalesce(m.first_success, s.first_success)

when not matched then
insert (model, last_processed_load_tstamp, first_processed_load_tstamp)
values (s.model, s.last_processed_load_tstamp, s.first_processed_load_tstamp);
insert (model, last_success, first_success)
values (s.model, s.last_success, s.first_success);

{% if target.type == 'snowflake' %}
commit;
Expand Down

0 comments on commit fdfccde

Please sign in to comment.