Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: refactor #7

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
a33d1b3
refactor
jakevc Jan 24, 2025
c0e6018
no keys
jakevc Jan 24, 2025
6a8af10
rm container image
jakevc Jan 24, 2025
56e65ba
build job definition
jakevc Jan 24, 2025
0c30eac
update submit job def
jakevc Jan 24, 2025
4a4bccf
update internal method
jakevc Jan 24, 2025
7bc9d2a
move command up
jakevc Jan 24, 2025
bca0dd0
bump deps
jakevc Jan 24, 2025
acac896
fixes
jakevc Jan 25, 2025
8447d07
settings tweak
jakevc Jan 25, 2025
4940ead
authors
jakevc Jan 25, 2025
9370e67
Merge branch 'main' into feat--refactor
jakevc Jan 25, 2025
ce595f5
format
jakevc Jan 25, 2025
21b882f
executor setting type causes circular import
jakevc Jan 25, 2025
6e6916d
tweak
jakevc Jan 25, 2025
86dfe89
tweak
jakevc Jan 25, 2025
459580e
update WorkflowError msg
jakevc Jan 26, 2025
e1b1bb0
tuple
jakevc Jan 26, 2025
3c69a03
todo
jakevc Jan 26, 2025
bed0327
terraform
jakevc Jan 26, 2025
14afb48
debug
jakevc Jan 27, 2025
8c1d65d
tf
jakevc Jan 27, 2025
b0865a7
vpc_id
jakevc Jan 27, 2025
9dfa6e7
tf
jakevc Jan 27, 2025
504d884
tf
jakevc Jan 27, 2025
a17d9b6
docker cmd
jakevc Jan 28, 2025
b914564
its alive
jakevc Jan 28, 2025
df9fec3
refactor
jakevc Jan 29, 2025
0cc969e
tweak
jakevc Jan 29, 2025
ac92dfd
log
jakevc Jan 29, 2025
48c9f36
mock success
jakevc Jan 29, 2025
f59fe23
error hangling
jakevc Jan 29, 2025
8e52d3b
env
jakevc Jan 29, 2025
460e422
Update snakemake_executor_plugin_aws_batch/__init__.py
jakevc Jan 29, 2025
9756fc7
Update .github/workflows/ci_mocked_api.yml
jakevc Jan 29, 2025
aa1a1f0
OIDC
jakevc Jan 29, 2025
93fab0b
black
jakevc Jan 29, 2025
5da9542
secret
jakevc Jan 29, 2025
0ddf9bf
MinioLocal
jakevc Feb 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{
"black-formatter.interpreter": [
"./.venv/bin/python"
],
"flake8.interpreter": [
"./.venv/bin/python"
],
"files.saveConflictResolution": "overwriteFileOnDisk"
}
6 changes: 3 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ keywords = ["snakemake", "plugin", "executor", "aws-batch"]

[tool.poetry.dependencies]
python = "^3.11"
snakemake-interface-common = "^1.15.0"
snakemake-interface-executor-plugins = "^9.0.0"
boto3 = "^1.33.11"
snakemake-interface-common = "^1.17.4"
snakemake-interface-executor-plugins = "^9.3.2"
boto3 = "^1.36.5"

[tool.poetry.group.dev.dependencies]
black = "^23.11.0"
Expand Down
276 changes: 33 additions & 243 deletions snakemake_executor_plugin_aws_batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
__author__ = "Jake VanCampen, Johannes Köster"
__copyright__ = "Copyright 2025, Snakemake community"
__email__ = "[email protected]"
__license__ = "MIT"

