Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Facebook test client WIP, campaigns stream only #231

Merged
merged 14 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions tests/base_new_frmwrk.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
from datetime import timedelta
from datetime import datetime as dt
from datetime import timezone as tz
from tap_tester import connections, menagerie, runner, LOGGER
from tap_tester.base_suite_tests.base_case import BaseCase

Expand Down Expand Up @@ -72,26 +73,31 @@ def expected_metadata():
"ads": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"adcreative": {
BaseCase.PRIMARY_KEYS: {"id"},
BaseCase.REPLICATION_METHOD: BaseCase.FULL_TABLE,
BaseCase.API_LIMIT: 100
},
"adsets": {
BaseCase.PRIMARY_KEYS: {"id", "updated_time"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"campaigns": {
BaseCase.PRIMARY_KEYS: {"id", },
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"updated_time"}
BaseCase.REPLICATION_KEYS: {"updated_time"},
BaseCase.API_LIMIT: 100
},
"ads_insights": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
BaseCase.REPLICATION_KEYS: {"date_start"},
BaseCase.API_LIMIT: 100
},
"ads_insights_age_and_gender": {
BaseCase.PRIMARY_KEYS: {
Expand All @@ -101,15 +107,15 @@ def expected_metadata():
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_country": {
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start", "country"},
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start",
"country"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_platform_and_device": {
BaseCase.PRIMARY_KEYS: {
"campaign_id", "adset_id", "ad_id", "date_start",
"publisher_platform", "platform_position", "impression_device"
},
BaseCase.PRIMARY_KEYS: {"campaign_id", "adset_id", "ad_id", "date_start",
"publisher_platform", "platform_position",
"impression_device"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
Expand All @@ -124,7 +130,8 @@ def expected_metadata():
BaseCase.REPLICATION_KEYS: {"date_start"}
},
"ads_insights_hourly_advertiser": {
BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone", "campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.PRIMARY_KEYS: {"hourly_stats_aggregated_by_advertiser_time_zone",
"campaign_id", "adset_id", "ad_id", "date_start"},
BaseCase.REPLICATION_METHOD: BaseCase.INCREMENTAL,
BaseCase.REPLICATION_KEYS: {"date_start"}
},
Expand All @@ -146,8 +153,12 @@ def set_replication_methods(self, conn_id, catalogs, replication_methods):
replication_md = [{ "breadcrumb": [], "metadata":{ "selected" : True}}]
else:
replication_md = [{ "breadcrumb": [], "metadata": { "selected": None}}]
connections.set_non_discoverable_metadata(
conn_id, catalog, menagerie.get_annotated_schema(conn_id, catalog['stream_id']), replication_md)
connections.set_non_discoverable_metadata(conn_id,
catalog,
menagerie.get_annotated_schema(
conn_id,
catalog['stream_id']),
replication_md)

@classmethod
def setUpClass(cls,logging="Ensuring environment variables are sourced."):
Expand All @@ -165,3 +176,16 @@ def setUpClass(cls,logging="Ensuring environment variables are sourced."):
@staticmethod
def is_insight(stream):
return stream.startswith('ads_insights')

def expected_page_size(self, stream=None):
"""
return a dictionary with key of table name
and value as an integer for the page size of the API requests for that stream.
"""
page_size = {
table: properties[BaseCase.API_LIMIT]
for table, properties in self.expected_metadata().items()
if properties.get(BaseCase.API_LIMIT)}
if not stream:
return page_size
return page_size[stream]
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
161 changes: 161 additions & 0 deletions tests/test_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
import os
import random
import requests
import string

from tap_tester.logger import LOGGER


class TestClient():
def __init__(self):
self.base_url = 'https://graph.facebook.com'
self.api_version = 'v18.0'
self.account_id = os.getenv('TAP_FACEBOOK_ACCOUNT_ID')
self.access_token = os.getenv('TAP_FACEBOOK_ACCESS_TOKEN')
self.account_url = f"{self.base_url}/{self.api_version}/act_{self.account_id}"

self.stream_endpoint_map = {'ads': '/ads',
'adsets': '/adsets',
'adcreative': '/adcreatives',
'ads_insights': '/insights', # GET only endpoint
'campaigns': '/campaigns',
'users': '/users',}

self.campaign_special_ad_categories = ['NONE',
'EMPLOYMENT',
'HOUSING',
'CREDIT',
# 'ISSUES_ELECTIONS_POLITICS', # acct unauthorized
'ONLINE_GAMBLING_AND_GAMING']

self.adset_billing_events = ['APP_INSTALLS',
'CLICKS',
'IMPRESSIONS',
'LINK_CLICKS',
'NONE',
'OFFER_CLAIMS',
'PAGE_LIKES',
'POST_ENGAGEMENT',
'THRUPLAY',
'PURCHASE',
'LISTING_INTERACTION']

self.adset_optimization_goals = ['NONE',
'APP_INSTALLS',
'AD_RECALL_LIFT',
'ENGAGED_USERS',
'EVENT_RESPONSES',
'IMPRESSIONS',
'LEAD_GENERATION',
'QUALITY_LEAD',
'LINK_CLICKS',
'OFFSITE_CONVERSIONS',
'PAGE_LIKES',
'POST_ENGAGEMENT',
'QUALITY_CALL',
'REACH',
'LANDING_PAGE_VIEWS',
'VISIT_INSTAGRAM_PROFILE',
'VALUE',
'THRUPLAY',
'DERIVED_EVENTS',
'APP_INSTALLS_AND_OFFSITE_CONVERSIONS',
'CONVERSATIONS',
'IN_APP_VALUE',
'MESSAGING_PURCHASE_CONVERSION',
'MESSAGING_APPOINTMENT_CONVERSION',
'SUBSCRIBERS',
'REMINDERS_SET']

# valid and verified objectives listed below, other objectives should be re-mapped to these
self.campaign_objectives = ['OUTCOME_APP_PROMOTION',
'OUTCOME_AWARENESS',
'OUTCOME_ENGAGEMENT',
'OUTCOME_LEADS',
'OUTCOME_SALES',
'OUTCOME_TRAFFIC']

def get_account_objects(self, stream, limit, time_range):
# time_range defines query start and end dates and should match tap config
assert stream in self.stream_endpoint_map.keys(), \
f'Endpoint undefined for specified stream: {stream}'
endpoint = self.stream_endpoint_map[stream]
url = self.account_url + endpoint
params = {'access_token': self.access_token,
'limit': limit,
'time_range': str({'since': time_range['since'], 'until': time_range['until']})}
LOGGER.info(f"Getting url: {url}")
response = requests.get(url, params)
response.raise_for_status()
LOGGER.info(f"Returning get response: {response}")
return response.json()

def create_account_objects(self, stream):
assert stream in self.stream_endpoint_map.keys(), \
f'Endpoint undefined for specified stream: {stream}'
endpoint = self.stream_endpoint_map[stream]
url = self.account_url + endpoint
LOGGER.info(f"Posting to url: {url}")
params = self.generate_post_params(stream)
response = requests.post(url, params)
response.raise_for_status()
LOGGER.info(f"Returning post response: {response}")
return response

def generate_post_params(self, stream):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more than enough for now if we have random pk values. For an all fields test we would need to expand this and it could get messy. Might need to think about that more in the future, but not necessary now.

Also, to think about for the future (and maybe for the pagination test) is if we should store and return the data that we get and create to compare to what the sync gets and to make sure it is getting the correct data. Not sure if our standard pagination test worries about that but probably not since the data is unknown ahead of time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted.

if stream == 'adcreative':
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=18)),
'object_story_spec': str({'page_id': '453760455405317',
'link_data': {'link': 'http://fb.me'}})}
return params

