Skip to content

Commit

Permalink
Facebook test client WIP, campaigns stream only (#231)
Browse files Browse the repository at this point in the history
* Facebook test client WIP, campaigns stream only

* Add pagination test for campaigns stream, update new base file and client

* First review comments, add adsets stream, start work on ads

* Support for adcreative, ads, adsets, and campaigns, get only for ads_insights

* Clean up for next review round

* PR review, clean up for further review

* PR review comments round 2, pass limit and date range to test client get request

* Convert times to utc to compare all datetime objects as tz naive

* Use base_suite parse_date

* Final review comments, add TODO to update get_account_objects(), update BaseCase page_size(), update self.start_date pattern
  • Loading branch information
bhtowles authored Nov 15, 2023
1 parent d9f361e commit add18f1
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 15 deletions.
41 changes: 26 additions & 15 deletions tests/base_new_frmwrk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from datetime import timedelta
from datetime import datetime as dt
from datetime import timezone as tz
from tap_tester import connections, menagerie, runner, LOGGER
from tap_tester.base_suite_tests.base_case import BaseCase

Expand Down Expand Up @@ -39,7 +40,7 @@ class FacebookBaseTest(BaseCase):
FULL_TABLE = "FULL_TABLE"
BOOKMARK_COMPARISON_FORMAT = "%Y-%m-%dT00:00:00+00:00"

start_date = ""
start_date = "2021-04-07T00:00:00Z"
end_date = ""

@staticmethod
Expand All @@ -56,7 +57,7 @@ def get_properties(self):
"""Configuration properties required for the tap."""
return {
'account_id': os.getenv('TAP_FACEBOOK_ACCOUNT_ID'),
'start_date' : '2021-04-07T00:00:00Z',
'start_date' : self.start_date,
'end_date': '2021-04-09T00:00:00Z',
'insights_buffer_days': '1',
}
Expand All @@ -72,26 +73,31 @@ def expected_metadata():
"ads": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"adcreative": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE,
BaseCase.API_LIMIT: 100
},
"adsets": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"campaigns": {
BaseCase.PRIMARY_KEYS: {"id", },
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"ads_insights": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
BaseCase.REPLICATION_KEYS: {"date_start"},
BaseCase.API_LIMIT: 100
},
"ads_insights_age_and_gender": {
BaseCase.PRIMARY_KEYS: {
Expand All @@ -101,15 +107,15 @@ def expected_metadata():
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_country": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", "country"},
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start",
"country"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_platform_and_device": {
BaseCase.PRIMARY_KEYS: {
"campaign_id", "adset_id", "ad_id", "date_start",
"publisher_platform", "platform_position", "impression_device"
},
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start",
"publisher_platform", "platform_position",
"impression_device"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
Expand All @@ -124,7 +130,8 @@ def expected_metadata():
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_hourly_advertiser": {
BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", "campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone",
"campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
Expand All @@ -146,8 +153,12 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods):
replication_md = [{ "breadcrumb": [], "metadata":{ "selected" : True}}]
else:
replication_md = [{ "breadcrumb": [], "metadata": { "selected": None}}]
connections.set_non_discoverable_metadata(
conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md)
connections.set_non_discoverable_metadata(conn_id,
catalog,
menagerie.get_annotated_schema(
conn_id,
catalog['stream_id']),
replication_md)

@classmethod
def setUpClass(cls,logging="Ensuring environment variables are sourced."):
Expand Down
161 changes: 161 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import os
import random
import requests
import string

from tap_tester.logger import LOGGER


class TestClient():
def __init__(self):
self.base_url = 'https://graph.facebook.com'
self.api_version = 'v18.0'
self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID')
self.access_token = os.getenv('TAP_FACEBOOK_ACCESS_TOKEN')
self.account_url = f"{self.base_url}/{self.api_version}/act_{self.account_id}"

self.stream_endpoint_map = {'ads': '/ads',
'adsets': '/adsets',
'adcreative': '/adcreatives',
'ads_insights': '/insights', # GET only endpoint
'campaigns': '/campaigns',
'users': '/users',}

self.campaign_special_ad_categories = ['NONE',
'EMPLOYMENT',
'HOUSING',
'CREDIT',
# 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized
'ONLINE_GAMBLING_AND_GAMING']

self.adset_billing_events = ['APP_INSTALLS',
'CLICKS',
'IMPRESSIONS',
'LINK_CLICKS',
'NONE',
'OFFER_CLAIMS',
'PAGE_LIKES',
'POST_ENGAGEMENT',
'THRUPLAY',
'PURCHASE',
'LISTING_INTERACTION']

self.adset_optimization_goals = ['NONE',
'APP_INSTALLS',
'AD_RECALL_LIFT',
'ENGAGED_USERS',
'EVENT_RESPONSES',
'IMPRESSIONS',
'LEAD_GENERATION',
'QUALITY_LEAD',
'LINK_CLICKS',
'OFFSITE_CONVERSIONS',
'PAGE_LIKES',
'POST_ENGAGEMENT',
'QUALITY_CALL',
'REACH',
'LANDING_PAGE_VIEWS',
'VISIT_INSTAGRAM_PROFILE',
'VALUE',
'THRUPLAY',
'DERIVED_EVENTS',
'APP_INSTALLS_AND_OFFSITE_CONVERSIONS',
'CONVERSATIONS',
'IN_APP_VALUE',
'MESSAGING_PURCHASE_CONVERSION',
'MESSAGING_APPOINTMENT_CONVERSION',
'SUBSCRIBERS',
'REMINDERS_SET']

