Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add selected-by-default #8

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ check_prereqs:
bash -c 'dpkg -l libsnappy-dev >/dev/null 2>&1'

test: check_prereqs
pylint tap_heap --disable too-few-public-methods,missing-docstring,protected-access,no-else-return
pylint tap_heap --disable too-few-public-methods,missing-docstring,protected-access
python -m unittest discover
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
install_requires=[
'boto3==1.9.57',
'singer-encodings==0.0.3',
'singer-python==5.1.5',
'singer-python==5.5.1',
'python-snappy==0.5.3',
'fastavro==0.21.8'
],
Expand Down
15 changes: 14 additions & 1 deletion tap_heap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

def do_discover(config):
LOGGER.info("Starting discover")
streams = discover_streams(config['bucket'])

streams = discover_streams(config['bucket'], config.get('selected_by_default', False))
if not streams:
raise Exception("No streams found")
catalog = {"streams": streams}
Expand All @@ -31,12 +32,24 @@ def stream_is_selected(mdata):
return mdata.get((), {}).get('selected', False)


def convert_selected_by_default_metadata(catalog):
for stream in catalog['streams']:
for med in stream.get('metadata'):
is_selected = med.get('metadata', {}).get('selected')
is_selected_by_default = med.get('metadata', {}).get('selected-by-default', False)
if is_selected_by_default and is_selected is None:
med['metadata']['selected'] = True


def do_sync(config, catalog, state):
LOGGER.info('Starting sync.')

bucket = config['bucket']
merged_manifests = manifest.generate_merged_manifests(bucket)

# Convert all selected-by-default metadata into selected: True
convert_selected_by_default_metadata(catalog)

for stream in catalog['streams']:
stream_name = stream['tap_stream_id']
mdata = metadata.to_map(stream['metadata'])
Expand Down
22 changes: 15 additions & 7 deletions tap_heap/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,45 @@
from tap_heap import manifest
from tap_heap.schema import generate_fake_schema

def discover_streams(bucket):
def discover_streams(bucket, selected_by_default=False):
streams = []

merged_manifests = manifest.generate_merged_manifests(bucket)
for table_name, manifest_table in merged_manifests.items():
schema = generate_fake_schema(manifest_table)
streams.append({'stream': table_name, 'tap_stream_id': table_name,
'schema': schema, 'metadata': load_metadata(table_name, schema)})
streams.append({'stream': table_name,
'tap_stream_id': table_name,
'schema': schema,
'metadata': load_metadata(table_name, schema, selected_by_default)})

return streams


def get_key_properties(table_name):
if table_name == 'user_migrations':
return ['from_user_id']
elif table_name == 'users':
if table_name == 'users':
return ['user_id']
else:
return ['event_id']
return ['event_id']


def load_metadata(table_name, schema):
def load_metadata(table_name, schema, selected_by_default=False):
mdata = metadata.new()

key_properties = get_key_properties(table_name)
mdata = metadata.write(mdata, (), 'table-key-properties', key_properties)
if selected_by_default:
mdata = metadata.write(mdata, (), 'selected-by-default', True)

for field_name in schema.get('properties', {}).keys():
if field_name in key_properties:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'automatic')
else:
mdata = metadata.write(mdata, ('properties', field_name), 'inclusion', 'available')
if selected_by_default:
mdata = metadata.write(mdata,
('properties', field_name),
'selected-by-default',
True)

return metadata.to_list(mdata)