elif stream == 'ads':
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=17)),
# adset is bound to parent campaign_objective, can cause errors posting new ads
# as certain objectives have different requirements. 50 ads per adset max
# adset below can be found under campaign: 120203395323750059
'adset_id': 120203403135680059,
'creative': str({'creative_id': 23843561378450058}),
'status': "PAUSED"}
return params

elif stream == 'adsets':
# TODO In order to randomize optimization_goal and billing_event the campaign_id
# would need to be examined to determine which goals were supported. Then an option
# could be selected from the available billing events supported by that goal.
params = {
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=16)),
'optimization_goal': 'REACH',
'billing_event': 'IMPRESSIONS',
'bid_amount': 2, # TODO random?
'daily_budget': 1000, # TODO random? tie to parent campaign?
'campaign_id': 120203241386960059,
'targeting': str({'geo_locations': {'countries': ["US"]},
'facebook_positions': ["feed"]}),
'status': "PAUSED",
'promoted_object': str({'page_id': '453760455405317'})}
return params

elif stream == 'campaigns':
params = { # generate a campaign with random name, ojbective, and ad category
'access_token': self.access_token,
'name': ''.join(random.choices(string.ascii_letters + string.digits, k=15)),
'objective': random.choice(self.campaign_objectives),
'special_ad_categories': random.choice(self.campaign_special_ad_categories)}
return params