# valid and verified objectives listed below, other objectives should be re-mapped to these
self.campaign_objectives = ['OUTCOME_APP_PROMOTION',
'OUTCOME_AWARENESS',
'OUTCOME_ENGAGEMENT',
'OUTCOME_LEADS',
'OUTCOME_SALES',
'OUTCOME_TRAFFIC']

def get_account_objects(self, stream, limit, time_range):
# time_range defines query start and end dates and should match tap config
assert stream in self.stream_endpoint_map.keys(), \
f'Endpoint undefined for specified stream: {stream}'
endpoint = self.stream_endpoint_map[stream]
url = self.account_url + endpoint
params = {'access_token': self.access_token,
'limit': limit,
'time_range': str({'since': time_range['since'], 'until': time_range['until']})}
LOGGER.info(f"Getting url: {url}")
response = requests.get(url, params)
response.raise_for_status()
LOGGER.info(f"Returning get response: {response}")
return response.json()

def create_account_objects(self, stream):
assert stream in self.stream_endpoint_map.keys(), \
f'Endpoint undefined for specified stream: {stream}'
endpoint = self.stream_endpoint_map[stream]
url = self.account_url + endpoint
LOGGER.info(f"Posting to url: {url}")
params = self.generate_post_params(stream)
response = requests.post(url, params)
response.raise_for_status()
LOGGER.info(f"Returning post response: {response}")
return response

def generate_post_params(self, stream):
if stream == 'adcreative':
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=18)),
'object_story_spec': str({'page_id': '453760455405317',
'link_data': {'link': 'http://fb.me'}})}
return params

elif stream == 'ads':
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=17)),
# adset is bound to parent campaign_objective, can cause errors posting new ads
# as certain objectives have different requirements. 50 ads per adset max
# adset below can be found under campaign: 120203395323750059
'adset_id': 120203403135680059,
'creative': str({'creative_id': 23843561378450058}),
'status': "PAUSED"}
return params

elif stream == 'adsets':
# TODO In order to randomize optimization_goal and billing_event the campaign_id
# would need to be examined to determine which goals were supported. Then an option
# could be selected from the available billing events supported by that goal.
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=16)),
'optimization_goal': 'REACH',
'billing_event': 'IMPRESSIONS',
'bid_amount': 2, # TODO random?
'daily_budget': 1000, # TODO random? tie to parent campaign?
'campaign_id': 120203241386960059,
'targeting': str({'geo_locations': {'countries': ["US"]},
'facebook_positions': ["feed"]}),
'status': "PAUSED",
'promoted_object': str({'page_id': '453760455405317'})}
return params

elif stream == 'campaigns':
params = { # generate a campaign with random name, ojbective, and ad category
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=15)),
'objective': random.choice(self.campaign_objectives),
'special_ad_categories': random.choice(self.campaign_special_ad_categories)}
return params

else:
assert False, f"Post params for stream {stream} not implemented / supported"


# Ad Insights TODO
# endpoint is "GET" only. We cannot post fake insights data for test. As of Oct 27, 2023
# data lists for original 3 AdSet Ids, and Ads account as a whole are empty.
# 1 - Can we run enough ads to get enough data to paginate?
# 2 - Can we interact with our own ads?
# if 1 or 2 == True then use setUp to conditionally test ads_insights if there is enough data
68 changes: 68 additions & 0 deletions tests/test_facebook_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import test_client as tc
import time
import unittest

from datetime import datetime as dt

from tap_tester.base_suite_tests.pagination_test import PaginationTest
from tap_tester import connections, runner, menagerie, LOGGER

from base_new_frmwrk import FacebookBaseTest

fb_client = tc.TestClient()


class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest):
"""Standard Pagination Test"""

@staticmethod
def name():
return "tt_facebook_pagination"
def streams_to_test(self):
# TODO ads_insights empty for account, no post via API, spike on generating data
return {'adcreative', 'ads', 'adsets', 'campaigns'}

def setUp(self): # pylint: disable=invalid-name
"""
Setup for tests in this module.
"""
if PaginationTest.synced_records and PaginationTest.record_count_by_stream:
return

# instantiate connection
conn_id = connections.ensure_connection(self)

# run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in self.streams_to_test()]

# non_selected_fields are none
self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs)

# ensure there is enough data to paginate
start_date_dt = self.parse_date(self.start_date)
date_range = {'since': dt.strftime(start_date_dt, "%Y-%m-%d"),
'until': dt.strftime(dt.now(), "%Y-%m-%d")}

for stream in self.streams_to_test():
limit = self.expected_page_size(stream)
response = fb_client.get_account_objects(stream, limit, date_range)

number_of_records = len(response['data'])
# TODO move "if" logic below to client method get_account_objects()
if number_of_records >= limit and response.get('paging', {}).get('next'):
continue # stream is ready for test, no need for futher action

LOGGER.info(f"Stream: {stream} - Record count is less than max page size: {limit}, "
"posting more records to setUp the PaginationTest")

for i in range(limit - number_of_records + 1):
post_response = fb_client.create_account_objects(stream)
LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}")

# run initial sync
PaginationTest.record_count_by_stream = self.run_and_verify_sync_mode(conn_id)
PaginationTest.synced_records = runner.get_records_from_target_output()

0 comments on commit add18f1

Please sign in to comment.