Skip to content

Commit

Permalink
Stop feedstock backup, attempt to address GDrive errors again
Browse files Browse the repository at this point in the history
  • Loading branch information
jgaff committed Nov 10, 2020
1 parent 1d52010 commit 2de0c9c
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 95 deletions.
18 changes: 8 additions & 10 deletions mdf_connect_server/config/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,19 @@
"LOCAL_EP": "ca7550ad-55a9-4762-b558-8f2b15049039",

# Disable backups
"BACKUP_EP": False,
#"BACKUP_EP": False,

# Petrel
# "BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec",
# "BACKUP_PATH": "/MDF/mdf_connect/dev/data/",
# "BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org",
# "BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/",
"BACKUP_EP": "e38ee745-6d04-11e5-ba46-22000b92c6ec",
"BACKUP_PATH": "/MDF/mdf_connect/dev/data/",
"BACKUP_HOST": "https://e38ee745-6d04-11e5-ba46-22000b92c6ec.e.globus.org",
"BACKUP_FEEDSTOCK": "/MDF/mdf_connect/dev/feedstock/",

# NCSA
# "BACKUP_EP": "82f1b5c6-6e9b-11e5-ba47-22000b92c6ec",
"BACKUP_PATH": "/mdf_connect/dev/data/",
"BACKUP_HOST": "https://data.materialsdatafacility.org",
"BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/",

# "GDRIVE_EP": "6ab13202-7c99-4e44-b0ff-04b8fbd77c97",
#"BACKUP_PATH": "/mdf_connect/dev/data/",
#"BACKUP_HOST": "https://data.materialsdatafacility.org",
#"BACKUP_FEEDSTOCK": "/mdf_connect/dev/feedstock/",

"DEFAULT_CLEANUP": True,
"FINAL_CLEANUP": True,
Expand Down
19 changes: 2 additions & 17 deletions mdf_connect_server/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,23 +517,8 @@ def submission_driver(metadata, sub_conf, source_id, access_token, user_id):
utils.complete_submission(source_id)
return

# Back up feedstock
source_feed_loc = "globus://{}{}".format(CONFIG["LOCAL_EP"], feedstock_file)
backup_feed_loc = "globus://{}{}".format(CONFIG["BACKUP_EP"],
os.path.join(CONFIG["BACKUP_FEEDSTOCK"],
source_id + "_final.json"))
try:
feed_backup_res = list(utils.backup_data(mdf_transfer_client, source_feed_loc,
backup_feed_loc, acl=None))[-1]
if not feed_backup_res.get(backup_feed_loc, {}).get("success"):
raise ValueError(feed_backup_res.get(backup_feed_loc, {}).get("error"))
except Exception as e:
utils.update_status(source_id, "ingest_search", "R",
text=("Feedstock backup failed: {}".format(str(e))),
except_on_fail=True)
else:
utils.update_status(source_id, "ingest_search", "S", except_on_fail=True)
os.remove(feedstock_file)
utils.update_status(source_id, "ingest_search", "S", except_on_fail=True)
os.remove(feedstock_file)
service_res["mdf_search"] = "This dataset was ingested to MDF Search."

# Move files to data_destinations
Expand Down
153 changes: 85 additions & 68 deletions mdf_connect_server/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -630,79 +630,90 @@ def download_data(transfer_client, source_loc, local_ep, local_path,
source_loc = [source_loc]

# Download data locally
logger.debug("Sources: {}".format(source_loc))
for raw_loc in source_loc:
location = old_normalize_globus_uri(raw_loc)
logger.debug("Location: {}".format(location))
loc_info = urllib.parse.urlparse(location)
# Globus Transfer
if loc_info.scheme == "globus":
# Use admin_client for GDrive Transfers
# User doesn't need permissions on MDF GDrive, we have those
# For all other cases use user's TC
tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"]
and admin_client is not None) else transfer_client
if filename:
transfer_path = os.path.join(local_path, filename)
else:
transfer_path = local_path
# Check that data not already in place
if not (loc_info.netloc == local_ep
and loc_info.path == transfer_path):
try:
if admin_client is not None:
# Edit ACL to allow pull
acl_rule = {
"DATA_TYPE": "access",
"principal_type": "identity",
"principal": user_id,
"path": local_path,
"permissions": "rw"
}
try:
acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data
except Exception as e:
logger.error("ACL rule creation exception for '{}': {}"
.format(acl_rule, repr(e)))
raise ValueError("Internal permissions error.")
if not acl_res.get("code") == "Created":
logger.error("Unable to create ACL rule '{}': {}"
.format(acl_rule, acl_res))
raise ValueError("Internal permissions error.")
else:
acl_res = None