else:
assert False, f"Post params for stream {stream} not implemented / supported"


# Ad Insights TODO
# endpoint is "GET" only. We cannot post fake insights data for test. As of Oct 27, 2023
# data lists for original 3 AdSet Ids, and Ads account as a whole are empty.
# 1 - Can we run enough ads to get enough data to paginate?
# 2 - Can we interact with our own ads?
# if 1 or 2 == True then use setUp to conditionally test ads_insights if there is enough data
68 changes: 68 additions & 0 deletions tests/test_facebook_pagination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import test_client as tc
import time
import unittest

from datetime import datetime as dt

from tap_tester.base_suite_tests.pagination_test import PaginationTest
from tap_tester import connections, runner, menagerie, LOGGER

from base_new_frmwrk import FacebookBaseTest

fb_client = tc.TestClient()


class FacebookDiscoveryTest(PaginationTest, FacebookBaseTest):
"""Standard Pagination Test"""

@staticmethod
def name():
return "tt_facebook_pagination"
def streams_to_test(self):
# TODO ads_insights empty for account, no post via API, spike on generating data
return {'adcreative', 'ads', 'adsets', 'campaigns'}

def setUp(self): # pylint: disable=invalid-name
"""
Setup for tests in this module.
"""
if PaginationTest.synced_records and PaginationTest.record_count_by_stream:
return

# instantiate connection
conn_id = connections.ensure_connection(self)

# run check mode
found_catalogs = self.run_and_verify_check_mode(conn_id)

# table and field selection
test_catalogs = [catalog for catalog in found_catalogs
if catalog.get('stream_name') in self.streams_to_test()]

# non_selected_fields are none
self.perform_and_verify_table_and_field_selection(conn_id, test_catalogs)

# ensure there is enough data to paginate
start_date_dt = self.parse_date(self.get_properties()['start_date'])
bhtowles marked this conversation as resolved.
Show resolved Hide resolved
date_range = {'since': dt.strftime(start_date_dt, "%Y-%m-%d"),
'until': dt.strftime(dt.now(), "%Y-%m-%d")}

for stream in self.streams_to_test():
limit = self.expected_page_size(stream)
response = fb_client.get_account_objects(stream, limit, date_range)

number_of_records = len(response['data'])
if number_of_records >= limit and response.get('paging', {}).get('next'):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would think for pagination to occur the number of records cannot be equal to the limit. If the limit is 100 and there are 100 then it won't go to a second page.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not clear on what response.get('paging', {}).get('next'): does?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears as if the limit sent in the get request controls the number of records returned per page so I don't expect to get greater than 'limit' records back in a single response. I guess we should change this from ">=" to just "==". The "response.get('paging', {}).get('next'):" verifies that there is another page of data still left to get.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic should be in the get_account_objects. When the if statement is true we just continue to the next stream. Are we assuming because there is a next page, we have another record and are over the limit.

continue # stream is ready for test, no need for futher action

LOGGER.info(f"Stream: {stream} - Record count is less than max page size: {limit}, "
"posting more records to setUp the PaginationTest")

for i in range(limit - number_of_records + 1):
post_response = fb_client.create_account_objects(stream)
LOGGER.info(f"Posted {i + 1} new {stream}, new total: {number_of_records + i + 1}")
time.sleep(1)
bhtowles marked this conversation as resolved.
Show resolved Hide resolved

# run initial sync
PaginationTest.record_count_by_stream = self.run_and_verify_sync_mode(conn_id)
PaginationTest.synced_records = runner.get_records_from_target_output()