diff --git a/.circleci/config.yml b/.circleci/config.yml index 45d96f6..cd8092f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -18,8 +18,7 @@ jobs: name: 'pylint' command: | source /usr/local/share/virtualenvs/tap-bing-ads/bin/activate - pylint tap_bing_ads -d missing-docstring,line-too-long,invalid-name,super-with-arguments,return-in-init,too-many-arguments,deprecated-method,consider-using-f-string,too-many-lines,unidiomatic-typecheck - - add_ssh_keys + pylint tap_bing_ads -d missing-docstring,line-too-long,invalid-name,super-with-arguments,return-in-init,too-many-arguments,deprecated-method,consider-using-f-string,too-many-lines,unidiomatic-typecheck,consider-using-generator - run: name: 'Unit Tests' command: | @@ -35,10 +34,14 @@ jobs: command: | aws s3 cp s3://com-stitchdata-dev-deployment-assets/environments/tap-tester/tap_tester_sandbox dev_env.sh source dev_env.sh + mkdir /tmp/${CIRCLE_PROJECT_REPONAME} + export STITCH_CONFIG_DIR=/tmp/${CIRCLE_PROJECT_REPONAME} source /usr/local/share/virtualenvs/tap-tester/bin/activate run-test --tap=tap-bing-ads tests - slack/notify-on-failure: only_for_branches: master + - store_artifacts: + path: /tmp/tap-bing-ads workflows: version: 2 commit: &commit_jobs diff --git a/CHANGELOG.md b/CHANGELOG.md index 6d0728b..cc9ce08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,13 +1,15 @@ # Changelog -## 2.20 +## 2.2.1 + * Prefer new authentication scope for the access_token request [#99](https://github.com/singer-io/tap-bing-ads/pull/99) + +## 2.2.0 * Implement Timeout Request [#93](https://github.com/singer-io/tap-bing-ads/pull/93) * Added Top Level Breadcrumb [#91](https://github.com/singer-io/tap-bing-ads/pull/91) * Added Retry Logic for 500 errors [#90](https://github.com/singer-io/tap-bing-ads/pull/90) * Added required fields for age_gender_audience_report [#86](https://github.com/singer-io/tap-bing-ads/pull/86) * Removed automatic fields from ad_ext report [#85](https://github.com/singer-io/tap-bing-ads/pull/85) - ## 2.1.0 * Update the BingAds library to `13.0.11` diff --git a/setup.py b/setup.py index f95f621..0743125 100644 --- a/setup.py +++ b/setup.py @@ -12,7 +12,9 @@ py_modules=['tap_bingads'], install_requires=[ 'arrow==0.12.0', - 'bingads==13.0.11', + # Seems that suds-community is now the reference for 13.0.11.1 so we can install it now with the removal of use_2to3 + # https://github.com/BingAds/BingAds-Python-SDK/pull/192 + 'bingads==13.0.11.1', 'requests==2.31.0', 'singer-python==6.0.0', 'stringcase==1.2.0', @@ -20,7 +22,7 @@ ], extras_require={ 'test': [ - 'pylint' + 'pylint==3.0.3' ], 'dev': [ 'ipdb' diff --git a/tap_bing_ads/__init__.py b/tap_bing_ads/__init__.py index 7f27d04..f898192 100644 --- a/tap_bing_ads/__init__.py +++ b/tap_bing_ads/__init__.py @@ -15,6 +15,7 @@ from urllib.error import URLError import singer from singer import utils, metadata, metrics +import bingads from bingads import AuthorizationData, OAuthWebAuthCodeGrant, ServiceClient import suds from suds.sudsobject import asdict @@ -152,26 +153,38 @@ def set_options(self, **kwargs): kwargs['timeout'] = get_request_timeout() self._soap_client.set_options(**kwargs) +def get_authentication(): + """ + Authenticate using the new method (no scope specified) and + fall back to the legacy method using the bingads.manage scope if + that fails. + """ + # Represents an OAuth authorization object implementing the authorization code grant flow for use in a web application. + try: + authentication = OAuthWebAuthCodeGrant( + CONFIG['oauth_client_id'], + CONFIG['oauth_client_secret'], + '') ## redirect URL not needed for refresh token + # Retrieves OAuth access and refresh tokens from the Microsoft Account authorization service. + authentication.request_oauth_tokens_by_refresh_token(CONFIG['refresh_token']) + return authentication + except bingads.exceptions.OAuthTokenRequestException: + authentication = OAuthWebAuthCodeGrant( + CONFIG['oauth_client_id'], + CONFIG['oauth_client_secret'], + '', + oauth_scope='bingads.manage') ## redirect URL not needed for refresh token + # Retrieves OAuth access and refresh tokens from the Microsoft Account authorization service. + authentication.request_oauth_tokens_by_refresh_token(CONFIG['refresh_token']) + return authentication + @bing_ads_error_handling def create_sdk_client(service, account_id): # Creates SOAP client with OAuth refresh credentials for services LOGGER.info('Creating SOAP client with OAuth refresh credentials for service: %s, account_id %s', service, account_id) - if CONFIG.get('require_live_connect', 'True') == 'True': - oauth_scope = 'msads.manage'#'bingads.manage' - else: - oauth_scope = 'msads.manage' - - # Represents an OAuth authorization object implementing the authorization code grant flow for use in a web application. - authentication = OAuthWebAuthCodeGrant( - CONFIG['oauth_client_id'], - CONFIG['oauth_client_secret'], - '', - oauth_scope=oauth_scope) ## redirect URL not needed for refresh token - - # Retrieves OAuth access and refresh tokens from the Microsoft Account authorization service. - authentication.request_oauth_tokens_by_refresh_token(CONFIG['refresh_token']) + authentication = get_authentication() # Instance require to authenticate with Bing Ads authorization_data = AuthorizationData( diff --git a/tests/base.py b/tests/base.py index 90826c8..1be2c74 100644 --- a/tests/base.py +++ b/tests/base.py @@ -2,7 +2,6 @@ Setup expectations for test sub classes Run discovery for as a prerequisite for most tests """ -import unittest import backoff import copy import os @@ -10,7 +9,8 @@ from datetime import datetime as dt from datetime import timezone as tz -from tap_tester import connections, menagerie, runner +from tap_tester import connections, menagerie, runner, LOGGER +from tap_tester.base_case import BaseCase def backoff_wait_times(): """Create a generator of wait times as [30, 60, 120, 240, 480, ...]""" @@ -22,7 +22,7 @@ def __init__(self, message): super().__init__(message) -class BingAdsBaseTest(unittest.TestCase): +class BingAdsBaseTest(BaseCase): """ Setup expectations for test sub classes Run discovery for as a prerequisite for most tests @@ -222,7 +222,7 @@ def run_check_mode(self, conn_id): menagerie.verify_check_exit_status(self, exit_status, check_job_name) except AssertionError as e: if exit_status['discovery_error_message']: - print("*******************RETRYING CHECK FOR DISCOVERY FAILURE*******************") + LOGGER.warn("*******************RETRYING CHECK FOR DISCOVERY FAILURE*******************") raise RetryableTapError(e) raise @@ -233,7 +233,7 @@ def verify_check_mode(self, conn_id): found_catalog_names = set(map(lambda c: c['tap_stream_id'], found_catalogs)) self.assertSetEqual(self.expected_streams(), found_catalog_names, msg="discovered schemas do not match") - print("discovered schemas are OK") + LOGGER.info("discovered schemas are OK") return found_catalogs def run_and_verify_check_mode(self, conn_id): @@ -267,7 +267,7 @@ def run_and_verify_sync(self, conn_id, state): menagerie.verify_sync_exit_status(self, exit_status, sync_job_name) except AssertionError as e: if exit_status['discovery_error_message'] or exit_status['tap_error_message']: - print("*******************RETRYING SYNC FOR TAP/DISCOVERY FAILURE*******************") + LOGGER.warn("*******************RETRYING SYNC FOR TAP/DISCOVERY FAILURE*******************") raise RetryableTapError(e) raise @@ -279,7 +279,7 @@ def run_and_verify_sync(self, conn_id, state): sum(sync_record_count.values()), 0, msg="failed to replicate any data: {}".format(sync_record_count) ) - print("total replicated row count: {}".format(sum(sync_record_count.values()))) + LOGGER.info("total replicated row count: %s", sum(sync_record_count.values())) return sync_record_count @@ -372,7 +372,7 @@ def perform_and_verify_table_and_field_selection(self, # Verify all testable streams are selected selected = catalog_entry.get('annotated-schema').get('selected') - print("Validating selection on {}: {}".format(cat['stream_name'], selected)) + LOGGER.info("Validating selection on %s: %s", cat['stream_name'], selected) if cat['stream_name'] not in expected_selected: self.assertFalse(selected, msg="Stream selected, but not testable.") continue # Skip remaining assertions if we aren't selecting this stream @@ -382,8 +382,7 @@ def perform_and_verify_table_and_field_selection(self, # Verify all fields within each selected stream are selected for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): field_selected = field_props.get('selected') - print("\tValidating selection on {}.{}: {}".format( - cat['stream_name'], field, field_selected)) + LOGGER.info("\tValidating selection on %s.%s: %s", cat['stream_name'], field, field_selected) self.assertTrue(field_selected, msg="Field not selected.") else: # Verify only automatic fields are selected @@ -397,7 +396,7 @@ def get_selected_fields_from_metadata(metadata): for field in metadata: is_field_metadata = len(field['breadcrumb']) > 1 if field['metadata'].get('inclusion') is None and is_field_metadata: # BUG_SRCE-4313 remove when addressed - print("Error {} has no inclusion key in metadata".format(field)) # BUG_SRCE-4313 remove when addressed + LOGGER.info("Error %s has no inclusion key in metadata", field) # BUG_SRCE-4313 remove when addressed continue # BUG_SRCE-4313 remove when addressed inclusion_automatic_or_selected = ( field['metadata']['selected'] is True or \ @@ -503,7 +502,7 @@ def perform_and_verify_adjusted_selection(self, # Verify intended streams are selected selected = catalog_entry.get('annotated-schema').get('selected') - print("Validating selection on {}: {}".format(cat['tap_stream_id'], selected)) + LOGGER.info("Validating selection on %s: %s", cat['tap_stream_id'], selected) if cat['stream_name'] not in expected_selected: continue # Skip remaining assertions if we aren't selecting this stream @@ -513,15 +512,13 @@ def perform_and_verify_adjusted_selection(self, # Verify all fields within each selected stream are selected for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): field_selected = field_props.get('selected') - print("\tValidating selection on {}.{}: {}".format( - cat['stream_name'], field, field_selected)) + LOGGER.info("\tValidating selection on %s.%s: %s", cat['stream_name'], field, field_selected) self.assertTrue(field_selected, msg="Field not selected.") else: for field, field_props in catalog_entry.get('annotated-schema').get('properties').items(): field_selected = field_props.get('selected') if field_selected: - print("\tValidating selection on {}.{}: {}".format( - cat['stream_name'], field, field_selected)) + LOGGER.info("\tValidating selection on %s.%s: %s", cat['stream_name'], field, field_selected) # Verify only automatic fields are selected # Uncomment lines below to reporduce BUG_SRCE-4313 from automatic fields tests diff --git a/tests/base_new_framework.py b/tests/base_new_framework.py new file mode 100644 index 0000000..6f971c0 --- /dev/null +++ b/tests/base_new_framework.py @@ -0,0 +1,200 @@ +import backoff +import copy +import os +from datetime import timedelta +from datetime import datetime as dt +from datetime import timezone as tz + +#from tap_tester import connections, menagerie, runner, LOGGER +from tap_tester.base_suite_tests.base_case import BaseCase + +def backoff_wait_times(): + """Create a generator of wait times as [30, 60, 120, 240, 480, ...]""" + return backoff.expo(factor=30) + + +class RetryableTapError(Exception): + def __init__(self, message): + super().__init__(message) + + +class BingAdsBaseTest(BaseCase): + """ + Setup expectations for test sub classes + Run discovery for as a prerequisite for most tests + """ + REQUIRED_KEYS = "required_keys" + + @staticmethod + def tap_name(): + """The name of the tap""" + return "tap-bing-ads" + + @staticmethod + def get_type(): + """the expected url route ending""" + return "platform.bing-ads" + + def get_properties(self, original: bool = True): + """Configuration properties required for the tap.""" + return_value = { + 'start_date': '2020-10-01T00:00:00Z', + 'customer_id': '163875182', + 'account_ids': '163078754,140168565,71086605', + # 'conversion_window': '-15', # advanced option + } + # cid=42183085 aid=71086605 uid=71069166 (RJMetrics) + # cid=42183085 aid=163078754 uid=71069166 (Stitch) + # cid=42183085 aid=140168565 uid=71069166 (TestAccount) + + if original: + return return_value + + # This test needs the new connections start date to be larger than the default + assert self.start_date > return_value["start_date"] + + return_value["start_date"] = self.start_date + return return_value + + @staticmethod + def get_credentials(): + """Authentication information for the test account""" + return { + "oauth_client_id": os.getenv('TAP_BING_ADS_OAUTH_CLIENT_ID'), + "oauth_client_secret": os.getenv('TAP_BING_ADS_OAUTH_CLIENT_SECRET'), + "refresh_token": os.getenv('TAP_BING_ADS_REFRESH_TOKEN'), + "developer_token": os.getenv('TAP_BING_ADS_DEVELOPER_TOKEN'), + } + + @staticmethod + def expected_metadata(): + """The expected streams and metadata about the streams""" + + default = { + BaseCase.PRIMARY_KEYS: {"Id"}, + BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE, + } + default_report = { + BaseCase.PRIMARY_KEYS: set(), # "_sdc_report_datetime" is added by tap + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"TimePeriod"}, # It used in sync but not mentioned in catalog. Bug: TDL-15816 + BaseCase.FOREIGN_KEYS: {"AccountId"} + } + accounts_meta = { + BaseCase.PRIMARY_KEYS: {"Id"}, + BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, + BaseCase.REPLICATION_KEYS: {"LastModifiedTime"} + } + + goals_report = copy.deepcopy(default_report) + goals_report[BingAdsBaseTest.REQUIRED_KEYS] = {'Goal', 'TimePeriod'} + + audience_report = copy.deepcopy(default_report) + audience_report[BingAdsBaseTest.REQUIRED_KEYS] = {'AudienceId'} + + geographic_report = copy.deepcopy(default_report) + geographic_report[BingAdsBaseTest.REQUIRED_KEYS] = {'AccountName'} + + search_report = copy.deepcopy(default_report) + search_report[BingAdsBaseTest.REQUIRED_KEYS] = {'SearchQuery'} + + # BUG_SRCE-4578 (https://stitchdata.atlassian.net/browse/SRCE-4578) + # 'Impressions', 'Ctr', 'Clicks' shouldn't be automatic + extension_report = copy.deepcopy(default_report) + extension_report[BingAdsBaseTest.REQUIRED_KEYS] = { + 'AdExtensionId', 'AdExtensionPropertyValue', 'AdExtensionType', 'AdExtensionTypeId' + } + + age_gender_report = copy.deepcopy(default_report) + age_gender_report[BingAdsBaseTest.REQUIRED_KEYS] = {'AccountName', 'AdGroupName', 'AgeGroup', 'Gender'} + + return { + "accounts": accounts_meta, + "ad_extension_detail_report": extension_report, # BUG_DOC-1504 | https://stitchdata.atlassian.net/browse/DOC-1504 + "ad_group_performance_report": default_report, # BUG_DOC-1567 https://stitchdata.atlassian.net/browse/DOC-1567 + "ad_groups": default, + "ad_performance_report": default_report, + "ads": default, + "age_gender_audience_report": age_gender_report, # BUG_DOC-1567 + "audience_performance_report": audience_report, # BUG_DOC-1504 + "campaign_performance_report": default_report, + "campaigns": default, + "geographic_performance_report": geographic_report, + "goals_and_funnels_report": goals_report, + "keyword_performance_report": default_report, + "search_query_performance_report": search_report, + } + + @classmethod + def setUpClass(cls): + super().setUpClass(logging="Ensuring environment variables are sourced.") + missing_envs = [ + x for x in [ + 'TAP_BING_ADS_OAUTH_CLIENT_ID','TAP_BING_ADS_OAUTH_CLIENT_SECRET','TAP_BING_ADS_REFRESH_TOKEN', + 'TAP_BING_ADS_DEVELOPER_TOKEN', + ] if os.getenv(x) is None + ] + + if len(missing_envs) != 0: + raise Exception("Missing environment variables: {}".format(missing_envs)) + + def expected_replication_method(self,stream=None): + """return a dictionary with key of table name nd value of replication method + TDL-15816 + Currently, in tap, all streams are FULL_TABLE except accounts. + But as per the doc https://www.stitchdata.com/docs/integrations/saas/microsoft-advertising, + only the below streams are FULL TABLE, all other streams are INCREMENTAL. + ads + ad_groups + campaigns + """ + + rep_method = {} + for table, properties in self.expected_metadata().items(): + rep_method[table] = properties.get(self.REPLICATION_METHOD, None) + for streams in rep_method.keys(): + if streams in [ 'ad_extension_detail_report', 'ad_group_performance_report', 'ad_performance_report', + 'age_gender_audience_report', 'audience_performance_report', 'campaign_performance_report', 'geographic_performance_report', 'goals_and_funnels_report', 'keyword_performance_report', + 'search_query_performance_report']: + rep_method[streams] = 'FULL_TABLE' + if not stream: + return rep_method + return rep_method[stream] + + def expected_replication_keys(self,stream=None): + """ + return a dictionary with key of table name + and value as a set of replication key fields + """ + """ + As all streams are FULL TABLE according to the tap, there is no replication key specified for any of + the streams.TDL-15816, hence removing the "TimePeriod" key from expected replication keys. + Need to determine the correct replication menthod and replication keys accordingly. + """ + + replication_keys = {table: properties.get(self.REPLICATION_KEYS, set())-{"TimePeriod"} + for table, properties + in self.expected_metadata().items()} + if not stream: + return replication_keys + return replication_keys[stream] + + def expected_automatic_fields(self,stream=None): + """ + return a dictionary with key of table name + and value as a set of automatic fields + """ + """ + Sdc_report_datetime is mentioned as primary key for most of the stream in docs, + but is not returned as primary key by the tap, hence adding it explicitly to automatic fields TDL-15816 + """ + auto_fields = {} + for k, v in self.expected_metadata().items(): + auto_fields[k] = v.get(self.PRIMARY_KEYS, set())|v.get(self.REPLICATION_KEYS, set()) \ + |v.get(self.FOREIGN_KEYS, set())|v.get(self.REQUIRED_KEYS, set())|{'_sdc_report_datetime'} + for streams in auto_fields.keys(): + if streams in ['ads', 'ad_groups', 'campaigns', 'accounts']: + auto_fields[streams] = auto_fields[stream]-{'_sdc_report_datetime'} + if not stream: + return auto_fields + return auto_fields[stream] diff --git a/tests/test_all_fields.py b/tests/test_all_fields.py new file mode 100644 index 0000000..4a7ce65 --- /dev/null +++ b/tests/test_all_fields.py @@ -0,0 +1,62 @@ + + +from tap_tester.base_suite_tests.all_fields_test import AllFieldsTest +from base_new_framework import BingAdsBaseTest + + +class AllFieldsTest(AllFieldsTest,BingAdsBaseTest): + """ Test the tap all_fields """ + + @staticmethod + def name(): + return "tap_tester_bing_ads_all_fields_test" + def streams_to_test(self): + streams_to_exclude={'ad_group_performance_report','campaign_performance_report','goals_and_funnels_report'} + """ + TODO + Excluded the ad_group and campaign report streams, has the Exclusion's file doesn't have the latest exclusions, + to be removed after TDL-23223 is fixed + Goals stream has no active data + """ + return self.expected_stream_names().difference(streams_to_exclude) + + def missing_fields(self): + return { + 'accounts':{ + 'TaxCertificate', + 'AccountMode' + }, + 'ads':{ + 'Descriptions', + 'LongHeadlineString', + 'BusinessName', + 'Videos', + 'LongHeadlines', + 'Images', + 'LongHeadline', + 'PromotionalText', + 'CallToAction', + 'AppStoreId', + 'Headlines', + 'ImpressionTrackingUrls', + 'CallToActionLanguage', + 'Headline', + 'AppPlatform' + }, + 'campaigns':{ + 'MultimediaAdsBidAdjustment', + 'AdScheduleUseSearcherTimeZone', + 'BidStrategyId' + }, + 'ad_groups':{ + 'CpvBid', + 'AdGroupType',#Talend Data Loader TDL-23228 -- data present in fronend but not returned in synced records + 'MultimediaAdsBidAdjustment', + 'AdScheduleUseSearcherTimeZone', + 'CpmBid' + } + } + def test_all_fields_for_streams_are_replicated(self): + self.selected_fields = {k:v - self.missing_fields().get(k, set()) + for k,v in AllFieldsTest.selected_fields.items()} + super().test_all_fields_for_streams_are_replicated() diff --git a/tests/test_bookmarks.py b/tests/test_bookmarks.py index 23d2cfc..37f1668 100644 --- a/tests/test_bookmarks.py +++ b/tests/test_bookmarks.py @@ -1,7 +1,6 @@ import datetime import dateutil.parser import pytz -import singer import tap_tester.connections as connections import tap_tester.menagerie as menagerie @@ -9,8 +8,6 @@ from base import BingAdsBaseTest -LOGGER = singer.get_logger() - class TestBingAdsBookmarks(BingAdsBaseTest): diff --git a/tests/test_bookmarks_reports.py b/tests/test_bookmarks_reports.py index e8330fc..8c3df3e 100644 --- a/tests/test_bookmarks_reports.py +++ b/tests/test_bookmarks_reports.py @@ -1,7 +1,6 @@ import datetime import dateutil.parser import pytz -import singer import tap_tester.connections as connections import tap_tester.menagerie as menagerie @@ -9,8 +8,6 @@ from base import BingAdsBaseTest -LOGGER = singer.get_logger() - class TestBingAdsBookmarksReports(BingAdsBaseTest): diff --git a/tests/test_discovery.py b/tests/test_discovery.py index 7d04b36..80c8619 100644 --- a/tests/test_discovery.py +++ b/tests/test_discovery.py @@ -1,188 +1,19 @@ """ Test tap discovery """ -import re +import unittest +#import re +#from tap_tester import menagerie, connections, runner -from tap_tester import menagerie, connections, runner +from tap_tester.base_suite_tests.discovery_test import DiscoveryTest +from base_new_framework import BingAdsBaseTest -from base import BingAdsBaseTest - -class DiscoveryTest(BingAdsBaseTest): +class DiscoveryTest(DiscoveryTest,BingAdsBaseTest): """ Test the tap discovery """ @staticmethod def name(): return "tap_tester_bing_ads_discovery_test" - - def test_run(self): - """ - Verify that discover creates the appropriate catalog, schema, metadata, etc. - • Verify number of actual streams discovered match expected - • Verify the stream names discovered were what we expect - • Verify stream names follow naming convention - streams should only have lowercase alphas and underscores - • verify there is only 1 top level breadcrumb - • verify replication key(s) - • verify primary key(s) - • verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - • verify the actual replication matches our expected replication method - • verify that primary, replication and foreign keys - are given the inclusion of automatic (metadata and annotated schema). - • verify that all other fields have inclusion of available (metadata and schema) - """ - streams_to_test = self.expected_streams() - - conn_id = self.create_connection() - - # Verify number of actual streams discovered match expected - catalogs = menagerie.get_catalogs(conn_id) - found_catalogs = [catalog for catalog in catalogs - if catalog.get('tap_stream_id') in streams_to_test] - - self.assertGreater(len(found_catalogs), 0, - msg="unable to locate schemas for connection {}".format(conn_id)) - self.assertEqual(len(found_catalogs), - len(streams_to_test), - msg="Expected {} streams, actual was {} for connection {}, " - "actual {}".format( - len(streams_to_test), - len(found_catalogs), - found_catalogs, - conn_id)) - - # Verify the stream names discovered were what we expect - found_catalog_names = {c['tap_stream_id'] for c in found_catalogs} - self.assertEqual(set(streams_to_test), - set(found_catalog_names), - msg="Expected streams don't match actual streams") - - # Verify stream names follow naming convention - # streams should only have lowercase alphas and underscores - self.assertTrue(all([re.fullmatch(r"[a-z_]+", name) for name in found_catalog_names]), - msg="One or more streams don't follow standard naming") - - for stream in streams_to_test: - with self.subTest(stream=stream): - catalog = next(iter([catalog for catalog in found_catalogs - if catalog["stream_name"] == stream])) - assert catalog # based on previous tests this should always be found - - schema_and_metadata = menagerie.get_annotated_schema(conn_id, catalog['stream_id']) - metadata = schema_and_metadata["metadata"] - schema = schema_and_metadata["annotated-schema"] - - # verify the stream level properties are as expected - # verify there is only 1 top level breadcrumb - stream_properties = [item for item in metadata if item.get("breadcrumb") == []] - self.assertTrue(len(stream_properties) == 1, - msg="There is NOT only one top level breadcrumb for {}".format(stream) + \ - "\nstream_properties | {}".format(stream_properties)) - - expected_primary_keys = self.expected_primary_keys()[stream] - expected_foreign_keys = self.expected_foreign_keys()[stream] - expected_replication_keys = self.expected_replication_keys()[stream] - # As there is a discrepancy for replication key in existing tap's catalog, sync mode behavior and documentation - # removing replication key "TimePeriod" from each report streams. Bug: TDL-15816 - if stream.endswith("_report"): - expected_replication_keys = expected_replication_keys - {"TimePeriod"} - expected_required_keys = self.expected_required_fields()[stream] - expected_automatic_fields = expected_primary_keys | expected_replication_keys \ - | expected_foreign_keys | expected_required_keys - - # verify replication key(s) - self.assertEqual( - set(stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, [])), - expected_replication_keys, - msg="expected replication key {} but actual is {}".format( - expected_replication_keys, - set(stream_properties[0].get( - "metadata", {self.REPLICATION_KEYS: None}).get( - self.REPLICATION_KEYS, [])))) - - # verify primary key(s) - self.assertEqual( - set(stream_properties[0].get( - "metadata", {self.PRIMARY_KEYS: []}).get(self.PRIMARY_KEYS, [])), - expected_primary_keys, - msg="expected primary key {} but actual is {}".format( - expected_primary_keys, - set(stream_properties[0].get( - "metadata", {self.PRIMARY_KEYS: None}).get(self.PRIMARY_KEYS, [])))) - - # # verify that if there is a replication key we are doing INCREMENTAL otherwise FULL - # actual_replication_method = stream_properties[0].get( - # "metadata", {self.REPLICATION_METHOD: None}).get(self.REPLICATION_METHOD) - # if stream_properties[0].get( - # "metadata", {self.REPLICATION_KEYS: []}).get(self.REPLICATION_KEYS, []): - - # self.assertTrue(actual_replication_method == self.INCREMENTAL, - # msg="Expected INCREMENTAL replication " - # "since there is a replication key") - # else: - # self.assertTrue(actual_replication_method == self.FULL_TABLE, - # msg="Expected FULL replication " - # "since there is no replication key") - - # # verify the actual replication matches our expected replication method - # self.assertEqual( - # self.expected_replication_method().get(stream, None), - # actual_replication_method, - # msg="The actual replication method {} doesn't match the expected {}".format( - # actual_replication_method, - # self.expected_replication_method().get(stream, None))) - - # END OF BUG SRCE-4315 - - expected_primary_keys = self.expected_primary_keys()[stream] - expected_foreign_keys = self.expected_foreign_keys()[stream] - expected_replication_keys = self.expected_replication_keys()[stream] - expected_required_keys = self.expected_required_fields()[stream] - # These streams does not include the field `_sdc_report_datetime` in their schema - if stream in ['ads', 'ad_groups', 'campaigns', 'accounts']: - expected_automatic_fields = expected_primary_keys | expected_replication_keys \ - | expected_foreign_keys | expected_required_keys - # adding `_sdc_report_datetime` in the expected automatic fields list as there was - # a failure experienced for all the streams that the particular field was missing - # in the expected fields. - else: - expected_automatic_fields = expected_primary_keys | expected_replication_keys \ - | expected_foreign_keys | expected_required_keys | {'_sdc_report_datetime'} - - # verify that primary, replication and foreign keys - # are given the inclusion of automatic in annotated schema. - actual_automatic_fields = {key for key, value in schema["properties"].items() - if value.get("inclusion") == "automatic"} - self.assertEqual(expected_automatic_fields, actual_automatic_fields) - - - # verify that all other fields have inclusion of available - # This assumes there are no unsupported fields for SaaS sources - self.assertTrue( - all({value.get("inclusion") == "available" for key, value - in schema["properties"].items() - if key not in actual_automatic_fields}), - msg="Not all non key properties are set to available in annotated schema") - - # verify that primary, replication and foreign keys - # are given the inclusion of automatic in metadata. - actual_automatic_fields = {item.get("breadcrumb", ["properties", None])[1] - for item in metadata - if item.get("metadata").get("inclusion") == "automatic"} - - self.assertEqual(expected_automatic_fields, - actual_automatic_fields, - msg="expected {} automatic fields but got {}".format( - expected_automatic_fields, - actual_automatic_fields)) - - # verify that all other fields have inclusion of available - # This assumes there are no unsupported fields for SaaS sources - self.assertTrue( - all({item.get("metadata").get("inclusion") == "available" - for item in metadata - if item.get("breadcrumb", []) != [] - and item.get("breadcrumb", ["properties", None])[1] - not in actual_automatic_fields}), - msg="Not all non key properties are set to available in metadata") + def streams_to_test(self): + return self.expected_stream_names() diff --git a/tests/test_start_date.py b/tests/test_start_date.py index 01aff73..eea74bb 100644 --- a/tests/test_start_date.py +++ b/tests/test_start_date.py @@ -4,7 +4,7 @@ import tap_tester.connections as connections import tap_tester.runner as runner import tap_tester.menagerie as menagerie - +from tap_tester import LOGGER from base import BingAdsBaseTest @@ -109,14 +109,14 @@ def start_date_test(self, streams_to_fields_with_exclusions): replicated_row_count_1 = sum(record_count_by_stream_1.values()) self.assertGreater(replicated_row_count_1, 0, msg="failed to replicate any data: {}".format(record_count_by_stream_1)) - print("total replicated row count: {}".format(replicated_row_count_1)) + LOGGER.info("total replicated row count: %s", replicated_row_count_1) synced_records_1 = runner.get_records_from_target_output() ########################################################################## ### Update START DATE Between Syncs ########################################################################## - print("REPLICATION START DATE CHANGE: {} ===>>> {} ".format(self.start_date, self.start_date_2)) + LOGGER.info("REPLICATION START DATE CHANGE: %s ===>>> %s ", self.start_date, self.start_date_2) self.start_date = self.start_date_2 ########################################################################## @@ -148,7 +148,7 @@ def start_date_test(self, streams_to_fields_with_exclusions): replicated_row_count_2 = sum(record_count_by_stream_2.values()) self.assertGreater(replicated_row_count_2, 0, msg="failed to replicate any data") - print("total replicated row count: {}".format(replicated_row_count_2)) + LOGGER.info("total replicated row count: %s", replicated_row_count_2) synced_records_2 = runner.get_records_from_target_output() for stream in self.expected_sync_streams(): diff --git a/tests/test_sync_rows.py b/tests/test_sync_rows.py index 4d466c8..3015f38 100644 --- a/tests/test_sync_rows.py +++ b/tests/test_sync_rows.py @@ -8,10 +8,6 @@ import tap_tester.runner as runner from base import BingAdsBaseTest -import singer -from singer import metadata -LOGGER = singer.get_logger() - class BingAdsSyncRows(BingAdsBaseTest):