diff --git a/src/main/java/com/conveyal/datatools/common/utils/AWSClientManager.java b/src/main/java/com/conveyal/datatools/common/utils/AWSClientManager.java new file mode 100644 index 000000000..264241b0f --- /dev/null +++ b/src/main/java/com/conveyal/datatools/common/utils/AWSClientManager.java @@ -0,0 +1,77 @@ +package com.conveyal.datatools.common.utils; + +import java.util.HashMap; + +import static com.conveyal.datatools.common.utils.AWSUtils.DEFAULT_EXPIRING_AWS_ASSET_VALID_DURATION_MILLIS; + +/** + * This abstract class provides a framework for managing the creation of AWS Clients. Three types of clients are stored + * in this class: + * 1. A default client to use when not requesting a client using a specific role and/or region + * 2. A client to use when using a specific region, but not with a role + * 3. A client to use with a specific role and region combination (including null regions) + * + * The {@link AWSClientManager#getClient(String, String)} handles the creation and caching of clients based on the given + * role and region inputs. + */ +public abstract class AWSClientManager { + protected final T defaultClient; + private HashMap nonRoleClientsByRegion = new HashMap<>(); + private HashMap> clientsByRoleAndRegion = new HashMap<>(); + + public AWSClientManager (T defaultClient) { + this.defaultClient = defaultClient; + } + + /** + * An abstract method where the implementation will create a client with the specified region, but not with a role. + */ + public abstract T buildDefaultClientWithRegion(String region); + + /** + * An abstract method where the implementation will create a client with the specified role and region. + */ + protected abstract T buildCredentialedClientForRoleAndRegion(String role, String region) + throws CheckedAWSException; + + /** + * Obtain a potentially cached AWS client for the provided role ARN and region. If the role and region are null, the + * default AWS client will be used. If just the role is null a cached client configured for the specified + * region will be returned. For clients that require using a role, a client will be obtained (either via a cache or + * by creation and then insertion into the cache) that has obtained the proper credentials. + */ + public T getClient(String role, String region) throws CheckedAWSException { + // return default client for null region and role + if (role == null && region == null) { + return defaultClient; + } + + // if the role is null, return a potentially cached EC2 client with the region configured + T client; + if (role == null) { + client = nonRoleClientsByRegion.get(region); + if (client == null) { + client = buildDefaultClientWithRegion(region); + nonRoleClientsByRegion.put(region, client); + } + return client; + } + + // check for the availability of a client already associated with the given role and region + String roleRegionKey = makeRoleRegionKey(role, region); + ExpiringAsset clientWithRole = clientsByRoleAndRegion.get(roleRegionKey); + if (clientWithRole != null && clientWithRole.isActive()) return clientWithRole.asset; + + // Either a new client hasn't been created or it has expired. Create a new client and cache it. + T credentialedClientForRoleAndRegion = buildCredentialedClientForRoleAndRegion(role, region); + clientsByRoleAndRegion.put( + roleRegionKey, + new ExpiringAsset(credentialedClientForRoleAndRegion, DEFAULT_EXPIRING_AWS_ASSET_VALID_DURATION_MILLIS) + ); + return credentialedClientForRoleAndRegion; + } + + private static String makeRoleRegionKey(String role, String region) { + return String.format("role=%s,region=%s", role, region); + } +} diff --git a/src/main/java/com/conveyal/datatools/common/utils/AWSUtils.java b/src/main/java/com/conveyal/datatools/common/utils/AWSUtils.java index d63e9c7e0..ea867b0ee 100644 --- a/src/main/java/com/conveyal/datatools/common/utils/AWSUtils.java +++ b/src/main/java/com/conveyal/datatools/common/utils/AWSUtils.java @@ -6,16 +6,24 @@ import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicSessionCredentials; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.ec2.AmazonEC2; import com.amazonaws.services.ec2.AmazonEC2Client; +import com.amazonaws.services.ec2.AmazonEC2ClientBuilder; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClientBuilder; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagement; +import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; import com.amazonaws.services.s3.model.PutObjectRequest; import com.conveyal.datatools.manager.DataManager; -import com.conveyal.datatools.manager.persistence.FeedStore; +import com.conveyal.datatools.manager.models.OtpServer; import org.apache.commons.io.IOUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,16 +39,69 @@ import java.io.InputStream; import java.net.URL; import java.util.Date; +import java.util.HashMap; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; /** - * Created by landon on 8/2/16. + * This class mainly has utility functions for obtaining a new AWS client for a various AWS service. This class will + * create new clients only when they are needed and will refresh the clients in case they have expired. It is expected + * that all AWS clients are obtained from this class and not created elsewhere in order to properly manage AWS clients + * and to avoid repetition of code that creates clients. */ public class AWSUtils { private static final Logger LOG = LoggerFactory.getLogger(AWSUtils.class); private static final int REQUEST_TIMEOUT_MSEC = 30 * 1000; + public static final long DEFAULT_EXPIRING_AWS_ASSET_VALID_DURATION_MILLIS = 800 * 1000; + + private static final AmazonEC2 DEFAULT_EC2_CLIENT = AmazonEC2Client.builder().build(); + private static final AmazonElasticLoadBalancing DEFAULT_ELB_CLIENT = AmazonElasticLoadBalancingClient + .builder() + .build(); + private static final AmazonIdentityManagement DEFAULT_IAM_CLIENT = AmazonIdentityManagementClientBuilder + .defaultClient(); + private static final AWSCredentialsProvider DEFAULT_S3_CREDENTIALS; + private static final AmazonS3 DEFAULT_S3_CLIENT; + + static { + // Configure the default S3 client + // A temporary variable needs to be used before setting the final variable + AmazonS3 tempS3Client = null; + AWSCredentialsProvider tempS3CredentialsProvider = null; + try { + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + String credentialsFile = DataManager.getConfigPropertyAsText( + "application.data.s3_credentials_file" + ); + tempS3CredentialsProvider = credentialsFile != null + ? new ProfileCredentialsProvider(credentialsFile, "default") + : new DefaultAWSCredentialsProviderChain(); // default credentials providers, e.g. IAM role + builder.withCredentials(tempS3CredentialsProvider); + + // If region configuration string is provided, use that. + // Otherwise defaults to value provided in ~/.aws/config + String region = DataManager.getConfigPropertyAsText("application.data.s3_region"); + if (region != null) { + builder.withRegion(region); + } + tempS3Client = builder.build(); + } catch (Exception e) { + LOG.error("S3 client not initialized correctly. Must provide config property application.data.s3_region or specify region in ~/.aws/config", e); + } + DEFAULT_S3_CREDENTIALS = tempS3CredentialsProvider; + DEFAULT_S3_CLIENT = tempS3Client; + if (DEFAULT_S3_CLIENT == null || DEFAULT_S3_CREDENTIALS == null) { + throw new IllegalArgumentException("Fatal error initializing the default s3Client"); + } + } + + private static HashMap> crendentialsProvidersByRole = + new HashMap<>(); + private static EC2ClientManagerImpl EC2ClientManager = new EC2ClientManagerImpl(DEFAULT_EC2_CLIENT); + private static ELBClientManagerImpl ELBClientManager = new ELBClientManagerImpl(DEFAULT_ELB_CLIENT); + private static IAMClientManagerImpl IAMClientManager = new IAMClientManagerImpl(DEFAULT_IAM_CLIENT); + private static S3ClientManagerImpl S3ClientManager = new S3ClientManagerImpl(DEFAULT_S3_CLIENT); public static String uploadBranding(Request req, String key) { String url; @@ -79,14 +140,13 @@ public static String uploadBranding(Request req, String key) { String keyName = "branding/" + key + extension; url = "https://s3.amazonaws.com/" + s3Bucket + "/" + keyName; // FIXME: This may need to change during feed store refactor - AmazonS3 s3client = FeedStore.s3Client; - s3client.putObject(new PutObjectRequest( + getDefaultS3Client().putObject(new PutObjectRequest( s3Bucket, keyName, tempFile) // grant public read .withCannedAcl(CannedAccessControlList.PublicRead)); return url; - } catch (AmazonServiceException ase) { - logMessageAndHalt(req, 500, "Error uploading file to S3", ase); + } catch (AmazonServiceException | CheckedAWSException e) { + logMessageAndHalt(req, 500, "Error uploading file to S3", e); return null; } finally { boolean deleted = tempFile.delete(); @@ -97,22 +157,57 @@ public static String uploadBranding(Request req, String key) { } /** - * Download an object in the selected format from S3, using presigned URLs. - * @param s3 + * Helper for downloading a file using the default S3 client. + */ + public static String downloadFromS3(String bucket, String key, boolean redirect, Request req, Response res) { + try { + return downloadFromS3(getDefaultS3Client(), bucket, key, redirect, req, res); + } catch (CheckedAWSException e) { + logMessageAndHalt(req, 500, "Failed to download file from S3.", e); + return null; + } + } + + /** + * Given a Spark request, download an object in the selected format from S3, using presigned URLs. + * + * @param s3 The s3 client to use * @param bucket name of the bucket - * @param filename both the key and the format - * @param redirect - * @param res - * @return + * @param key both the key and the format + * @param redirect whether or not to redirect to the presigned url + * @param req The underlying Spark request this came from + * @param res The response to write the download info to */ - public static String downloadFromS3(AmazonS3 s3, String bucket, String filename, boolean redirect, Response res){ + public static String downloadFromS3( + AmazonS3 s3, + String bucket, + String key, + boolean redirect, + Request req, + Response res + ) { + if (!s3.doesObjectExist(bucket, key)) { + logMessageAndHalt( + req, + 500, + String.format("Error downloading file from S3. Object s3://%s/%s does not exist.", bucket, key) + ); + return null; + } + Date expiration = new Date(); expiration.setTime(expiration.getTime() + REQUEST_TIMEOUT_MSEC); - GeneratePresignedUrlRequest presigned = new GeneratePresignedUrlRequest(bucket, filename); + GeneratePresignedUrlRequest presigned = new GeneratePresignedUrlRequest(bucket, key); presigned.setExpiration(expiration); presigned.setMethod(HttpMethod.GET); - URL url = s3.generatePresignedUrl(presigned); + URL url; + try { + url = s3.generatePresignedUrl(presigned); + } catch (AmazonServiceException e) { + logMessageAndHalt(req, 500, "Failed to download file from S3.", e); + return null; + } if (redirect) { res.type("text/plain"); // override application/json @@ -130,73 +225,166 @@ public static String downloadFromS3(AmazonS3 s3, String bucket, String filename, * https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html). The credentials can be * then used for creating a temporary S3 or EC2 client. */ - public static AWSStaticCredentialsProvider getCredentialsForRole( - String role, - String sessionName - ) { + private static AWSStaticCredentialsProvider getCredentialsForRole(String role) throws CheckedAWSException { String roleSessionName = "data-tools-session"; if (role == null) return null; - if (sessionName != null) roleSessionName = String.join("-", roleSessionName, sessionName); + // check if an active credentials provider exists for this role + ExpiringAsset session = crendentialsProvidersByRole.get(role); + if (session != null && session.isActive()) return session.asset; + // either a session hasn't been created or an existing one has expired. Create a new session. STSAssumeRoleSessionCredentialsProvider sessionProvider = new STSAssumeRoleSessionCredentialsProvider .Builder( role, roleSessionName ) .build(); - AWSSessionCredentials credentials = sessionProvider.getCredentials(); - return new AWSStaticCredentialsProvider(new BasicSessionCredentials( - credentials.getAWSAccessKeyId(), credentials.getAWSSecretKey(), - credentials.getSessionToken())); + AWSSessionCredentials credentials; + try { + credentials = sessionProvider.getCredentials(); + } catch (AmazonServiceException e) { + throw new CheckedAWSException("Failed to obtain AWS credentials"); + } + AWSStaticCredentialsProvider credentialsProvider = new AWSStaticCredentialsProvider( + new BasicSessionCredentials( + credentials.getAWSAccessKeyId(), + credentials.getAWSSecretKey(), + credentials.getSessionToken() + ) + ); + // store the credentials provider in a lookup by role for future use + crendentialsProvidersByRole.put( + role, + new ExpiringAsset<>(credentialsProvider, DEFAULT_EXPIRING_AWS_ASSET_VALID_DURATION_MILLIS) + ); + return credentialsProvider; } - /** - * Shorthand method to obtain an EC2 client for the provided role ARN. If role is null, the default EC2 credentials - * will be used. - */ - public static AmazonEC2 getEC2ClientForRole (String role, String region) { - AWSStaticCredentialsProvider credentials = getCredentialsForRole(role, "ec2-client"); - return region == null - ? getEC2ClientForCredentials(credentials) - : getEC2ClientForCredentials(credentials, region); + public static AmazonEC2 getEC2Client(String role, String region) throws CheckedAWSException { + return EC2ClientManager.getClient(role, region); + } + + public static AmazonEC2 getEC2Client(OtpServer server) throws CheckedAWSException { + return getEC2Client( + server.role, + server.ec2Info == null ? null : server.ec2Info.region + ); + } + + public static AmazonElasticLoadBalancing getELBClient(String role, String region) throws CheckedAWSException { + return ELBClientManager.getClient(role, region); + } + + public static AmazonIdentityManagement getIAMClient(String role, String region) throws CheckedAWSException { + return IAMClientManager.getClient(role, region); + } + + public static AmazonS3 getDefaultS3Client() throws CheckedAWSException { + return getS3Client (null, null); + } + + public static AmazonS3 getS3Client(String role, String region) throws CheckedAWSException { + return S3ClientManager.getClient(role, region); } /** - * Shorthand method to obtain an EC2 client for the provided credentials. If credentials are null, the default EC2 - * credentials will be used. + * A class that manages the creation of EC2 clients. */ - public static AmazonEC2 getEC2ClientForCredentials (AWSCredentialsProvider credentials) { - return AmazonEC2Client.builder().withCredentials(credentials).build(); + private static class EC2ClientManagerImpl extends AWSClientManager { + public EC2ClientManagerImpl(AmazonEC2 defaultClient) { + super(defaultClient); + } + + @Override + public AmazonEC2 buildDefaultClientWithRegion(String region) { + return AmazonEC2Client.builder().withRegion(region).build(); + } + + @Override + public AmazonEC2 buildCredentialedClientForRoleAndRegion( + String role, + String region + ) throws CheckedAWSException { + AWSStaticCredentialsProvider credentials = getCredentialsForRole(role); + AmazonEC2ClientBuilder builder = AmazonEC2Client.builder().withCredentials(credentials); + if (region != null) { + builder = builder.withRegion(region); + } + return builder.build(); + } } /** - * Shorthand method to obtain an EC2 client for the provided credentials and region. If credentials are null, the - * default EC2 credentials will be used. + * A class that manages the creation of ELB clients. */ - public static AmazonEC2 getEC2ClientForCredentials (AWSCredentialsProvider credentials, String region) { - return AmazonEC2Client.builder().withCredentials(credentials).withRegion(region).build(); + private static class ELBClientManagerImpl extends AWSClientManager { + public ELBClientManagerImpl(AmazonElasticLoadBalancing defaultClient) { + super(defaultClient); + } + + @Override + public AmazonElasticLoadBalancing buildDefaultClientWithRegion(String region) { + return AmazonElasticLoadBalancingClient.builder().withRegion(region).build(); + } + + @Override + public AmazonElasticLoadBalancing buildCredentialedClientForRoleAndRegion( + String role, + String region + ) throws CheckedAWSException { + AWSStaticCredentialsProvider credentials = getCredentialsForRole(role); + AmazonElasticLoadBalancingClientBuilder builder = AmazonElasticLoadBalancingClient + .builder() + .withCredentials(credentials); + if (region != null) { + builder = builder.withRegion(region); + } + return builder.build(); + } } /** - * Shorthand method to obtain an S3 client for the provided credentials. If credentials are null, the default EC2 - * credentials will be used. + * A class that manages the creation of IAM clients. */ - public static AmazonS3 getS3ClientForCredentials (AWSCredentialsProvider credentials, String region) { - AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); - if (region != null) builder.withRegion(region); - return builder.withCredentials(credentials).build(); + private static class IAMClientManagerImpl extends AWSClientManager { + public IAMClientManagerImpl(AmazonIdentityManagement defaultClient) { + super(defaultClient); + } + + @Override + public AmazonIdentityManagement buildDefaultClientWithRegion(String region) { + return defaultClient; + } + + @Override + public AmazonIdentityManagement buildCredentialedClientForRoleAndRegion(String role, String region) + throws CheckedAWSException { + AWSStaticCredentialsProvider credentials = getCredentialsForRole(role); + return AmazonIdentityManagementClientBuilder.standard().withCredentials(credentials).build(); + } } /** - * Shorthand method to obtain an S3 client for the provided role ARN. If role is null, the default EC2 credentials - * will be used. Similarly, if the region is null, it will be omitted while building the S3 client. + * A class that manages the creation of S3 clients. */ - public static AmazonS3 getS3ClientForRole(String role, String region) { - AWSStaticCredentialsProvider credentials = getCredentialsForRole(role, "s3-client"); - return getS3ClientForCredentials(credentials, region); - } + private static class S3ClientManagerImpl extends AWSClientManager { + public S3ClientManagerImpl(AmazonS3 defaultClient) { + super(defaultClient); + } + + @Override + public AmazonS3 buildDefaultClientWithRegion(String region) { + return AmazonS3ClientBuilder.standard().withCredentials(DEFAULT_S3_CREDENTIALS).withRegion(region).build(); + } - /** Shorthand method to obtain an S3 client for the provided role ARN. */ - public static AmazonS3 getS3ClientForRole(String role) { - return getS3ClientForRole(role, null); + @Override + public AmazonS3 buildCredentialedClientForRoleAndRegion( + String role, + String region + ) throws CheckedAWSException { + AWSStaticCredentialsProvider credentials = getCredentialsForRole(role); + AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + if (region != null) builder.withRegion(region); + return builder.withCredentials(credentials).build(); + } } } diff --git a/src/main/java/com/conveyal/datatools/common/utils/CheckedAWSException.java b/src/main/java/com/conveyal/datatools/common/utils/CheckedAWSException.java new file mode 100644 index 000000000..fe7551cd5 --- /dev/null +++ b/src/main/java/com/conveyal/datatools/common/utils/CheckedAWSException.java @@ -0,0 +1,21 @@ +package com.conveyal.datatools.common.utils; + +import com.amazonaws.AmazonServiceException; + +/** + * A helper exception class that does not extend the RunTimeException class in order to make the compiler properly + * detect possible places where an exception could occur. + */ +public class CheckedAWSException extends Exception { + public final Exception originalException; + + public CheckedAWSException(String message) { + super(message); + originalException = null; + } + + public CheckedAWSException(AmazonServiceException e) { + super(e.getMessage()); + originalException = e; + } +} diff --git a/src/main/java/com/conveyal/datatools/common/utils/ExpiringAsset.java b/src/main/java/com/conveyal/datatools/common/utils/ExpiringAsset.java new file mode 100644 index 000000000..f200386ab --- /dev/null +++ b/src/main/java/com/conveyal/datatools/common/utils/ExpiringAsset.java @@ -0,0 +1,22 @@ +package com.conveyal.datatools.common.utils; + +/** + * A class that holds another variable and keeps track of whether the variable is still considered to be active (ie not + * expired) + */ +public class ExpiringAsset { + public final T asset; + private final long expirationTimeMillis; + + public ExpiringAsset(T asset, long validDurationMillis) { + this.asset = asset; + this.expirationTimeMillis = System.currentTimeMillis() + validDurationMillis; + } + + /** + * @return true if the asset hasn't yet expired + */ + public boolean isActive() { + return expirationTimeMillis > System.currentTimeMillis(); + } +} diff --git a/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java b/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java index d1c025177..f48975a7d 100644 --- a/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java +++ b/src/main/java/com/conveyal/datatools/editor/controllers/api/SnapshotController.java @@ -1,6 +1,9 @@ package com.conveyal.datatools.editor.controllers.api; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.editor.jobs.CreateSnapshotJob; import com.conveyal.datatools.editor.jobs.ExportSnapshotToGTFSJob; @@ -14,7 +17,6 @@ import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.Snapshot; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.json.JsonManager; import org.slf4j.Logger; @@ -26,6 +28,7 @@ import java.util.Collection; import static com.conveyal.datatools.common.utils.AWSUtils.downloadFromS3; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.common.utils.SparkUtils.downloadFile; import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; @@ -193,16 +196,8 @@ private static Object getSnapshotToken(Request req, Response res) { // an actual object to download. // FIXME: use new FeedStore. if (DataManager.useS3) { - if (!FeedStore.s3Client.doesObjectExist(DataManager.feedBucket, key)) { - logMessageAndHalt( - req, - 500, - String.format("Error downloading snapshot from S3. Object %s does not exist.", key), - new Exception("s3 object does not exist") - ); - } // Return presigned download link if using S3. - return downloadFromS3(FeedStore.s3Client, DataManager.feedBucket, key, false, res); + return downloadFromS3(DataManager.feedBucket, key, false, req, res); } else { // If not storing on s3, just use the token download method. token = new FeedDownloadToken(snapshot); diff --git a/src/main/java/com/conveyal/datatools/editor/jobs/ExportSnapshotToGTFSJob.java b/src/main/java/com/conveyal/datatools/editor/jobs/ExportSnapshotToGTFSJob.java index 5aafb3391..5c3804440 100644 --- a/src/main/java/com/conveyal/datatools/editor/jobs/ExportSnapshotToGTFSJob.java +++ b/src/main/java/com/conveyal/datatools/editor/jobs/ExportSnapshotToGTFSJob.java @@ -1,11 +1,12 @@ package com.conveyal.datatools.editor.jobs; +import com.amazonaws.AmazonServiceException; import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.Snapshot; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.gtfs.loader.FeedLoadResult; import com.conveyal.gtfs.loader.JdbcGtfsExporter; import com.fasterxml.jackson.annotation.JsonProperty; @@ -16,6 +17,8 @@ import java.io.FileInputStream; import java.io.IOException; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; + /** * This job will export a database snapshot (i.e., namespace) to a GTFS file. If a feed version is supplied in the * constructor, it will assume that the GTFS file is intended for ingestion into Data Tools as a new feed version. @@ -70,7 +73,12 @@ public void jobLogic() { status.update("Writing snapshot to GTFS file", 90); if (DataManager.useS3) { String s3Key = String.format("%s/%s", bucketPrefix, filename); - FeedStore.s3Client.putObject(DataManager.feedBucket, s3Key, tempFile); + try { + getDefaultS3Client().putObject(DataManager.feedBucket, s3Key, tempFile); + } catch (AmazonServiceException | CheckedAWSException e) { + status.fail("Failed to upload file to S3", e); + return; + } LOG.info("Storing snapshot GTFS at s3://{}/{}", DataManager.feedBucket, s3Key); } else { try { diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java index d995762b1..c124ed51b 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/DeploymentController.java @@ -1,17 +1,14 @@ package com.conveyal.datatools.manager.controllers.api; -import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.AmazonEC2Client; import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; import com.amazonaws.services.ec2.model.Reservation; -import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3URI; import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; @@ -23,7 +20,6 @@ import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.OtpServer; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.json.JsonManager; import org.bson.Document; @@ -46,8 +42,8 @@ import java.util.stream.Collectors; import static com.conveyal.datatools.common.utils.AWSUtils.downloadFromS3; +import static com.conveyal.datatools.common.utils.AWSUtils.getS3Client; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; -import static com.conveyal.datatools.manager.persistence.FeedStore.getAWSCreds; import static spark.Spark.delete; import static spark.Spark.get; import static spark.Spark.options; @@ -61,7 +57,6 @@ public class DeploymentController { private static final Logger LOG = LoggerFactory.getLogger(DeploymentController.class); private static Map deploymentJobsByServer = new HashMap<>(); - private static final AmazonEC2 ec2 = AmazonEC2Client.builder().build(); /** * Gets the deployment specified by the request's id parameter and ensure that user has access to the @@ -95,11 +90,9 @@ private static Deployment deleteDeployment (Request req, Response res) { /** * HTTP endpoint for downloading a build artifact (e.g., otp build log or Graph.obj) from S3. */ - private static String downloadBuildArtifact (Request req, Response res) { + private static String downloadBuildArtifact (Request req, Response res) throws CheckedAWSException { Deployment deployment = getDeploymentWithPermissions(req, res); DeployJob.DeploySummary summaryToDownload = null; - // Default client to use if no role was used during the deployment. - AmazonS3 s3Client = FeedStore.s3Client; String role = null; String region = null; String uriString; @@ -144,12 +137,14 @@ private static String downloadBuildArtifact (Request req, Response res) { } AmazonS3URI uri = new AmazonS3URI(uriString); // Assume the alternative role if needed to download the deploy artifact. - if (role != null) { - s3Client = AWSUtils.getS3ClientForRole(role, region); - } else if (region != null) { - s3Client = AWSUtils.getS3ClientForCredentials(getAWSCreds(), region); - } - return downloadFromS3(s3Client, uri.getBucket(), String.join("/", uri.getKey(), filename), false, res); + return downloadFromS3( + getS3Client(role, region), + uri.getBucket(), + String.join("/", uri.getKey(), filename), + false, + req, + res + ); } /** @@ -340,7 +335,8 @@ private static Deployment updateDeployment (Request req, Response res) { * perhaps two people somehow kicked off a deploy job for the same deployment simultaneously and one of the EC2 * instances has out-of-date data). */ - private static boolean terminateEC2InstanceForDeployment(Request req, Response res) { + private static boolean terminateEC2InstanceForDeployment(Request req, Response res) + throws CheckedAWSException { Deployment deployment = getDeploymentWithPermissions(req, res); String instanceIds = req.queryParams("instanceIds"); if (instanceIds == null) { @@ -355,18 +351,12 @@ private static boolean terminateEC2InstanceForDeployment(Request req, Response r .collect(Collectors.toList()); // Get the target group ARN from the latest deployment. Surround in a try/catch in case of NPEs. // TODO: Perhaps provide some other way to provide the target group ARN. - String targetGroupArn; - DeployJob.DeploySummary latest; - AWSStaticCredentialsProvider credentials; - try { - latest = deployment.latest(); - targetGroupArn = latest.ec2Info.targetGroupArn; - // Also, get credentials for role (if exists), which are needed to terminate instances in external AWS account. - credentials = AWSUtils.getCredentialsForRole(latest.role, "deregister-instances"); - } catch (Exception e) { + DeployJob.DeploySummary latest = deployment.latest(); + if (latest == null || latest.ec2Info == null) { logMessageAndHalt(req, 400, "Latest deploy job does not exist or is missing target group ARN."); return false; } + String targetGroupArn = latest.ec2Info.targetGroupArn; for (String id : idsToTerminate) { if (!instanceIdsForDeployment.contains(id)) { logMessageAndHalt(req, HttpStatus.UNAUTHORIZED_401, "It is not permitted to terminate an instance that is not associated with deployment " + deployment.id); @@ -381,7 +371,7 @@ private static boolean terminateEC2InstanceForDeployment(Request req, Response r } // If checks are ok, terminate instances. boolean success = ServerController.deRegisterAndTerminateInstances( - credentials, + latest.role, targetGroupArn, latest.ec2Info.region, idsToTerminate @@ -396,7 +386,10 @@ private static boolean terminateEC2InstanceForDeployment(Request req, Response r /** * HTTP controller to fetch information about provided EC2 machines that power ELBs running a trip planner. */ - private static List fetchEC2InstanceSummaries(Request req, Response res) { + private static List fetchEC2InstanceSummaries( + Request req, + Response res + ) throws CheckedAWSException { Deployment deployment = getDeploymentWithPermissions(req, res); return deployment.retrieveEC2Instances(); } @@ -412,7 +405,7 @@ public static List fetchEC2InstanceSummaries(AmazonEC2 ec2Cl * Fetch EC2 instances from AWS that match the provided set of filters (e.g., tags, instance ID, or other properties). */ public static List fetchEC2Instances(AmazonEC2 ec2Client, Filter... filters) { - if (ec2Client == null) ec2Client = ec2; + if (ec2Client == null) throw new IllegalArgumentException("Must provide EC2Client"); List instances = new ArrayList<>(); DescribeInstancesRequest request = new DescribeInstancesRequest().withFilters(filters); DescribeInstancesResult result = ec2Client.describeInstances(request); diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java index f9f9131aa..bb521d2aa 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/FeedVersionController.java @@ -1,5 +1,7 @@ package com.conveyal.datatools.manager.controllers.api; +import com.amazonaws.AmazonServiceException; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; @@ -36,6 +38,7 @@ import java.util.Set; import static com.conveyal.datatools.common.utils.AWSUtils.downloadFromS3; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.common.utils.SparkUtils.copyRequestStreamIntoFile; import static com.conveyal.datatools.common.utils.SparkUtils.downloadFile; import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage; @@ -209,7 +212,13 @@ private static Object getDownloadCredentials(Request req, Response res) { if (DataManager.useS3) { // Return pre-signed download link if using S3. - return downloadFromS3(FeedStore.s3Client, DataManager.feedBucket, FeedStore.s3Prefix + version.id, false, res); + return downloadFromS3( + DataManager.feedBucket, + FeedStore.s3Prefix + version.id, + false, + req, + res + ); } else { // when feeds are stored locally, single-use download token will still be used FeedDownloadToken token = new FeedDownloadToken(version); diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java index 23b520734..6c9f9c4a3 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/ProjectController.java @@ -1,5 +1,7 @@ package com.conveyal.datatools.manager.controllers.api; +import com.amazonaws.AmazonServiceException; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.common.utils.Scheduler; import com.conveyal.datatools.common.utils.SparkUtils; import com.conveyal.datatools.manager.DataManager; @@ -13,7 +15,6 @@ import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.json.JsonManager; import org.bson.Document; @@ -29,6 +30,7 @@ import java.util.stream.Collectors; import static com.conveyal.datatools.common.utils.AWSUtils.downloadFromS3; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.common.utils.SparkUtils.downloadFile; import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; @@ -272,7 +274,7 @@ private static Object getFeedDownloadCredentials(Request req, Response res) { if (DataManager.useS3) { // Return presigned download link if using S3. String key = String.format("project/%s.zip", project.id); - return downloadFromS3(FeedStore.s3Client, DataManager.feedBucket, key, false, res); + return downloadFromS3(DataManager.feedBucket, key, false, req, res); } else { // when feeds are stored locally, single-use download token will still be used FeedDownloadToken token = new FeedDownloadToken(project); diff --git a/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java b/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java index e11f3fc36..f3eae8592 100644 --- a/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java +++ b/src/main/java/com/conveyal/datatools/manager/controllers/api/ServerController.java @@ -1,9 +1,7 @@ package com.conveyal.datatools.manager.controllers.api; -import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.AmazonEC2Client; -import com.amazonaws.services.ec2.AmazonEC2ClientBuilder; import com.amazonaws.services.ec2.model.AmazonEC2Exception; import com.amazonaws.services.ec2.model.DescribeImagesRequest; import com.amazonaws.services.ec2.model.DescribeImagesResult; @@ -19,8 +17,6 @@ import com.amazonaws.services.ec2.model.TerminateInstancesRequest; import com.amazonaws.services.ec2.model.TerminateInstancesResult; import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; -import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; -import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClientBuilder; import com.amazonaws.services.elasticloadbalancingv2.model.AmazonElasticLoadBalancingException; import com.amazonaws.services.elasticloadbalancingv2.model.DeregisterTargetsRequest; import com.amazonaws.services.elasticloadbalancingv2.model.DescribeLoadBalancersRequest; @@ -30,19 +26,16 @@ import com.amazonaws.services.elasticloadbalancingv2.model.TargetDescription; import com.amazonaws.services.elasticloadbalancingv2.model.TargetGroup; import com.amazonaws.services.identitymanagement.AmazonIdentityManagement; -import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClientBuilder; import com.amazonaws.services.identitymanagement.model.InstanceProfile; import com.amazonaws.services.identitymanagement.model.ListInstanceProfilesResult; import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.Deployment; import com.conveyal.datatools.manager.models.EC2Info; import com.conveyal.datatools.manager.models.JsonViews; import com.conveyal.datatools.manager.models.OtpServer; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.datatools.manager.utils.json.JsonManager; import org.eclipse.jetty.http.HttpStatus; @@ -67,10 +60,13 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import static com.conveyal.datatools.common.utils.AWSUtils.getEC2Client; +import static com.conveyal.datatools.common.utils.AWSUtils.getELBClient; +import static com.conveyal.datatools.common.utils.AWSUtils.getIAMClient; +import static com.conveyal.datatools.common.utils.AWSUtils.getS3Client; import static com.conveyal.datatools.common.utils.SparkUtils.getPOJOFromRequestBody; import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt; import static com.conveyal.datatools.manager.models.EC2Info.DEFAULT_INSTANCE_TYPE; -import static com.conveyal.datatools.manager.persistence.FeedStore.getAWSCreds; import static spark.Spark.delete; import static spark.Spark.get; import static spark.Spark.options; @@ -84,9 +80,6 @@ public class ServerController { private static JsonManager json = new JsonManager<>(OtpServer.class, JsonViews.UserInterface.class); private static final Logger LOG = LoggerFactory.getLogger(ServerController.class); - private static final AmazonEC2 ec2 = AmazonEC2Client.builder().build(); - private static final AmazonIdentityManagement iam = AmazonIdentityManagementClientBuilder.defaultClient(); - private static final AmazonElasticLoadBalancing elb = AmazonElasticLoadBalancingClient.builder().build(); /** * Gets the server specified by the request's id parameter and ensure that user has access to the @@ -108,7 +101,7 @@ private static OtpServer getServerWithPermissions(Request req, Response res) { } /** HTTP endpoint for deleting an {@link OtpServer}. */ - private static OtpServer deleteServer(Request req, Response res) { + private static OtpServer deleteServer(Request req, Response res) throws CheckedAWSException { OtpServer server = getServerWithPermissions(req, res); // Ensure that there are no active EC2 instances associated with server. Halt deletion if so. List activeInstances = server.retrieveEC2Instances().stream() @@ -122,15 +115,16 @@ private static OtpServer deleteServer(Request req, Response res) { } /** HTTP method for terminating EC2 instances associated with an ELB OTP server. */ - private static OtpServer terminateEC2InstancesForServer(Request req, Response res) { + private static OtpServer terminateEC2InstancesForServer(Request req, Response res) throws CheckedAWSException { OtpServer server = getServerWithPermissions(req, res); List instances = server.retrieveEC2Instances(); List ids = getIds(instances); - AmazonEC2 ec2Client = AWSUtils.getEC2ClientForRole( - server.role, - server.ec2Info == null ? null : server.ec2Info.region - ); - terminateInstances(ec2Client, ids); + try { + AmazonEC2 ec2Client = getEC2Client(server); + terminateInstances(ec2Client, ids); + } catch (AmazonServiceException | CheckedAWSException e) { + logMessageAndHalt(req, 500, "Failed to terminate instances!", e); + } for (Deployment deployment : Deployment.retrieveDeploymentForServerAndRouterId(server.id, null)) { Persistence.deployments.updateField(deployment.id, "deployedTo", null); } @@ -145,23 +139,36 @@ public static List getIds (List instances) { } /** Terminate the list of EC2 instance IDs. */ - public static TerminateInstancesResult terminateInstances(AmazonEC2 ec2Client, Collection instanceIds) throws AmazonEC2Exception { + public static TerminateInstancesResult terminateInstances( + AmazonEC2 ec2Client, + Collection instanceIds + ) throws CheckedAWSException { if (instanceIds.size() == 0) { LOG.warn("No instance IDs provided in list. Skipping termination request."); return null; } LOG.info("Terminating EC2 instances {}", instanceIds); TerminateInstancesRequest request = new TerminateInstancesRequest().withInstanceIds(instanceIds); - return ec2Client.terminateInstances(request); + try { + return ec2Client.terminateInstances(request); + } catch (AmazonEC2Exception e) { + throw new CheckedAWSException(e); + } } /** Convenience method to override terminateInstances. */ - public static TerminateInstancesResult terminateInstances(AmazonEC2 ec2Client, String... instanceIds) throws AmazonEC2Exception { + public static TerminateInstancesResult terminateInstances( + AmazonEC2 ec2Client, + String... instanceIds + ) throws CheckedAWSException { return terminateInstances(ec2Client, Arrays.asList(instanceIds)); } /** Convenience method to override. */ - public static TerminateInstancesResult terminateInstances(AmazonEC2 ec2Client, List instances) throws AmazonEC2Exception { + public static TerminateInstancesResult terminateInstances( + AmazonEC2 ec2Client, + List instances + ) throws CheckedAWSException { return terminateInstances(ec2Client, getIds(instances)); } @@ -170,7 +177,7 @@ public static TerminateInstancesResult terminateInstances(AmazonEC2 ec2Client, L * */ public static boolean deRegisterAndTerminateInstances( - AWSStaticCredentialsProvider credentials, + String role, String targetGroupArn, String region, List instanceIds @@ -183,26 +190,9 @@ public static boolean deRegisterAndTerminateInstances( DeregisterTargetsRequest request = new DeregisterTargetsRequest() .withTargetGroupArn(targetGroupArn) .withTargets(targetDescriptions); - AmazonElasticLoadBalancing elbClient = elb; - AmazonEC2 ec2Client = ec2; - // If OTP Server has role defined/alt credentials, override default AWS clients. - if (credentials != null || region != null) { - AmazonElasticLoadBalancingClientBuilder elbBuilder = AmazonElasticLoadBalancingClient.builder(); - AmazonEC2ClientBuilder ec2Builder = AmazonEC2Client.builder(); - if (credentials != null) { - elbBuilder.withCredentials(credentials); - ec2Builder.withCredentials(credentials); - } - if (region != null) { - elbBuilder.withRegion(region); - ec2Builder.withRegion(region); - } - elbClient = elbBuilder.build(); - ec2Client = ec2Builder.build(); - } - elbClient.deregisterTargets(request); - ServerController.terminateInstances(ec2Client, instanceIds); - } catch (AmazonEC2Exception | AmazonElasticLoadBalancingException e) { + getELBClient(role, region).deregisterTargets(request); + ServerController.terminateInstances(getEC2Client(role, region), instanceIds); + } catch (AmazonServiceException | CheckedAWSException e) { LOG.warn("Could not terminate EC2 instances: " + String.join(",", instanceIds), e); return false; } @@ -213,7 +203,7 @@ public static boolean deRegisterAndTerminateInstances( * Create a new server for the project. All feed sources with a valid latest version are added to the new * deployment. */ - private static OtpServer createServer(Request req, Response res) throws IOException { + private static OtpServer createServer(Request req, Response res) throws IOException, CheckedAWSException { Auth0UserProfile userProfile = req.attribute("user"); OtpServer newServer = getPOJOFromRequestBody(req, OtpServer.class); // If server has no project ID specified, user must be an application admin to create it. Otherwise, they must @@ -250,7 +240,7 @@ private static List fetchServers (Request req, Response res) { /** * Update a single OTP server. */ - private static OtpServer updateServer(Request req, Response res) throws IOException { + private static OtpServer updateServer(Request req, Response res) throws IOException, CheckedAWSException { OtpServer serverToUpdate = getServerWithPermissions(req, res); OtpServer updatedServer = getPOJOFromRequestBody(req, OtpServer.class); Auth0UserProfile user = req.attribute("user"); @@ -266,21 +256,8 @@ private static OtpServer updateServer(Request req, Response res) throws IOExcept * Validate certain fields found in the document representing a server. This also currently modifies the document by * removing problematic date fields. */ - private static void validateFields(Request req, OtpServer server) throws HaltException { - // Default to standard AWS clients. - AmazonEC2 ec2Client = ec2; - AmazonIdentityManagement iamClient = iam; - AmazonS3 s3Client = FeedStore.s3Client; + private static void validateFields(Request req, OtpServer server) throws HaltException, CheckedAWSException { try { - // Construct credentials if role is provided. - AWSStaticCredentialsProvider credentials = AWSUtils.getCredentialsForRole(server.role, "validate"); - // If alternative credentials exist, override the default AWS clients. - if (credentials != null) { - // build ec2 client - ec2Client = AmazonEC2Client.builder().withCredentials(credentials).build(); - iamClient = AmazonIdentityManagementClientBuilder.standard().withCredentials(credentials).build(); - s3Client = AWSUtils.getS3ClientForRole(server.role, null); - } // Check that projectId is valid. if (server.projectId != null) { Project project = Persistence.projects.getById(server.projectId); @@ -290,22 +267,8 @@ private static void validateFields(Request req, OtpServer server) throws HaltExc // If a server's ec2 info object is not null, it must pass a few validation checks on various fields related to // AWS. (e.g., target group ARN and instance type). if (server.ec2Info != null) { - // create custom clients if credentials and or a custom region exist - if (server.ec2Info.region != null) { - AmazonEC2ClientBuilder builder = AmazonEC2Client.builder(); - if (credentials != null) { - builder.withCredentials(credentials); - } - builder.withRegion(server.ec2Info.region); - ec2Client = builder.build(); - if (credentials != null) { - s3Client = AWSUtils.getS3ClientForRole(server.role, server.ec2Info.region); - } else { - s3Client = AWSUtils.getS3ClientForCredentials(getAWSCreds(), server.ec2Info.region); - } - } try { - EC2ValidationResult result = validateEC2Config(server, credentials, ec2Client, iamClient); + EC2ValidationResult result = validateEC2Config(server); if (!result.isValid()) { logMessageAndHalt(req, 400, result.getMessage(), result.getException()); } @@ -323,7 +286,7 @@ private static void validateFields(Request req, OtpServer server) throws HaltExc logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Server must contain either internal URL(s) or s3 bucket name."); } } else { - verifyS3WritePermissions(server, req, s3Client); + verifyS3WritePermissions(server, req); } } catch (Exception e) { if (e instanceof HaltException) throw e; @@ -332,14 +295,18 @@ private static void validateFields(Request req, OtpServer server) throws HaltExc } /** - * Asynchrnously validates all ec2 config of a particular OtpServer instance. + * Asynchronously validates all ec2 config of a particular OtpServer instance. */ public static EC2ValidationResult validateEC2Config( - OtpServer server, - AWSStaticCredentialsProvider credentials, - AmazonEC2 ec2Client, - AmazonIdentityManagement iamClient - ) throws ExecutionException, InterruptedException { + OtpServer server + ) throws ExecutionException, InterruptedException, CheckedAWSException { + String region = null; + if (server.ec2Info != null && server.ec2Info.region != null) region = server.ec2Info.region; + + AmazonEC2 ec2Client = getEC2Client(server.role, region); + AmazonElasticLoadBalancing elbClient = getELBClient(server.role, region); + AmazonIdentityManagement iamClient = getIAMClient(server.role, region); + List> validationTasks = new ArrayList<>(); validationTasks.add(() -> validateInstanceType(server.ec2Info.instanceType)); validationTasks.add(() -> validateInstanceType(server.ec2Info.buildInstanceType)); @@ -351,8 +318,8 @@ public static EC2ValidationResult validateEC2Config( // add the load balancer task to the end since it can produce aggregate messages validationTasks.add(() -> validateTargetGroupLoadBalancerSubnetIdAndSecurityGroup( server.ec2Info, - credentials, - ec2Client + ec2Client, + elbClient )); return executeValidationTasks( @@ -362,34 +329,27 @@ public static EC2ValidationResult validateEC2Config( } /** - * Verify that application has permission to write to/delete from S3 bucket. We're following the recommended - * approach from https://stackoverflow.com/a/17284647/915811, but perhaps there is a way to do this - * effectively without incurring AWS costs (although writing/deleting an empty file to S3 is probably - * miniscule). - * @param s3Bucket + * Verify that application can write to S3 bucket either through its own credentials or by assuming the provided IAM + * role. We're following the recommended approach from https://stackoverflow.com/a/17284647/915811, but perhaps + * there is a way to do this effectively without incurring AWS costs (although writing/deleting an empty file to S3 + * is probably miniscule). */ - private static boolean verifyS3WritePermissions(AmazonS3 s3Client, String s3Bucket, Request req) { + private static void verifyS3WritePermissions(OtpServer server, Request req) { + String bucket = server.s3Bucket; String key = UUID.randomUUID().toString(); try { - s3Client.putObject(s3Bucket, key, File.createTempFile("test", ".zip")); - s3Client.deleteObject(s3Bucket, key); - } catch (IOException | AmazonS3Exception e) { - LOG.warn("S3 client cannot write to bucket: " + s3Bucket, e); - return false; - } - return true; - } - - /** - * Verify that application can write to S3 bucket either through its own credentials or by assuming the provided IAM - * role. - */ - private static void verifyS3WritePermissions(OtpServer server, Request req, AmazonS3 s3Client) { - if (!verifyS3WritePermissions(s3Client, server.s3Bucket, req)) { - // Else, verify that this application can write to the S3 bucket, which is needed to write the transit bundle - // file to S3. - String message = "Application cannot write to specified S3 bucket: " + server.s3Bucket; - logMessageAndHalt(req, 400, message); + String region = null; + if (server.ec2Info != null && server.ec2Info.region != null) region = server.ec2Info.region; + AmazonS3 client = getS3Client(server.role, region); + client.putObject(bucket, key, File.createTempFile("test", ".zip")); + client.deleteObject(bucket, key); + } catch (Exception e) { + logMessageAndHalt( + req, + 400, + "Application cannot write to specified S3 bucket: " + server.s3Bucket, + e + ); } } @@ -414,7 +374,6 @@ private static EC2ValidationResult validateAmiId(String amiId, AmazonEC2 ec2Clie /** Determine if AMI ID exists (and is gettable by the application's AWS credentials). */ public static boolean amiExists(String amiId, AmazonEC2 ec2Client) { - if (ec2Client == null) ec2Client = ec2; DescribeImagesRequest request = new DescribeImagesRequest().withImageIds(amiId); DescribeImagesResult result = ec2Client.describeImages(request); // Iterate over AMIs to find a matching ID. @@ -435,7 +394,6 @@ private static EC2ValidationResult validateGraphBuildReplacementAmiName( ) { EC2ValidationResult result = new EC2ValidationResult(); if (!ec2Info.recreateBuildImage) return result; - if (ec2Client == null) ec2Client = ec2; String buildImageName = ec2Info.buildImageName; try { DescribeImagesRequest describeImagesRequest = new DescribeImagesRequest() @@ -588,18 +546,7 @@ private static EC2ValidationResult validateInstanceType(String instanceType) { * - https://serverfault.com/a/865422 * - https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-limits.html */ - private static LoadBalancer getLoadBalancerForTargetGroup (EC2Info ec2Info, AWSStaticCredentialsProvider credentials) { - // If alternative credentials exist, use them to assume the role. Otherwise, use default ELB client. - AmazonElasticLoadBalancingClientBuilder builder = AmazonElasticLoadBalancingClient.builder(); - if (credentials != null) { - builder.withCredentials(credentials); - } - - if (ec2Info.region != null) { - builder.withRegion(ec2Info.region); - } - - AmazonElasticLoadBalancing elbClient = builder.build(); + private static LoadBalancer getLoadBalancerForTargetGroup (EC2Info ec2Info, AmazonElasticLoadBalancing elbClient) { try { DescribeTargetGroupsRequest targetGroupsRequest = new DescribeTargetGroupsRequest() .withTargetGroupArns(ec2Info.targetGroupArn); @@ -624,8 +571,8 @@ private static LoadBalancer getLoadBalancerForTargetGroup (EC2Info ec2Info, AWSS */ private static EC2ValidationResult validateTargetGroupLoadBalancerSubnetIdAndSecurityGroup( EC2Info ec2Info, - AWSStaticCredentialsProvider credentials, - AmazonEC2 ec2Client + AmazonEC2 ec2Client, + AmazonElasticLoadBalancing elbClient ) throws ExecutionException, InterruptedException { EC2ValidationResult result = new EC2ValidationResult(); if (isEmpty(ec2Info.targetGroupArn)) { @@ -634,7 +581,7 @@ private static EC2ValidationResult validateTargetGroupLoadBalancerSubnetIdAndSec } // Get load balancer for target group. This essentially checks that the target group exists and is assigned // to a load balancer. - LoadBalancer loadBalancer = getLoadBalancerForTargetGroup(ec2Info, credentials); + LoadBalancer loadBalancer = getLoadBalancerForTargetGroup(ec2Info, elbClient); if (loadBalancer == null) { result.setInvalid("Invalid value for Target Group ARN. Could not locate Target Group or Load Balancer."); return result; diff --git a/src/main/java/com/conveyal/datatools/manager/extensions/ExternalFeedResource.java b/src/main/java/com/conveyal/datatools/manager/extensions/ExternalFeedResource.java index c761449de..0bf909509 100644 --- a/src/main/java/com/conveyal/datatools/manager/extensions/ExternalFeedResource.java +++ b/src/main/java/com/conveyal/datatools/manager/extensions/ExternalFeedResource.java @@ -1,5 +1,6 @@ package com.conveyal.datatools.manager.extensions; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty; import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; @@ -20,5 +21,5 @@ public interface ExternalFeedResource { public void propertyUpdated(ExternalFeedSourceProperty property, String previousValue, String authHeader) throws IOException; - public void feedVersionCreated(FeedVersion feedVersion, String authHeader); + public void feedVersionCreated(FeedVersion feedVersion, String authHeader) throws CheckedAWSException; } diff --git a/src/main/java/com/conveyal/datatools/manager/extensions/mtc/MtcFeedResource.java b/src/main/java/com/conveyal/datatools/manager/extensions/mtc/MtcFeedResource.java index 27112c6e5..211e7f167 100644 --- a/src/main/java/com/conveyal/datatools/manager/extensions/mtc/MtcFeedResource.java +++ b/src/main/java/com/conveyal/datatools/manager/extensions/mtc/MtcFeedResource.java @@ -1,29 +1,28 @@ package com.conveyal.datatools.manager.extensions.mtc; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.PutObjectRequest; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.extensions.ExternalFeedResource; import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty; import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.fasterxml.jackson.databind.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.BufferedReader; import java.io.File; import java.io.IOException; -import java.io.InputStreamReader; import java.io.OutputStreamWriter; -import java.lang.reflect.Field; import java.net.HttpURLConnection; import java.net.MalformedURLException; import java.net.URL; import java.util.Collection; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.manager.models.ExternalFeedSourceProperty.constructId; /** @@ -168,7 +167,10 @@ public void propertyUpdated( * When feed version is created/published, write the feed to the shared S3 bucket. */ @Override - public void feedVersionCreated(FeedVersion feedVersion, String authHeader) { + public void feedVersionCreated( + FeedVersion feedVersion, + String authHeader + ) throws AmazonServiceException, CheckedAWSException { if(s3Bucket == null) { LOG.error("Cannot push {} to S3 bucket. No bucket name specified.", feedVersion.id); @@ -188,7 +190,7 @@ public void feedVersionCreated(FeedVersion feedVersion, String authHeader) { LOG.info("Pushing to MTC S3 Bucket: s3://{}/{}", s3Bucket, keyName); File file = feedVersion.retrieveGtfsFile(); try { - FeedStore.s3Client.putObject(new PutObjectRequest(s3Bucket, keyName, file)); + getDefaultS3Client().putObject(new PutObjectRequest(s3Bucket, keyName, file)); } catch (Exception e) { LOG.error("Could not upload feed version to s3."); e.printStackTrace(); diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java index 5567e951c..550124092 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/DeployJob.java @@ -1,10 +1,7 @@ package com.conveyal.datatools.manager.jobs; -import com.amazonaws.AmazonClientException; -import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.event.ProgressListener; import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.model.AmazonEC2Exception; import com.amazonaws.services.ec2.model.CreateTagsRequest; import com.amazonaws.services.ec2.model.DescribeInstanceStatusRequest; import com.amazonaws.services.ec2.model.Filter; @@ -15,8 +12,8 @@ import com.amazonaws.services.ec2.model.InstanceType; import com.amazonaws.services.ec2.model.RunInstancesRequest; import com.amazonaws.services.ec2.model.Tag; +import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; import com.amazonaws.services.ec2.model.TerminateInstancesResult; -import com.amazonaws.services.identitymanagement.AmazonIdentityManagementClientBuilder; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3URI; import com.amazonaws.services.s3.model.CopyObjectRequest; @@ -47,7 +44,6 @@ import java.util.Scanner; import java.util.Set; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -56,7 +52,7 @@ import com.amazonaws.waiters.Waiter; import com.amazonaws.waiters.WaiterParameters; import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.controllers.api.DeploymentController; @@ -78,6 +74,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.conveyal.datatools.common.utils.AWSUtils.getEC2Client; +import static com.conveyal.datatools.common.utils.AWSUtils.getELBClient; +import static com.conveyal.datatools.common.utils.AWSUtils.getS3Client; import static com.conveyal.datatools.manager.controllers.api.ServerController.getIds; import static com.conveyal.datatools.manager.models.Deployment.DEFAULT_OTP_VERSION; import static com.conveyal.datatools.manager.models.Deployment.DEFAULT_R5_VERSION; @@ -122,7 +121,6 @@ public class DeployJob extends MonitorableJob { private final String s3Bucket; private final int targetCount; private final DeployType deployType; - private final AWSStaticCredentialsProvider credentials; private final String customRegion; // a nonce that is used with otp-runner to verify that status files produced by otp-runner are from this deployment private final String nonce = UUID.randomUUID().toString(); @@ -132,8 +130,6 @@ public class DeployJob extends MonitorableJob { private int tasksCompleted = 0; private int totalTasks; - private AmazonEC2 ec2; - private AmazonS3 s3Client; private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z"); /** The deployment to deploy */ @@ -222,15 +218,9 @@ public DeployJob(String jobName, Deployment deployment, Auth0UserProfile owner, // deployment using either a specified bundle or Graph.obj. this.jobRelativePath = bundlePath; } - // CONNECT TO EC2/S3 - credentials = AWSUtils.getCredentialsForRole(otpServer.role, this.jobId); this.customRegion = otpServer.ec2Info != null && otpServer.ec2Info.region != null ? otpServer.ec2Info.region : null; - ec2 = customRegion == null - ? AWSUtils.getEC2ClientForCredentials(credentials) - : AWSUtils.getEC2ClientForCredentials(credentials, customRegion); - s3Client = AWSUtils.getS3ClientForCredentials(credentials, customRegion); } public void jobLogic () { @@ -280,7 +270,7 @@ public void jobLogic () { } try { uploadBundleToS3(); - } catch (AmazonClientException | InterruptedException | IOException e) { + } catch (Exception e) { status.fail(String.format("Error uploading/copying deployment bundle to s3://%s", s3Bucket), e); } @@ -311,17 +301,50 @@ public void jobLogic () { status.completed = true; } + /** + * Obtains an EC2 client from the AWS Utils client manager that is applicable to this deploy job's AWS + * configuration. It is important to obtain a client this way so that the client is assured to be valid in the event + * that a client is obtained that has a session that eventually expires. + */ + @JsonIgnore + public AmazonEC2 getEC2ClientForDeployJob() throws CheckedAWSException { + return getEC2Client(otpServer.role, customRegion); + } + + /** + * Obtains an ELB client from the AWS Utils client manager that is applicable to this deploy job's AWS + * configuration. It is important to obtain a client this way so that the client is assured to be valid in the event + * that a client is obtained that has a session that eventually expires. + */ + @JsonIgnore + public AmazonElasticLoadBalancing getELBClientForDeployJob() throws CheckedAWSException { + return getELBClient(otpServer.role, customRegion); + } + + /** + * Obtains an S3 client from the AWS Utils client manager that is applicable to this deploy job's AWS + * configuration. It is important to obtain a client this way so that the client is assured to be valid in the event + * that a client is obtained that has a session that eventually expires. + */ + @JsonIgnore + public AmazonS3 getS3ClientForDeployJob() throws CheckedAWSException { + return getS3Client(otpServer.role, customRegion); + } + /** * Upload to S3 the transit data bundle zip that contains GTFS zip files, OSM data, and config files. */ - private void uploadBundleToS3() throws InterruptedException, AmazonClientException, IOException { + private void uploadBundleToS3() throws InterruptedException, IOException, CheckedAWSException { AmazonS3URI uri = new AmazonS3URI(getS3BundleURI()); String bucket = uri.getBucket(); status.message = "Uploading bundle to " + getS3BundleURI(); status.uploadingS3 = true; LOG.info("Uploading deployment {} to {}", deployment.name, uri.toString()); // Use Transfer Manager so we can monitor S3 bundle upload progress. - TransferManager transferManager = TransferManagerBuilder.standard().withS3Client(s3Client).build(); + TransferManager transferManager = TransferManagerBuilder + .standard() + .withS3Client(getS3ClientForDeployJob()) + .build(); final Upload uploadBundle = transferManager.upload(bucket, uri.getKey(), deploymentTempFile); uploadBundle.addProgressListener( (ProgressListener) progressEvent -> status.percentUploaded = uploadBundle.getProgress().getPercentTransferred() @@ -352,7 +375,7 @@ private void uploadBundleToS3() throws InterruptedException, AmazonClientExcepti // copy to [name]-latest.zip String copyKey = getLatestS3BundleKey(); CopyObjectRequest copyObjRequest = new CopyObjectRequest(bucket, uri.getKey(), uri.getBucket(), copyKey); - s3Client.copyObject(copyObjRequest); + getS3ClientForDeployJob().copyObject(copyObjRequest); LOG.info("Copied to s3://{}/{}", bucket, copyKey); LOG.info("Uploaded to {}", getS3BundleURI()); status.update("Upload to S3 complete.", status.percentComplete + 10); @@ -476,10 +499,6 @@ private String getS3BundleURI() { return joinToS3FolderURI("bundle.zip"); } - public String getCustomRegion() { - return customRegion; - } - private String getLatestS3BundleKey() { String name = StringUtils.getCleanName(deployment.parentProject().name.toLowerCase()); return String.format("%s/%s/%s-latest.zip", bundlePrefix, deployment.projectId, name); @@ -529,26 +548,25 @@ private void replaceEC2Servers() { // issue were encountered after a long graph build). status.message = "Validating AWS config"; try { - ServerController.EC2ValidationResult ec2ValidationResult = ServerController.validateEC2Config( - otpServer, - credentials, - ec2, - credentials == null - ? AmazonIdentityManagementClientBuilder.defaultClient() - : AmazonIdentityManagementClientBuilder.standard().withCredentials(credentials).build() - ); + ServerController.EC2ValidationResult ec2ValidationResult = ServerController.validateEC2Config(otpServer); if (!ec2ValidationResult.isValid()) { status.fail(ec2ValidationResult.getMessage(), ec2ValidationResult.getException()); return; } - } catch (ExecutionException | InterruptedException e) { + } catch (Exception e) { status.fail("An error occurred while validating the ec2 configuration", e); return; } try { // Track any previous instances running for the server we're deploying to in order to de-register and // terminate them later. - List previousInstances = otpServer.retrieveEC2InstanceSummaries(); + List previousInstances = null; + try { + previousInstances = otpServer.retrieveEC2InstanceSummaries(); + } catch (CheckedAWSException e) { + status.fail("Failed to retrieve previously running EC2 instances!", e); + return; + } // Track new instances that should be added to target group once the deploy job is completed. List newInstancesForTargetGroup = new ArrayList<>(); // Initialize recreate build image job and executor in case they're needed below. @@ -664,14 +682,17 @@ private void replaceEC2Servers() { service.awaitTermination(4, TimeUnit.HOURS); } // Check if any of the monitor jobs encountered any errors and terminate the job's associated instance. + int numFailedInstances = 0; for (MonitorServerStatusJob job : remainingServerMonitorJobs) { if (job.status.error) { + numFailedInstances++; String id = job.getInstanceId(); LOG.warn("Error encountered while monitoring server {}. Terminating.", id); remainingInstances.removeIf(instance -> instance.getInstanceId().equals(id)); try { - ServerController.terminateInstances(ec2, id); - } catch (AmazonEC2Exception e){ + // terminate instance without failing overall deploy job. That happens later. + ServerController.terminateInstances(getEC2ClientForDeployJob(), id); + } catch (Exception e){ job.status.message = String.format( "%s During job cleanup, the instance was not properly terminated!", job.status.message @@ -685,32 +706,40 @@ private void replaceEC2Servers() { // and the graph loading instance(s) failed to load graph successfully). if (newInstancesForTargetGroup.size() == 0) { status.fail("Job failed because no running instances remain."); - return; - } - String finalMessage = "Server setup is complete!"; - // Get EC2 servers running that are associated with this server. - if (deployType.equals(DeployType.REPLACE)) { - List previousInstanceIds = previousInstances.stream() - .filter(instance -> "running".equals(instance.state.getName())) - .map(instance -> instance.instanceId) - .collect(Collectors.toList()); - // If there were previous instances assigned to the server, deregister/terminate them (now that the new - // instances are up and running). - if (previousInstanceIds.size() > 0) { - boolean previousInstancesTerminated = ServerController.deRegisterAndTerminateInstances( - credentials, - otpServer.ec2Info.targetGroupArn, - customRegion, - previousInstanceIds - ); - // If there was a problem during de-registration/termination, notify via status message. - if (!previousInstancesTerminated) { - finalMessage = String.format("Server setup is complete! (WARNING: Could not terminate previous EC2 instances: %s", previousInstanceIds); + } else { + // Deregister and terminate previous EC2 servers running that were associated with this server. + if (deployType.equals(DeployType.REPLACE)) { + List previousInstanceIds = previousInstances.stream().filter(instance -> "running".equals(instance.state.getName())) + .map(instance -> instance.instanceId).collect(Collectors.toList()); + // If there were previous instances assigned to the server, deregister/terminate them (now that the new + // instances are up and running). + if (previousInstanceIds.size() > 0) { + boolean previousInstancesTerminated = ServerController.deRegisterAndTerminateInstances(otpServer.role, + otpServer.ec2Info.targetGroupArn, + customRegion, + previousInstanceIds + ); + // If there was a problem during de-registration/termination, notify via status message. + if (!previousInstancesTerminated) { + failJobWithAppendedMessage(String.format( + "Server setup is complete! (WARNING: Could not terminate previous EC2 instances: %s", + previousInstanceIds + )); + } } } + if (numFailedInstances > 0) { + failJobWithAppendedMessage(String.format( + "%d instances failed to properly start.", + numFailedInstances + )); + } } - // Wait for a recreate graph build job to complete if one was started + // Wait for a recreate graph build job to complete if one was started. This must always occur even if the + // job has already been failed in order to shutdown the recreateBuildImageExecutor. if (recreateBuildImageExecutor != null) { + // store previous message in case of a previous failure. + String previousMessage = status.message; status.update("Waiting for recreate graph building image job to complete", 95); while (!recreateBuildImageJob.status.completed) { try { @@ -718,15 +747,20 @@ private void replaceEC2Servers() { Thread.sleep(1000); } catch (InterruptedException e) { recreateBuildImageJob.status.fail("An error occurred with the parent DeployJob", e); - recreateBuildImageExecutor.shutdown(); - status.fail("An error occurred while waiting for the graph build image to be recreated", e); - return; + status.message = previousMessage; + failJobWithAppendedMessage( + "An error occurred while waiting for the graph build image to be recreated", + e + ); + break; } } recreateBuildImageExecutor.shutdown(); } - // Job is complete. - status.completeSuccessfully(finalMessage); + if (!status.error) { + // Job is complete. + status.completeSuccessfully("Server setup is complete!"); + } } catch (Exception e) { LOG.error("Could not deploy to EC2 server", e); status.fail("Could not deploy to EC2 server", e); @@ -760,14 +794,27 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { // Pick proper ami depending on whether graph is being built and what is defined. String amiId = otpServer.ec2Info.getAmiId(graphAlreadyBuilt); // Verify that AMI is correctly defined. - if (amiId == null || !ServerController.amiExists(amiId, ec2)) { - status.fail(String.format( - "AMI ID (%s) is missing or bad. Check the deployment settings or the default value in the app config at %s", - amiId, - AMI_CONFIG_PATH - )); + boolean amiIdValid; + Exception amiCheckException = null; + try { + amiIdValid = amiId != null && ServerController.amiExists(amiId, getEC2ClientForDeployJob()); + } catch (Exception e) { + amiIdValid = false; + amiCheckException = e; + } + + if (!amiIdValid) { + status.fail( + String.format( + "AMI ID (%s) is missing or bad. Check the deployment settings or the default value in the app config at %s", + amiId, + AMI_CONFIG_PATH + ), + amiCheckException + ); return Collections.EMPTY_LIST; } + // Pick proper instance type depending on whether graph is being built and what is defined. String instanceType = otpServer.ec2Info.getInstanceType(graphAlreadyBuilt); // Verify that instance type is correctly defined. @@ -802,8 +849,8 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { try { // attempt to start the instances. Sometimes, AWS does not have enough availability of the desired instance // type and can throw an error at this point. - instances = ec2.runInstances(runInstancesRequest).getReservation().getInstances(); - } catch (AmazonEC2Exception e) { + instances = getEC2ClientForDeployJob().runInstances(runInstancesRequest).getReservation().getInstances(); + } catch (Exception e) { status.fail(String.format("DeployJob failed due to a problem with AWS: %s", e.getMessage()), e); return Collections.EMPTY_LIST; } @@ -813,7 +860,7 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { Set instanceIpAddresses = new HashSet<>(); // Wait so that create tags request does not fail because instances not found. try { - Waiter waiter = ec2.waiters().instanceStatusOk(); + Waiter waiter = getEC2ClientForDeployJob().waiters().instanceStatusOk(); long beginWaiting = System.currentTimeMillis(); waiter.run(new WaiterParameters<>(new DescribeInstanceStatusRequest().withInstanceIds(instanceIds))); LOG.info("Instance status is OK after {} ms", (System.currentTimeMillis() - beginWaiting)); @@ -826,16 +873,21 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { // initialize. String serverName = String.format("%s %s (%s) %d %s", deployment.r5 ? "r5" : "otp", deployment.name, dateString, serverCounter++, graphAlreadyBuilt ? "clone" : "builder"); LOG.info("Creating tags for new EC2 instance {}", serverName); - ec2.createTags(new CreateTagsRequest() - .withTags(new Tag("Name", serverName)) - .withTags(new Tag("projectId", deployment.projectId)) - .withTags(new Tag("deploymentId", deployment.id)) - .withTags(new Tag("jobId", this.jobId)) - .withTags(new Tag("serverId", otpServer.id)) - .withTags(new Tag("routerId", getRouterId())) - .withTags(new Tag("user", retrieveEmail())) - .withResources(instance.getInstanceId()) - ); + try { + getEC2ClientForDeployJob().createTags(new CreateTagsRequest() + .withTags(new Tag("Name", serverName)) + .withTags(new Tag("projectId", deployment.projectId)) + .withTags(new Tag("deploymentId", deployment.id)) + .withTags(new Tag("jobId", this.jobId)) + .withTags(new Tag("serverId", otpServer.id)) + .withTags(new Tag("routerId", getRouterId())) + .withTags(new Tag("user", retrieveEmail())) + .withResources(instance.getInstanceId()) + ); + } catch (Exception e) { + status.fail("Failed to create tags for instances.", e); + return instances; + } } // Wait up to 10 minutes for IP addresses to be available. TimeTracker ipCheckTracker = new TimeTracker(10, TimeUnit.MINUTES); @@ -849,7 +901,19 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { while (instanceIpAddresses.size() < instances.size()) { LOG.info(ipCheckMessage); // Check that all of the instances have public IPs. - List instancesWithIps = DeploymentController.fetchEC2Instances(ec2, instanceIdFilter); + List instancesWithIps; + try { + instancesWithIps = DeploymentController.fetchEC2Instances( + getEC2ClientForDeployJob(), + instanceIdFilter + ); + } catch (Exception e) { + status.fail( + "Failed while waiting for public IP addresses to be assigned to new instance(s)!", + e + ); + return updatedInstances; + } for (Instance instance : instancesWithIps) { String publicIp = instance.getPublicIpAddress(); // If IP has been found, store the updated instance and IP. @@ -883,7 +947,7 @@ private List startEC2Instances(int count, boolean graphAlreadyBuilt) { private boolean terminateInstances(List instances) { TerminateInstancesResult terminateInstancesResult; try { - terminateInstancesResult = ServerController.terminateInstances(ec2, instances); + terminateInstancesResult = ServerController.terminateInstances(getEC2ClientForDeployJob(), instances); } catch (Exception e) { failJobWithAppendedMessage( "During job cleanup, an instance was not properly terminated!", @@ -895,13 +959,12 @@ private boolean terminateInstances(List instances) { // verify that all instances have terminated boolean allInstancesTerminatedProperly = true; for (InstanceStateChange terminatingInstance : terminateInstancesResult.getTerminatingInstances()) { - // If instance state code is 48 that means it has been terminated. - if (terminatingInstance.getCurrentState().getCode() != 48) { - // TODO: determine if the terminateInstanceResult immediately returns the terminated code or if it needs - // to be verified in subsequent DescribeInstanceRequests + // instance state code == 32 means the instance is preparing to be terminated. + // instance state code == 48 means it has been terminated. + int instanceStateCode = terminatingInstance.getCurrentState().getCode(); + if (instanceStateCode != 32 && instanceStateCode != 48) { failJobWithAppendedMessage( - String.format("Instance %s failed to properly terminate!", terminatingInstance.getInstanceId()), - null + String.format("Instance %s failed to properly terminate!", terminatingInstance.getInstanceId()) ); allInstancesTerminatedProperly = false; } @@ -909,6 +972,14 @@ private boolean terminateInstances(List instances) { return allInstancesTerminatedProperly; } + /** + * Helper for ${@link DeployJob#failJobWithAppendedMessage(String, Exception)} that doesn't take an exception + * argument. + */ + private void failJobWithAppendedMessage(String appendedMessage) { + failJobWithAppendedMessage(appendedMessage, null); + } + /** * If the status already has been marked as having errored out, the given message will be appended to the current * message, but the given Exception is not added to the status Exception. Otherwise, the status message is set to the @@ -1074,8 +1145,8 @@ private boolean uploadStringToS3File(String filename, String contents, boolean d if (dryRun) return true; status.message = String.format("uploading %s to S3", filename); try { - s3Client.putObject(s3Bucket, String.format("%s/%s", jobRelativePath, filename), contents); - } catch (RuntimeException e) { + getS3ClientForDeployJob().putObject(s3Bucket, String.format("%s/%s", jobRelativePath, filename), contents); + } catch (Exception e) { status.fail(String.format("Failed to upload file %s", filename), e); return false; } @@ -1162,11 +1233,6 @@ private String getRouterConfigS3Path() { return joinToS3FolderURI(ROUTER_CONFIG_FILENAME); } - @JsonIgnore - public AmazonS3 getS3Client() { - return s3Client; - } - /** * Represents the current status of this job. */ diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/FeedUpdater.java b/src/main/java/com/conveyal/datatools/manager/jobs/FeedUpdater.java index 5352d347b..ec80c89c4 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/FeedUpdater.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/FeedUpdater.java @@ -1,8 +1,10 @@ package com.conveyal.datatools.manager.jobs; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty; import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; @@ -28,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.manager.extensions.mtc.MtcFeedResource.AGENCY_ID_FIELDNAME; import static com.conveyal.datatools.common.utils.Scheduler.schedulerService; import static com.mongodb.client.model.Filters.and; @@ -98,7 +101,13 @@ private Map checkForUpdatedFeeds() { LOG.debug("Checking for feeds on S3."); Map newTags = new HashMap<>(); // iterate over feeds in download_prefix folder and register to (MTC project) - ObjectListing gtfsList = FeedStore.s3Client.listObjects(feedBucket, bucketFolder); + ObjectListing gtfsList = null; + try { + gtfsList = getDefaultS3Client().listObjects(feedBucket, bucketFolder); + } catch (AmazonServiceException | CheckedAWSException e) { + LOG.error("Failed to list S3 Objects", e); + return newTags; + } LOG.debug(eTagForFeed.toString()); for (S3ObjectSummary objSummary : gtfsList.getObjectSummaries()) { @@ -194,10 +203,13 @@ private void updatePublishedFeedVersion(String feedId, FeedSource feedSource) { * Find matching feed version for a feed source based on md5. NOTE: This is no longer in use because MTC's RTD system * does NOT preserve MD5 checksums when moving a file from the "waiting" to "completed" folders on S3. */ - private FeedVersion findMatchingFeedVersion(String keyName, FeedSource feedSource) throws IOException { + private FeedVersion findMatchingFeedVersion( + String keyName, + FeedSource feedSource + ) throws AmazonServiceException, IOException, CheckedAWSException { String filename = keyName.split("/")[1]; String feedId = filename.replace(".zip", ""); - S3Object object = FeedStore.s3Client.getObject(feedBucket, keyName); + S3Object object = getDefaultS3Client().getObject(feedBucket, keyName); InputStream in = object.getObjectContent(); File file = new File(FeedStore.basePath, filename); OutputStream out = new FileOutputStream(file); diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java index 6b25e2aa4..13cfcc36d 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MergeFeedsJob.java @@ -1,6 +1,7 @@ package com.conveyal.datatools.manager.jobs; import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.gtfsplus.tables.GtfsPlusTable; @@ -8,7 +9,6 @@ import com.conveyal.datatools.manager.models.FeedSource; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; import com.conveyal.gtfs.error.NewGTFSError; import com.conveyal.gtfs.error.NewGTFSErrorType; @@ -45,6 +45,7 @@ import java.util.zip.ZipFile; import java.util.zip.ZipOutputStream; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.manager.jobs.MergeFeedsType.SERVICE_PERIOD; import static com.conveyal.datatools.manager.jobs.MergeFeedsType.REGIONAL; import static com.conveyal.datatools.manager.models.FeedRetrievalMethod.REGIONAL_MERGE; @@ -212,7 +213,7 @@ public void jobFinished() { * Primary job logic handles collecting and sorting versions, creating a merged table for all versions, and writing * the resulting zip file to storage. */ - @Override public void jobLogic() throws IOException { + @Override public void jobLogic() throws IOException, CheckedAWSException { // Create temp zip file to add merged feed content to. mergedTempFile = File.createTempFile(filename, null); mergedTempFile.deleteOnExit(); @@ -304,7 +305,7 @@ private List collectAndSortFeeds(Set feedVersions) { * Handles writing the GTFS zip file to disk. For REGIONAL merges, this will end up in a project subdirectory on s3. * Otherwise, it will write to a new version. */ - private void storeMergedFeed() throws IOException { + private void storeMergedFeed() throws IOException, CheckedAWSException { if (mergedVersion != null) { // Store the zip file for the merged feed version. try { @@ -320,7 +321,7 @@ private void storeMergedFeed() throws IOException { // Store the project merged zip locally or on s3 if (DataManager.useS3) { String s3Key = String.join("/", "project", filename); - FeedStore.s3Client.putObject(DataManager.feedBucket, s3Key, mergedTempFile); + getDefaultS3Client().putObject(DataManager.feedBucket, s3Key, mergedTempFile); LOG.info("Storing merged project feed at s3://{}/{}", DataManager.feedBucket, s3Key); } else { diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java index 12818d3f7..66e76ab80 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/MonitorServerStatusJob.java @@ -1,22 +1,16 @@ package com.conveyal.datatools.manager.jobs; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.model.AmazonEC2Exception; import com.amazonaws.services.ec2.model.DescribeInstancesRequest; import com.amazonaws.services.ec2.model.DescribeInstancesResult; import com.amazonaws.services.ec2.model.Instance; import com.amazonaws.services.ec2.model.Reservation; import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancing; -import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClient; -import com.amazonaws.services.elasticloadbalancingv2.AmazonElasticLoadBalancingClientBuilder; import com.amazonaws.services.elasticloadbalancingv2.model.DescribeTargetHealthRequest; import com.amazonaws.services.elasticloadbalancingv2.model.DescribeTargetHealthResult; import com.amazonaws.services.elasticloadbalancingv2.model.RegisterTargetsRequest; import com.amazonaws.services.elasticloadbalancingv2.model.TargetDescription; import com.amazonaws.services.elasticloadbalancingv2.model.TargetHealthDescription; import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.AWSUtils; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.Deployment; import com.conveyal.datatools.manager.models.OtpServer; @@ -50,8 +44,6 @@ public class MonitorServerStatusJob extends MonitorableJob { private final Instance instance; private final boolean graphAlreadyBuilt; private final OtpServer otpServer; - private final AWSStaticCredentialsProvider credentials; - private final AmazonEC2 ec2; private final CloseableHttpClient httpClient = HttpClients.createDefault(); // Delay checks by four seconds to give user-data script time to upload the instance's user data log if part of the // script fails (e.g., uploading or downloading a file). @@ -70,10 +62,6 @@ public MonitorServerStatusJob(Auth0UserProfile owner, DeployJob deployJob, Insta this.instance = instance; this.graphAlreadyBuilt = graphAlreadyBuilt; status.message = "Checking server status..."; - credentials = AWSUtils.getCredentialsForRole(otpServer.role, "monitor-" + instance.getInstanceId()); - ec2 = deployJob.getCustomRegion() == null - ? AWSUtils.getEC2ClientForCredentials(credentials) - : AWSUtils.getEC2ClientForCredentials(credentials, deployJob.getCustomRegion()); } @JsonProperty @@ -161,14 +149,12 @@ public void jobLogic() { .withTargetGroupArn(otpServer.ec2Info.targetGroupArn) .withTargets(new TargetDescription().withId(instance.getInstanceId())); boolean targetAddedSuccessfully = false; - // Wait for two minutes for targets to register. TimeTracker registerTargetTracker = new TimeTracker(2, TimeUnit.MINUTES); - AmazonElasticLoadBalancingClientBuilder elbBuilder = AmazonElasticLoadBalancingClient.builder() - .withCredentials(credentials); - AmazonElasticLoadBalancing elbClient = deployJob.getCustomRegion() == null ? - elbBuilder.build() : - elbBuilder.withRegion(deployJob.getCustomRegion()).build(); + // obtain an ELB client suitable for this deploy job. It is important to obtain a client this way to ensure + // that the proper AWS credentials are used and that the client has a valid session if it is obtained from a + // role. + AmazonElasticLoadBalancing elbClient = deployJob.getELBClientForDeployJob(); while (!targetAddedSuccessfully) { // Register target with target group. elbClient.registerTargets(registerTargetsRequest); @@ -248,14 +234,10 @@ private void failJob(String message, Exception e) { * waiting, check the instance health to make sure it is still running. If a user has terminated the instance, the * job should be failed. */ - private void waitAndCheckInstanceHealth(String waitingFor) throws InstanceHealthException { + private void waitAndCheckInstanceHealth(String waitingFor) throws InstanceHealthException, InterruptedException { checkInstanceHealth(1); - try { - LOG.info("Waiting {} seconds for {}", DELAY_SECONDS, waitingFor); - Thread.sleep(1000 * DELAY_SECONDS); - } catch (InterruptedException e) { - e.printStackTrace(); - } + LOG.info("Waiting {} seconds for {}", DELAY_SECONDS, waitingFor); + Thread.sleep(1000 * DELAY_SECONDS); checkInstanceHealth(1); } @@ -266,13 +248,13 @@ private void waitAndCheckInstanceHealth(String waitingFor) throws InstanceHealth * checking the instance health using recursion up to the * ${@link MonitorServerStatusJob#MAX_INSTANCE_HEALTH_RETRIES} value. */ - private void checkInstanceHealth(int attemptNumber) throws InstanceHealthException { + private void checkInstanceHealth(int attemptNumber) throws InstanceHealthException, InterruptedException { DescribeInstancesRequest request = new DescribeInstancesRequest() .withInstanceIds(Collections.singletonList(instance.getInstanceId())); DescribeInstancesResult result; try { - result = ec2.describeInstances(request); - } catch (AmazonEC2Exception e) { + result = deployJob.getEC2ClientForDeployJob().describeInstances(request); + } catch (Exception e) { LOG.warn( "Failed on attempt {}/{} to execute request to obtain instance health!", attemptNumber, @@ -282,12 +264,8 @@ private void checkInstanceHealth(int attemptNumber) throws InstanceHealthExcepti if (attemptNumber > MAX_INSTANCE_HEALTH_RETRIES) { throw new InstanceHealthException("AWS Describe Instances error!"); } - try { - LOG.info("Waiting 5 seconds to try to get instance status again"); - Thread.sleep(5000); - } catch (InterruptedException interruptedException) { - interruptedException.printStackTrace(); - } + LOG.info("Waiting 5 seconds to try to get instance status again"); + Thread.sleep(5000); checkInstanceHealth(attemptNumber + 1); return; } diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/PublishProjectFeedsJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/PublishProjectFeedsJob.java index edf76e498..312537543 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/PublishProjectFeedsJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/PublishProjectFeedsJob.java @@ -1,23 +1,29 @@ package com.conveyal.datatools.manager.jobs; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.conveyal.datatools.common.status.MonitorableJob; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.FeedVersion; import com.conveyal.datatools.manager.models.Project; -import com.conveyal.datatools.manager.persistence.FeedStore; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; + /** * Publish the latest GTFS files for all public feeds in a project. */ public class PublishProjectFeedsJob extends MonitorableJob { + public static final Logger LOG = LoggerFactory.getLogger(MonitorableJob.class); + private Project project; public PublishProjectFeedsJob(Project project, Auth0UserProfile owner) { @@ -62,7 +68,12 @@ public void jobLogic () { } else { // ensure latest feed is written to the s3 public folder - fs.makePublic(); + try { + fs.makePublic(); + } catch (Exception e) { + status.fail("Failed to make GTFS files public on S3", e); + return; + } url = String.join("/", "https://s3.amazonaws.com", DataManager.feedBucket, fs.toPublicKey()); } FeedVersion latest = fs.retrieveLatest(); @@ -91,14 +102,23 @@ public void jobLogic () { try { FileUtils.writeStringToFile(file, output); } catch (IOException e) { + LOG.error("Failed to write string to file", e); e.printStackTrace(); } - FeedStore.s3Client.putObject(DataManager.feedBucket, folder + fileName, file); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, folder + fileName, CannedAccessControlList.PublicRead); + try { + AmazonS3 defaultS3Client = getDefaultS3Client(); + defaultS3Client.putObject(DataManager.feedBucket, folder + fileName, file); + defaultS3Client.setObjectAcl(DataManager.feedBucket, folder + fileName, CannedAccessControlList.PublicRead); + } catch (Exception e) { + status.fail("Failed to perform S3 actions", e); + return; + } } @Override public void jobFinished() { - status.completeSuccessfully("Public page updated successfully!"); + if (!status.error) { + status.completeSuccessfully("Public page updated successfully!"); + } } } diff --git a/src/main/java/com/conveyal/datatools/manager/jobs/RecreateBuildImageJob.java b/src/main/java/com/conveyal/datatools/manager/jobs/RecreateBuildImageJob.java index 98fdd323e..17c532fa5 100644 --- a/src/main/java/com/conveyal/datatools/manager/jobs/RecreateBuildImageJob.java +++ b/src/main/java/com/conveyal/datatools/manager/jobs/RecreateBuildImageJob.java @@ -1,8 +1,5 @@ package com.conveyal.datatools.manager.jobs; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.services.ec2.AmazonEC2; -import com.amazonaws.services.ec2.model.AmazonEC2Exception; import com.amazonaws.services.ec2.model.CreateImageRequest; import com.amazonaws.services.ec2.model.CreateImageResult; import com.amazonaws.services.ec2.model.DeregisterImageRequest; @@ -11,7 +8,6 @@ import com.amazonaws.services.ec2.model.Image; import com.amazonaws.services.ec2.model.Instance; import com.conveyal.datatools.common.status.MonitorableJob; -import com.conveyal.datatools.common.utils.AWSUtils; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.controllers.api.ServerController; @@ -30,9 +26,9 @@ * and has a separate graph build instance type, the EC2 instance will be terminated. */ public class RecreateBuildImageJob extends MonitorableJob { - private final AmazonEC2 ec2; private final List graphBuildingInstances; private final OtpServer otpServer; + private final DeployJob parentDeployJob; public RecreateBuildImageJob( DeployJob parentDeployJob, @@ -44,36 +40,47 @@ public RecreateBuildImageJob( String.format("Recreating build image for %s", parentDeployJob.getOtpServer().name), JobType.RECREATE_BUILD_IMAGE ); + this.parentDeployJob = parentDeployJob; this.otpServer = parentDeployJob.getOtpServer(); this.graphBuildingInstances = graphBuildingInstances; - AWSCredentialsProvider credentials = AWSUtils.getCredentialsForRole( - otpServer.role, - "recreate-build-image" - ); - ec2 = parentDeployJob.getCustomRegion() == null - ? AWSUtils.getEC2ClientForCredentials(credentials) - : AWSUtils.getEC2ClientForCredentials(credentials, parentDeployJob.getCustomRegion()); } @Override - public void jobLogic() throws Exception { + public void jobLogic() { status.update("Creating build image", 5); // Create a new image of this instance. CreateImageRequest createImageRequest = new CreateImageRequest() .withInstanceId(graphBuildingInstances.get(0).getInstanceId()) .withName(otpServer.ec2Info.buildImageName) .withDescription(otpServer.ec2Info.buildImageDescription); - CreateImageResult createImageResult = ec2.createImage(createImageRequest); - // Wait for image creation to complete (it can take a few minutes) + CreateImageResult createImageResult = null; + try { + createImageResult = parentDeployJob + .getEC2ClientForDeployJob() + .createImage(createImageRequest); + } catch (Exception e) { + status.fail("Failed to make a request to create a new image!", e); + return; + } + + // Wait for the image to be created (it can take a few minutes). Also, make sure the parent DeployJob hasn't + // failed this job already. + TimeTracker imageCreationTracker = new TimeTracker(1, TimeUnit.HOURS); String createdImageId = createImageResult.getImageId(); status.update("Waiting for graph build image to be created...", 25); boolean imageCreated = false; DescribeImagesRequest describeImagesRequest = new DescribeImagesRequest() .withImageIds(createdImageId); - // wait for the image to be created. Also, make sure the parent DeployJob hasn't failed this job already. - TimeTracker imageCreationTracker = new TimeTracker(1, TimeUnit.HOURS); while (!imageCreated && !status.error) { - DescribeImagesResult describeImagesResult = ec2.describeImages(describeImagesRequest); + DescribeImagesResult describeImagesResult = null; + try { + describeImagesResult = parentDeployJob + .getEC2ClientForDeployJob() + .describeImages(describeImagesRequest); + } catch (Exception e) { + terminateInstanceAndFailWithMessage("Failed to make request to get image creation status!", e); + return; + } for (Image image : describeImagesResult.getImages()) { if (image.getImageId().equals(createdImageId)) { // obtain the image state. @@ -86,16 +93,6 @@ public void jobLogic() throws Exception { ); return; } - // wait 2.5 seconds before making next request - try { - Thread.sleep(2500); - } catch (InterruptedException e) { - terminateInstanceAndFailWithMessage( - "Failed while waiting for graph build image creation to complete!", - e - ); - return; - } } else if (imageState.equals("available")) { // success! Set imageCreated to true. imageCreated = true; @@ -108,6 +105,16 @@ public void jobLogic() throws Exception { } } } + // wait 2.5 seconds before making next request + try { + Thread.sleep(2500); + } catch (InterruptedException e) { + terminateInstanceAndFailWithMessage( + "Failed while waiting for graph build image creation to complete!", + e + ); + return; + } } // If the parent DeployJob has already failed this job, exit immediately. if (status.error) return; @@ -122,7 +129,12 @@ public void jobLogic() throws Exception { status.message = "Deregistering old build image"; DeregisterImageRequest deregisterImageRequest = new DeregisterImageRequest() .withImageId(graphBuildAmiId); - ec2.deregisterImage(deregisterImageRequest); + try { + parentDeployJob.getEC2ClientForDeployJob().deregisterImage(deregisterImageRequest); + } catch (Exception e) { + terminateInstanceAndFailWithMessage("Failed to deregister previous graph building image!", e); + return; + } } status.update("Updating Server build AMI info", 80); // Update OTP Server info @@ -134,12 +146,13 @@ public void jobLogic() throws Exception { if (otpServer.ec2Info.hasSeparateGraphBuildConfig()) { status.message = "Terminating graph building instance"; try { - ServerController.terminateInstances(ec2, graphBuildingInstances); - } catch (AmazonEC2Exception e) { + ServerController.terminateInstances(parentDeployJob.getEC2ClientForDeployJob(), graphBuildingInstances); + } catch (Exception e) { status.fail( "Graph build image successfully created, but failed to terminate graph building instance!", e ); + return; } } status.completeSuccessfully("Graph build image successfully created!"); @@ -153,7 +166,15 @@ private void terminateInstanceAndFailWithMessage(String message) { * Terminates the graph building instance and fails with the given message and Exception. */ private void terminateInstanceAndFailWithMessage(String message, Exception e) { - ServerController.terminateInstances(ec2, graphBuildingInstances); + try { + ServerController.terminateInstances(parentDeployJob.getEC2ClientForDeployJob(), graphBuildingInstances); + } catch (Exception terminationException) { + status.fail( + String.format("%s Also, the graph building instance failed to terminate!", message), + terminationException + ); + return; + } status.fail(message, e); } } diff --git a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java index b18c61e04..73a13d343 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/Deployment.java +++ b/src/main/java/com/conveyal/datatools/manager/models/Deployment.java @@ -1,7 +1,7 @@ package com.conveyal.datatools.manager.models; import com.amazonaws.services.ec2.model.Filter; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.controllers.api.DeploymentController; import com.conveyal.datatools.manager.jobs.DeployJob; @@ -48,6 +48,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.conveyal.datatools.common.utils.AWSUtils.getEC2Client; import static com.conveyal.datatools.manager.models.FeedVersion.feedStore; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; @@ -125,7 +126,7 @@ public List retrieveFeedVersions() { } /** Fetch ec2 instances tagged with this deployment's ID. */ - public List retrieveEC2Instances() { + public List retrieveEC2Instances() throws CheckedAWSException { if (!"true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.enabled"))) return Collections.EMPTY_LIST; Filter deploymentFilter = new Filter("tag:deploymentId", Collections.singletonList(id)); // Check if the latest deployment used alternative credentials/AWS role. @@ -141,7 +142,7 @@ public List retrieveEC2Instances() { } } return DeploymentController.fetchEC2InstanceSummaries( - AWSUtils.getEC2ClientForRole(role, region), + getEC2Client(role, region), deploymentFilter ); } diff --git a/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java b/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java index b3f425b46..24f374cd6 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java +++ b/src/main/java/com/conveyal/datatools/manager/models/FeedSource.java @@ -1,16 +1,17 @@ package com.conveyal.datatools.manager.models; +import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.ObjectMetadata; import com.conveyal.datatools.common.status.MonitorableJob; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.jobs.NotifyUsersForSubscriptionJob; import com.conveyal.datatools.manager.models.transform.FeedTransformRules; import com.conveyal.datatools.manager.models.transform.FeedTransformation; import com.conveyal.datatools.manager.persistence.FeedStore; import com.conveyal.datatools.manager.persistence.Persistence; -import com.conveyal.datatools.manager.utils.HashUtils; import com.conveyal.gtfs.GTFS; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -33,9 +34,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.stream.Collectors; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.manager.utils.StringUtils.getCleanName; import static com.mongodb.client.model.Filters.and; import static com.mongodb.client.model.Filters.eq; @@ -458,7 +459,10 @@ public String toPublicKey() { return "public/" + getCleanName(this.name) + ".zip"; } - public void makePublic() { + /** + * Makes the feed source's latest version have public access on AWS S3. + */ + public void makePublic() throws CheckedAWSException { String sourceKey = FeedStore.s3Prefix + this.id + ".zip"; String publicKey = toPublicKey(); String versionId = this.latestVersionId(); @@ -467,44 +471,49 @@ public void makePublic() { // only deploy to public if storing feeds on s3 (no mechanism for downloading/publishing // them otherwise) if (DataManager.useS3) { - boolean sourceExists = FeedStore.s3Client.doesObjectExist(DataManager.feedBucket, sourceKey); + AmazonS3 defaultS3Client = getDefaultS3Client(); + boolean sourceExists = defaultS3Client.doesObjectExist(DataManager.feedBucket, sourceKey); ObjectMetadata sourceMetadata = sourceExists - ? FeedStore.s3Client.getObjectMetadata(DataManager.feedBucket, sourceKey) + ? defaultS3Client.getObjectMetadata(DataManager.feedBucket, sourceKey) : null; - boolean latestExists = FeedStore.s3Client.doesObjectExist(DataManager.feedBucket, latestVersionKey); + boolean latestExists = defaultS3Client.doesObjectExist(DataManager.feedBucket, latestVersionKey); ObjectMetadata latestVersionMetadata = latestExists - ? FeedStore.s3Client.getObjectMetadata(DataManager.feedBucket, latestVersionKey) + ? defaultS3Client.getObjectMetadata(DataManager.feedBucket, latestVersionKey) : null; boolean latestVersionMatchesSource = sourceMetadata != null && latestVersionMetadata != null && sourceMetadata.getETag().equals(latestVersionMetadata.getETag()); if (sourceExists && latestVersionMatchesSource) { LOG.info("copying feed {} to s3 public folder", this); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, sourceKey, CannedAccessControlList.PublicRead); - FeedStore.s3Client.copyObject(DataManager.feedBucket, sourceKey, DataManager.feedBucket, publicKey); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, publicKey, CannedAccessControlList.PublicRead); + defaultS3Client.setObjectAcl(DataManager.feedBucket, sourceKey, CannedAccessControlList.PublicRead); + defaultS3Client.copyObject(DataManager.feedBucket, sourceKey, DataManager.feedBucket, publicKey); + defaultS3Client.setObjectAcl(DataManager.feedBucket, publicKey, CannedAccessControlList.PublicRead); } else { LOG.warn("Latest feed source {} on s3 at {} does not exist or does not match latest version. Using latest version instead.", this, sourceKey); - if (FeedStore.s3Client.doesObjectExist(DataManager.feedBucket, latestVersionKey)) { + if (defaultS3Client.doesObjectExist(DataManager.feedBucket, latestVersionKey)) { LOG.info("copying feed version {} to s3 public folder", versionId); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, latestVersionKey, CannedAccessControlList.PublicRead); - FeedStore.s3Client.copyObject(DataManager.feedBucket, latestVersionKey, DataManager.feedBucket, publicKey); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, publicKey, CannedAccessControlList.PublicRead); + defaultS3Client.setObjectAcl(DataManager.feedBucket, latestVersionKey, CannedAccessControlList.PublicRead); + defaultS3Client.copyObject(DataManager.feedBucket, latestVersionKey, DataManager.feedBucket, publicKey); + defaultS3Client.setObjectAcl(DataManager.feedBucket, publicKey, CannedAccessControlList.PublicRead); // also copy latest version to feedStore latest - FeedStore.s3Client.copyObject(DataManager.feedBucket, latestVersionKey, DataManager.feedBucket, sourceKey); + defaultS3Client.copyObject(DataManager.feedBucket, latestVersionKey, DataManager.feedBucket, sourceKey); } } } } - public void makePrivate() { + /** + * Makes the feed source's latest version have private access on AWS S3. + */ + public void makePrivate() throws CheckedAWSException { String sourceKey = FeedStore.s3Prefix + this.id + ".zip"; String publicKey = toPublicKey(); - if (FeedStore.s3Client.doesObjectExist(DataManager.feedBucket, sourceKey)) { + AmazonS3 defaultS3Client = getDefaultS3Client(); + if (defaultS3Client.doesObjectExist(DataManager.feedBucket, sourceKey)) { LOG.info("removing feed {} from s3 public folder", this); - FeedStore.s3Client.setObjectAcl(DataManager.feedBucket, sourceKey, CannedAccessControlList.AuthenticatedRead); - FeedStore.s3Client.deleteObject(DataManager.feedBucket, publicKey); + defaultS3Client.setObjectAcl(DataManager.feedBucket, sourceKey, CannedAccessControlList.AuthenticatedRead); + defaultS3Client.deleteObject(DataManager.feedBucket, publicKey); } } @@ -553,9 +562,10 @@ public void delete() { } // Delete latest copy of feed source on S3. if (DataManager.useS3) { + AmazonS3 defaultS3Client = getDefaultS3Client(); DeleteObjectsRequest delete = new DeleteObjectsRequest(DataManager.feedBucket); delete.withKeys("public/" + this.name + ".zip", FeedStore.s3Prefix + this.id + ".zip"); - FeedStore.s3Client.deleteObjects(delete); + defaultS3Client.deleteObjects(delete); } // Remove all external properties for this feed source. Persistence.externalFeedSourceProperties.removeFiltered(eq("feedSourceId", this.id)); diff --git a/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java b/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java index 77e84b64a..952174a5b 100644 --- a/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java +++ b/src/main/java/com/conveyal/datatools/manager/models/OtpServer.java @@ -2,7 +2,7 @@ import com.amazonaws.services.ec2.model.Filter; import com.amazonaws.services.ec2.model.Instance; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.controllers.api.DeploymentController; import com.conveyal.datatools.manager.persistence.Persistence; @@ -12,6 +12,8 @@ import java.util.Collections; import java.util.List; +import static com.conveyal.datatools.common.utils.AWSUtils.getEC2Client; + /** * An OtpServer represents a deployment target for deploying transit and OSM data to. This can take the shape of a number * of things: @@ -55,26 +57,20 @@ public OtpServer () {} /** The EC2 instances that are associated with this serverId. */ @JsonProperty("ec2Instances") - public List retrieveEC2InstanceSummaries() { + public List retrieveEC2InstanceSummaries() throws CheckedAWSException { // Prevent calling EC2 method on servers that do not have EC2 info defined because this is a JSON property. if (ec2Info == null) return Collections.EMPTY_LIST; Filter serverFilter = new Filter("tag:serverId", Collections.singletonList(id)); - return DeploymentController.fetchEC2InstanceSummaries( - AWSUtils.getEC2ClientForRole(this.role, ec2Info.region), - serverFilter - ); + return DeploymentController.fetchEC2InstanceSummaries(getEC2Client(this), serverFilter); } - public List retrieveEC2Instances() { + public List retrieveEC2Instances() throws CheckedAWSException { if ( !"true".equals(DataManager.getConfigPropertyAsText("modules.deployment.ec2.enabled")) || ec2Info == null ) return Collections.EMPTY_LIST; Filter serverFilter = new Filter("tag:serverId", Collections.singletonList(id)); - return DeploymentController.fetchEC2Instances( - AWSUtils.getEC2ClientForRole(this.role, ec2Info.region), - serverFilter - ); + return DeploymentController.fetchEC2Instances(getEC2Client(this), serverFilter); } @JsonProperty("organizationId") diff --git a/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java b/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java index 8ef19f471..d998956ea 100644 --- a/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java +++ b/src/main/java/com/conveyal/datatools/manager/persistence/FeedStore.java @@ -2,10 +2,6 @@ import com.amazonaws.AmazonClientException; import com.amazonaws.AmazonServiceException; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.auth.profile.ProfileCredentialsProvider; -import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.CopyObjectRequest; import com.amazonaws.services.s3.model.GetObjectRequest; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -13,7 +9,7 @@ import com.amazonaws.services.s3.transfer.TransferManager; import com.amazonaws.services.s3.transfer.TransferManagerBuilder; import com.amazonaws.services.s3.transfer.Upload; -import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.DataManager; import com.conveyal.datatools.manager.models.FeedSource; import com.google.common.io.ByteStreams; @@ -27,9 +23,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; -import java.nio.file.Files; -import java.nio.file.Path; +import static com.conveyal.datatools.common.utils.AWSUtils.getDefaultS3Client; import static com.conveyal.datatools.manager.DataManager.hasConfigProperty; /** @@ -49,10 +44,15 @@ public class FeedStore { public static final String s3Prefix = "gtfs/"; - // FIXME: this should not be static most likely - public static AmazonS3 s3Client; - /** An AWS credentials file to use when uploading to S3 */ - private static final String S3_CREDENTIALS_FILENAME = DataManager.getConfigPropertyAsText("application.data.s3_credentials_file"); + static { + // s3 storage + if (DataManager.useS3 || hasConfigProperty("modules.gtfsapi.use_extension")){ + s3Bucket = DataManager.getConfigPropertyAsText("application.data.gtfs_s3_bucket"); + if (s3Bucket == null) { + throw new IllegalArgumentException("Fatal error initializing s3Bucket or s3Client"); + } + } + } public FeedStore() { this(null); @@ -70,25 +70,6 @@ public FeedStore(String subdir) { path = getPath(pathString); } - static { - // s3 storage - if (DataManager.useS3 || hasConfigProperty("modules.gtfsapi.use_extension")){ - s3Bucket = DataManager.getConfigPropertyAsText("application.data.gtfs_s3_bucket"); - try { - // If region configuration string is provided, use that. - // Otherwise defaults to value provided in ~/.aws/config - String region = DataManager.getConfigPropertyAsText("application.data.s3_region"); - s3Client = AWSUtils.getS3ClientForCredentials(getAWSCreds(), region); - } catch (Exception e) { - LOG.error("S3 client not initialized correctly. Must provide config property application.data.s3_region or specify region in ~/.aws/config", e); - } - // TODO: check for this?? - if (s3Client == null || s3Bucket == null) { - throw new IllegalArgumentException("Fatal error initializing s3Bucket or s3Client"); - } - } - } - private static File getPath (String pathString) { File path = new File(pathString); if (!path.exists() || !path.isDirectory()) { @@ -98,10 +79,10 @@ private static File getPath (String pathString) { return path; } - public void deleteFeed (String id) { + public void deleteFeed (String id) throws CheckedAWSException { // If the application is using s3 storage, delete the remote copy. if (DataManager.useS3){ - s3Client.deleteObject(s3Bucket, getS3Key(id)); + getDefaultS3Client().deleteObject(s3Bucket, getS3Key(id)); } // Always delete local copy (whether storing exclusively on local disk or using s3). File feed = getLocalFeed(id); @@ -111,15 +92,6 @@ public void deleteFeed (String id) { } } - public static AWSCredentialsProvider getAWSCreds () { - if (S3_CREDENTIALS_FILENAME != null) { - return new ProfileCredentialsProvider(S3_CREDENTIALS_FILENAME, "default"); - } else { - // default credentials providers, e.g. IAM role - return new DefaultAWSCredentialsProviderChain(); - } - } - private static String getS3Key (String id) { return s3Prefix + id; } @@ -145,11 +117,11 @@ public File getFeed (String id) { LOG.info("Downloading feed from {}", uri); InputStream objectData; try { - S3Object object = s3Client.getObject( + S3Object object = getDefaultS3Client().getObject( new GetObjectRequest(s3Bucket, key)); objectData = object.getObjectContent(); - } catch (AmazonServiceException ase) { - LOG.error("Error downloading " + uri, ase); + } catch (AmazonServiceException | CheckedAWSException e) { + LOG.error("Error downloading " + uri, e); return null; } @@ -227,7 +199,7 @@ public boolean uploadToS3 (File gtfsFile, String s3FileName, FeedSource feedSour if (s3Bucket != null) { try { LOG.info("Uploading feed {} to S3 from {}", s3FileName, gtfsFile.getAbsolutePath()); - TransferManager tm = TransferManagerBuilder.standard().withS3Client(s3Client).build(); + TransferManager tm = TransferManagerBuilder.standard().withS3Client(getDefaultS3Client()).build(); PutObjectRequest request = new PutObjectRequest(s3Bucket, getS3Key(s3FileName), gtfsFile); // Subscribe to the event and provide event handler. TLongList transferredBytes = new TLongArrayList(); @@ -267,10 +239,10 @@ public boolean uploadToS3 (File gtfsFile, String s3FileName, FeedSource feedSour String copyKey = s3Prefix + feedSource.id + ".zip"; CopyObjectRequest copyObjRequest = new CopyObjectRequest( s3Bucket, getS3Key(s3FileName), s3Bucket, copyKey); - s3Client.copyObject(copyObjRequest); + getDefaultS3Client().copyObject(copyObjRequest); } return true; - } catch (AmazonServiceException e) { + } catch (AmazonServiceException | CheckedAWSException e) { LOG.error("Error uploading feed to S3", e); return false; } diff --git a/src/test/java/com/conveyal/datatools/manager/jobs/DeployJobTest.java b/src/test/java/com/conveyal/datatools/manager/jobs/DeployJobTest.java index fa52a58fe..d3070969b 100644 --- a/src/test/java/com/conveyal/datatools/manager/jobs/DeployJobTest.java +++ b/src/test/java/com/conveyal/datatools/manager/jobs/DeployJobTest.java @@ -1,9 +1,11 @@ package com.conveyal.datatools.manager.jobs; +import com.amazonaws.AmazonServiceException; import com.amazonaws.services.ec2.model.Instance; import com.conveyal.datatools.DatatoolsTest; import com.conveyal.datatools.UnitTest; import com.conveyal.datatools.common.utils.AWSUtils; +import com.conveyal.datatools.common.utils.CheckedAWSException; import com.conveyal.datatools.manager.auth.Auth0UserProfile; import com.conveyal.datatools.manager.models.Deployment; import com.conveyal.datatools.manager.models.EC2Info; @@ -169,12 +171,12 @@ public void canDeployFromPrebuiltGraph () { * RUN_AWS_DEPLOY_JOB_TESTS environment variable is set to "true" */ @AfterClass - public static void cleanUp() { + public static void cleanUp() throws AmazonServiceException, CheckedAWSException { assumeTrue(getBooleanEnvVar("RUN_AWS_DEPLOY_JOB_TESTS")); List instances = server.retrieveEC2Instances(); List ids = getIds(instances); terminateInstances( - AWSUtils.getEC2ClientForRole(server.role, server.ec2Info == null ? null : server.ec2Info.region), + AWSUtils.getEC2Client(server.role, server.ec2Info == null ? null : server.ec2Info.region), ids ); }