From 2de0c9ca165f373b5111163eb871a60f5cef023b Mon Sep 17 00:00:00 2001 From: jgaff Date: Tue, 10 Nov 2020 21:03:37 +0000 Subject: [PATCH 1/2] Stop feedstock backup, attempt to address GDrive errors again --- mdf_connect_server/config/dev.py | 18 ++- mdf_connect_server/processor/processor.py | 19 +-- mdf_connect_server/utils/utils.py | 153 ++++++++++++---------- 3 files changed, 95 insertions(+), 95 deletions(-) diff --git a/mdf_connect_server/config/dev.py b/mdf_connect_server/config/dev.py index 56ea394..b505d21 100644 --- a/mdf_connect_server/config/dev.py +++ b/mdf_connect_server/config/dev.py @@ -16,21 +16,19 @@ "LOCAL_EP": "ca7550ad-55a9-4762-b558-8f2b15049039", # Disable backups - "BACKUP_EP": False, + #"BACKUP_EP": False, # Petrel - # "BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec", - # "BACKUP_PATH": "/MDF/mdf_connect/dev/data/", - # "BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org", - # "BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/", + "BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec", + "BACKUP_PATH": "/MDF/mdf_connect/dev/data/", + "BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org", + "BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/", # NCSA # "BACKUP_EP": "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec", - "BACKUP_PATH": "/mdf_connect/dev/data/", - "BACKUP_HOST": "https://data.materialsdatafacility.org", - "BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/", - - # "GDRIVE_EP": "6ab13202-7c99-4e44-b0ff-04b8fbd77c97", + #"BACKUP_PATH": "/mdf_connect/dev/data/", + #"BACKUP_HOST": "https://data.materialsdatafacility.org", + #"BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/", "DEFAULT_CLEANUP": True, "FINAL_CLEANUP": True, diff --git a/mdf_connect_server/processor/processor.py b/mdf_connect_server/processor/processor.py index 9cb2de3..20e081f 100644 --- a/mdf_connect_server/processor/processor.py +++ b/mdf_connect_server/processor/processor.py @@ -517,23 +517,8 @@ def submission_driver(metadata, sub_conf, source_id, access_token, user_id): utils.complete_submission(source_id) return - # Back up feedstock - source_feed_loc = "globus://{}{}".format(CONFIG["LOCAL_EP"], feedstock_file) - backup_feed_loc = "globus://{}{}".format(CONFIG["BACKUP_EP"], - os.path.join(CONFIG["BACKUP_FEEDSTOCK"], - source_id + "_final.json")) - try: - feed_backup_res = list(utils.backup_data(mdf_transfer_client, source_feed_loc, - backup_feed_loc, acl=None))[-1] - if not feed_backup_res.get(backup_feed_loc, {}).get("success"): - raise ValueError(feed_backup_res.get(backup_feed_loc, {}).get("error")) - except Exception as e: - utils.update_status(source_id, "ingest_search", "R", - text=("Feedstock backup failed: {}".format(str(e))), - except_on_fail=True) - else: - utils.update_status(source_id, "ingest_search", "S", except_on_fail=True) - os.remove(feedstock_file) + utils.update_status(source_id, "ingest_search", "S", except_on_fail=True) + os.remove(feedstock_file) service_res["mdf_search"] = "This dataset was ingested to MDF Search." # Move files to data_destinations diff --git a/mdf_connect_server/utils/utils.py b/mdf_connect_server/utils/utils.py index 86525ff..6d8508c 100644 --- a/mdf_connect_server/utils/utils.py +++ b/mdf_connect_server/utils/utils.py @@ -630,79 +630,90 @@ def download_data(transfer_client, source_loc, local_ep, local_path, source_loc = [source_loc] # Download data locally + logger.debug("Sources: {}".format(source_loc)) for raw_loc in source_loc: location = old_normalize_globus_uri(raw_loc) + logger.debug("Location: {}".format(location)) loc_info = urllib.parse.urlparse(location) # Globus Transfer if loc_info.scheme == "globus": - # Use admin_client for GDrive Transfers - # User doesn't need permissions on MDF GDrive, we have those - # For all other cases use user's TC - tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"] - and admin_client is not None) else transfer_client - if filename: - transfer_path = os.path.join(local_path, filename) - else: - transfer_path = local_path - # Check that data not already in place - if not (loc_info.netloc == local_ep - and loc_info.path == transfer_path): - try: - if admin_client is not None: - # Edit ACL to allow pull - acl_rule = { - "DATA_TYPE": "access", - "principal_type": "identity", - "principal": user_id, - "path": local_path, - "permissions": "rw" - } - try: - acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data - except Exception as e: - logger.error("ACL rule creation exception for '{}': {}" - .format(acl_rule, repr(e))) - raise ValueError("Internal permissions error.") - if not acl_res.get("code") == "Created": - logger.error("Unable to create ACL rule '{}': {}" - .format(acl_rule, acl_res)) - raise ValueError("Internal permissions error.") - else: - acl_res = None - - # Transfer locally - transfer = mdf_toolbox.custom_transfer( - tc, loc_info.netloc, local_ep, - [(loc_info.path, transfer_path)], - interval=CONFIG["TRANSFER_PING_INTERVAL"], - inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False) - for event in transfer: - if not event["success"]: - logger.info("Transfer is_error: {} - {}" - .format(event.get("code", "No code found"), - event.get("description", "No description found"))) - yield { - "success": False, - "error": "{} - {}".format(event.get("code", "No code found"), - event.get("description", - "No description found")) + try: + # Use admin_client for GDrive Transfers + # User doesn't need permissions on MDF GDrive, we have those + # For all other cases use user's TC + tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"] + and admin_client is not None) else transfer_client + if filename: + transfer_path = os.path.join(local_path, filename) + else: + transfer_path = local_path + # Check that data not already in place + if not (loc_info.netloc == local_ep + and loc_info.path == transfer_path): + try: + if admin_client is not None: + # Edit ACL to allow pull + acl_rule = { + "DATA_TYPE": "access", + "principal_type": "identity", + "principal": user_id, + "path": local_path, + "permissions": "rw" } - if not event["success"]: - logger.error("Transfer failed: {}".format(event)) - raise ValueError(event) - finally: - if acl_res is not None: - try: - acl_del = admin_client.delete_endpoint_acl_rule( - local_ep, acl_res["access_id"]) - except Exception as e: - logger.critical("ACL rule deletion exception for '{}': {}" - .format(acl_res, repr(e))) - raise ValueError("Internal permissions error.") - if not acl_del.get("code") == "Deleted": - logger.critical("Unable to delete ACL rule '{}': {}" - .format(acl_res, acl_del)) - raise ValueError("Internal permissions error.") + try: + acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data + except Exception as e: + logger.error("ACL rule creation exception for '{}': {}" + .format(acl_rule, repr(e))) + raise ValueError("Internal permissions error.") + if not acl_res.get("code") == "Created": + logger.error("Unable to create ACL rule '{}': {}" + .format(acl_rule, acl_res)) + raise ValueError("Internal permissions error.") + else: + acl_res = None + + # Transfer locally + logger.debug("BEFORE TRANSFER") + logger.warning("Transferring:\nSource EP: {}\nDest EP: {}\nSource path: {}\nDest path: {}".format(loc_info.netloc, local_ep, loc_info.path, transfer_path)) + transfer = mdf_toolbox.custom_transfer( + tc, loc_info.netloc, local_ep, + [(loc_info.path, transfer_path)], + interval=CONFIG["TRANSFER_PING_INTERVAL"], + inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False) + for event in transfer: + if not event["success"]: + logger.info("Transfer is_error: {} - {}" + .format(event.get("code", "No code found"), + event.get("description", "No description found"))) + yield { + "success": False, + "error": "{} - {}".format(event.get("code", "No code found"), + event.get("description", + "No description found")) + } + if not event["success"]: + logger.error("Transfer failed: {}".format(event)) + raise ValueError(event) + finally: + if acl_res is not None: + try: + acl_del = admin_client.delete_endpoint_acl_rule( + local_ep, acl_res["access_id"]) + except Exception as e: + logger.critical("ACL rule deletion exception for '{}': {}" + .format(acl_res, repr(e))) + raise ValueError("Internal permissions error.") + if not acl_del.get("code") == "Deleted": + logger.critical("Unable to delete ACL rule '{}': {}" + .format(acl_res, acl_del)) + raise ValueError("Internal permissions error.") + except Exception as e: + # this is all temporary + import traceback + logger.critical("\nError {}: {}\n\nStacktrace: {}\n".format(str(e), repr(e), + traceback.format_exception(type(e), e, e.__traceback__))) + raise # HTTP(S) elif loc_info.scheme.startswith("http"): # Get default filename and extension @@ -820,6 +831,11 @@ def backup_data(transfer_client, storage_loc, backup_locs, acl=None, raise ValueError("Storage location '{}' (from '{}') is not a Globus Endpoint and cannot " "be directly published from or backed up from" .format(norm_store, storage_loc)) + # If the storage is the MDF GDrive EP, the data_client and data_user aren't necessary + # because the MDF client has read access already. + elif storage_info.netloc == CONFIG["GDRIVE_EP"]: + data_client = None + data_user = None for backup in backup_locs: error = "" @@ -1392,7 +1408,8 @@ def cancel_submission(source_id, wait=True): old_status_code = old_read_table("status", source_id)["status"]["code"] new_status_code = old_status_code.replace("z", "X").replace("W", "X") \ .replace("T", "X").replace("P", "X") - update_res = modify_status_entry(source_id, {"code": new_status_code}) + update_res = modify_status_entry(source_id, {"code": new_status_code, + "active": False}) if not update_res["success"]: return { "success": False, From 43b2890c5240a6b79e82a646bdd279ac720cc388 Mon Sep 17 00:00:00 2001 From: jgaff Date: Tue, 10 Nov 2020 21:09:10 +0000 Subject: [PATCH 2/2] Revert temporary testing code --- mdf_connect_server/config/dev.py | 16 ++-- mdf_connect_server/utils/utils.py | 145 ++++++++++++++---------------- 2 files changed, 75 insertions(+), 86 deletions(-) diff --git a/mdf_connect_server/config/dev.py b/mdf_connect_server/config/dev.py index b505d21..3b5b060 100644 --- a/mdf_connect_server/config/dev.py +++ b/mdf_connect_server/config/dev.py @@ -16,19 +16,19 @@ "LOCAL_EP": "ca7550ad-55a9-4762-b558-8f2b15049039", # Disable backups - #"BACKUP_EP": False, + "BACKUP_EP": False, # Petrel - "BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec", - "BACKUP_PATH": "/MDF/mdf_connect/dev/data/", - "BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org", - "BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/", + # "BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec", + # "BACKUP_PATH": "/MDF/mdf_connect/dev/data/", + # "BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org", + # "BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/", # NCSA # "BACKUP_EP": "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec", - #"BACKUP_PATH": "/mdf_connect/dev/data/", - #"BACKUP_HOST": "https://data.materialsdatafacility.org", - #"BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/", + "BACKUP_PATH": "/mdf_connect/dev/data/", + "BACKUP_HOST": "https://data.materialsdatafacility.org", + "BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/", "DEFAULT_CLEANUP": True, "FINAL_CLEANUP": True, diff --git a/mdf_connect_server/utils/utils.py b/mdf_connect_server/utils/utils.py index 6d8508c..d62fd67 100644 --- a/mdf_connect_server/utils/utils.py +++ b/mdf_connect_server/utils/utils.py @@ -630,90 +630,79 @@ def download_data(transfer_client, source_loc, local_ep, local_path, source_loc = [source_loc] # Download data locally - logger.debug("Sources: {}".format(source_loc)) for raw_loc in source_loc: location = old_normalize_globus_uri(raw_loc) - logger.debug("Location: {}".format(location)) loc_info = urllib.parse.urlparse(location) # Globus Transfer if loc_info.scheme == "globus": - try: - # Use admin_client for GDrive Transfers - # User doesn't need permissions on MDF GDrive, we have those - # For all other cases use user's TC - tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"] - and admin_client is not None) else transfer_client - if filename: - transfer_path = os.path.join(local_path, filename) - else: - transfer_path = local_path - # Check that data not already in place - if not (loc_info.netloc == local_ep - and loc_info.path == transfer_path): - try: - if admin_client is not None: - # Edit ACL to allow pull - acl_rule = { - "DATA_TYPE": "access", - "principal_type": "identity", - "principal": user_id, - "path": local_path, - "permissions": "rw" - } - try: - acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data - except Exception as e: - logger.error("ACL rule creation exception for '{}': {}" - .format(acl_rule, repr(e))) - raise ValueError("Internal permissions error.") - if not acl_res.get("code") == "Created": - logger.error("Unable to create ACL rule '{}': {}" - .format(acl_rule, acl_res)) - raise ValueError("Internal permissions error.") - else: - acl_res = None - - # Transfer locally - logger.debug("BEFORE TRANSFER") - logger.warning("Transferring:\nSource EP: {}\nDest EP: {}\nSource path: {}\nDest path: {}".format(loc_info.netloc, local_ep, loc_info.path, transfer_path)) - transfer = mdf_toolbox.custom_transfer( - tc, loc_info.netloc, local_ep, - [(loc_info.path, transfer_path)], - interval=CONFIG["TRANSFER_PING_INTERVAL"], - inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False) - for event in transfer: - if not event["success"]: - logger.info("Transfer is_error: {} - {}" - .format(event.get("code", "No code found"), - event.get("description", "No description found"))) - yield { - "success": False, - "error": "{} - {}".format(event.get("code", "No code found"), - event.get("description", - "No description found")) - } + # Use admin_client for GDrive Transfers + # User doesn't need permissions on MDF GDrive, we have those + # For all other cases use user's TC + tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"] + and admin_client is not None) else transfer_client + if filename: + transfer_path = os.path.join(local_path, filename) + else: + transfer_path = local_path + # Check that data not already in place + if not (loc_info.netloc == local_ep + and loc_info.path == transfer_path): + try: + if admin_client is not None: + # Edit ACL to allow pull + acl_rule = { + "DATA_TYPE": "access", + "principal_type": "identity", + "principal": user_id, + "path": local_path, + "permissions": "rw" + } + try: + acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data + except Exception as e: + logger.error("ACL rule creation exception for '{}': {}" + .format(acl_rule, repr(e))) + raise ValueError("Internal permissions error.") + if not acl_res.get("code") == "Created": + logger.error("Unable to create ACL rule '{}': {}" + .format(acl_rule, acl_res)) + raise ValueError("Internal permissions error.") + else: + acl_res = None + + # Transfer locally + transfer = mdf_toolbox.custom_transfer( + tc, loc_info.netloc, local_ep, + [(loc_info.path, transfer_path)], + interval=CONFIG["TRANSFER_PING_INTERVAL"], + inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False) + for event in transfer: if not event["success"]: - logger.error("Transfer failed: {}".format(event)) - raise ValueError(event) - finally: - if acl_res is not None: - try: - acl_del = admin_client.delete_endpoint_acl_rule( - local_ep, acl_res["access_id"]) - except Exception as e: - logger.critical("ACL rule deletion exception for '{}': {}" - .format(acl_res, repr(e))) - raise ValueError("Internal permissions error.") - if not acl_del.get("code") == "Deleted": - logger.critical("Unable to delete ACL rule '{}': {}" - .format(acl_res, acl_del)) - raise ValueError("Internal permissions error.") - except Exception as e: - # this is all temporary - import traceback - logger.critical("\nError {}: {}\n\nStacktrace: {}\n".format(str(e), repr(e), - traceback.format_exception(type(e), e, e.__traceback__))) - raise + logger.info("Transfer is_error: {} - {}" + .format(event.get("code", "No code found"), + event.get("description", "No description found"))) + yield { + "success": False, + "error": "{} - {}".format(event.get("code", "No code found"), + event.get("description", + "No description found")) + } + if not event["success"]: + logger.error("Transfer failed: {}".format(event)) + raise ValueError(event) + finally: + if acl_res is not None: + try: + acl_del = admin_client.delete_endpoint_acl_rule( + local_ep, acl_res["access_id"]) + except Exception as e: + logger.critical("ACL rule deletion exception for '{}': {}" + .format(acl_res, repr(e))) + raise ValueError("Internal permissions error.") + if not acl_del.get("code") == "Deleted": + logger.critical("Unable to delete ACL rule '{}': {}" + .format(acl_res, acl_del)) + raise ValueError("Internal permissions error.") # HTTP(S) elif loc_info.scheme.startswith("http"): # Get default filename and extension