From f9bb1ef5b43c73bb645e68ffc9a6d0bddcbd80c0 Mon Sep 17 00:00:00 2001 From: btowles Date: Tue, 24 Oct 2023 14:26:27 +0000 Subject: [PATCH 01/10] Facebook test client WIP, campaigns stream only --- tests/test_client.py | 242 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 tests/test_client.py diff --git a/tests/test_client.py b/tests/test_client.py new file mode 100644 index 00000000..d090790c --- /dev/null +++ b/tests/test_client.py @@ -0,0 +1,242 @@ +# import backoff +import os +import random +import requests +import string + +# from requests.exceptions import HTTPError +# from requests.auth import HTTPBasicAuth + +# TODO try below syncronous interaction or go all http? +# from facebookads.objects import Ad, AdAccount, AdSet, Campaign + +from tap_tester.logger import LOGGER + + +class TestClient(): + # def __init__(self, config): # TODO move to dynamic config model? + def __init__(self): + # pass in config above and get() from it or hard code? + self.base_url = 'https://graph.facebook.com/' + self.api_version = 'v18.0' + self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID') + self.access_token = os.getenv('TAP_FACEBOOK_ACCESS_TOKEN') + self.account_url = self.base_url + self.api_version +'/act_{}'.format(self.account_id) + + self.stream_endpoint_map = {'ads': '/ads', + 'adsets': '/adsets', + 'adcreative': '/adcreatives', + 'ads_insights': '/insights', + 'campaigns': '/campaigns', + 'users': '/users',} + + self.campaign_special_ad_categories = ['NONE', + 'EMPLOYMENT', + 'HOUSING', + 'CREDIT', + # 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized + 'ONLINE_GAMBLING_AND_GAMING'] + + # list of campaign objective values from fb docs below give "invalid" error via api 18.0 + # 'APP_INSTALLS', 'BRAND_AWARENESS', 'CONVERSIONS', 'EVENT_RESPONSES', 'LEAD_GENERATION', + # 'LINK_CLICKS', 'MESSAGES', 'OFFER_CLAIMS', 'PAGE_LIKES', 'POST_ENGAGEMENT', + # 'PRODUCT_CATALOG_SALES', 'REACH', 'STORE_VISITS', 'VIDEO_VIEWS', + + # LOCAL_AWARENESS gives deprecated error, use REACH (reach is invalid from above) + # valid and verified ojcectives listed below + self.campaign_objectives = ['OUTCOME_APP_PROMOTION', + 'OUTCOME_AWARENESS', + 'OUTCOME_ENGAGEMENT', + 'OUTCOME_LEADS', + 'OUTCOME_SALES', + 'OUTCOME_TRAFFIC'] + + def get_account_objects(self, stream): + assert stream in self.stream_endpoint_map.keys(), \ + f'Endpoint undefiend for specified stream: {stream}' + endpoint = self.stream_endpoint_map[stream] + url = self.account_url + endpoint + params = {'access_token': self.access_token} + LOGGER.info(f"Getting url: {url}") + response = requests.get(url, params) + LOGGER.info(f"Returning response: {response}") + return response.json() + + def create_account_objects(self, stream): + assert stream in self.stream_endpoint_map.keys(), \ + f'Endpoint undefined for specified stream: {stream}' + endpoint = self.stream_endpoint_map[stream] + url = self.account_url + endpoint + LOGGER.info(f"Posting to url: {url}") + params = self.generate_post_params(stream) + response = requests.post(url, params) + LOGGER.info(f"Returning response: {response}") + return response.json() + + def generate_post_params(self, stream): + if stream == 'campaigns': + params = { # generate a campaign with random name, ojbective, and ad category + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=15)), + 'objective': random.choice(self.campaign_objectives), + 'special_ad_categories': random.choice(self.campaign_special_ad_categories)} + return params + else: + assert False, f"Post params for stream {stream} not implemented / supported" + + + # Create multiplue ads at a time async $ get notif when complete + # Make an HTTP POST to: + # https://graph.facebook.com/{API_VERSION}/act_{AD_ACCOUNT_ID}/asyncadrequestsets + + # HTTP to get ads for an account + # GET /v18.0/act_{ad-account-id}/ads HTTP/1.1 + # Host: graph.facebook.com + + # cURL to read all ads from one ad account example + # curl -G \ + # -d "fields=name" \ + # -d "access_token=" \ + # "https://graph.facebook.com//act_/ads" + + # Ad IDs TODO + # "data": [{"id": "23843561338620058"}, + # {"id": "23847656838300058"}, + # {"id": "23847292383430058"}], + # creative = {'asset_feed_spec': {'audios': [{'type': 'random'}]}, + # 'contextual_multi_ads': {'eligibility': ['POST_AD_ENGAGEMENT_FEED', + # 'POST_AD_ENGAGEMENT_SEED_AD', + # 'STANDALONE_FEED'], + # 'enroll_status': 'OPT_IN'}, + # 'degrees_of_freedom_spec': {'degrees_of_freedom_type': 'USER_ENROLLED_NON_DCO', + # 'text_transformation_types': ['TEXT_LIQUIDITY']}, + # 'object_story_spec': {'instagram_actor_id': '2476947555701417', + # 'link_data': {'call_to_action': {'type': 'SIGN_UP'}, + # 'link': 'http://fb.me', + # 'picture': 'https://foo.x.y.net/v/dir/1.png'}, + # 'page_id': '453760455405317'}, + # 'object_type': 'SHARE'} + + # Ad Insights TODO + # Empty data list for all 3 AdSet Ids + + # AdSet Ids TODO + # "data": [{"id": "23847656838230058"}, + # {"id": "23847292383400058"}, + # {"id": "23843561338600058"}], + + # Campaign Ids TODO + # "data": [{"id": "23847656838160058"}, + # {"id": "23847292383380058"}, + # {"id": "23843561338580058"}, + # {"id": "120203241386960059"}] # API added campaign, returned on API get, verified + # cam_post_params = {'access_token': token, + # 'name': 'BHT test campaign', + # 'objective': 'OUTCOME_TRAFFIC', + # 'special_ad_categories': ['NONE']} + + # Creative Ids TODO + # "data": [{"id": "23850233534140058"}, + # {"id": "23850233532300058"}, + # {"id": "23850233210620058"}, + # {"id": "23850232986710058"}, + # {"id": "23849554079380058"}, + # {"id": "23849066774220058"}, + # {"id": "23849066252410058"}, + # {"id": "23849063425570058"}, + # {"id": "23847674484290058"}, + # {"id": "23847292410940058"}, + # {"id": "23843561378450058"}], + + # Users + # "data": [{"name": "Stitch IntegrationDev", + # "tasks": ["DRAFT", "ANALYZE", "ADVERTISE", "MANAGE"], + # "id": "113504146635004"}] + + # TODO refactor below this line from jira test client to facebook + # def url(self, path): + # if self.is_cloud: + # return self.base_url.format(self.cloud_id, path) + + # # defend against if the base_url does or does not provide https:// + # base_url = self.base_url + # base_url = re.sub('^http[s]?://', '', base_url) + # base_url = 'https://' + base_url + # return base_url.rstrip("/") + "/" + path.lstrip("/") + + # def _headers(self, headers): + # headers = headers.copy() + # if self.user_agent: + # headers["User-Agent"] = self.user_agent + + # if self.is_cloud: + # # Add OAuth Headers + # headers['Accept'] = 'application/json' + # headers['Authorization'] = 'Bearer {}'.format(self.access_token) + + # return headers + + # @backoff.on_exception(backoff.expo, + # (requests.exceptions.ConnectionError, HTTPError), + # jitter=None, + # max_tries=6, + # giveup=lambda e: not should_retry_httperror(e)) + # def send(self, method, path, headers={}, **kwargs): + # if self.is_cloud: + # # OAuth Path + # request = requests.Request(method, + # self.url(path), + # headers=self._headers(headers), + # **kwargs) + # else: + # # Basic Auth Path + # request = requests.Request(method, + # self.url(path), + # auth=self.auth, + # headers=self._headers(headers), + # **kwargs) + # return self.session.send(request.prepare()) + + # @backoff.on_exception(backoff.constant, + # RateLimitException, + # max_tries=10, + # interval=60) + # def request(self, tap_stream_id, *args, **kwargs): + # response = self.send(*args, **kwargs) + # if response.status_code == 429: + # raise RateLimitException() + + # try: + # response.raise_for_status() + # except requests.exceptions.HTTPError as http_error: + # LOGGER.error("Received HTTPError with status code %s, error message response text %s", + # http_error.response.status_code, + # http_error.response.text) + # raise + + # return response.json() + + # def refresh_credentials(self): + # body = {"grant_type": "refresh_token", + # "client_id": self.oauth_client_id, + # "client_secret": self.oauth_client_secret, + # "refresh_token": self.refresh_token} + # try: + # resp = self.session.post("https://auth.atlassian.com/oauth/token", data=body) + # resp.raise_for_status() + # self.access_token = resp.json()['access_token'] + # except Exception as ex: + # error_message = str(ex) + # if resp: + # error_message = error_message + ", Response from Jira: {}".format(resp.text) + # raise Exception(error_message) from ex + # finally: + # LOGGER.info("Starting new login timer") + # self.login_timer = threading.Timer(REFRESH_TOKEN_EXPIRATION_PERIOD, + # self.refresh_credentials) + # self.login_timer.start() + + # def test_credentials_are_authorized(self): + # # Assume that everyone has issues, so we try and hit that endpoint + # self.request("issues", "GET", "/rest/api/2/search", + # params={"maxResults": 1}) From 5d6ff1656f361b12c73a840d1b597003511b3f7d Mon Sep 17 00:00:00 2001 From: btowles Date: Tue, 24 Oct 2023 21:01:38 +0000 Subject: [PATCH 02/10] Add pagination test for campaigns stream, update new base file and client --- tests/base_new_frmwrk.py | 19 ++++++++-- tests/test_client.py | 16 ++++---- tests/test_facebook_pagination.py | 63 +++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 tests/test_facebook_pagination.py diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index 798305aa..ee09b9a1 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -1,4 +1,3 @@ - import os from datetime import timedelta from tap_tester import connections, menagerie, runner, LOGGER @@ -61,7 +60,7 @@ def get_properties(self): 'end_date': '2021-04-09T00:00:00Z', 'insights_buffer_days': '1', } - + @staticmethod def get_credentials(): """Authentication information for the test account""" @@ -87,7 +86,8 @@ def expected_metadata(): "campaigns": { BaseCase.PRIMARY_KEYS: {"id", }, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, - BaseCase.REPLICATION_KEYS: {"updated_time"} + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 }, "ads_insights": { BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"}, @@ -166,3 +166,16 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."): @staticmethod def is_insight(stream): return stream.startswith('ads_insights') + + def expected_page_size(self, stream=None): + """ + return a dictionary with key of table name + and value as an integer for the page size of the API requests for that stream. + """ + page_size = { + table: properties[BaseCase.API_LIMIT] + for table, properties in self.expected_metadata().items() + if properties.get(BaseCase.API_LIMIT)} # TODO only define API_LIMIT for core streams? + if not stream: + return page_size + return page_size[stream] diff --git a/tests/test_client.py b/tests/test_client.py index d090790c..c313c51c 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -37,13 +37,14 @@ def __init__(self): # 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized 'ONLINE_GAMBLING_AND_GAMING'] - # list of campaign objective values from fb docs below give "invalid" error via api 18.0 + # list of campaign objective values from fb docs below give "Invalid" error via api 18.0 # 'APP_INSTALLS', 'BRAND_AWARENESS', 'CONVERSIONS', 'EVENT_RESPONSES', 'LEAD_GENERATION', # 'LINK_CLICKS', 'MESSAGES', 'OFFER_CLAIMS', 'PAGE_LIKES', 'POST_ENGAGEMENT', - # 'PRODUCT_CATALOG_SALES', 'REACH', 'STORE_VISITS', 'VIDEO_VIEWS', + # 'PRODUCT_CATALOG_SALES', 'REACH', 'STORE_VISITS', 'VIDEO_VIEWS' # LOCAL_AWARENESS gives deprecated error, use REACH (reach is invalid from above) - # valid and verified ojcectives listed below + + # valid and verified objectives listed below, objectives above should be re-mapped to these self.campaign_objectives = ['OUTCOME_APP_PROMOTION', 'OUTCOME_AWARENESS', 'OUTCOME_ENGAGEMENT', @@ -56,10 +57,11 @@ def get_account_objects(self, stream): f'Endpoint undefiend for specified stream: {stream}' endpoint = self.stream_endpoint_map[stream] url = self.account_url + endpoint - params = {'access_token': self.access_token} + params = {'access_token': self.access_token, + 'limit': 100} LOGGER.info(f"Getting url: {url}") response = requests.get(url, params) - LOGGER.info(f"Returning response: {response}") + LOGGER.info(f"Returning get response: {response}") return response.json() def create_account_objects(self, stream): @@ -70,8 +72,8 @@ def create_account_objects(self, stream): LOGGER.info(f"Posting to url: {url}") params = self.generate_post_params(stream) response = requests.post(url, params) - LOGGER.info(f"Returning response: {response}") - return response.json() + LOGGER.info(f"Returning post response: {response}") + return response def generate_post_params(self, stream): if stream == 'campaigns': diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py new file mode 100644 index 00000000..1e5abadb --- /dev/null +++ b/tests/test_facebook_pagination.py @@ -0,0 +1,63 @@ +import test_client as tc +import time +import unittest + +from tap_tester.base_suite_tests.pagination_test import PaginationTest +from tap_tester import connections, runner, menagerie, LOGGER + +from base_new_frmwrk import FacebookBaseTest + +fb_client = tc.TestClient() + + +class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest): + """Standard Pagination Test""" + + @staticmethod + def name(): + return "tt_facebook_pagination" + def streams_to_test(self): + # return self.expected_stream_names() + return {'campaigns'} # TODO WIP, expand to all core streams + + def setUp(self): # pylint: disable=invalid-name + """ + Setup for tests in this module. + """ + if PaginationTest.synced_records and PaginationTest.record_count_by_stream: + return + + # instantiate connection + conn_id = connections.ensure_connection(self) + + # run check mode + found_catalogs = self.run_and_verify_check_mode(conn_id) + + # table and field selection + test_catalogs = [catalog for catalog in found_catalogs + if catalog.get('stream_name') in self.streams_to_test()] + + # non_selected_fields are none + self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) + + # ensure there is enough data to paginate + for stream in self.streams_to_test(): + response = fb_client.get_account_objects(stream) + self.assertGreater(len(response['data']), 0, + msg='Failed HTTP get response for stream: {}'.format(stream)) + number_of_records = len(response['data']) + if number_of_records > self.expected_page_size(stream): + continue + LOGGER.info(f"Stream: {stream} - Record count is less than max page size: " + f"{self.expected_page_size(stream)}. Posting more records to setUp " + "the PaginationTest") + for i in range(self.expected_page_size(stream) - number_of_records + 1): + post_response = fb_client.create_account_objects(stream) + self.assertEqual(post_response.status_code, 200, + msg='Failed HTTP post response for stream: {}'.format(stream)) + LOGGER.info(f"Posted {i + 1} new campaigns, new total: {number_of_records + i + 1}") + time.sleep(1) + + # run initial sync + PaginationTest.record_count_by_stream = self.run_and_verify_sync_mode(conn_id) + PaginationTest.synced_records = runner.get_records_from_target_output() From 05cfc4bfade3adf261556775aa48017b737ee621 Mon Sep 17 00:00:00 2001 From: btowles Date: Wed, 25 Oct 2023 19:19:01 +0000 Subject: [PATCH 03/10] First review comments, add adsets stream, start work on ads --- tests/base_new_frmwrk.py | 6 ++- tests/test_client.py | 81 ++++++++++++++++++++++++++++--- tests/test_facebook_pagination.py | 10 ++-- 3 files changed, 85 insertions(+), 12 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index ee09b9a1..b169d151 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -72,7 +72,8 @@ def expected_metadata(): "ads": { BaseCase.PRIMARY_KEYS: {"id", "updated_time"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, - BaseCase.REPLICATION_KEYS: {"updated_time"} + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 }, "adcreative": { BaseCase.PRIMARY_KEYS: {"id"}, @@ -81,7 +82,8 @@ def expected_metadata(): "adsets": { BaseCase.PRIMARY_KEYS: {"id", "updated_time"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, - BaseCase.REPLICATION_KEYS: {"updated_time"} + BaseCase.REPLICATION_KEYS: {"updated_time"}, + BaseCase.API_LIMIT: 100 }, "campaigns": { BaseCase.PRIMARY_KEYS: {"id", }, diff --git a/tests/test_client.py b/tests/test_client.py index c313c51c..7fe12a68 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -17,11 +17,11 @@ class TestClient(): # def __init__(self, config): # TODO move to dynamic config model? def __init__(self): # pass in config above and get() from it or hard code? - self.base_url = 'https://graph.facebook.com/' + self.base_url = 'https://graph.facebook.com' self.api_version = 'v18.0' self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID') self.access_token = os.getenv('TAP_FACEBOOK_ACCESS_TOKEN') - self.account_url = self.base_url + self.api_version +'/act_{}'.format(self.account_id) + self.account_url = f"{self.base_url}/{self.api_version}/act_{self.account_id}" self.stream_endpoint_map = {'ads': '/ads', 'adsets': '/adsets', @@ -37,6 +37,45 @@ def __init__(self): # 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized 'ONLINE_GAMBLING_AND_GAMING'] + self.adset_billing_events = ['APP_INSTALLS', + 'CLICKS', + 'IMPRESSIONS', + 'LINK_CLICKS', + 'NONE', + 'OFFER_CLAIMS', + 'PAGE_LIKES', + 'POST_ENGAGEMENT', + 'THRUPLAY', + 'PURCHASE', + 'LISTING_INTERACTION'] + + self.adset_optimization_goals = ['NONE', + 'APP_INSTALLS', + 'AD_RECALL_LIFT', + 'ENGAGED_USERS', + 'EVENT_RESPONSES', + 'IMPRESSIONS', + 'LEAD_GENERATION', + 'QUALITY_LEAD', + 'LINK_CLICKS', + 'OFFSITE_CONVERSIONS', + 'PAGE_LIKES', + 'POST_ENGAGEMENT', + 'QUALITY_CALL', + 'REACH', + 'LANDING_PAGE_VIEWS', + 'VISIT_INSTAGRAM_PROFILE', + 'VALUE', + 'THRUPLAY', + 'DERIVED_EVENTS', + 'APP_INSTALLS_AND_OFFSITE_CONVERSIONS', + 'CONVERSATIONS', + 'IN_APP_VALUE', + 'MESSAGING_PURCHASE_CONVERSION', + 'MESSAGING_APPOINTMENT_CONVERSION', + 'SUBSCRIBERS', + 'REMINDERS_SET'] + # list of campaign objective values from fb docs below give "Invalid" error via api 18.0 # 'APP_INSTALLS', 'BRAND_AWARENESS', 'CONVERSIONS', 'EVENT_RESPONSES', 'LEAD_GENERATION', # 'LINK_CLICKS', 'MESSAGES', 'OFFER_CLAIMS', 'PAGE_LIKES', 'POST_ENGAGEMENT', @@ -54,13 +93,14 @@ def __init__(self): def get_account_objects(self, stream): assert stream in self.stream_endpoint_map.keys(), \ - f'Endpoint undefiend for specified stream: {stream}' + f'Endpoint undefined for specified stream: {stream}' endpoint = self.stream_endpoint_map[stream] url = self.account_url + endpoint params = {'access_token': self.access_token, 'limit': 100} LOGGER.info(f"Getting url: {url}") response = requests.get(url, params) + response.raise_for_status() LOGGER.info(f"Returning get response: {response}") return response.json() @@ -72,17 +112,46 @@ def create_account_objects(self, stream): LOGGER.info(f"Posting to url: {url}") params = self.generate_post_params(stream) response = requests.post(url, params) + response.raise_for_status() LOGGER.info(f"Returning post response: {response}") return response def generate_post_params(self, stream): - if stream == 'campaigns': + if stream == 'ads': + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=18)), + 'adset_id': 23847656838230058, # TODO pick rand adset_id? + 'creative': str({'creative_id': 23843561378450058}), # TODO pick rand creative_id? + 'status': "PAUSED"} + return params + + elif stream == 'adsets': + # TODO In order to randomize optimization_goal and billing_event the campaign_id + # would need to be examined to determine which goals were supported. Then an option + # could be selected from the available billing events supported by that goal. + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=16)), + 'optimization_goal': 'REACH', + 'billing_event': 'IMPRESSIONS', + 'bid_amount': 2, # TODO random? + 'daily_budget': 1000, # TODO random? tie to parent campaign? + 'campaign_id': 120203241386960059, # TODO pull from campaigns dynamically? + 'targeting': str({'geo_locations': {'countries': ["US"]}, + 'facebook_positions': ["feed"]}), + 'status': "PAUSED", + 'promoted_object': str({'page_id': '453760455405317'})} + return params + + elif stream == 'campaigns': params = { # generate a campaign with random name, ojbective, and ad category 'access_token': self.access_token, 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=15)), 'objective': random.choice(self.campaign_objectives), 'special_ad_categories': random.choice(self.campaign_special_ad_categories)} return params + else: assert False, f"Post params for stream {stream} not implemented / supported" @@ -122,12 +191,12 @@ def generate_post_params(self, stream): # Ad Insights TODO # Empty data list for all 3 AdSet Ids - # AdSet Ids TODO + # AdSet Ids # "data": [{"id": "23847656838230058"}, # {"id": "23847292383400058"}, # {"id": "23843561338600058"}], - # Campaign Ids TODO + # Campaign Ids # "data": [{"id": "23847656838160058"}, # {"id": "23847292383380058"}, # {"id": "23843561338580058"}, diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 1e5abadb..30f32764 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -18,7 +18,7 @@ def name(): return "tt_facebook_pagination" def streams_to_test(self): # return self.expected_stream_names() - return {'campaigns'} # TODO WIP, expand to all core streams + return {'adsets', 'campaigns'} # TODO WIP, expand to all core streams def setUp(self): # pylint: disable=invalid-name """ @@ -42,20 +42,22 @@ def setUp(self): # pylint: disable=invalid-name # ensure there is enough data to paginate for stream in self.streams_to_test(): + limit = self.expected_page_size(stream) + response = fb_client.get_account_objects(stream) self.assertGreater(len(response['data']), 0, msg='Failed HTTP get response for stream: {}'.format(stream)) number_of_records = len(response['data']) - if number_of_records > self.expected_page_size(stream): + if number_of_records >= limit and response.get('paging', {}).get('next'): continue LOGGER.info(f"Stream: {stream} - Record count is less than max page size: " f"{self.expected_page_size(stream)}. Posting more records to setUp " "the PaginationTest") - for i in range(self.expected_page_size(stream) - number_of_records + 1): + for i in range(limit - number_of_records + 1): post_response = fb_client.create_account_objects(stream) self.assertEqual(post_response.status_code, 200, msg='Failed HTTP post response for stream: {}'.format(stream)) - LOGGER.info(f"Posted {i + 1} new campaigns, new total: {number_of_records + i + 1}") + LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}") time.sleep(1) # run initial sync From 0356413236c36418bfcbf27ae1d8cec5e40f50c9 Mon Sep 17 00:00:00 2001 From: btowles Date: Wed, 25 Oct 2023 23:10:02 +0000 Subject: [PATCH 04/10] Support for adcreative, ads, adsets, and campaigns, get only for ads_insights --- tests/base_new_frmwrk.py | 4 +++- tests/test_client.py | 24 ++++++++++++++++++++---- tests/test_facebook_pagination.py | 7 +++++-- 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index b169d151..48dbf3f3 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -78,6 +78,7 @@ def expected_metadata(): "adcreative": { BaseCase.PRIMARY_KEYS: {"id"}, BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE, + BaseCase.API_LIMIT: 100 }, "adsets": { BaseCase.PRIMARY_KEYS: {"id", "updated_time"}, @@ -94,7 +95,8 @@ def expected_metadata(): "ads_insights": { BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, - BaseCase.REPLICATION_KEYS: {"date_start"} + BaseCase.REPLICATION_KEYS: {"date_start"}, + BaseCase.API_LIMIT: 100 }, "ads_insights_age_and_gender": { BaseCase.PRIMARY_KEYS: { diff --git a/tests/test_client.py b/tests/test_client.py index 7fe12a68..005a29b3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -117,11 +117,22 @@ def create_account_objects(self, stream): return response def generate_post_params(self, stream): - if stream == 'ads': + if stream == 'adcreative': params = { 'access_token': self.access_token, 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=18)), - 'adset_id': 23847656838230058, # TODO pick rand adset_id? + 'object_story_spec': str({'page_id': '453760455405317', + 'link_data': {'link': 'http://fb.me'}})} + return params + + elif stream == 'ads': + params = { + 'access_token': self.access_token, + 'name': ''.join(random.choices(string.ascii_letters + string.digits, k=17)), + # adset is bound to parent campaign_objective, can cause errors posting new ads + # as certain objectives have different requirements. 50 ads per adset max + # adset below can be found under campaign: 120203395323750059 + 'adset_id': 120203403135680059, 'creative': str({'creative_id': 23843561378450058}), # TODO pick rand creative_id? 'status': "PAUSED"} return params @@ -144,6 +155,11 @@ def generate_post_params(self, stream): 'promoted_object': str({'page_id': '453760455405317'})} return params + # elif stream == 'ad_insights': + # params = { + # } + # return params + elif stream == 'campaigns': params = { # generate a campaign with random name, ojbective, and ad category 'access_token': self.access_token, @@ -170,7 +186,7 @@ def generate_post_params(self, stream): # -d "access_token=" \ # "https://graph.facebook.com//act_/ads" - # Ad IDs TODO + # Ad IDs # "data": [{"id": "23843561338620058"}, # {"id": "23847656838300058"}, # {"id": "23847292383430058"}], @@ -206,7 +222,7 @@ def generate_post_params(self, stream): # 'objective': 'OUTCOME_TRAFFIC', # 'special_ad_categories': ['NONE']} - # Creative Ids TODO + # Creative Ids # "data": [{"id": "23850233534140058"}, # {"id": "23850233532300058"}, # {"id": "23850233210620058"}, diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 30f32764..97d1d106 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -17,8 +17,8 @@ class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest): def name(): return "tt_facebook_pagination" def streams_to_test(self): - # return self.expected_stream_names() - return {'adsets', 'campaigns'} # TODO WIP, expand to all core streams + # TODO expand beyond core streams? ads_insights empty for account, no post via API + return {'adcreative', 'ads', 'adsets', 'campaigns'} def setUp(self): # pylint: disable=invalid-name """ @@ -43,6 +43,9 @@ def setUp(self): # pylint: disable=invalid-name # ensure there is enough data to paginate for stream in self.streams_to_test(): limit = self.expected_page_size(stream) + if stream == 'ads_insights': + import ipdb; ipdb.set_trace() + 1+1 response = fb_client.get_account_objects(stream) self.assertGreater(len(response['data']), 0, From e497066801979cb4e0a83ab3ff9d2309c91ed671 Mon Sep 17 00:00:00 2001 From: btowles Date: Fri, 27 Oct 2023 17:22:45 +0000 Subject: [PATCH 05/10] Clean up for next review round --- tests/test_client.py | 83 ++++--------------------------- tests/test_facebook_pagination.py | 16 +++--- 2 files changed, 16 insertions(+), 83 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 005a29b3..fc726406 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -26,7 +26,7 @@ def __init__(self): self.stream_endpoint_map = {'ads': '/ads', 'adsets': '/adsets', 'adcreative': '/adcreatives', - 'ads_insights': '/insights', + 'ads_insights': '/insights', # GET only endpoint 'campaigns': '/campaigns', 'users': '/users',} @@ -155,11 +155,6 @@ def generate_post_params(self, stream): 'promoted_object': str({'page_id': '453760455405317'})} return params - # elif stream == 'ad_insights': - # params = { - # } - # return params - elif stream == 'campaigns': params = { # generate a campaign with random name, ojbective, and ad category 'access_token': self.access_token, @@ -172,75 +167,15 @@ def generate_post_params(self, stream): assert False, f"Post params for stream {stream} not implemented / supported" - # Create multiplue ads at a time async $ get notif when complete - # Make an HTTP POST to: - # https://graph.facebook.com/{API_VERSION}/act_{AD_ACCOUNT_ID}/asyncadrequestsets - - # HTTP to get ads for an account - # GET /v18.0/act_{ad-account-id}/ads HTTP/1.1 - # Host: graph.facebook.com - - # cURL to read all ads from one ad account example - # curl -G \ - # -d "fields=name" \ - # -d "access_token=" \ - # "https://graph.facebook.com//act_/ads" - - # Ad IDs - # "data": [{"id": "23843561338620058"}, - # {"id": "23847656838300058"}, - # {"id": "23847292383430058"}], - # creative = {'asset_feed_spec': {'audios': [{'type': 'random'}]}, - # 'contextual_multi_ads': {'eligibility': ['POST_AD_ENGAGEMENT_FEED', - # 'POST_AD_ENGAGEMENT_SEED_AD', - # 'STANDALONE_FEED'], - # 'enroll_status': 'OPT_IN'}, - # 'degrees_of_freedom_spec': {'degrees_of_freedom_type': 'USER_ENROLLED_NON_DCO', - # 'text_transformation_types': ['TEXT_LIQUIDITY']}, - # 'object_story_spec': {'instagram_actor_id': '2476947555701417', - # 'link_data': {'call_to_action': {'type': 'SIGN_UP'}, - # 'link': 'http://fb.me', - # 'picture': 'https://foo.x.y.net/v/dir/1.png'}, - # 'page_id': '453760455405317'}, - # 'object_type': 'SHARE'} - # Ad Insights TODO - # Empty data list for all 3 AdSet Ids - - # AdSet Ids - # "data": [{"id": "23847656838230058"}, - # {"id": "23847292383400058"}, - # {"id": "23843561338600058"}], - - # Campaign Ids - # "data": [{"id": "23847656838160058"}, - # {"id": "23847292383380058"}, - # {"id": "23843561338580058"}, - # {"id": "120203241386960059"}] # API added campaign, returned on API get, verified - # cam_post_params = {'access_token': token, - # 'name': 'BHT test campaign', - # 'objective': 'OUTCOME_TRAFFIC', - # 'special_ad_categories': ['NONE']} - - # Creative Ids - # "data": [{"id": "23850233534140058"}, - # {"id": "23850233532300058"}, - # {"id": "23850233210620058"}, - # {"id": "23850232986710058"}, - # {"id": "23849554079380058"}, - # {"id": "23849066774220058"}, - # {"id": "23849066252410058"}, - # {"id": "23849063425570058"}, - # {"id": "23847674484290058"}, - # {"id": "23847292410940058"}, - # {"id": "23843561378450058"}], - - # Users - # "data": [{"name": "Stitch IntegrationDev", - # "tasks": ["DRAFT", "ANALYZE", "ADVERTISE", "MANAGE"], - # "id": "113504146635004"}] - - # TODO refactor below this line from jira test client to facebook + # endpoint is "GET" only. We cannot post fake insights data for test. As of Oct 27, 2023 + # data lists for original 3 AdSet Ids, and Ads account as a whole are empty. + # 1 - Can we run enough ads to get enough data to paginate? + # 2 - Can we interact with our own ads? + # if 1 or 2 == True then use setUp to conditionally test ads_insights if there is enough data + + + # TODO refactor or remove below this line from jira test client to facebook # def url(self, path): # if self.is_cloud: # return self.base_url.format(self.cloud_id, path) diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 97d1d106..2a3bf3df 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -17,7 +17,7 @@ class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest): def name(): return "tt_facebook_pagination" def streams_to_test(self): - # TODO expand beyond core streams? ads_insights empty for account, no post via API + # TODO ads_insights empty for account, no post via API, spike on generating data return {'adcreative', 'ads', 'adsets', 'campaigns'} def setUp(self): # pylint: disable=invalid-name @@ -43,23 +43,21 @@ def setUp(self): # pylint: disable=invalid-name # ensure there is enough data to paginate for stream in self.streams_to_test(): limit = self.expected_page_size(stream) - if stream == 'ads_insights': - import ipdb; ipdb.set_trace() - 1+1 - response = fb_client.get_account_objects(stream) self.assertGreater(len(response['data']), 0, msg='Failed HTTP get response for stream: {}'.format(stream)) + number_of_records = len(response['data']) if number_of_records >= limit and response.get('paging', {}).get('next'): - continue - LOGGER.info(f"Stream: {stream} - Record count is less than max page size: " - f"{self.expected_page_size(stream)}. Posting more records to setUp " - "the PaginationTest") + continue # stream is ready for test, no need for futher action + + LOGGER.info(f"Stream: {stream} - Record count is less than max page size: {limit}, " + "posting more records to setUp the PaginationTest") for i in range(limit - number_of_records + 1): post_response = fb_client.create_account_objects(stream) self.assertEqual(post_response.status_code, 200, msg='Failed HTTP post response for stream: {}'.format(stream)) + LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}") time.sleep(1) From 826b50577e71f1e4ce91e7f09f772bce4236ddbd Mon Sep 17 00:00:00 2001 From: btowles Date: Tue, 31 Oct 2023 19:49:33 +0000 Subject: [PATCH 06/10] PR review, clean up for further review --- tests/test_client.py | 111 +----------------------------- tests/test_facebook_pagination.py | 6 +- 2 files changed, 4 insertions(+), 113 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index fc726406..23850f6b 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,22 +1,13 @@ -# import backoff import os import random import requests import string -# from requests.exceptions import HTTPError -# from requests.auth import HTTPBasicAuth - -# TODO try below syncronous interaction or go all http? -# from facebookads.objects import Ad, AdAccount, AdSet, Campaign - from tap_tester.logger import LOGGER class TestClient(): - # def __init__(self, config): # TODO move to dynamic config model? def __init__(self): - # pass in config above and get() from it or hard code? self.base_url = 'https://graph.facebook.com' self.api_version = 'v18.0' self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID') @@ -76,14 +67,7 @@ def __init__(self): 'SUBSCRIBERS', 'REMINDERS_SET'] - # list of campaign objective values from fb docs below give "Invalid" error via api 18.0 - # 'APP_INSTALLS', 'BRAND_AWARENESS', 'CONVERSIONS', 'EVENT_RESPONSES', 'LEAD_GENERATION', - # 'LINK_CLICKS', 'MESSAGES', 'OFFER_CLAIMS', 'PAGE_LIKES', 'POST_ENGAGEMENT', - # 'PRODUCT_CATALOG_SALES', 'REACH', 'STORE_VISITS', 'VIDEO_VIEWS' - - # LOCAL_AWARENESS gives deprecated error, use REACH (reach is invalid from above) - - # valid and verified objectives listed below, objectives above should be re-mapped to these + # valid and verified objectives listed below, other objectives should be re-mapped to these self.campaign_objectives = ['OUTCOME_APP_PROMOTION', 'OUTCOME_AWARENESS', 'OUTCOME_ENGAGEMENT', @@ -133,7 +117,7 @@ def generate_post_params(self, stream): # as certain objectives have different requirements. 50 ads per adset max # adset below can be found under campaign: 120203395323750059 'adset_id': 120203403135680059, - 'creative': str({'creative_id': 23843561378450058}), # TODO pick rand creative_id? + 'creative': str({'creative_id': 23843561378450058}), 'status': "PAUSED"} return params @@ -148,7 +132,7 @@ def generate_post_params(self, stream): 'billing_event': 'IMPRESSIONS', 'bid_amount': 2, # TODO random? 'daily_budget': 1000, # TODO random? tie to parent campaign? - 'campaign_id': 120203241386960059, # TODO pull from campaigns dynamically? + 'campaign_id': 120203241386960059, 'targeting': str({'geo_locations': {'countries': ["US"]}, 'facebook_positions': ["feed"]}), 'status': "PAUSED", @@ -173,92 +157,3 @@ def generate_post_params(self, stream): # 1 - Can we run enough ads to get enough data to paginate? # 2 - Can we interact with our own ads? # if 1 or 2 == True then use setUp to conditionally test ads_insights if there is enough data - - - # TODO refactor or remove below this line from jira test client to facebook - # def url(self, path): - # if self.is_cloud: - # return self.base_url.format(self.cloud_id, path) - - # # defend against if the base_url does or does not provide https:// - # base_url = self.base_url - # base_url = re.sub('^http[s]?://', '', base_url) - # base_url = 'https://' + base_url - # return base_url.rstrip("/") + "/" + path.lstrip("/") - - # def _headers(self, headers): - # headers = headers.copy() - # if self.user_agent: - # headers["User-Agent"] = self.user_agent - - # if self.is_cloud: - # # Add OAuth Headers - # headers['Accept'] = 'application/json' - # headers['Authorization'] = 'Bearer {}'.format(self.access_token) - - # return headers - - # @backoff.on_exception(backoff.expo, - # (requests.exceptions.ConnectionError, HTTPError), - # jitter=None, - # max_tries=6, - # giveup=lambda e: not should_retry_httperror(e)) - # def send(self, method, path, headers={}, **kwargs): - # if self.is_cloud: - # # OAuth Path - # request = requests.Request(method, - # self.url(path), - # headers=self._headers(headers), - # **kwargs) - # else: - # # Basic Auth Path - # request = requests.Request(method, - # self.url(path), - # auth=self.auth, - # headers=self._headers(headers), - # **kwargs) - # return self.session.send(request.prepare()) - - # @backoff.on_exception(backoff.constant, - # RateLimitException, - # max_tries=10, - # interval=60) - # def request(self, tap_stream_id, *args, **kwargs): - # response = self.send(*args, **kwargs) - # if response.status_code == 429: - # raise RateLimitException() - - # try: - # response.raise_for_status() - # except requests.exceptions.HTTPError as http_error: - # LOGGER.error("Received HTTPError with status code %s, error message response text %s", - # http_error.response.status_code, - # http_error.response.text) - # raise - - # return response.json() - - # def refresh_credentials(self): - # body = {"grant_type": "refresh_token", - # "client_id": self.oauth_client_id, - # "client_secret": self.oauth_client_secret, - # "refresh_token": self.refresh_token} - # try: - # resp = self.session.post("https://auth.atlassian.com/oauth/token", data=body) - # resp.raise_for_status() - # self.access_token = resp.json()['access_token'] - # except Exception as ex: - # error_message = str(ex) - # if resp: - # error_message = error_message + ", Response from Jira: {}".format(resp.text) - # raise Exception(error_message) from ex - # finally: - # LOGGER.info("Starting new login timer") - # self.login_timer = threading.Timer(REFRESH_TOKEN_EXPIRATION_PERIOD, - # self.refresh_credentials) - # self.login_timer.start() - - # def test_credentials_are_authorized(self): - # # Assume that everyone has issues, so we try and hit that endpoint - # self.request("issues", "GET", "/rest/api/2/search", - # params={"maxResults": 1}) diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 2a3bf3df..6d023dab 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -44,8 +44,6 @@ def setUp(self): # pylint: disable=invalid-name for stream in self.streams_to_test(): limit = self.expected_page_size(stream) response = fb_client.get_account_objects(stream) - self.assertGreater(len(response['data']), 0, - msg='Failed HTTP get response for stream: {}'.format(stream)) number_of_records = len(response['data']) if number_of_records >= limit and response.get('paging', {}).get('next'): @@ -53,11 +51,9 @@ def setUp(self): # pylint: disable=invalid-name LOGGER.info(f"Stream: {stream} - Record count is less than max page size: {limit}, " "posting more records to setUp the PaginationTest") + for i in range(limit - number_of_records + 1): post_response = fb_client.create_account_objects(stream) - self.assertEqual(post_response.status_code, 200, - msg='Failed HTTP post response for stream: {}'.format(stream)) - LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}") time.sleep(1) From 01ad519848c9760686772b692d9386125c41d4e2 Mon Sep 17 00:00:00 2001 From: btowles Date: Thu, 9 Nov 2023 21:11:55 +0000 Subject: [PATCH 07/10] PR review comments round 2, pass limit and date range to test client get request --- tests/base_new_frmwrk.py | 22 ++++++++++++++++++++++ tests/test_client.py | 6 ++++-- tests/test_facebook_pagination.py | 8 +++++++- 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index 48dbf3f3..9160c456 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -1,4 +1,5 @@ import os +from datetime import datetime as dt from datetime import timedelta from tap_tester import connections, menagerie, runner, LOGGER from tap_tester.base_suite_tests.base_case import BaseCase @@ -162,6 +163,27 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."): if len(missing_envs) != 0: raise Exception("set environment variables") + @staticmethod + def parse_date(date_value): + """ + Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + """ + date_formats = { + "%Y-%m-%dT%H:%M:%S.%fZ", + "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00", + "%Y-%m-%d" + } + for date_format in date_formats: + try: + date_stripped = dt.strptime(date_value, date_format) + return date_stripped + except ValueError: + continue + + raise NotImplementedError("Tests do not account for dates of this format: {}".format(date_value)) + ########################################################################## ### Tap Specific Methods diff --git a/tests/test_client.py b/tests/test_client.py index 23850f6b..db484874 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -75,13 +75,15 @@ def __init__(self): 'OUTCOME_SALES', 'OUTCOME_TRAFFIC'] - def get_account_objects(self, stream): + def get_account_objects(self, stream, limit, time_range): + # time_range defines query start and end dates and should match tap config assert stream in self.stream_endpoint_map.keys(), \ f'Endpoint undefined for specified stream: {stream}' endpoint = self.stream_endpoint_map[stream] url = self.account_url + endpoint params = {'access_token': self.access_token, - 'limit': 100} + 'limit': limit, + 'time_range': str({'since': time_range['since'], 'until': time_range['until']})} LOGGER.info(f"Getting url: {url}") response = requests.get(url, params) response.raise_for_status() diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 6d023dab..21b09e59 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -2,6 +2,8 @@ import time import unittest +from datetime import datetime as dt + from tap_tester.base_suite_tests.pagination_test import PaginationTest from tap_tester import connections, runner, menagerie, LOGGER @@ -41,9 +43,13 @@ def setUp(self): # pylint: disable=invalid-name self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) # ensure there is enough data to paginate + start_date_dt = self.parse_date(self.get_properties()['start_date']) + date_range = {'since': dt.strftime(start_date_dt, "%Y-%m-%d"), + 'until': dt.strftime(dt.now(), "%Y-%m-%d")} + for stream in self.streams_to_test(): limit = self.expected_page_size(stream) - response = fb_client.get_account_objects(stream) + response = fb_client.get_account_objects(stream, limit, date_range) number_of_records = len(response['data']) if number_of_records >= limit and response.get('paging', {}).get('next'): From 6753872182f239fac2cfa3982574c254a725bd39 Mon Sep 17 00:00:00 2001 From: btowles Date: Mon, 13 Nov 2023 16:38:22 +0000 Subject: [PATCH 08/10] Convert times to utc to compare all datetime objects as tz naive --- tests/base_new_frmwrk.py | 51 +++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 14 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index 9160c456..e6816587 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -1,6 +1,6 @@ import os from datetime import datetime as dt -from datetime import timedelta +from datetime import timezone as tz from tap_tester import connections, menagerie, runner, LOGGER from tap_tester.base_suite_tests.base_case import BaseCase @@ -107,15 +107,15 @@ def expected_metadata(): BaseCase.REPLICATION_KEYS: {"date_start"} }, "ads_insights_country": { - BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", "country"}, + BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", + "country"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, BaseCase.REPLICATION_KEYS: {"date_start"} }, "ads_insights_platform_and_device": { - BaseCase.PRIMARY_KEYS: { - "campaign_id", "adset_id", "ad_id", "date_start", - "publisher_platform", "platform_position", "impression_device" - }, + BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", + "publisher_platform", "platform_position", + "impression_device"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, BaseCase.REPLICATION_KEYS: {"date_start"} }, @@ -130,7 +130,8 @@ def expected_metadata(): BaseCase.REPLICATION_KEYS: {"date_start"} }, "ads_insights_hourly_advertiser": { - BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", "campaign_id", "adset_id", "ad_id", "date_start"}, + BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", + "campaign_id", "adset_id", "ad_id", "date_start"}, BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL, BaseCase.REPLICATION_KEYS: {"date_start"} }, @@ -152,8 +153,12 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods): replication_md = [{ "breadcrumb": [], "metadata":{ "selected" : True}}] else: replication_md = [{ "breadcrumb": [], "metadata": { "selected": None}}] - connections.set_non_discoverable_metadata( - conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md) + connections.set_non_discoverable_metadata(conn_id, + catalog, + menagerie.get_annotated_schema( + conn_id, + catalog['stream_id']), + replication_md) @classmethod def setUpClass(cls,logging="Ensuring environment variables are sourced."): @@ -166,23 +171,41 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."): @staticmethod def parse_date(date_value): """ - Pass in string-formatted-datetime, parse the value, and return it as an unformatted datetime object. + Pass in string-formatted-datetime, parse the value, and return it as an unformatted + datetime object that is time zone naive. """ date_formats = { "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", + "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f+00:00", "%Y-%m-%dT%H:%M:%S+00:00", "%Y-%m-%d" } for date_format in date_formats: try: - date_stripped = dt.strptime(date_value, date_format) - return date_stripped + date = dt.strptime(date_value, date_format) + if date_format in ["%Y-%m-%dT%H:%M:%S%z", + "%Y-%m-%dT%H:%M:%S.%f+00:00", + "%Y-%m-%dT%H:%M:%S+00:00"]: + # convert a datetime with timezone information to utc + utc = dt(date.year, date.month, date.day, date.hour, date.minute, date.second, + tzinfo=tz.utc) + + if date.tzinfo and hasattr(date.tzinfo, "_offset"): + utc += date.tzinfo._offset + + utc = utc.replace(tzinfo=None) + + return utc + + return date + except ValueError: continue - raise NotImplementedError("Tests do not account for dates of this format: {}".format(date_value)) + raise NotImplementedError("Tests do not account for dates of this format: {}".format( + date_value)) ########################################################################## @@ -201,7 +224,7 @@ def expected_page_size(self, stream=None): page_size = { table: properties[BaseCase.API_LIMIT] for table, properties in self.expected_metadata().items() - if properties.get(BaseCase.API_LIMIT)} # TODO only define API_LIMIT for core streams? + if properties.get(BaseCase.API_LIMIT)} if not stream: return page_size return page_size[stream] From 668e165f1dbd3f1a188f27257845117817bd1e7a Mon Sep 17 00:00:00 2001 From: btowles Date: Mon, 13 Nov 2023 23:03:15 +0000 Subject: [PATCH 09/10] Use base_suite parse_date --- tests/base_new_frmwrk.py | 39 --------------------------------------- 1 file changed, 39 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index e6816587..13331baa 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -168,45 +168,6 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."): if len(missing_envs) != 0: raise Exception("set environment variables") - @staticmethod - def parse_date(date_value): - """ - Pass in string-formatted-datetime, parse the value, and return it as an unformatted - datetime object that is time zone naive. - """ - date_formats = { - "%Y-%m-%dT%H:%M:%S.%fZ", - "%Y-%m-%dT%H:%M:%SZ", - "%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M:%S.%f+00:00", - "%Y-%m-%dT%H:%M:%S+00:00", - "%Y-%m-%d" - } - for date_format in date_formats: - try: - date = dt.strptime(date_value, date_format) - if date_format in ["%Y-%m-%dT%H:%M:%S%z", - "%Y-%m-%dT%H:%M:%S.%f+00:00", - "%Y-%m-%dT%H:%M:%S+00:00"]: - # convert a datetime with timezone information to utc - utc = dt(date.year, date.month, date.day, date.hour, date.minute, date.second, - tzinfo=tz.utc) - - if date.tzinfo and hasattr(date.tzinfo, "_offset"): - utc += date.tzinfo._offset - - utc = utc.replace(tzinfo=None) - - return utc - - return date - - except ValueError: - continue - - raise NotImplementedError("Tests do not account for dates of this format: {}".format( - date_value)) - ########################################################################## ### Tap Specific Methods From 5e8ef415f362652c570e9ee18cd98f2c2c7c7347 Mon Sep 17 00:00:00 2001 From: btowles Date: Tue, 14 Nov 2023 19:30:08 +0000 Subject: [PATCH 10/10] Final review comments, add TODO to update get_account_objects(), update BaseCase page_size(), update self.start_date pattern --- tests/base_new_frmwrk.py | 17 ++--------------- tests/test_facebook_pagination.py | 4 ++-- 2 files changed, 4 insertions(+), 17 deletions(-) diff --git a/tests/base_new_frmwrk.py b/tests/base_new_frmwrk.py index 13331baa..cb3a3259 100644 --- a/tests/base_new_frmwrk.py +++ b/tests/base_new_frmwrk.py @@ -40,7 +40,7 @@ class FacebookBaseTest(BaseCase): FULL_TABLE = "FULL_TABLE" BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00" - start_date = "" + start_date = "2021-04-07T00:00:00Z" end_date = "" @staticmethod @@ -57,7 +57,7 @@ def get_properties(self): """Configuration properties required for the tap.""" return { 'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'), - 'start_date' : '2021-04-07T00:00:00Z', + 'start_date' : self.start_date, 'end_date': '2021-04-09T00:00:00Z', 'insights_buffer_days': '1', } @@ -176,16 +176,3 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."): @staticmethod def is_insight(stream): return stream.startswith('ads_insights') - - def expected_page_size(self, stream=None): - """ - return a dictionary with key of table name - and value as an integer for the page size of the API requests for that stream. - """ - page_size = { - table: properties[BaseCase.API_LIMIT] - for table, properties in self.expected_metadata().items() - if properties.get(BaseCase.API_LIMIT)} - if not stream: - return page_size - return page_size[stream] diff --git a/tests/test_facebook_pagination.py b/tests/test_facebook_pagination.py index 21b09e59..364fca96 100644 --- a/tests/test_facebook_pagination.py +++ b/tests/test_facebook_pagination.py @@ -43,7 +43,7 @@ def setUp(self): # pylint: disable=invalid-name self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs) # ensure there is enough data to paginate - start_date_dt = self.parse_date(self.get_properties()['start_date']) + start_date_dt = self.parse_date(self.start_date) date_range = {'since': dt.strftime(start_date_dt, "%Y-%m-%d"), 'until': dt.strftime(dt.now(), "%Y-%m-%d")} @@ -52,6 +52,7 @@ def setUp(self): # pylint: disable=invalid-name response = fb_client.get_account_objects(stream, limit, date_range) number_of_records = len(response['data']) + # TODO move "if" logic below to client method get_account_objects() if number_of_records >= limit and response.get('paging', {}).get('next'): continue # stream is ready for test, no need for futher action @@ -61,7 +62,6 @@ def setUp(self): # pylint: disable=invalid-name for i in range(limit - number_of_records + 1): post_response = fb_client.create_account_objects(stream) LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}") - time.sleep(1) # run initial sync PaginationTest.record_count_by_stream = self.run_and_verify_sync_mode(conn_id)