from dataclasses import dataclass, field
from pprint import pformat
import boto3
import uuid
import heapq
import botocore
import shlex
import time
import threading
from typing import List, Generator, Optional
from typing import Union
from pprint import pformat
from typing import List, AsyncGenerator, Optional
from snakemake_executor_plugin_aws_batch.batch_client import BatchClient
from snakemake_executor_plugin_aws_batch.batch_job_builder import BatchJobBuilder
from snakemake_executor_plugin_aws_batch.batch_descriptor import BatchJobDescriber
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
from snakemake_interface_executor_plugins.settings import (
Expand All @@ -28,16 +31,6 @@
# of None or anything else that makes sense in your case.
@dataclass
class ExecutorSettings(ExecutorSettingsBase):
access_key_id: Optional[int] = field(
default=None,
metadata={"help": "AWS access key id", "env_var": True, "required": False},
repr=False,
)
access_key: Optional[int] = field(
default=None,
metadata={"help": "AWS access key", "env_var": True, "required": False},
repr=False,
)
region: Optional[int] = field(
default=None,
metadata={
Expand All @@ -46,29 +39,6 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": True,
},
)
fsap_id: Optional[str] = (
field(
default=None,
metadata={
"help": (
"The fsap id of the EFS instance you want to use that "
"is shared with your local environment"
),
"env_var": False,
"required": False,
},
),
)
efs_project_path: Optional[str] = (
field(
default=None,
metadata={
"help": "The EFS path that contains the project Snakemake is running",
"env_var": False,
"required": False,
},
),
)
job_queue: Optional[str] = field(
default=None,
metadata={
Expand All @@ -77,10 +47,10 @@ class ExecutorSettings(ExecutorSettingsBase):
"required": True,
},
)
execution_role: Optional[str] = field(
job_role: Optional[str] = field(
default=None,
metadata={
"help": "The AWS execution role ARN that is used for running the tasks",
"help": "The AWS job role ARN that is used for running the tasks",
"env_var": True,
"required": True,
},
Expand Down Expand Up @@ -140,58 +110,17 @@ class ExecutorSettings(ExecutorSettingsBase):
# Implementation of your executor
class Executor(RemoteExecutor):
def __post_init__(self):
# set snakemake/snakemake container image
# snakemake/snakemake:latest container image
self.container_image = self.workflow.remote_execution_settings.container_image

# access executor specific settings
self.settings: ExecutorSettings = self.workflow.executor_settings
self.settings = self.workflow.executor_settings
self.logger.debug(f"ExecutorSettings: {pformat(self.settings, indent=2)}")

# keep track of job definitions
self.created_job_defs = list()
self.mount_path = None
self._describer = BatchJobDescriber()

# init batch client
try:
self.batch_client = (
boto3.Session().client( # Session() needed for thread safety
"batch",
aws_access_key_id=self.settings.access_key_id,
aws_secret_access_key=self.settings.access_key,
region_name=self.settings.region,
config=botocore.config.Config(
retries={"max_attempts": 5, "mode": "standard"}
),
)
)
except Exception as e:
raise WorkflowError(e)

# TODO:
# def _prepare_mounts(self):
# """
# Prepare the "volumes" and "mountPoints" for the Batch job definition,
# assembling the in-container filesystem with the shared working directory,
# read-only input files, and command/stdout/stderr files.
# """

# # EFS mount point
# volumes = [
# {
# "name": "efs",
# "efsVolumeConfiguration": {
# "fileSystemId": self.fs_id,
# "transitEncryption": "ENABLED",
# },
# }
# ]
# volumes[0]["efsVolumeConfiguration"]["authorizationConfig"] = {
# "accessPointId": self.fsap_id
# }
# mount_points = [{"containerPath": self.mount_path, "sourceVolume": "efs"}]

# return volumes, mount_points
self.batch_client = BatchClient(region_name=self.settings.region)
jakevc marked this conversation as resolved.
Show resolved Hide resolved

def run_job(self, job: JobExecutorInterface):
# Implement here how to run a job.
Expand All @@ -204,88 +133,22 @@ def run_job(self, job: JobExecutorInterface):
# If required, make sure to pass the job's id to the job_info object, as keyword
# argument 'external_job_id'.

# set job name
job_uuid = str(uuid.uuid4())
job_name = f"snakejob-{job.name}-{job_uuid}"

# set job definition name
job_definition_name = f"snakejob-def-{job.name}-{job_uuid}"
job_definition_type = "container"

# get job resources or default
vcpu = str(job.resources.get("_cores", str(1)))
mem = str(job.resources.get("mem_mb", str(2048)))

# job definition container properties
container_properties = {
"command": ["snakemake"],
"image": self.container_image,
# fargate required privileged False
"privileged": False,
"resourceRequirements": [
# resource requirements have to be compatible
# see: https://docs.aws.amazon.com/batch/latest/APIReference/API_ResourceRequirement.html # noqa
{"type": "VCPU", "value": vcpu},
{"type": "MEMORY", "value": mem},
],
"networkConfiguration": {
"assignPublicIp": "ENABLED",
},
"executionRoleArn": self.settings.execution_role,
}

# TODO: or not todo ?
# (
# container_properties["volumes"],
# container_properties["mountPoints"],
# ) = self._prepare_mounts()

# register the job definition
tags = self.settings.tags if isinstance(self.settings.tags, dict) else dict()
try:
job_def = self.batch_client.register_job_definition(
jobDefinitionName=job_definition_name,
type=job_definition_type,
containerProperties=container_properties,
platformCapabilities=["FARGATE"],
tags=tags,
)
self.created_job_defs.append(job_def)
except Exception as e:
raise WorkflowError(e)

job_command = self._generate_snakemake_command(job)
remote_command = f"/bin/bash -c {shlex.quote(self.format_job_exec(job))}"
self.logger.debug(f"Remote command: {remote_command}")

# configure job parameters
job_params = {
"jobName": job_name,
"jobQueue": self.settings.job_queue,
"jobDefinition": "{}:{}".format(
job_def["jobDefinitionName"], job_def["revision"]
),
"containerOverrides": {
"command": job_command,
"resourceRequirements": [
{"type": "VCPU", "value": vcpu},
{"type": "MEMORY", "value": mem},
],
},
}

if self.settings.tags:
job_params["tags"] = self.settings.tags

if self.settings.task_timeout is not None:
job_params["timeout"] = {
"attemptDurationSeconds": self.settings.task_timeout
}

# submit the job
try:
submitted = self.batch_client.submit_job(**job_params)
job_definition = BatchJobBuilder(
logger=self.logger,
job=job,
container_image=self.container_image,
settings=self.settings,
job_command=remote_command,
batch_client=self.batch_client,
)
job_info = job_definition.submit()
self.logger.debug(
"AWS Batch job submitted with queue {}, jobId {} and tags {}".format(
self.settings.job_queue, submitted["jobId"], self.settings.tags
self.settings.job_queue, job_info["jobId"], self.settings.tags
)
)
except Exception as e:
Expand All @@ -294,22 +157,17 @@ def run_job(self, job: JobExecutorInterface):
self.report_job_submission(
SubmittedJobInfo(
job=job,
external_jobid=submitted["jobId"],
external_jobid=job_info["jobId"],
aux={
"jobs_params": job_params,
"job_def_arn": job_def["jobDefinitionArn"],
"jobs_params": job_info["job_params"],
"job_def_arn": job_definition["jobDefinitionArn"],
},
jakevc marked this conversation as resolved.
Show resolved Hide resolved
)
)

def _generate_snakemake_command(self, job: JobExecutorInterface) -> str:
"""generates the snakemake command for the job"""
exec_job = self.format_job_exec(job)
return ["sh", "-c", shlex.quote(exec_job)]

async def check_active_jobs(
self, active_jobs: List[SubmittedJobInfo]
) -> Generator[SubmittedJobInfo, None, None]:
) -> AsyncGenerator[SubmittedJobInfo, None]:
# Check the status of active jobs.

# You have to iterate over the given list active_jobs.
Expand Down Expand Up @@ -342,7 +200,7 @@ async def check_active_jobs(
else:
yield job

def _get_job_status(self, job: SubmittedJobInfo) -> (int, Optional[str]):
def _get_job_status(self, job: SubmittedJobInfo) -> Union[int, Optional[str]]:
jakevc marked this conversation as resolved.
Show resolved Hide resolved
"""poll for Batch job success or failure

returns exits code and failure information if exit code is not 0
Expand Down Expand Up @@ -451,71 +309,3 @@ def cancel_jobs(self, active_jobs: List[SubmittedJobInfo]):
for j in active_jobs:
self._terminate_job(j)
self._deregister_job(j)


class BatchJobDescriber:
"""
This singleton class handles calling the AWS Batch DescribeJobs API with up to 100
job IDs per request, then dispensing each job description to the thread interested
in it. This helps avoid AWS API request rate limits when tracking concurrent jobs.
"""

JOBS_PER_REQUEST = 100 # maximum jobs per DescribeJob request

def __init__(self):
self.lock = threading.Lock()
self.last_request_time = 0
self.job_queue = []
self.jobs = {}

def describe(self, aws, job_id, period):
"""get the latest Batch job description"""
while True:
with self.lock:
if job_id not in self.jobs:
# register new job to be described ASAP
heapq.heappush(self.job_queue, (0.0, job_id))
self.jobs[job_id] = None
# update as many job descriptions as possible
self._update(aws, period)
# return the desired job description if we have it
desc = self.jobs[job_id]
if desc:
return desc
# otherwise wait (outside the lock) and try again
time.sleep(period / 4)

def unsubscribe(self, job_id):
"""unsubscribe from job_id when no longer interested"""
with self.lock:
if job_id in self.jobs:
del self.jobs[job_id]

def _update(self, aws, period):
# if enough time has passed since our last DescribeJobs request
if time.time() - self.last_request_time >= period:
# take the N least-recently described jobs
job_ids = set()
assert self.job_queue
while self.job_queue and len(job_ids) < self.JOBS_PER_REQUEST:
job_id = heapq.heappop(self.job_queue)[1]
assert job_id not in job_ids
if job_id in self.jobs:
job_ids.add(job_id)
if not job_ids:
return
# describe them
try:
job_descs = aws.describe_jobs(jobs=list(job_ids))
finally:
# always: bump last_request_time and re-enqueue these jobs
self.last_request_time = time.time()
for job_id in job_ids:
heapq.heappush(self.job_queue, (self.last_request_time, job_id))
# update self.jobs with the new descriptions
for job_desc in job_descs["jobs"]:
job_ids.remove(job_desc["jobId"])
self.jobs[job_desc["jobId"]] = job_desc
assert (
not job_ids
), "AWS Batch DescribeJobs didn't return all expected results"
Loading
Loading