Skip to content

Commit

Permalink
Add flag to CLI that offloads processing to Batch
Browse files Browse the repository at this point in the history
  • Loading branch information
rbreslow committed May 17, 2022
1 parent bdcfc73 commit 4b002fd
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ services:
build: ./src/image_deid_etl
env_file: .env
environment:
- AWS_JOB_QUEUE=${AWS_JOB_QUEUE:-queueImageDeidEtl}
- AWS_JOB_DEFINITION=${AWS_JOB_DEFINITION:-jobImageDeidEtl}
- DATABASE_URL=postgresql://image-deid-etl:image-deid-etl@database:5432/image-deid-etl
- IMAGE_DEID_ETL_LOG_LEVEL=INFO
volumes:
Expand Down
45 changes: 41 additions & 4 deletions src/image_deid_etl/image_deid_etl/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import os
import sys
import tempfile
import time

import boto3
import flywheel
from sqlalchemy.exc import IntegrityError

Expand Down Expand Up @@ -111,6 +113,35 @@ def validate(args) -> int:


def run(args) -> int:
if args.batch:
batch = boto3.client("batch")

aws_job_queue = os.getenv("AWS_JOB_QUEUE")
if aws_job_queue is None:
raise ImproperlyConfigured("You must supply a value for AWS_JOB_QUEUE.")

aws_job_definition = os.getenv("AWS_JOB_DEFINITION")
if aws_job_definition is None:
raise ImproperlyConfigured(
"You must supply a value for AWS_JOB_DEFINITION."
)

for uuid in args.uuid:
response = batch.submit_job(
jobName=f"ProcessStudy_{uuid}",
jobQueue=aws_job_queue,
jobDefinition=aws_job_definition,
containerOverrides={"command": ["image-deid-etl", "run", uuid]},
)

region = batch.meta.region_name
job_id = response["jobId"]
url = f"https://console.aws.amazon.com/batch/home?region={region}#jobs/detail/{job_id}"

logger.info(f"Job started! View here:\n{url}")

return 0

local_path = f"{args.program}/{args.site}/"

for uuid in args.uuid:
Expand Down Expand Up @@ -159,8 +190,8 @@ def run(args) -> int:


def upload2fw(args) -> int:
# This is a hack so that the Flywheel CLI can consume credentials
# from the environment.
# This is a hack so that the Flywheel CLI can consume credentials from the
# environment.
with tempfile.TemporaryDirectory() as flywheel_user_home:
logger.info(f"Writing fake Flywheel CLI credentials to {flywheel_user_home}...")
# The Flywheel CLI will look for its config directory at this path.
Expand All @@ -174,8 +205,9 @@ def upload2fw(args) -> int:
source_path = f"{args.program}/{args.site}/NIfTIs/"

if not os.path.exists(source_path):
# It appears that this can happen when sub_mapping is empty.
raise FileNotFoundError(f"{source_path} directory does not exist.")
raise FileNotFoundError(
f"{source_path} directory does not exist. Is sub_mapping empty?"
)

for fw_project in next(os.walk(source_path))[1]: # for each project dir
proj_path = os.path.join(source_path, fw_project)
Expand Down Expand Up @@ -264,6 +296,11 @@ def main() -> int:
help="download images and run deidentification",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser_run.add_argument(
"--batch",
action="store_true",
help="skip local processing and submit job(s) to AWS Batch",
)
parser_run.add_argument(
"--skip-modalities",
nargs="*",
Expand Down

0 comments on commit 4b002fd

Please sign in to comment.