Skip to content

Commit

Permalink
Adding logic to max_file_in_manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
henrihem committed Apr 29, 2022
1 parent 3e6123a commit 4384e21
Showing 1 changed file with 66 additions and 16 deletions.
82 changes: 66 additions & 16 deletions adenotifier/notifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,36 @@ def search_manifests(source_system_name: str, source_entity_name: str, state: st

return manifest_ids

def search_manifest_entries(source_system_name: str, source_entity_name: str, base_url: str, notify_api_key: str, notify_api_key_secret: str, manifest_id: str):
"""Searches manifest entries from ADE Notify API.
Args:
source_system_name (str): Source system name defined in ADE source entity.
source_entity_name (str): ADE source entity name.
base_url (str): ADE Notify API base url, e.g. https://external-api.{environment}.datahub.{tenant}.saas.agiledataengine.com:443/notify-api.
notify_api_key (str): ADE Notify API key.
notify_api_key_secret (str): ADE Notify API key secret.
manifest_id (str): Manifest ID that will be used to search manifest entries
Returns:
List [dict] of manifest entries.
"""
session = requests.Session()
session.auth = (notify_api_key, notify_api_key_secret)
session.headers.update({"Content-Type": "application/json"})
session.mount('https://', HTTPAdapter(max_retries=Retry(total=3, status_forcelist=[429, 500, 502, 503, 504], backoff_factor=2))) # HTTP request retry settings.
request_url = "{0}/tenants/local/installations/local/environments/local/source-systems/{1}/source-entities/{2}/manifests/{3}/entries"\
.format(base_url, source_system_name, source_entity_name, manifest_id)

try:
response = session.get(request_url)
except Exception as e:
raise Exception(e)

manifest_entries = response.json()

return manifest_entries

def parse_batch(file_url: str, regexp: str):
"""Parses batch number from given file url with given regular expression.
Expand Down Expand Up @@ -92,27 +122,47 @@ def add_to_manifest(file_url: str, source: object, base_url: str, notify_api_key
notify_api_key_secret = notify_api_key_secret
)

# Set optional manifest attributes if configured in data source.
if ('columns' in source['manifest_parameters']):
manifest.columns = source['manifest_parameters']['columns']
if ('compression' in source['manifest_parameters']):
manifest.compression = source['manifest_parameters']['compression']
if ('delim' in source['manifest_parameters']):
manifest.delim = source['manifest_parameters']['delim']
if ('fullscanned' in source['manifest_parameters']):
manifest.fullscanned = source['manifest_parameters']['fullscanned']
if ('skiph' in source['manifest_parameters']):
manifest.skiph = source['manifest_parameters']['skiph']

if (open_manifests == []):
# Create a new manifest if open manifests are not found.
# Set optional manifest attributes if configured in data source.
if ('columns' in source['manifest_parameters']):
manifest.columns = source['manifest_parameters']['columns']
if ('compression' in source['manifest_parameters']):
manifest.compression = source['manifest_parameters']['compression']
if ('delim' in source['manifest_parameters']):
manifest.delim = source['manifest_parameters']['delim']
if ('fullscanned' in source['manifest_parameters']):
manifest.fullscanned = source['manifest_parameters']['fullscanned']
if ('skiph' in source['manifest_parameters']):
manifest.skiph = source['manifest_parameters']['skiph']

manifest.create()
logging.info('Manifest created: {0}'.format(manifest.id))

else:
# Use latest existing manifest if open manifests are found.
manifest.fetch_manifest(open_manifests[-1])
logging.info('Using open manifest: {0}'.format(manifest.id))
if ('max_files_in_manifest' in source['attributes']):
manifest.fetch_manifest(open_manifests[-1])

manifest_entries = search_manifest_entries(
source_system_name = source['attributes']['ade_source_system'],
source_entity_name = source['attributes']['ade_source_entity'],
base_url = base_url,
notify_api_key = notify_api_key,
notify_api_key_secret = notify_api_key_secret,
manifest_id = manifest.id[0]
)

if (len(manifest_entries) >= source['attributes']['max_files_in_manifest']):
# Create a new manifest if current manifest has already reached max files limit
manifest.create()
logging.info('Manifest created: {0}'.format(manifest.id))
else:
# Use latest existing manifest if open manifests are found and max_files_in_manifest not reached.
manifest.fetch_manifest(open_manifests[-1])
logging.info('Using open manifest: {0}'.format(manifest.id))
else:
# Use latest existing manifest if open manifests are found.
manifest.fetch_manifest(open_manifests[-1])
logging.info('Using open manifest: {0}'.format(manifest.id))

if ('path_replace' in source['attributes'] and 'path_replace_with' in source['attributes']):
# Modify manifest entry file url if configured.
Expand Down

0 comments on commit 4384e21

Please sign in to comment.