Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Primary keys #44

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 70 additions & 66 deletions tap_bing_ads/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
def get_user_agent():
return CONFIG.get('user_agent', DEFAULT_USER_AGENT)

def get_table_key_properties(catalog_item):
mdata = metadata.to_map(catalog_item.metadata)
return metadata.get(mdata, (), 'table-key-properties')

class InvalidDateRangeEnd(Exception):
pass

Expand Down Expand Up @@ -166,10 +170,10 @@ def get_json_schema(element):
if xml_type in ['dateTime', 'date']:
_format = 'date-time'

schema = {'type': types}
schema = singer.Schema(type=types)

if _format:
schema['format'] = _format
schema.format = _format

return schema

Expand All @@ -180,19 +184,12 @@ def get_array_type(array_type):
# complex type
items = xml_type # will be filled in fill_in_nested_types
else:
items = {
'type': json_type
}
items = json_type
items = singer.Schema(type=items)

array_obj = {
'type': ['null', 'object'],
'properties': {}
}
array_obj = singer.Schema(type=['null', 'object'], properties={})

array_obj['properties'][xml_type] = {
'type': ['null', 'array'],
'items': items
}
array_obj.properties[xml_type] = singer.Schema(type=['null', 'array'], items=items)

return array_obj

Expand Down Expand Up @@ -233,21 +230,14 @@ def wsdl_type_to_schema(inherited_types, wsdl_type):
else:
properties[element.name] = get_json_schema(element)

return {
'type': ['null', 'object'],
'additionalProperties': False,
'properties': properties
}
return singer.Schema(type=['null', 'object'], additionalProperties=False, properties=properties)

def combine_object_schemas(schemas):
properties = {}
for schema in schemas:
for prop, prop_schema in schema['properties'].items():
for prop, prop_schema in schema.properties.items():
properties[prop] = prop_schema
return {
'type': ['object'],
'properties': properties
}
return singer.Schema(type=['object'], properties=properties)

def normalize_abstract_types(inherited_types, type_map):
for base_type, types in inherited_types.items():
Expand All @@ -261,14 +251,15 @@ def normalize_abstract_types(inherited_types, type_map):
if base_type in TOP_LEVEL_CORE_OBJECTS:
type_map[base_type] = combine_object_schemas(schemas)
else:
type_map[base_type] = {'anyOf': schemas}
type_map[base_type] = singer.Schema(anyOf=schemas)

def fill_in_nested_types(type_map, schema):
if 'properties' in schema:
for prop, descriptor in schema['properties'].items():
schema['properties'][prop] = fill_in_nested_types(type_map, descriptor)
elif 'items' in schema:
schema['items'] = fill_in_nested_types(type_map, schema['items'])
if hasattr(schema, 'properties') and schema.properties:
for prop, descriptor in schema.properties.items():
schema.properties[prop] = fill_in_nested_types(type_map, descriptor)
elif hasattr(schema, 'items') and schema.items:
items = fill_in_nested_types(type_map, schema.items)
schema = singer.Schema(type=schema.type, items=items)
else:
if isinstance(schema, str) and schema in type_map:
return type_map[schema]
Expand All @@ -292,33 +283,44 @@ def get_type_map(client):

return type_map

def get_stream_def(stream_name, schema, stream_metadata=None, pks=None, replication_key=None):

def get_stream_def(stream_name, schema, stream_metadata=None, key_properties=None, replication_key=None):

stream_def = {
'tap_stream_id': stream_name,
'stream': stream_name,
'schema': schema
'schema': schema,
'key_properties': key_properties
}

excluded_inclusion_fields = []
if pks:
stream_def['key_properties'] = pks
excluded_inclusion_fields = pks

if key_properties:
excluded_inclusion_fields = [("properties", field) for field in key_properties]
if replication_key:
stream_def['replication_key'] = replication_key
stream_def['replication_method'] = 'INCREMENTAL'
excluded_inclusion_fields += [replication_key]
# replication_method = 'INCREMENTAL'
excluded_inclusion_fields += ("properties", replication_key)
valid_replication_keys = [replication_key]
else:
stream_def['replication_method'] = 'FULL_TABLE'
valid_replication_keys = None
# replication_method = 'FULL_TABLE'

if stream_metadata:
stream_def['metadata'] = stream_metadata
mdata = stream_metadata
else:
stream_def['metadata'] = list(map(
lambda field: {"metadata": {"inclusion": "available"}, "breadcrumb": ["properties", field]},
(schema['properties'].keys() - excluded_inclusion_fields)))

return stream_def
mdata = metadata.get_standard_metadata(schema=schema.to_dict(),
key_properties=key_properties,
valid_replication_keys=valid_replication_keys)
# Forcing a replication key does not adhere
# to Singer Best Practices
# replication_method=replication_method)
mdata = metadata.to_map(mdata)
for breadcrumb in list(mdata.keys()):
if breadcrumb in excluded_inclusion_fields:
del mdata[breadcrumb]
mdata = metadata.to_list(mdata)
stream_def['metadata'] = mdata

return singer.catalog.CatalogEntry(**stream_def)

def get_core_schema(client, obj):
type_map = get_type_map(client)
Expand All @@ -332,19 +334,19 @@ def discover_core_objects():

account_schema = get_core_schema(client, 'AdvertiserAccount')
core_object_streams.append(
get_stream_def('accounts', account_schema, pks=['Id'], replication_key='LastModifiedTime'))
get_stream_def('accounts', account_schema, key_properties=['Id'], replication_key='LastModifiedTime'))

LOGGER.info('Initializing CampaignManagementService client - Loading WSDL')
client = CustomServiceClient('CampaignManagementService')

