diff --git a/.github/workflows/pr_tests.yml b/.github/workflows/pr_tests.yml index 66f7ec7..4b19bfb 100644 --- a/.github/workflows/pr_tests.yml +++ b/.github/workflows/pr_tests.yml @@ -2,9 +2,6 @@ name: pr_tests on: pull_request: - branches: - - main - - 'release/**' concurrency: dbt_integration_tests @@ -42,13 +39,12 @@ env: SNOWFLAKE_TEST_WAREHOUSE: ${{ secrets.SNOWFLAKE_TEST_WAREHOUSE }} # Postgres Connection - POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} POSTGRES_TEST_USER: ${{ secrets.POSTGRES_TEST_USER }} POSTGRES_TEST_PASS: ${{ secrets.POSTGRES_TEST_PASS }} - POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} POSTGRES_TEST_DBNAME: ${{ secrets.POSTGRES_TEST_DBNAME }} + POSTGRES_TEST_HOST: ${{ secrets.POSTGRES_TEST_HOST }} + POSTGRES_TEST_PORT: ${{ secrets.POSTGRES_TEST_PORT }} - # Databricks Connection DATABRICKS_TEST_HOST: ${{ secrets.DATABRICKS_TEST_HOST }} DATABRICKS_TEST_HTTP_PATH: ${{ secrets.DATABRICKS_TEST_HTTP_PATH }} DATABRICKS_TEST_TOKEN: ${{ secrets.DATABRICKS_TEST_TOKEN }} @@ -63,10 +59,10 @@ jobs: # Run tests from integration_tests sub dir working-directory: ./integration_tests strategy: + fail-fast: false matrix: dbt_version: ["1.*"] - warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift"] - + warehouse: ["postgres", "bigquery", "snowflake", "databricks", "redshift", "spark_iceberg"] # TODO: Add RS self-hosted runner services: postgres: image: postgres:latest @@ -87,7 +83,26 @@ jobs: steps: - name: Check out uses: actions/checkout@v3 - + - name: Configure Docker credentials + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_USERNAME }} + password: ${{ secrets.DOCKERHUB_SNOWPLOWCI_READ_PASSWORD }} + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v1 + with: + aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} + aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} + aws-region: eu-west-1 + - name: Set warehouse variables + id: set_warehouse + run: | + WAREHOUSE_PLATFORM=$(echo ${{ matrix.warehouse }} | cut -d'_' -f1) + WAREHOUSE_SPECIFIC=$(echo ${{ matrix.warehouse }} | cut -s -d'_' -f2) + echo "WAREHOUSE_PLATFORM=${WAREHOUSE_PLATFORM}" >> $GITHUB_ENV + echo "WAREHOUSE_SPECIFIC=${WAREHOUSE_SPECIFIC}" >> $GITHUB_ENV + echo "warehouse_platform=${WAREHOUSE_PLATFORM}" >> $GITHUB_OUTPUT + echo "warehouse_specific=${WAREHOUSE_SPECIFIC}" >> $GITHUB_OUTPUT # Remove '*' and replace '.' with '_' in DBT_VERSION & set as SCHEMA_SUFFIX. # SCHEMA_SUFFIX allows us to run multiple versions of dbt in parallel without overwriting the output tables - name: Set SCHEMA_SUFFIX env @@ -97,7 +112,7 @@ jobs: - name: Set DEFAULT_TARGET env run: | - echo "DEFAULT_TARGET=${{ matrix.warehouse }}" >> $GITHUB_ENV + echo "DEFAULT_TARGET=${{matrix.warehouse}}" >> $GITHUB_ENV - name: Python setup uses: actions/setup-python@v4 @@ -108,37 +123,46 @@ jobs: uses: actions/cache@v3 with: path: ~/.cache/pip - key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + key: ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} restore-keys: | - ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{ matrix.warehouse }} + ${{ runner.os }}-pip-${{ matrix.dbt_version }}-${{env.WAREHOUSE_PLATFORM}} # Install latest patch version. Upgrade if cache contains old patch version. - name: Install dependencies run: | - pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}"==${{ matrix.dbt_version }} --upgrade + pip install wheel setuptools + pip install -Iv dbt-${{env.WAREHOUSE_PLATFORM}}==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse != 'spark'}} + if: ${{env.WAREHOUSE_PLATFORM != 'spark'}} - name: Install spark dependencies run: | pip install --upgrade pip wheel setuptools - pip install -Iv "dbt-${{ matrix.warehouse }}[ODBC]"==${{ matrix.dbt_version }} --upgrade + pip install -Iv "dbt-${{ env.WAREHOUSE_PLATFORM }}[PyHive]"==${{ matrix.dbt_version }} --upgrade dbt deps - if: ${{matrix.warehouse == 'spark'}} - - - name: "Connect to database" + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + + - name: Install Docker Compose run: | - dbt debug + sudo curl -L "https://github.com/docker/compose/releases/download/1.29.2/docker-compose-$(uname -s)-$(uname -m)" -o /usr/local/bin/docker-compose + sudo chmod +x /usr/local/bin/docker-compose + + - name: Build and start Spark cluster + working-directory: .github/workflows/spark_deployment + run: | + docker-compose up -d + echo "Waiting for Spark services to start..." + sleep 90 + if: ${{env.WAREHOUSE_PLATFORM == 'spark'}} + - name: "Pre-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} - name: Run tests - run: ./.scripts/integration_test.sh -d ${{ matrix.warehouse }} + run: ./.scripts/integration_tests.sh -d ${{matrix.warehouse}} - # post_ci_cleanup sits in utils package - name: "Post-test: Drop ci schemas" run: | - dbt run-operation post_ci_cleanup --target ${{ matrix.warehouse }} + dbt run-operation post_ci_cleanup --target ${{matrix.warehouse}} \ No newline at end of file diff --git a/.github/workflows/spark_deployment/Dockerfile b/.github/workflows/spark_deployment/Dockerfile new file mode 100644 index 0000000..dab5720 --- /dev/null +++ b/.github/workflows/spark_deployment/Dockerfile @@ -0,0 +1,34 @@ +FROM openjdk:11-jre-slim + +# Set environment variables +ENV SPARK_VERSION=3.5.1 +ENV HADOOP_VERSION=3.3.4 +ENV ICEBERG_VERSION=1.4.2 +ENV AWS_SDK_VERSION=1.12.581 + +# Install necessary tools +RUN apt-get update && apt-get install -y curl wget procps rsync ssh + +# Download and install Spark +RUN wget https://downloads.apache.org/spark/spark-${SPARK_VERSION}/spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + tar -xvzf spark-${SPARK_VERSION}-bin-hadoop3.tgz && \ + mv spark-${SPARK_VERSION}-bin-hadoop3 /spark && \ + rm spark-${SPARK_VERSION}-bin-hadoop3.tgz + +# Set Spark environment variables +ENV SPARK_HOME=/spark +ENV PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin + +# Download necessary JARs +RUN mkdir -p /spark/jars && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/${ICEBERG_VERSION}/iceberg-spark-runtime-3.5_2.12-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-spark-runtime.jar && \ + wget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-aws-bundle/${ICEBERG_VERSION}/iceberg-aws-bundle-${ICEBERG_VERSION}.jar -O /spark/jars/iceberg-aws-bundle.jar && \ + wget https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -O /spark/jars/hadoop-aws.jar && \ + wget https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -O /spark/jars/aws-java-sdk-bundle.jar + +# Create directory for Spark events +RUN mkdir -p /tmp/spark-events + +WORKDIR /spark + +CMD ["bash"] \ No newline at end of file diff --git a/.github/workflows/spark_deployment/build_and_push.sh b/.github/workflows/spark_deployment/build_and_push.sh new file mode 100755 index 0000000..1be2b6d --- /dev/null +++ b/.github/workflows/spark_deployment/build_and_push.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Set variables +DOCKER_HUB_ORG="snowplow" +IMAGE_NAME="spark-s3-iceberg" +TAG="latest" + +# Build the image +echo "Building Docker image..." +docker build --platform linux/amd64 -t $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG . + +# Log in to Docker Hub +echo "Logging in to Docker Hub..." +docker login + +# Push the image to Docker Hub +echo "Pushing image to Docker Hub..." +docker push $DOCKER_HUB_ORG/$IMAGE_NAME:$TAG + +echo "Image successfully built and pushed to Docker Hub" \ No newline at end of file diff --git a/.github/workflows/spark_deployment/docker-compose.yml b/.github/workflows/spark_deployment/docker-compose.yml new file mode 100644 index 0000000..2e8077b --- /dev/null +++ b/.github/workflows/spark_deployment/docker-compose.yml @@ -0,0 +1,66 @@ +version: '3' + +networks: + spark-network: + driver: bridge + +services: + spark-master: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "/spark/sbin/start-master.sh -h spark-master --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.master.Master-1-*.out"] + hostname: spark-master + ports: + - '8080:8080' + - '7077:7077' + environment: + - SPARK_LOCAL_IP=spark-master + - SPARK_MASTER_HOST=spark-master + - SPARK_MASTER_PORT=7077 + - SPARK_MASTER_OPTS="-Dspark.driver.memory=2g" + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + spark-worker: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 10 && /spark/sbin/start-worker.sh spark://spark-master:7077 --properties-file /spark/conf/spark-defaults.conf && tail -f /spark/logs/spark--org.apache.spark.deploy.worker.Worker-*.out"] + depends_on: + - spark-master + environment: + - SPARK_WORKER_CORES=2 + - SPARK_WORKER_MEMORY=4G + - SPARK_EXECUTOR_MEMORY=3G + - SPARK_LOCAL_IP=spark-worker + - SPARK_MASTER=spark://spark-master:7077 + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network + + thrift-server: + image: snowplow/spark-s3-iceberg:latest + command: ["/bin/bash", "-c", "sleep 30 && /spark/sbin/start-thriftserver.sh --master spark://spark-master:7077 --driver-memory 2g --executor-memory 3g --hiveconf hive.server2.thrift.port=10000 --hiveconf hive.server2.thrift.bind.host=0.0.0.0 --conf spark.sql.hive.thriftServer.async=true --conf spark.sql.hive.thriftServer.workerQueue.size=2000 --conf spark.sql.hive.thriftServer.maxWorkerThreads=100 --conf spark.sql.hive.thriftServer.minWorkerThreads=50 && tail -f /spark/logs/spark--org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-*.out"] + ports: + - '10000:10000' + depends_on: + - spark-master + - spark-worker + environment: + - SPARK_LOCAL_IP=thrift-server + - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} + - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY} + - AWS_REGION=eu-west-1 + - AWS_DEFAULT_REGION=eu-west-1 + volumes: + - ./spark-defaults.conf:/spark/conf/spark-defaults.conf + networks: + - spark-network \ No newline at end of file diff --git a/.github/workflows/spark_deployment/spark-defaults.conf b/.github/workflows/spark_deployment/spark-defaults.conf new file mode 100644 index 0000000..9052a05 --- /dev/null +++ b/.github/workflows/spark_deployment/spark-defaults.conf @@ -0,0 +1,44 @@ +spark.master spark://spark-master:7077 + +spark.sql.warehouse.dir s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.glue.catalog-impl org.apache.iceberg.aws.glue.GlueCatalog +spark.sql.catalog.glue.warehouse s3a://dbt-spark-iceberg/github-integration-testing +spark.sql.catalog.glue.io-impl org.apache.iceberg.aws.s3.S3FileIO +spark.sql.defaultCatalog glue +spark.sql.catalog.glue.database dbt-spark-iceberg + +spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem +spark.hadoop.fs.s3a.access.key +spark.hadoop.fs.s3a.secret.key +spark.hadoop.fs.s3a.endpoint s3.eu-west-1.amazonaws.com +spark.hadoop.fs.s3a.path.style.access true +spark.hadoop.fs.s3a.region eu-west-1 +spark.hadoop.fs.s3a.aws.region eu-west-1 + +# Enabling AWS SDK V4 signing (required for regions launched after January 2014) +spark.hadoop.com.amazonaws.services.s3.enableV4 true +spark.hadoop.fs.s3a.aws.credentials.provider org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider + +# Hive Metastore Configuration (using AWS Glue) +spark.hadoop.hive.metastore.client.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory + +# Thrift Server Configuration for better performance in concurrent environments +spark.sql.hive.thriftServer.singleSession false +spark.sql.hive.thriftServer.async true +# spark.sql.hive.thriftServer.maxWorkerThreads 100 +# spark.sql.hive.thriftServer.minWorkerThreads 50 +# spark.sql.hive.thriftServer.workerQueue.size 2000 + +# Memory and Performance Tuning +# spark.driver.memory 2g +# spark.executor.memory 3g +# spark.worker.memory 4g +spark.network.timeout 600s +spark.sql.broadcastTimeout 600s +spark.sql.adaptive.enabled true +spark.serializer org.apache.spark.serializer.KryoSerializer + +# Logging and Debugging +spark.eventLog.enabled true +spark.eventLog.dir /tmp/spark-events diff --git a/CHANGELOG b/CHANGELOG index 49ef8f0..94649e0 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,17 @@ +snowplow-media-player 0.9.0 (2024-10-15) +--------------------------------------- +## Summary +This release adds support for Apache Spark with the Iceberg file format and updates integration tests to ensure proper functionality. + +## Features +- Add support for Apache Spark with Iceberg file format + +## Under the hood +- Modify integration tests to support Spark + +## Upgrading +Update the snowplow-media-player version in your `packages.yml` file. + snowplow-media-player 0.8.0 (2024-06-20) --------------------------------------- ## Summary diff --git a/dbt_project.yml b/dbt_project.yml index ccff7d6..8c3ddc3 100644 --- a/dbt_project.yml +++ b/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_media_player' -version: '0.8.0' +version: '0.9.0' config-version: 2 require-dbt-version: ['>=1.4.0', '<2.0.0'] @@ -115,6 +115,8 @@ models: snowplow_media_player: +bind: false +materialized: table + +incremental_strategy: "{{ none if target.type not in ['spark'] else 'merge' }}" + +file_format: "{{ 'delta' if target.type not in ['spark'] else 'iceberg'}}" base: manifest: +schema: "snowplow_manifest" @@ -124,11 +126,13 @@ models: bigquery: +enabled: '{{ target.type == "bigquery" | as_bool() }}' databricks: - +enabled: '{{ target.type in ["databricks", "spark"] | as_bool() }}' + +enabled: '{{ target.type == "databricks" | as_bool() }}' default: +enabled: '{{ target.type in ["redshift", "postgres"] | as_bool() }}' snowflake: +enabled: '{{ target.type == "snowflake" | as_bool() }}' + spark: + +enabled: '{{ target.type == "spark" | as_bool() }}' media_base: +schema: 'derived' +tags: 'snowplow_media_player_incremental' diff --git a/integration_tests/.scripts/integration_test.sh b/integration_tests/.scripts/integration_tests.sh similarity index 97% rename from integration_tests/.scripts/integration_test.sh rename to integration_tests/.scripts/integration_tests.sh index 3ac227b..1add026 100755 --- a/integration_tests/.scripts/integration_test.sh +++ b/integration_tests/.scripts/integration_tests.sh @@ -10,7 +10,7 @@ do esac done -declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake") +declare -a SUPPORTED_DATABASES=("bigquery" "databricks" "postgres" "redshift" "snowflake", "spark_iceberg") # set to lower case DATABASE="$(echo $DATABASE | tr '[:upper:]' '[:lower:]')" @@ -28,7 +28,7 @@ for db in ${DATABASES[@]}; do eval "dbt seed --target $db --full-refresh" || exit 1; # This run and the subsequent incremental ones exist just to make sure that the models work with the newer contexts disabled - echo "Snowplow media player integration tests (v1 only): Execute models - run 1/6" + echo "Snowplow media player integration tests (v1 only): Execute models - run 1/2" eval "dbt run --target $db --full-refresh --vars '{snowplow__allow_refresh: true, snowplow__enable_media_player_v2: false, snowplow__enable_media_session: false, snowplow__enable_media_ad: false, snowplow__enable_media_ad_break: false, snowplow__enable_ad_quartile_event: false, snowplow__enable_mobile_events: false}'" || exit 1; @@ -37,7 +37,7 @@ for db in ${DATABASES[@]}; do eval "dbt run --target $db --vars '{snowplow__allow_refresh: true, snowplow__enable_media_player_v2: false, snowplow__enable_media_session: false, snowplow__enable_media_ad: false, snowplow__enable_media_ad_break: false, snowplow__enable_ad_quartile_event: false, snowplow__enable_mobile_events: false}'" || exit 1; # This run and the subsequent incremental ones exist just to make sure that the models work with the older contexts disabled - echo "Snowplow media player integration tests (v2 only): Execute models - run 1/6" + echo "Snowplow media player integration tests (v2 only): Execute models - run 1/2" eval "dbt run --target $db --full-refresh --vars '{snowplow__allow_refresh: true, snowplow__backfill_limit_days: 3000, snowplow__enable_youtube: false, snowplow__enable_whatwg_media: false, snowplow__enable_whatwg_video: false, snowplow__enable_media_player_v1: false}'" || exit 1; diff --git a/integration_tests/ci/profiles.yml b/integration_tests/ci/profiles.yml index 46ec83a..dd550d3 100644 --- a/integration_tests/ci/profiles.yml +++ b/integration_tests/ci/profiles.yml @@ -67,12 +67,14 @@ integration_tests: token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" threads: 4 - spark: - type: spark - method: odbc - driver: "{{ env_var('DATABRICKS_TEST_HTTP_PATH') }}" - schema: "gh_sp_web_dbt_{{ env_var('SCHEMA_SUFFIX') }}" - host: "{{ env_var('DATABRICKS_TEST_HOST') }}" - token: "{{ env_var('DATABRICKS_TEST_TOKEN') }}" - endpoint: "{{ env_var('DATABRICKS_TEST_ENDPOINT') }}" - threads: 4 + + spark_iceberg: + type: spark + method: thrift + host: "{{ env_var('SPARK_MASTER_HOST', 'localhost') }}" + port: 10000 + user: "{{ env_var('SPARK_USER', 'spark') }}" + schema: "gh_sp_media_player_dbt_{{ env_var('SCHEMA_SUFFIX') }}" + connect_retries: 5 + connect_timeout: 60 + threads: 4 \ No newline at end of file diff --git a/integration_tests/dbt_project.yml b/integration_tests/dbt_project.yml index f07f8f6..d864af9 100644 --- a/integration_tests/dbt_project.yml +++ b/integration_tests/dbt_project.yml @@ -1,5 +1,5 @@ name: 'snowplow_media_player_integration_tests' -version: '0.8.0' +version: '0.9.0' config-version: 2 profile: 'integration_tests' @@ -27,20 +27,23 @@ quoting: models: snowplow_media_player_integration_tests: bind: false + +materialized: table +schema: 'snplw_media_player_int_tests' source: bigquery: +enabled: '{{ target.type == "bigquery" | as_bool() }}' databricks: - +enabled: '{{ target.type in ["databricks", "spark"] | as_bool() }}' + +enabled: '{{ target.type == "databricks" | as_bool() }}' + spark: + +enabled: '{{ target.type == "spark" | as_bool() }}' default: +enabled: '{{ target.type in ["redshift", "postgres"] | as_bool() }}' snowflake: +enabled: '{{ target.type == "snowflake" | as_bool() }}' snowplow_media_player: +persist_docs: - relation: '{{ false if target.type in ["databricks"] else true }}' - columns: '{{ false if target.type in ["databricks"] else true }}' + relation: '{{ false if target.type in ["spark","databricks"] else true }}' + columns: '{{ false if target.type in ["spark","databricks"] else true }}' custom: +enabled: true diff --git a/integration_tests/models/source/spark/snowplow_media_player_events_stg.sql b/integration_tests/models/source/spark/snowplow_media_player_events_stg.sql new file mode 100644 index 0000000..a55b201 --- /dev/null +++ b/integration_tests/models/source/spark/snowplow_media_player_events_stg.sql @@ -0,0 +1,329 @@ +{# +Copyright (c) 2022-present Snowplow Analytics Ltd. All rights reserved. +This program is licensed to you under the Snowplow Personal and Academic License Version 1.0, +and you may not use this file except in compliance with the Snowplow Personal and Academic License Version 1.0. +You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 at https://docs.snowplow.io/personal-and-academic-license-1.0/ +#} + +{{ + config( + materialized='table', + ) +}} + +with prep as ( + + select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign,se_category,se_action,se_label,se_property,se_value,tr_orderid,tr_affiliation,tr_total, + tr_tax,tr_shipping,tr_city,tr_state,tr_country,ti_orderid,ti_sku,ti_name,ti_category,ti_price,ti_quantity, + pp_xoffset_min,pp_xoffset_max,pp_yoffset_min,pp_yoffset_max,useragent,br_name,br_family,br_version,br_type, + br_renderengine,br_lang,br_features_pdf,br_features_flash,br_features_java,br_features_director,br_features_quicktime, + br_features_realplayer,br_features_windowsmedia,br_features_gears,br_features_silverlight,br_cookies,br_colordepth,br_viewwidth, + br_viewheight,os_name,os_family,os_manufacturer,os_timezone,dvce_type,dvce_ismobile,dvce_screenwidth,dvce_screenheight,doc_charset, + doc_width,doc_height,tr_currency,tr_total_base,tr_tax_base,tr_shipping_base,ti_currency,ti_price_base,base_currency,geo_timezone,mkt_clickid, + mkt_network,etl_tags,dvce_sent_tstamp,refr_domain_userid,refr_dvce_tstamp,domain_sessionid,derived_tstamp,event_vendor,event_name,event_format, + event_version,event_fingerprint,true_tstamp,load_tstamp, + + from_json(contexts_com_snowplowanalytics_snowplow_web_page_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_web_page_1_0_0, + from_json(contexts_com_snowplowanalytics_mobile_screen_1_0_0, 'array>') as contexts_com_snowplowanalytics_mobile_screen_1_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2, 'array>') as contexts_com_snowplowanalytics_snowplow_client_session_1_0_2, + from_json(contexts_org_whatwg_video_element_1_0_0, 'array>') as contexts_org_whatwg_video_element_1_0_0, + from_json(contexts_org_whatwg_media_element_1_0_0, 'array>') as contexts_org_whatwg_media_element_1_0_0, + from_json(contexts_com_youtube_youtube_1_0_0, 'array>') as contexts_com_youtube_youtube_1_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_media_player_1_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_media_player_2_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_media_session_1_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0, + from_json(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0, 'array>') as contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0, + from_json(unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1_0_0, 'struct') as unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1_0_0, + from_json(unstruct_event_com_snowplowanalytics_snowplow_media_ad_quartile_event_1_0_0, 'struct') as unstruct_event_com_snowplowanalytics_snowplow_media_ad_quartile_event_1_0_0 + + + from {{ ref('snowplow_media_player_events') }} + + ) + + select + app_id, + platform, + etl_tstamp, + collector_tstamp, + dvce_created_tstamp, + event, + event_id, + txn_id, + name_tracker, + v_tracker, + v_collector, + v_etl, + user_id, + user_ipaddress, + user_fingerprint, + domain_userid, + domain_sessionidx, + network_userid, + geo_country, + geo_region, + geo_city, + geo_zipcode, + geo_latitude, + geo_longitude, + geo_region_name, + ip_isp, + ip_organization, + ip_domain, + ip_netspeed, + page_url, + page_title, + page_referrer, + page_urlscheme, + page_urlhost, + page_urlport, + page_urlpath, + page_urlquery, + page_urlfragment, + refr_urlscheme, + refr_urlhost, + refr_urlport, + refr_urlpath, + refr_urlquery, + refr_urlfragment, + refr_medium, + refr_source, + refr_term, + mkt_medium, + mkt_source, + mkt_term, + mkt_content, + mkt_campaign,se_category,se_action,se_label,se_property,se_value,tr_orderid,tr_affiliation,tr_total, + tr_tax,tr_shipping,tr_city,tr_state,tr_country,ti_orderid,ti_sku,ti_name,ti_category,ti_price,ti_quantity, + pp_xoffset_min,pp_xoffset_max,pp_yoffset_min,pp_yoffset_max,useragent,br_name,br_family,br_version,br_type, + br_renderengine,br_lang,br_features_pdf,br_features_flash,br_features_java,br_features_director,br_features_quicktime, + br_features_realplayer,br_features_windowsmedia,br_features_gears,br_features_silverlight,br_cookies,br_colordepth,br_viewwidth, + br_viewheight,os_name,os_family,os_manufacturer,os_timezone,dvce_type,dvce_ismobile,dvce_screenwidth,dvce_screenheight,doc_charset, + doc_width,doc_height,tr_currency,tr_total_base,tr_tax_base,tr_shipping_base,ti_currency,ti_price_base,base_currency,geo_timezone,mkt_clickid, + mkt_network,etl_tags,dvce_sent_tstamp,refr_domain_userid,refr_dvce_tstamp,domain_sessionid,derived_tstamp,event_vendor,event_name,event_format, + event_version,event_fingerprint,true_tstamp,load_tstamp, + + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_web_page_1_0_0[0].id as STRING) AS id + ) + ) as contexts_com_snowplowanalytics_snowplow_web_page_1, + + array( + struct( + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].name as STRING) AS name, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].type as STRING) AS type, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].id as STRING) AS id, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].view_controller as STRING) AS view_controller, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].top_view_controller as STRING) AS top_view_controller, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].activity as STRING) AS activity, + cast(contexts_com_snowplowanalytics_mobile_screen_1_0_0[0].fragment as STRING) AS fragment + ) + ) as contexts_com_snowplowanalytics_mobile_screen_1, + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].user_id as STRING) AS user_id, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].session_id as STRING) AS session_id, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].session_index as INT) AS session_index, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].event_index as INT) AS event_index, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].previous_session_id as STRING) AS previous_session_id, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].storage_mechanism as STRING) AS storage_mechanism, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].first_event_id as STRING) AS first_event_id, + cast(contexts_com_snowplowanalytics_snowplow_client_session_1_0_2[0].first_event_timestamp as TIMESTAMP) AS first_event_timestamp + ) + ) as contexts_com_snowplowanalytics_snowplow_client_session_1, + + array( + struct( + cast(contexts_org_whatwg_video_element_1_0_0[0].video_height as INT) AS video_height, + cast(contexts_org_whatwg_video_element_1_0_0[0].video_width as INT) AS video_width, + cast(contexts_org_whatwg_video_element_1_0_0[0].auto_picture_in_picture as BOOLEAN) AS auto_picture_in_picture, + cast(contexts_org_whatwg_video_element_1_0_0[0].disable_picture_in_picture as BOOLEAN) AS disable_picture_in_picture, + cast(contexts_org_whatwg_video_element_1_0_0[0].poster as STRING) AS poster + ) + ) as contexts_org_whatwg_video_element_1, + + array( + struct( + cast(contexts_org_whatwg_media_element_1_0_0[0].auto_play as BOOLEAN) AS auto_play, + cast(contexts_org_whatwg_media_element_1_0_0[0].current_src as STRING) AS current_src, + cast(contexts_org_whatwg_media_element_1_0_0[0].default_muted as BOOLEAN) AS default_muted, + cast(contexts_org_whatwg_media_element_1_0_0[0].default_playback_rate as DOUBLE) AS default_playback_rate, + cast(contexts_org_whatwg_media_element_1_0_0[0].html_id as STRING) AS html_id, + cast(contexts_org_whatwg_media_element_1_0_0[0].media_type as STRING) AS media_type, + cast(contexts_org_whatwg_media_element_1_0_0[0].network_state as STRING) AS network_state, + cast(contexts_org_whatwg_media_element_1_0_0[0].preload as STRING) AS preload, + cast(contexts_org_whatwg_media_element_1_0_0[0].ready_state as STRING) AS ready_state, + cast(contexts_org_whatwg_media_element_1_0_0[0].seeking as BOOLEAN) AS seeking, + cast(contexts_org_whatwg_media_element_1_0_0[0].cross_origin as STRING) AS cross_origin, + cast(contexts_org_whatwg_media_element_1_0_0[0].disable_remote_playback as BOOLEAN) AS disable_remote_playback, + cast(contexts_org_whatwg_media_element_1_0_0[0].error as STRING) AS error, + cast(contexts_org_whatwg_media_element_1_0_0[0].file_extension as STRING) AS file_extension, + cast(contexts_org_whatwg_media_element_1_0_0[0].fullscreen as BOOLEAN) AS fullscreen, + cast(contexts_org_whatwg_media_element_1_0_0[0].picture_in_picture as BOOLEAN) AS picture_in_picture, + cast(contexts_org_whatwg_media_element_1_0_0[0].src as STRING) AS src + ) + ) as contexts_org_whatwg_media_element_1, + + array( + struct( + cast(contexts_com_youtube_youtube_1_0_0[0].auto_play as BOOLEAN) AS auto_play, + cast(contexts_com_youtube_youtube_1_0_0[0].buffering as BOOLEAN) AS buffering, + cast(contexts_com_youtube_youtube_1_0_0[0].controls as BOOLEAN) AS controls, + cast(contexts_com_youtube_youtube_1_0_0[0].cued as BOOLEAN) AS cued, + cast(contexts_com_youtube_youtube_1_0_0[0].loaded as INT) AS loaded, + cast(contexts_com_youtube_youtube_1_0_0[0].playback_quality as STRING) AS playback_quality, + cast(contexts_com_youtube_youtube_1_0_0[0].player_id as STRING) AS player_id, + cast(contexts_com_youtube_youtube_1_0_0[0].unstarted as BOOLEAN) AS unstarted, + cast(contexts_com_youtube_youtube_1_0_0[0].url as STRING) AS url, + cast(contexts_com_youtube_youtube_1_0_0[0].error as STRING) AS error, + cast(contexts_com_youtube_youtube_1_0_0[0].fov as DOUBLE) AS fov, + cast(contexts_com_youtube_youtube_1_0_0[0].origin as STRING) AS origin, + cast(contexts_com_youtube_youtube_1_0_0[0].pitch as DOUBLE) AS pitch, + cast(contexts_com_youtube_youtube_1_0_0[0].playlist_index as DOUBLE) AS playlist_index, + cast(contexts_com_youtube_youtube_1_0_0[0].roll as DOUBLE) AS roll, + cast(contexts_com_youtube_youtube_1_0_0[0].yaw as DOUBLE) AS yaw + ) + ) as contexts_com_youtube_youtube_1, + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].current_time as DOUBLE) AS current_time, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].ended as BOOLEAN) AS ended, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].loop as BOOLEAN) AS loop, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].muted as BOOLEAN) AS muted, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].paused as BOOLEAN) AS paused, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].playback_rate as DOUBLE) AS playback_rate, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].volume as INT) AS volume, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].duration as DOUBLE) AS duration, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].is_live as BOOLEAN) AS is_live, + cast(contexts_com_snowplowanalytics_snowplow_media_player_1_0_0[0].percent_progress as INT) AS percent_progress + ) + ) as contexts_com_snowplowanalytics_snowplow_media_player_1, + + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].current_time as DOUBLE) AS current_time, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].duration as DOUBLE) AS duration, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].ended as BOOLEAN) AS ended, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].fullscreen as BOOLEAN) AS fullscreen, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].livestream as BOOLEAN) AS livestream, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].label as STRING) AS label, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].loop as BOOLEAN) AS loop, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].media_type as STRING) AS media_type, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].muted as BOOLEAN) AS muted, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].paused as BOOLEAN) AS paused, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].picture_in_picture as BOOLEAN) AS picture_in_picture, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].playback_rate as DOUBLE) AS playback_rate, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].player_type as STRING) AS player_type, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].quality as STRING) AS quality, + cast(contexts_com_snowplowanalytics_snowplow_media_player_2_0_0[0].volume as INT) AS volume + ) +) as contexts_com_snowplowanalytics_snowplow_media_player_2, + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].media_session_id as STRING) AS media_session_id, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].started_at as STRING) AS started_at, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].ping_interval as INT) AS ping_interval, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].time_played as DOUBLE) AS time_played, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].time_played_muted as DOUBLE) AS time_played_muted, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].time_paused as DOUBLE) AS time_paused, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].content_watched as DOUBLE) AS content_watched, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].time_buffering as DOUBLE) AS time_buffering, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].time_spent_ads as DOUBLE) AS time_spent_ads, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].ads as INT) AS ads, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].ads_clicked as INT) AS ads_clicked, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].ads_skipped as INT) AS ads_skipped, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].ad_breaks as INT) AS ad_breaks, + cast(contexts_com_snowplowanalytics_snowplow_media_session_1_0_0[0].avg_playback_rate as DOUBLE) AS avg_playback_rate + ) + ) as contexts_com_snowplowanalytics_snowplow_media_session_1, + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].name as STRING) AS name, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].ad_id as STRING) AS ad_id, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].creative_id as STRING) AS creative_id, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].pod_position as INT) AS pod_position, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].duration as DOUBLE) AS duration, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_1_0_0[0].skippable as BOOLEAN) AS skippable + ) + ) as contexts_com_snowplowanalytics_snowplow_media_ad_1, + + array( + struct( + cast(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0[0].name as STRING) AS name, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0[0].break_id as STRING) AS break_id, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0[0].start_time as STRING) AS start_time, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0[0].break_type as STRING) AS break_type, + cast(contexts_com_snowplowanalytics_snowplow_media_ad_break_1_0_0[0].pod_size as INT) AS pod_size + ) + ) as contexts_com_snowplowanalytics_snowplow_media_ad_break_1, + + struct( + cast(unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1_0_0.type as STRING) AS type, + cast(unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1_0_0.label as STRING) AS label + ) as unstruct_event_com_snowplowanalytics_snowplow_media_player_event_1, + + struct( + cast(unstruct_event_com_snowplowanalytics_snowplow_media_ad_quartile_event_1_0_0.percent_progress as INT) AS percent_progress + ) as unstruct_event_com_snowplowanalytics_snowplow_media_ad_quartile_event_1 + + from prep diff --git a/macros/identifiers.sql b/macros/identifiers.sql index 1831ed2..eaa0246 100644 --- a/macros/identifiers.sql +++ b/macros/identifiers.sql @@ -15,7 +15,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} -{% macro databricks__session_identifiers() %} +{% macro spark__session_identifiers() %} {% if var('snowplow__session_identifiers') %} {{ return(var('snowplow__session_identifiers')) }} @@ -132,7 +132,7 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {% endmacro %} -{% macro databricks__user_identifiers() %} +{% macro spark__user_identifiers() %} {% if var('snowplow__user_identifiers') %} {{ return(var('snowplow__user_identifiers')) }} diff --git a/models/base/src_base.yml b/models/base/src_base.yml index 63ea1e2..bf87251 100644 --- a/models/base/src_base.yml +++ b/models/base/src_base.yml @@ -3,7 +3,7 @@ version: 2 sources: - name: atomic schema: "{{ var('snowplow__atomic_schema', 'atomic') if project_name != 'snowplow_media_player_integration_tests' else target.schema~'_snplw_media_player_int_tests' }}" - database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else var('snowplow__atomic_schema', 'atomic') }}" + database: "{{ var('snowplow__database', target.database) if target.type not in ['databricks', 'spark'] else var('snowplow__databricks_catalog', 'hive_metastore') if target.type in ['databricks'] else none }}" tables: - name: events identifier: "{{ var('snowplow__events_table', 'events') if project_name != 'snowplow_media_player_integration_tests' else 'snowplow_media_player_events_stg' }}" diff --git a/models/custom/snowplow_media_player_session_stats.sql b/models/custom/snowplow_media_player_session_stats.sql index 4ed1a83..9c79a80 100644 --- a/models/custom/snowplow_media_player_session_stats.sql +++ b/models/custom/snowplow_media_player_session_stats.sql @@ -23,10 +23,12 @@ with prep as ( select -- get the first domain_sessionid in the array - {% if target.name in ('bigquery', 'databricks', 'snowflake') %} + {% if target.type in ('bigquery', 'databricks', 'snowflake') %} cast(({{ snowplow_utils.get_split_to_array('domain_sessionid_array', 'b') }})[0] as {{ type_string() }}) as domain_sessionid, - {% elif target.name == 'redshift' %} + {% elif target.type == 'redshift' %} split_part(domain_sessionid_array, ',', 1) as domain_sessionid, + {% elif target.type == 'spark' %} + split(domain_sessionid_array, ',')[0] as domain_sessionid, {% else %} cast(({{ snowplow_utils.get_split_to_array('domain_sessionid_array', 'b') }})[1] as {{ type_string() }}) as domain_sessionid, {% endif %} diff --git a/models/media_ads/snowplow_media_player_media_ads.sql b/models/media_ads/snowplow_media_player_media_ads.sql index e775c07..66f3890 100644 --- a/models/media_ads/snowplow_media_player_media_ads.sql +++ b/models/media_ads/snowplow_media_player_media_ads.sql @@ -129,7 +129,7 @@ new_media_ad_views as ( select * from new_data union all - select * {% if target.type in ['databricks', 'spark'] %}except(first_view_date){% endif %} + select * {% if target.type in ['databricks'] %}except(first_view_date){% endif %} from {{ this }} ) @@ -229,7 +229,7 @@ new_media_ad_views as ( {% endif %} select * - {% if target.type in ['databricks', 'spark'] -%} + {% if target.type in ['databricks'] -%} , date(prep.first_view) as first_view_date {%- endif %} diff --git a/models/media_base/scratch/snowplow_media_player_base_this_run.sql b/models/media_base/scratch/snowplow_media_player_base_this_run.sql index 14950fb..e4ade89 100644 --- a/models/media_base/scratch/snowplow_media_player_base_this_run.sql +++ b/models/media_base/scratch/snowplow_media_player_base_this_run.sql @@ -95,13 +95,13 @@ events_this_run as ( select p.* - {% if target.type == 'postgres' %} + {% if target.type in ['postgres','spark'] %} ,row_number() over (partition by p.play_id order by p.start_tstamp) as duplicate_count {% endif %} from prep as p - {% if target.type not in ['postgres'] %} + {% if target.type not in ['postgres','spark'] %} qualify row_number() over (partition by p.play_id order by p.start_tstamp) = 1 {% endif %} @@ -308,6 +308,6 @@ left join first_page_views_by_play_id as pv left join page_view_id_aggregation as pva on pva.play_id = d.play_id -{% if target.type == 'postgres' %} +{% if target.type in ['postgres','spark'] %} where d.duplicate_count = 1 {% endif %} diff --git a/models/media_plays/snowplow_media_player_plays_by_pageview.sql b/models/media_plays/snowplow_media_player_plays_by_pageview.sql index c0d3d7f..0bd4458 100644 --- a/models/media_plays/snowplow_media_player_plays_by_pageview.sql +++ b/models/media_plays/snowplow_media_player_plays_by_pageview.sql @@ -7,7 +7,6 @@ You may obtain a copy of the Snowplow Personal and Academic License Version 1.0 {{ config( - materialized='view', tags=["derived"], sql_header=snowplow_utils.set_query_tag(var('snowplow__query_tag', 'snowplow_dbt')) ) diff --git a/packages.yml b/packages.yml index 6da1d8c..5a0a730 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,3 @@ packages: - package: snowplow/snowplow_utils - version: [">=0.16.2", "<0.17.0"] + version: [">=0.17.0", "<0.18.0"] \ No newline at end of file