Skip to content

Commit

Permalink
Merge pull request #1 from henrihem/main
Browse files Browse the repository at this point in the history
add_multiple_entries_to_manifest
  • Loading branch information
henrihem authored Aug 16, 2022
2 parents 27b6464 + 0a70437 commit b852a39
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions adenotifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from requests.adapters import HTTPAdapter
from urllib3.util import Retry
from .manifest import Manifest
from typing import List, Set, Dict, Tuple, Optional

def search_manifests(source_system_name: str, source_entity_name: str, base_url: str, notify_api_key: str, notify_api_key_secret: str, state: str):
"""Searches manifests from ADE Notify API.
Expand Down Expand Up @@ -165,6 +166,70 @@ def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key
notify_manifests(source, base_url, notify_api_key, notify_api_key_secret)
return

def add_multiple_entries_to_manifest(entries: List[dict], source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str):
"""Utilizes Manifest class and other functions to add the given file_url to a manifest for the given configured data source.
Args:
entries (list): Source file url.
source (object): Data source configuration JSON object. See notifier documentation for format & required attributes.
base_url (str): ADE Notify API base url, e.g. https://external-api.{environment}.datahub.{tenant}.saas.agiledataengine.com:443/notify-api.
notify_api_key (str): ADE Notify API key.
notify_api_key_secret (str): ADE Notify API key secret.
"""
# Initialize a manifest object with mandatory attributes.
manifest = Manifest(
base_url = base_url,
source_system_name = source['attributes']['ade_source_system'],
source_entity_name = source['attributes']['ade_source_entity'],
format = source['manifest_parameters']['format'],
notify_api_key = notify_api_key,
notify_api_key_secret = notify_api_key_secret
)

# Set optional manifest attributes if configured in data source.
if ('columns' in source['manifest_parameters']):
manifest.columns = source['manifest_parameters']['columns']
if ('compression' in source['manifest_parameters']):
manifest.compression = source['manifest_parameters']['compression']
if ('delim' in source['manifest_parameters']):
manifest.delim = source['manifest_parameters']['delim']
if ('fullscanned' in source['manifest_parameters']):
manifest.fullscanned = source['manifest_parameters']['fullscanned']
if ('skiph' in source['manifest_parameters']):
manifest.skiph = source['manifest_parameters']['skiph']

# Create a new manifest.
manifest.create()
logging.info('Manifest created: {0}'.format(manifest.id))

if ('path_replace' in source['attributes'] and 'path_replace_with' in source['attributes']):
# Modify manifest entry file url if configured.
for entry in entries:
entry['sourceFile'] = entry['sourceFile'].replace(
source['attributes']['path_replace'], source['attributes']['path_replace_with'])

if ('batch_from_file_path_regex' in source['attributes']):
# Parse entry specific batch number from file name if configured.
try:
for entry_batch in entries:
entry_batch['batch'] = parse_batch(entry['sourceFile'], source['attributes']['batch_from_file_path_regex'])
logging.info('Batch: {0}'.format(batch))
except Exception as e:
batch = None
logging.warning('Batch parsing failed:\n{0}'.format(e))
else:
batch = None

# Add entry to manifest.
manifest.add_entries(entries)
logging.info('Added entries: {0}'.format(entries))

manifest.notify(manifest.id)

return


def notify_manifests(source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str):
"""Utilizes Manifest class and other functions to notify all open manifests for the given configured data source.
Expand Down

0 comments on commit b852a39

Please sign in to comment.