# Transfer locally
transfer = mdf_toolbox.custom_transfer(
tc, loc_info.netloc, local_ep,
[(loc_info.path, transfer_path)],
interval=CONFIG["TRANSFER_PING_INTERVAL"],
inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False)
for event in transfer:
if not event["success"]:
logger.info("Transfer is_error: {} - {}"
.format(event.get("code", "No code found"),
event.get("description", "No description found")))
yield {
"success": False,
"error": "{} - {}".format(event.get("code", "No code found"),
event.get("description",
"No description found"))
try:
# Use admin_client for GDrive Transfers
# User doesn't need permissions on MDF GDrive, we have those
# For all other cases use user's TC
tc = admin_client if (loc_info.netloc == CONFIG["GDRIVE_EP"]
and admin_client is not None) else transfer_client
if filename:
transfer_path = os.path.join(local_path, filename)
else:
transfer_path = local_path
# Check that data not already in place
if not (loc_info.netloc == local_ep
and loc_info.path == transfer_path):
try:
if admin_client is not None:
# Edit ACL to allow pull
acl_rule = {
"DATA_TYPE": "access",
"principal_type": "identity",
"principal": user_id,
"path": local_path,
"permissions": "rw"
}
if not event["success"]:
logger.error("Transfer failed: {}".format(event))
raise ValueError(event)
finally:
if acl_res is not None:
try:
acl_del = admin_client.delete_endpoint_acl_rule(
local_ep, acl_res["access_id"])
except Exception as e:
logger.critical("ACL rule deletion exception for '{}': {}"
.format(acl_res, repr(e)))
raise ValueError("Internal permissions error.")
if not acl_del.get("code") == "Deleted":
logger.critical("Unable to delete ACL rule '{}': {}"
.format(acl_res, acl_del))
raise ValueError("Internal permissions error.")
try:
acl_res = admin_client.add_endpoint_acl_rule(local_ep, acl_rule).data
except Exception as e:
logger.error("ACL rule creation exception for '{}': {}"
.format(acl_rule, repr(e)))
raise ValueError("Internal permissions error.")
if not acl_res.get("code") == "Created":
logger.error("Unable to create ACL rule '{}': {}"
.format(acl_rule, acl_res))
raise ValueError("Internal permissions error.")
else:
acl_res = None

# Transfer locally
logger.debug("BEFORE TRANSFER")
logger.warning("Transferring:\nSource EP: {}\nDest EP: {}\nSource path: {}\nDest path: {}".format(loc_info.netloc, local_ep, loc_info.path, transfer_path))
transfer = mdf_toolbox.custom_transfer(
tc, loc_info.netloc, local_ep,
[(loc_info.path, transfer_path)],
interval=CONFIG["TRANSFER_PING_INTERVAL"],
inactivity_time=CONFIG["TRANSFER_DEADLINE"], notify=False)
for event in transfer:
if not event["success"]:
logger.info("Transfer is_error: {} - {}"
.format(event.get("code", "No code found"),
event.get("description", "No description found")))
yield {
"success": False,
"error": "{} - {}".format(event.get("code", "No code found"),
event.get("description",
"No description found"))
}
if not event["success"]:
logger.error("Transfer failed: {}".format(event))
raise ValueError(event)
finally:
if acl_res is not None:
try:
acl_del = admin_client.delete_endpoint_acl_rule(
local_ep, acl_res["access_id"])
except Exception as e:
logger.critical("ACL rule deletion exception for '{}': {}"
.format(acl_res, repr(e)))
raise ValueError("Internal permissions error.")
if not acl_del.get("code") == "Deleted":
logger.critical("Unable to delete ACL rule '{}': {}"
.format(acl_res, acl_del))
raise ValueError("Internal permissions error.")
except Exception as e:
# this is all temporary
import traceback
logger.critical("\nError {}: {}\n\nStacktrace: {}\n".format(str(e), repr(e),
traceback.format_exception(type(e), e, e.__traceback__)))
raise
# HTTP(S)
elif loc_info.scheme.startswith("http"):
# Get default filename and extension
Expand Down Expand Up @@ -820,6 +831,11 @@ def backup_data(transfer_client, storage_loc, backup_locs, acl=None,
raise ValueError("Storage location '{}' (from '{}') is not a Globus Endpoint and cannot "
"be directly published from or backed up from"
.format(norm_store, storage_loc))
# If the storage is the MDF GDrive EP, the data_client and data_user aren't necessary
# because the MDF client has read access already.
elif storage_info.netloc == CONFIG["GDRIVE_EP"]:
data_client = None
data_user = None

for backup in backup_locs:
error = ""
Expand Down Expand Up @@ -1392,7 +1408,8 @@ def cancel_submission(source_id, wait=True):
old_status_code = old_read_table("status", source_id)["status"]["code"]
new_status_code = old_status_code.replace("z", "X").replace("W", "X") \
.replace("T", "X").replace("P", "X")
update_res = modify_status_entry(source_id, {"code": new_status_code})
update_res = modify_status_entry(source_id, {"code": new_status_code,
"active": False})
if not update_res["success"]:
return {
"success": False,
Expand Down

0 comments on commit 2de0c9c

Please sign in to comment.