campaign_schema = get_core_schema(client, 'Campaign')
core_object_streams.append(get_stream_def('campaigns', campaign_schema, pks=['Id']))
core_object_streams.append(get_stream_def('campaigns', campaign_schema, key_properties=['Id']))

ad_group_schema = get_core_schema(client, 'AdGroup')
core_object_streams.append(get_stream_def('ad_groups', ad_group_schema, pks=['Id']))
core_object_streams.append(get_stream_def('ad_groups', ad_group_schema, key_properties=['Id']))

ad_schema = get_core_schema(client, 'Ad')
core_object_streams.append(get_stream_def('ads', ad_schema, pks=['Id']))
core_object_streams.append(get_stream_def('ads', ad_schema, key_properties=['Id']))

return core_object_streams

Expand All @@ -371,18 +373,14 @@ def get_report_schema(client, report_name):
else:
col_schema = {'type': ['null', _type]}

properties[column] = col_schema
properties[column] = singer.Schema.from_dict(col_schema)

properties['_sdc_report_datetime'] = {
properties['_sdc_report_datetime'] = singer.Schema.from_dict({
'type': 'string',
'format': 'date-time'
}
})

return {
'properties': properties,
'additionalProperties': False,
'type': 'object'
}
return singer.schema.Schema(type='object', properties=properties, additionalProperties=None)

def metadata_fn(report_name, field, required_fields):
if field in required_fields:
Expand All @@ -408,7 +406,7 @@ def get_report_metadata(report_name, report_schema):

return list(map(
lambda field: metadata_fn(report_name, field, required_fields),
report_schema['properties']))
report_schema.properties))

def discover_reports():
report_streams = []
Expand Down Expand Up @@ -448,8 +446,7 @@ def do_discover(account_ids):
LOGGER.info('Discovering reports')
report_streams = discover_reports()

json.dump({'streams': core_object_streams + report_streams}, sys.stdout, indent=2)

singer.catalog.write_catalog(singer.Catalog(core_object_streams + report_streams))

def check_for_invalid_selections(prop, mdata, invalid_selections):
field_exclusions = metadata.get(mdata, ('properties', prop), 'fieldExclusions')
Expand Down Expand Up @@ -504,7 +501,8 @@ def sync_accounts_stream(account_ids, catalog_item):
LOGGER.info('Initializing CustomerManagementService client - Loading WSDL')
client = CustomServiceClient('CustomerManagementService')
account_schema = get_core_schema(client, 'AdvertiserAccount')
singer.write_schema('accounts', account_schema, ['Id'])
pks = get_table_key_properties(catalog_item) or catalog_item.key_properties
singer.write_schema('accounts', account_schema, pks)

for account_id in account_ids:
client = create_sdk_client('CustomerManagementService', account_id)
Expand Down Expand Up @@ -534,7 +532,9 @@ def sync_campaigns(client, account_id, selected_streams):

if 'campaigns' in selected_streams:
selected_fields = get_selected_fields(selected_streams['campaigns'])
singer.write_schema('campaigns', get_core_schema(client, 'Campaign'), ['Id'])
schema = get_core_schema(client, 'Campaign')
pks = get_table_key_properties(selected_streams['campaigns']) or selected_streams['campaigns'].key_properties
singer.write_schema('campaigns', schema, pks)
with metrics.record_counter('campaigns') as counter:
singer.write_records('campaigns',
filter_selected_fields_many(selected_fields, campaigns))
Expand All @@ -555,7 +555,9 @@ def sync_ad_groups(client, account_id, campaign_ids, selected_streams):
LOGGER.info('Syncing AdGroups for Account: {}, Campaign: {}'.format(
account_id, campaign_id))
selected_fields = get_selected_fields(selected_streams['ad_groups'])
singer.write_schema('ad_groups', get_core_schema(client, 'AdGroup'), ['Id'])
schema = get_core_schema(client, 'AdGroup')
pks = get_table_key_properties(selected_streams['ad_groups']) or selected_streams['ad_groups'].key_properties
singer.write_schema('ad_groups', schema, pks)
with metrics.record_counter('ad_groups') as counter:
singer.write_records('ad_groups',
filter_selected_fields_many(selected_fields, ad_groups))
Expand All @@ -582,7 +584,9 @@ def sync_ads(client, selected_streams, ad_group_ids):

if 'Ad' in response_dict:
selected_fields = get_selected_fields(selected_streams['ads'])
singer.write_schema('ads', get_core_schema(client, 'Ad'), ['Id'])
schema = get_core_schema(client, 'Ad')
pks = get_table_key_properties(selected_streams['ads']) or selected_streams['ads'].key_properties
singer.write_schema('ads', schema, pks)
with metrics.record_counter('ads') as counter:
ads = response_dict['Ad']
singer.write_records('ads', filter_selected_fields_many(selected_fields, ads))
Expand Down Expand Up @@ -749,7 +753,8 @@ async def sync_report_interval(client, account_id, report_stream,
report_name = stringcase.pascalcase(report_stream.stream)

report_schema = get_report_schema(client, report_name)
singer.write_schema(report_stream.stream, report_schema, [])
pks = get_table_key_properties(report_stream) or report_stream.key_properties
singer.write_schema(report_stream.stream, report_schema, pks=pks)

report_time = arrow.get().isoformat()

Expand All @@ -763,7 +768,6 @@ async def sync_report_interval(client, account_id, report_stream,
try:
success, download_url = await poll_report(client, account_id, report_name,
start_date, end_date, request_id)

except Exception as some_error:
LOGGER.info('The request_id %s for %s is invalid, generating a new one',
request_id,
Expand Down