Skip to content

Commit

Permalink
Merge pull request #288 from ibi-group/mtc-20200310
Browse files Browse the repository at this point in the history
Mtc 20200310
  • Loading branch information
landonreed authored Jul 15, 2020
2 parents 4922ce5 + eff5b60 commit d2df2bb
Show file tree
Hide file tree
Showing 53 changed files with 1,706 additions and 548 deletions.
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@
<scope>test</scope>
</dependency>

<!-- Used for loading/fetching/writing GTFS entities. gtfs-lib also provides access to:
<!-- Used for loading/fetching/validating/writing GTFS entities. gtfs-lib also provides access to:
- commons-io - generic utilities
- AWS S3 SDK - putting/getting objects into/out of S3.
-->
Expand All @@ -266,7 +266,7 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.5.0</version>
<version>3.6.4</version>
</dependency>

<!-- Miscellaneous utilities -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public abstract class MonitorableJob implements Runnable, Serializable {

public enum JobType {
UNKNOWN_TYPE,
ARBITRARY_FEED_TRANSFORM,
BUILD_TRANSPORT_NETWORK,
CREATE_FEEDVERSION_FROM_SNAPSHOT,
// **** Legacy snapshot jobs
Expand Down Expand Up @@ -216,6 +217,11 @@ public void addNextJob(MonitorableJob ...jobs) {
}
}

/** Convenience wrapper for a {@link List} of jobs. */
public void addNextJob(List<MonitorableJob> jobs) {
for (MonitorableJob job : jobs) addNextJob(job);
}

/**
* Represents the current status of this job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ private static String createSnapshot (Request req, Response res) throws IOExcept
req.queryParamOrDefault("publishNewVersion", Boolean.FALSE.toString())
);
FeedSource feedSource = FeedVersionController.requestFeedSourceById(req, Actions.EDIT, "feedId");
// Take fields from request body for creating snapshot.
// Take fields from request body for creating snapshot (i.e., feedId/feedSourceId, name, comment).
Snapshot snapshot = json.read(req.body());
// Ensure feed source ID and snapshotOf namespace is correct
snapshot.feedSourceId = feedSource.id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.conveyal.datatools.manager.DataManager;
import com.conveyal.datatools.manager.auth.Auth0UserProfile;
import com.conveyal.datatools.manager.models.FeedSource;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.conveyal.datatools.manager.models.Snapshot;
import com.conveyal.datatools.manager.persistence.Persistence;
import com.conveyal.gtfs.loader.FeedLoadResult;
Expand Down Expand Up @@ -56,9 +57,17 @@
*/
public class CreateSnapshotJob extends MonitorableJob {
private static final Logger LOG = LoggerFactory.getLogger(CreateSnapshotJob.class);
private final String namespace;
/** The namespace to snapshot. (Note: namespace resulting from snapshot can be found at {@link Snapshot#namespace} */
private String namespace;
/** Whether to update working buffer for the feed source to the newly created snapshot namespace. */
private final boolean updateBuffer;
/** Whether to persist the snapshot in the Snapshots collection. */
private final boolean storeSnapshot;
/**
* Whether to preserve the existing editor buffer as its own snapshot. This is essentially a shorthand for creating
* a snapshot and then separately loading something new into the buffer (if used with updateBuffer). It can also be
* thought of as an autosave.
*/
private final boolean preserveBuffer;
private Snapshot snapshot;
private FeedSource feedSource;
Expand All @@ -73,13 +82,29 @@ public CreateSnapshotJob(Auth0UserProfile owner, Snapshot snapshot, boolean upda
status.update( "Initializing...", 0);
}

public CreateSnapshotJob(Auth0UserProfile owner, Snapshot snapshot) {
super(owner, "Creating snapshot for " + snapshot.feedSourceId, JobType.CREATE_SNAPSHOT);
this.snapshot = snapshot;
this.updateBuffer = false;
this.storeSnapshot = true;
this.preserveBuffer = false;
status.update( "Initializing...", 0);
}

@JsonProperty
public String getFeedSourceId () {
return snapshot.feedSourceId;
}

@Override
public void jobLogic() {
// Special case where snapshot was created when a feed version was transformed by DbTransformations (the
// snapshot contains the transformed feed). Because the jobs are queued up before the feed has been processed,
// the namespace will not exist for the feed version until this jobLogic is actually run.
if (namespace == null && snapshot.feedVersionId != null) {
FeedVersion feedVersion = Persistence.feedVersions.getById(snapshot.feedVersionId);
this.namespace = feedVersion.namespace;
}
// Get count of snapshots to set new version number.
feedSource = Persistence.feedSources.getById(snapshot.feedSourceId);
// Update job name to use feed source name (rather than ID).
Expand Down Expand Up @@ -108,8 +133,9 @@ public void jobFinished () {
if (preserveBuffer) {
// Preserve the existing buffer as a snapshot if requested. This is essentially a shorthand for creating
// a snapshot and then separately loading something new into the buffer. It can be thought of as an
// autosave. FIXME: the buffer would still exist even if not "preserved" here. Should it be deleted if
// requester opts to not preserve it?
// autosave.
// FIXME: the buffer would still exist even if not "preserved" here. Should it be deleted if
// requester opts to not preserve it?
if (feedSource.editorNamespace == null) {
LOG.error("Cannot preserve snapshot with null namespace for feed source {}", feedSource.id);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@
import java.io.FileInputStream;
import java.io.IOException;

/**
* This job will export a database snapshot (i.e., namespace) to a GTFS file. If a feed version is supplied in the
* constructor, it will assume that the GTFS file is intended for ingestion into Data Tools as a new feed version.
*/
public class ExportSnapshotToGTFSJob extends MonitorableJob {

private static final Logger LOG = LoggerFactory.getLogger(ExportSnapshotToGTFSJob.class);
private final Snapshot snapshot;
private final String feedVersionId;
private final FeedVersion feedVersion;
private File tempFile;

public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot, String feedVersionId) {
public ExportSnapshotToGTFSJob(Auth0UserProfile owner, Snapshot snapshot, FeedVersion feedVersion) {
super(owner, "Exporting snapshot " + snapshot.name, JobType.EXPORT_SNAPSHOT_TO_GTFS);
this.snapshot = snapshot;
this.feedVersionId = feedVersionId;
this.feedVersion = feedVersion;
status.update("Starting database snapshot...", 10);
}

Expand All @@ -41,6 +45,9 @@ public Snapshot getSnapshot () {

@Override
public void jobLogic() {
// Determine if storing/publishing new feed version for snapshot. If not, all we're doing is writing the
// snapshot to a GTFS file.
boolean isNewVersion = feedVersion != null;
try {
tempFile = File.createTempFile("snapshot", "zip");
} catch (IOException e) {
Expand All @@ -56,18 +63,19 @@ public void jobLogic() {
}

// Override snapshot ID if exporting feed for use as new feed version.
String filename = feedVersionId != null ? feedVersionId : snapshot.id + ".zip";
String bucketPrefix = feedVersionId != null ? "gtfs" : "snapshots";
String filename = isNewVersion ? feedVersion.id : snapshot.id + ".zip";
String bucketPrefix = isNewVersion ? "gtfs" : "snapshots";
// FIXME: replace with use of refactored FeedStore.
// Store the project merged zip locally or on s3
// Store the GTFS zip locally or on s3.
status.update("Writing snapshot to GTFS file", 90);
if (DataManager.useS3) {
String s3Key = String.format("%s/%s", bucketPrefix, filename);
FeedStore.s3Client.putObject(DataManager.feedBucket, s3Key, tempFile);
LOG.info("Storing snapshot GTFS at s3://{}/{}", DataManager.feedBucket, s3Key);
} else {
try {
FeedVersion.feedStore.newFeed(filename, new FileInputStream(tempFile), null);
File gtfsFile = FeedVersion.feedStore.newFeed(filename, new FileInputStream(tempFile), null);
if (isNewVersion) feedVersion.assignGtfsFileAttributes(gtfsFile);
} catch (IOException e) {
status.fail(String.format("Could not store feed for snapshot %s", snapshot.id), e);
}
Expand All @@ -80,7 +88,10 @@ public void jobFinished () {
// Delete snapshot temp file.
if (tempFile != null) {
LOG.info("Deleting temporary GTFS file for exported snapshot at {}", tempFile.getAbsolutePath());
tempFile.delete();
boolean deleted = tempFile.delete();
if (!deleted) {
LOG.warn("Temp file {} not deleted. This may contribute to storage space shortages.", tempFile.getAbsolutePath());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.conveyal.datatools.manager.jobs.ValidateFeedJob;
import com.conveyal.datatools.manager.models.Deployment;
import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
import com.conveyal.datatools.manager.models.FeedSource;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.conveyal.datatools.manager.models.JsonViews;
Expand Down Expand Up @@ -286,13 +287,13 @@ private static void loadLegacyFeedSource (JsonNode node) {
feedSource.name = name;
switch(node.findValue("retrievalMethod").asText()) {
case "FETCHED_AUTOMATICALLY":
feedSource.retrievalMethod = FeedSource.FeedRetrievalMethod.FETCHED_AUTOMATICALLY;
feedSource.retrievalMethod = FeedRetrievalMethod.FETCHED_AUTOMATICALLY;
break;
case "MANUALLY_UPLOADED":
feedSource.retrievalMethod = FeedSource.FeedRetrievalMethod.MANUALLY_UPLOADED;
feedSource.retrievalMethod = FeedRetrievalMethod.MANUALLY_UPLOADED;
break;
case "PRODUCED_IN_HOUSE":
feedSource.retrievalMethod = FeedSource.FeedRetrievalMethod.PRODUCED_IN_HOUSE;
feedSource.retrievalMethod = FeedRetrievalMethod.PRODUCED_IN_HOUSE;
break;
}
feedSource.snapshotVersion = node.findValue("snapshotVersion").asText();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.conveyal.datatools.manager.jobs.FetchSingleFeedJob;
import com.conveyal.datatools.manager.jobs.NotifyUsersForSubscriptionJob;
import com.conveyal.datatools.manager.models.ExternalFeedSourceProperty;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
import com.conveyal.datatools.manager.models.FeedSource;
import com.conveyal.datatools.manager.models.JsonViews;
import com.conveyal.datatools.manager.models.Project;
Expand All @@ -23,8 +24,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage;
import static com.conveyal.datatools.common.utils.SparkUtils.getPOJOFromRequestBody;
Expand Down Expand Up @@ -86,6 +91,7 @@ private static Collection<FeedSource> getProjectFeedSources(Request req, Respons
private static FeedSource createFeedSource(Request req, Response res) throws IOException {
Auth0UserProfile userProfile = req.attribute("user");
FeedSource newFeedSource = getPOJOFromRequestBody(req, FeedSource.class);
validate(req, newFeedSource);
boolean allowedToCreateFeedSource = userProfile.canAdministerProject(newFeedSource.projectId);
if (!allowedToCreateFeedSource) {
logMessageAndHalt(req, 403, "User not allowed to create feed source");
Expand Down Expand Up @@ -113,6 +119,35 @@ private static FeedSource createFeedSource(Request req, Response res) throws IOE
}
}

/**
* Check that updated or new feedSource object is valid. This method should be called before a feedSource is
* persisted to the database.
* TODO: Determine if other checks ought to be applied here.
*/
private static boolean validate(Request req, FeedSource feedSource) {
if (feedSource.name == null || feedSource.name.isEmpty()) {
logMessageAndHalt(req, HttpStatus.BAD_REQUEST_400, "Feed source name must be provided");
return false;
}
// Collect all retrieval methods found in tranform rules into a list.
List<FeedRetrievalMethod> retrievalMethods = feedSource.transformRules.stream()
.map(rule -> rule.retrievalMethods)
.flatMap(Collection::stream)
.collect(Collectors.toList());
Set<FeedRetrievalMethod> retrievalMethodSet = new HashSet<>(retrievalMethods);
if (retrievalMethods.size() > retrievalMethodSet.size()) {
// Explicitly check that the list of retrieval methods is not larger than the set (i.e., that there are no
// duplicates).
logMessageAndHalt(
req,
HttpStatus.BAD_REQUEST_400,
"Retrieval methods cannot be defined more than once in transformation rules."
);
return false;
}
return true;
}

/**
* Spark HTTP endpoint to update a feed source. Note: at one point this endpoint accepted a JSON object
* representing a single field to update for the feed source, but it now requires that the JSON body represent all
Expand All @@ -123,6 +158,7 @@ private static FeedSource updateFeedSource(Request req, Response res) throws IOE
String feedSourceId = req.params("id");
FeedSource formerFeedSource = requestFeedSourceById(req, Actions.MANAGE);
FeedSource updatedFeedSource = getPOJOFromRequestBody(req, FeedSource.class);
validate(req, updatedFeedSource);
// Feed source previously had a URL, but it has been changed. In this case, we reset the last fetched timestamp.
if (formerFeedSource.url != null && !formerFeedSource.url.equals(updatedFeedSource.url)) {
LOG.info("Feed source fetch URL has been modified. Resetting lastFetched value from {} to {}", formerFeedSource.lastFetched, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import com.conveyal.datatools.manager.jobs.MergeFeedsType;
import com.conveyal.datatools.manager.jobs.ProcessSingleFeedJob;
import com.conveyal.datatools.manager.models.FeedDownloadToken;
import com.conveyal.datatools.manager.models.FeedRetrievalMethod;
import com.conveyal.datatools.manager.models.FeedSource;
import com.conveyal.datatools.manager.models.FeedVersion;
import com.conveyal.datatools.manager.models.JsonViews;
import com.conveyal.datatools.manager.models.Snapshot;
import com.conveyal.datatools.manager.persistence.FeedStore;
import com.conveyal.datatools.manager.persistence.Persistence;
import com.conveyal.datatools.manager.utils.HashUtils;
import com.conveyal.datatools.manager.utils.json.JsonManager;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -98,29 +98,18 @@ private static String createFeedVersionViaUpload(Request req, Response res) {
Auth0UserProfile userProfile = req.attribute("user");
FeedSource feedSource = requestFeedSourceById(req, Actions.MANAGE);
FeedVersion latestVersion = feedSource.retrieveLatest();
FeedVersion newFeedVersion = new FeedVersion(feedSource);
newFeedVersion.retrievalMethod = FeedSource.FeedRetrievalMethod.MANUALLY_UPLOADED;


// FIXME: Make the creation of new GTFS files generic to handle other feed creation methods, including fetching
// by URL and loading from the editor.
File newGtfsFile = new File(DataManager.getConfigPropertyAsText("application.data.gtfs"), newFeedVersion.id);
FeedVersion newFeedVersion = new FeedVersion(feedSource, FeedRetrievalMethod.MANUALLY_UPLOADED);
// Get path to GTFS file for storage.
File newGtfsFile = FeedVersion.feedStore.getFeedFile(newFeedVersion.id);
copyRequestStreamIntoFile(req, newGtfsFile);
// Set last modified based on value of query param. This is determined/supplied by the client
// request because this data gets lost in the uploadStream otherwise.
Long lastModified = req.queryParams("lastModified") != null
? Long.valueOf(req.queryParams("lastModified"))
: null;
if (lastModified != null) {
newGtfsFile.setLastModified(lastModified);
newFeedVersion.fileTimestamp = lastModified;
}
LOG.info("Last modified: {}", new Date(newGtfsFile.lastModified()));
newFeedVersion.assignGtfsFileAttributes(newGtfsFile, lastModified);

// TODO: fix FeedVersion.hash() call when called in this context. Nothing gets hashed because the file has not been saved yet.
// newFeedVersion.hash();
newFeedVersion.fileSize = newGtfsFile.length();
newFeedVersion.hash = HashUtils.hashFile(newGtfsFile);
LOG.info("Last modified: {}", new Date(newGtfsFile.lastModified()));

// Check that the hashes of the feeds don't match, i.e. that the feed has changed since the last version.
// (as long as there is a latest version, i.e. the feed source is not completely new)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import com.conveyal.datatools.manager.models.FeedVersion;
import com.conveyal.datatools.manager.persistence.FeedStore;
import com.conveyal.datatools.manager.persistence.Persistence;
import com.conveyal.datatools.manager.utils.HashUtils;
import com.conveyal.datatools.manager.utils.json.JsonUtil;
import com.fasterxml.jackson.databind.JsonNode;
import org.eclipse.jetty.http.HttpStatus;
Expand All @@ -33,6 +32,7 @@
import static com.conveyal.datatools.common.utils.SparkUtils.formatJobMessage;
import static com.conveyal.datatools.common.utils.SparkUtils.copyRequestStreamIntoFile;
import static com.conveyal.datatools.common.utils.SparkUtils.logMessageAndHalt;
import static com.conveyal.datatools.manager.models.FeedRetrievalMethod.PRODUCED_IN_HOUSE_GTFS_PLUS;
import static spark.Spark.delete;
import static spark.Spark.get;
import static spark.Spark.post;
Expand Down Expand Up @@ -61,7 +61,7 @@ public class GtfsPlusController {
*/
private static Boolean uploadGtfsPlusFile (Request req, Response res) {
String feedVersionId = req.params("versionid");
File newGtfsFile = new File(gtfsPlusStore.getPathToFeed(feedVersionId));
File newGtfsFile = gtfsPlusStore.getFeedFile(feedVersionId);
copyRequestStreamIntoFile(req, newGtfsFile);
return true;
}
Expand Down Expand Up @@ -222,8 +222,8 @@ private static String publishGtfsPlusFile(Request req, Response res) {
} catch (IOException e) {
logMessageAndHalt(req, 500, "Error creating combined GTFS/GTFS+ file", e);
}

FeedVersion newFeedVersion = new FeedVersion(feedVersion.parentFeedSource());
// Create a new feed version to represent the published GTFS+.
FeedVersion newFeedVersion = new FeedVersion(feedVersion.parentFeedSource(), PRODUCED_IN_HOUSE_GTFS_PLUS);
File newGtfsFile = null;
try {
newGtfsFile = newFeedVersion.newGtfsFile(new FileInputStream(newFeed));
Expand All @@ -236,9 +236,6 @@ private static String publishGtfsPlusFile(Request req, Response res) {
return null;
}
newFeedVersion.originNamespace = feedVersion.namespace;
newFeedVersion.fileTimestamp = newGtfsFile.lastModified();
newFeedVersion.fileSize = newGtfsFile.length();
newFeedVersion.hash = HashUtils.hashFile(newGtfsFile);

// Must be handled by executor because it takes a long time.
ProcessSingleFeedJob processSingleFeedJob = new ProcessSingleFeedJob(newFeedVersion, profile, true);
Expand Down
Loading

0 comments on commit d2df2bb

Please sign in to comment.