Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shop info to the records #73

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions tap_shopify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@

REQUIRED_CONFIG_KEYS = ["shop", "api_key"]
LOGGER = singer.get_logger()
SDC_KEYS = {'id': 'integer', 'name': 'string', 'myshopify_domain': 'string'}


def initialize_shopify_client():
api_key = Context.config['api_key']
shop = Context.config['shop']
version = '2020-07'
session = shopify.Session(shop, version, api_key)
shopify.ShopifyResource.activate_session(session)
return shopify.Shop.current().attributes


def get_abs_path(path):
return os.path.join(os.path.dirname(os.path.realpath(__file__)), path)
Expand Down Expand Up @@ -68,6 +72,13 @@ def load_schema_references():

return refs


def add_synthetic_keys_to_schema(schema):
for k in SDC_KEYS:
schema['properties']['_sdc_shop_' + k] = {'type': SDC_KEYS[k]}
return schema


def discover():
raw_schemas = load_schemas()
streams = []
Expand All @@ -79,12 +90,14 @@ def discover():

stream = Context.stream_objects[schema_name]()

catalog_schema = add_synthetic_keys_to_schema(singer.resolve_schema_references(schema, refs))

# create and add catalog entry
catalog_entry = {
'stream': schema_name,
'tap_stream_id': schema_name,
'schema': singer.resolve_schema_references(schema, refs),
'metadata' : get_discovery_metadata(stream, schema),
'schema': catalog_schema,
'metadata': get_discovery_metadata(stream, schema),
'key_properties': stream.key_properties,
'replication_key': stream.replication_key,
'replication_method': stream.replication_method
Expand All @@ -107,7 +120,8 @@ def shuffle_streams(stream_name):
Context.catalog["streams"] = top_half + bottom_half

def sync():
initialize_shopify_client()
shop_attributes = initialize_shopify_client()
sdc_fields = {"_sdc_shop_" + x: shop_attributes[x] for x in SDC_KEYS}

# Emit all schemas first so we have them for child streams
for stream in Context.catalog["streams"]:
Expand Down Expand Up @@ -144,7 +158,7 @@ def sync():
extraction_time = singer.utils.now()
record_schema = catalog_entry['schema']
record_metadata = metadata.to_map(catalog_entry['metadata'])
rec = transformer.transform(rec, record_schema, record_metadata)
rec = transformer.transform({**rec, **sdc_fields}, record_schema, record_metadata)
singer.write_record(stream_id,
rec,
time_extracted=extraction_time)
Expand Down