diff --git a/CHANGELOG.md b/CHANGELOG.md index b3d1807..3284601 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,14 @@ # Changelog +## [v2.2.0] (2024-11-14) + +[Full Changelog](https://github.com/singer-io/tap-square/compare/v2.1.1...v2.2.0) + +* Bug fix to replicate the `Payouts` stream records [#122](https://github.com/singer-io/tap-square/pull/120) +* Fetches `Payments` stream data for all location IDs +* Updates `Payments` stream implementation to function as a pseudo-incremental stream +* Adds retry logic for handling 5xx errors + ## [v2.1.1] (2024-11-04) [Full Changelog](https://github.com/singer-io/tap-square/compare/v2.1.0...v2.1.1) diff --git a/README.md b/README.md index 40f9cba..c8e55c2 100644 --- a/README.md +++ b/README.md @@ -17,7 +17,7 @@ This tap: * BankAccounts * Refunds * Payments - * Payouts + * Payouts * ModifierLists * Inventories * Orders diff --git a/setup.py b/setup.py index 53a8136..51ff5e7 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='tap-square', - version='2.1.1', + version='2.2.0', description='Singer.io tap for extracting data from the Square API', author='Stitch', url='http://singer.io', diff --git a/tap_square/client.py b/tap_square/client.py index bb7f43b..2e996b6 100644 --- a/tap_square/client.py +++ b/tap_square/client.py @@ -86,8 +86,18 @@ def _retryable_v2_method(request_method, body, **kwargs): result = request_method(body, **kwargs) if result.is_error(): + LOGGER.info("HTTP status code when it errors out: %s", result.status_code) error_message = result.errors if result.errors else result.body - if 'Service Unavailable' in error_message or 'upstream connect error or disconnect/reset before headers' in error_message or result.status_code == 429: + + # Refactor the conditions into separate variables for readability + is_service_unavailable = 'Service Unavailable' in error_message + is_upstream_error = 'upstream connect error or disconnect/reset before headers' in error_message + is_cf_error_1101 = '1101' in error_message + is_html_error = error_message.startswith('') + is_status_429_or_500 = result.status_code == 429 or result.status_code >= 500 + + retryable_conditions = {is_service_unavailable, is_upstream_error, is_cf_error_1101, is_html_error, is_status_429_or_500} + if any(retryable_conditions): raise RetryableError(error_message) else: raise RuntimeError(error_message) @@ -253,23 +263,34 @@ def get_refunds(self, start_time, bookmarked_cursor): body, 'refunds') - def get_payments(self, start_time, bookmarked_cursor): - start_time = utils.strptime_to_utc(start_time) - start_time = start_time - timedelta(milliseconds=1) - start_time = utils.strftime(start_time) + def get_payments(self, location_id, start_time, bookmarked_cursor): + if bookmarked_cursor: + cursor = bookmarked_cursor + else: + cursor = '__initial__' # initial value so while loop is always entered one time - body = { - } - body['begin_time'] = start_time + end_time = utils.strftime(utils.now(), utils.DATETIME_PARSE) + while cursor: + if cursor == '__initial__': + # Initial text was needed to go into the while loop, but api needs + # it to be a valid bookmarked cursor or None + cursor = bookmarked_cursor - if bookmarked_cursor: - body['cursor'] = bookmarked_cursor + with singer.http_request_timer('GET payments'): + result = self._retryable_v2_method( + lambda bdy: self._client.payments.list_payments( + location_id=location_id, + begin_time=start_time, + end_time=end_time, + cursor=cursor, + limit=100, + ), + None, + ) - yield from self._get_v2_objects( - 'payments', - lambda bdy: self._client.payments.list_payments(**bdy), - body, - 'payments') + yield (result.body.get('payments', []), result.body.get('cursor')) + + cursor = result.body.get('cursor') def get_cash_drawer_shifts(self, location_id, start_time, bookmarked_cursor): if bookmarked_cursor: @@ -373,6 +394,6 @@ def get_payouts(self, location_id, start_time, bookmarked_cursor): None, ) - yield (result.body.get('items', []), result.body.get('cursor')) + yield (result.body.get('payouts', []), result.body.get('cursor')) cursor = result.body.get('cursor') diff --git a/tap_square/streams.py b/tap_square/streams.py index 260b2e6..80bc320 100644 --- a/tap_square/streams.py +++ b/tap_square/streams.py @@ -75,15 +75,13 @@ def sync(self, state, stream_schema, stream_metadata, config, transformer): start_time = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date']) bookmarked_cursor = singer.get_bookmark(state, self.tap_stream_id, 'cursor') - for page, cursor in self.get_pages_safe(state, bookmarked_cursor, start_time): + for page, _ in self.get_pages_safe(state, bookmarked_cursor, start_time): for record in page: transformed_record = transformer.transform(record, stream_schema, stream_metadata) singer.write_record( self.tap_stream_id, transformed_record, ) - singer.write_bookmark(state, self.tap_stream_id, 'cursor', cursor) - singer.write_state(state) state = singer.clear_bookmark(state, self.tap_stream_id, 'cursor') singer.write_state(state) @@ -180,16 +178,36 @@ def get_pages(self, bookmarked_cursor, start_time): yield from self.client.get_refunds(start_time, bookmarked_cursor) -class Payments(FullTableStream): +class Payments(Stream): tap_stream_id = 'payments' key_properties = ['id'] - replication_method = 'FULL_TABLE' - valid_replication_keys = [] - replication_key = None + replication_method = 'INCREMENTAL' + valid_replication_keys = ['updated_at'] + replication_key = 'updated_at' + # If the records are not updated at all since those are created and if it has missing the updated_at field + second_replication_key = 'created_at' object_type = 'PAYMENT' - def get_pages(self, bookmarked_cursor, start_time): - yield from self.client.get_payments(start_time, bookmarked_cursor) + + def sync(self, state, stream_schema, stream_metadata, config, transformer): + bookmarked_time = singer.get_bookmark(state, self.tap_stream_id, self.replication_key, config['start_date']) + max_bookmark_value = bookmarked_time + all_location_ids = Locations.get_all_location_ids(self.client) + + for location_id in all_location_ids: + for page, _ in self.client.get_payments(location_id, bookmarked_time, bookmarked_cursor=None): + for record in page: + transformed_record = transformer.transform(record, stream_schema, stream_metadata) + + if record.get(self.replication_key, self.second_replication_key) >= bookmarked_time: + singer.write_record(self.tap_stream_id, transformed_record,) + max_bookmark_value = max(transformed_record.get(self.replication_key) or \ + transformed_record.get(self.second_replication_key), \ + max_bookmark_value) + + state = singer.write_bookmark(state, self.tap_stream_id, self.replication_key, max_bookmark_value) + singer.write_state(state) + return state class Orders(Stream):