Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce support for end_date parameter #181

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ This tap:

The `shop` is your Shopify shop which will be the value `test_shop` in the string `https://test_shop.myshopify.com`

The `request_timeout` is the timeout for the requests. Default: 300 seconds
The `request_timeout` is the timeout for the requests. Default: 300 seconds

The `end_date` specifies the date at which the tap will stop pulling data
(for those resources that support this). Default: current date & time.

4. Run the Tap in Discovery Mode

Expand Down
2 changes: 1 addition & 1 deletion tap_shopify/streams/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def get_objects(self):
updated_at_min = self.get_bookmark()
max_bookmark = updated_at_min

stop_time = singer.utils.now().replace(microsecond=0)
stop_time = Context.config.get("end_date", singer.utils.now()).replace(microsecond=0)
date_window_size = float(Context.config.get("date_window_size", DATE_WINDOW_SIZE))

# Page through till the end of the resultset
Expand Down
154 changes: 154 additions & 0 deletions tests/test_end_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
"""
Test that the end_date configuration is respected
"""

from functools import reduce

import os

from dateutil.parser import parse

from tap_tester import menagerie, runner, LOGGER

from base import BaseTapTest


class EndDateTest(BaseTapTest):
"""
Test that the end_date configuration is respected

• verify that a sync with an earlier end date has at least one record synced
and less records than the 1st sync with a later end date
• verify that each stream has less records than the later end date sync
• verify all data from earlier end date has bookmark values <= end_date
• verify that the maximum bookmark sent to the target is less than or
equal to the end date
"""

def get_properties(self, original: bool = True):
return_value = {
'start_date': '2021-04-01T00:00:00Z',
'end_date': '2021-04-21T00:00:00Z',
'shop': 'talenddatawearhouse',
'date_window_size': 30,
# BUG: https://jira.talendforge.org/browse/TDL-13180
'results_per_page': '50'
}

if original:
return return_value

return_value["end_date"] = '2021-04-05T00:00:00Z'
return return_value

@staticmethod
def get_credentials(original_credentials: bool = True):
return {
'api_key': os.getenv('TAP_SHOPIFY_API_KEY_TALENDDATAWEARHOUSE')
}

@staticmethod
def name():
return "tap_tester_shopify_end_date_test"

def test_run(self):
"""Test we get a lot of data back based on the end date configured in base"""
conn_id = self.create_connection()

# Select all streams and all fields within streams
found_catalogs = menagerie.get_catalogs(conn_id)
# removed 'abandoned_checkouts', as per the Doc:
# https://help.shopify.com/en/manual/orders/abandoned-checkouts?st_source=admin&st_campaign=abandoned_checkouts_footer&utm_source=admin&utm_campaign=abandoned_checkouts_footer#review-your-abandoned-checkouts
# abandoned checkouts are saved in the Shopify admin for three months.
# Every Monday, abandoned checkouts that are older than three months are removed from your admin.
# Also no POST call is available for this endpoint: https://shopify.dev/api/admin-rest/2022-01/resources/abandoned-checkouts
expected_replication_method = self.expected_replication_method()
expected_replication_method.pop("abandoned_checkouts")
incremental_streams = {key for key, value in expected_replication_method.items()
if value == self.INCREMENTAL}

# IF THERE ARE STREAMS THAT SHOULD NOT BE TESTED
# REPLACE THE EMPTY SET BELOW WITH THOSE STREAMS

our_catalogs = [catalog for catalog in found_catalogs if
catalog.get('tap_stream_id') in incremental_streams]
self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True)

# Run a sync job using orchestrator
first_sync_record_count = self.run_sync(conn_id)
first_total_records = reduce(lambda a, b: a + b, first_sync_record_count.values())

# Count actual rows synced
first_sync_records = runner.get_records_from_target_output()
first_max_bookmarks = self.max_bookmarks_by_stream(first_sync_records)

self.end_date = self.get_properties(original=False)["end_date"]

# create a new connection with the new end_date
conn_id = self.create_connection(original_properties=False)

# Select all streams and all fields within streams
found_catalogs = menagerie.get_catalogs(conn_id)
our_catalogs = [catalog for catalog in found_catalogs if
catalog.get('tap_stream_id') in incremental_streams]
self.select_all_streams_and_fields(conn_id, our_catalogs, select_all_fields=True)

# Run a sync job using orchestrator
second_sync_record_count = self.run_sync(conn_id)
second_total_records = reduce(lambda a, b: a + b, second_sync_record_count.values(), 0)
second_sync_records = runner.get_records_from_target_output()
second_max_bookmarks = self.max_bookmarks_by_stream(second_sync_records)

# verify that at least one record synced and less records synced than the 1st connection
self.assertGreater(second_total_records, 0)
self.assertLess(second_total_records, first_total_records)

for stream in incremental_streams:
with self.subTest(stream=stream):

# get primary key values for both sync records
expected_primary_keys = self.expected_primary_keys()[stream]
primary_keys_list_1 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in first_sync_records.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_list_2 = [tuple(message.get('data').get(expected_pk) for expected_pk in expected_primary_keys)
for message in second_sync_records.get(stream).get('messages')
if message.get('action') == 'upsert']
primary_keys_sync_1 = set(primary_keys_list_1)
primary_keys_sync_2 = set(primary_keys_list_2)

# verify that each stream has less records than the first connection sync
self.assertGreaterEqual(
first_sync_record_count.get(stream, 0),
second_sync_record_count.get(stream, 0),
msg="second had more records, end_date usage not verified")

# Verify by primary key values, that all records of the 2nd sync are included in the 1st sync since 2nd sync has an earlier end date.
self.assertTrue(primary_keys_sync_2.issubset(primary_keys_sync_1))

# verify all data from both syncs <= end_date
first_sync_target_mark = first_max_bookmarks.get(stream, {"mark": None})
second_sync_target_mark = second_max_bookmarks.get(stream, {"mark": None})

# get end dates for both syncs
first_sync_end_date = self.get_properties()["end_date"]
second_sync_end_date = self.end_date

for end_date, target_mark in zip((first_sync_end_date, second_sync_end_date), (first_sync_target_mark, second_sync_target_mark)):
target_value = next(iter(target_mark.values())) # there should be only one

if target_value:

# it's okay if there isn't target data for a stream
try:
target_value = self.local_to_utc(parse(target_value))

# verify that the maximum bookmark sent to the target for the sync
# is less than or equal to the end date
self.assertLessEqual(target_value,
self.local_to_utc(parse(end_date)))

except (OverflowError, ValueError, TypeError):
LOGGER.warn(
"bookmarks cannot be converted to dates, can't test end_date for %s", stream
)