diff --git a/adenotifier/notifier.py b/adenotifier/notifier.py index d2e334f..aff27d3 100644 --- a/adenotifier/notifier.py +++ b/adenotifier/notifier.py @@ -4,6 +4,7 @@ from requests.adapters import HTTPAdapter from urllib3.util import Retry from .manifest import Manifest +from typing import List, Set, Dict, Tuple, Optional def search_manifests(source_system_name: str, source_entity_name: str, base_url: str, notify_api_key: str, notify_api_key_secret: str, state: str): """Searches manifests from ADE Notify API. @@ -165,6 +166,70 @@ def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key notify_manifests(source, base_url, notify_api_key, notify_api_key_secret) return +def add_multiple_entries_to_manifest(entries: List[dict], source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str): + """Utilizes Manifest class and other functions to add the given file_url to a manifest for the given configured data source. + + Args: + entries (list): Source file url. + source (object): Data source configuration JSON object. See notifier documentation for format & required attributes. + base_url (str): ADE Notify API base url, e.g. https://external-api.{environment}.datahub.{tenant}.saas.agiledataengine.com:443/notify-api. + notify_api_key (str): ADE Notify API key. + notify_api_key_secret (str): ADE Notify API key secret. + + """ + # Initialize a manifest object with mandatory attributes. + manifest = Manifest( + base_url = base_url, + source_system_name = source['attributes']['ade_source_system'], + source_entity_name = source['attributes']['ade_source_entity'], + format = source['manifest_parameters']['format'], + notify_api_key = notify_api_key, + notify_api_key_secret = notify_api_key_secret + ) + + # Set optional manifest attributes if configured in data source. + if ('columns' in source['manifest_parameters']): + manifest.columns = source['manifest_parameters']['columns'] + if ('compression' in source['manifest_parameters']): + manifest.compression = source['manifest_parameters']['compression'] + if ('delim' in source['manifest_parameters']): + manifest.delim = source['manifest_parameters']['delim'] + if ('fullscanned' in source['manifest_parameters']): + manifest.fullscanned = source['manifest_parameters']['fullscanned'] + if ('skiph' in source['manifest_parameters']): + manifest.skiph = source['manifest_parameters']['skiph'] + + # Create a new manifest. + manifest.create() + logging.info('Manifest created: {0}'.format(manifest.id)) + + if ('path_replace' in source['attributes'] and 'path_replace_with' in source['attributes']): + # Modify manifest entry file url if configured. + for entry in entries: + entry['sourceFile'] = entry['sourceFile'].replace( + source['attributes']['path_replace'], source['attributes']['path_replace_with']) + + if ('batch_from_file_path_regex' in source['attributes']): + # Parse entry specific batch number from file name if configured. + try: + for entry_batch in entries: + entry_batch['batch'] = parse_batch(entry['sourceFile'], source['attributes']['batch_from_file_path_regex']) + logging.info('Batch: {0}'.format(batch)) + except Exception as e: + batch = None + logging.warning('Batch parsing failed:\n{0}'.format(e)) + else: + batch = None + + # Add entry to manifest. + manifest.add_entries(entries) + logging.info('Added entries: {0}'.format(entries)) + + manifest.notify(manifest.id) + + return + + def notify_manifests(source: object, base_url: str, notify_api_key: str, notify_api_key_secret: str): """Utilizes Manifest class and other functions to notify all open manifests for the given configured data source.