diff --git a/pom.xml b/pom.xml index 06021fcf..35357393 100644 --- a/pom.xml +++ b/pom.xml @@ -120,7 +120,6 @@ org.yaml snakeyaml - 2.0 net.imglib2 @@ -131,20 +130,17 @@ jackson-dataformat-msgpack 0.9.5 - - org.apposed - appose - 0.1.1-SNAPSHOT - - net.java.dev.jna - jna + jna-platform 5.13.0 + + + org.apache.commons + commons-compress + 1.24.0 + + diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Appose.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Appose.java new file mode 100644 index 00000000..d8f1f362 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Appose.java @@ -0,0 +1,182 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.File; + +/** + * Appose is a library for interprocess cooperation with shared memory. The + * guiding principles are simplicity and efficiency. + *

+ * Appose was written to enable easy execution of Python-based deep + * learning from Java without copying tensors, but its utility extends + * beyond that. The steps for using Appose are: + *

+ *
    + *
  1. Build an {@link Environment} with the dependencies you need.
  2. + *
  3. Create a {@link Service} linked to a worker, which runs in its + * own process.
  4. + *
  5. Execute scripts on the worker by launching {@link Service.Task + * Tasks}.
  6. + *
  7. Receive status updates from the task asynchronously + * {@link Service.Task#listen via callbacks}.
  8. + *
+ *

Examples

+ * + *

+ * Here is a very simple example written in Java: + *

+ * + *
{@code
+ * Environment env = Appose.conda("/path/to/environment.yml").build();
+ * Service python = env.python();
+ * Task task = python.task("""
+ *     5 + 6
+ *     """);
+ * task.start().waitFor();
+ * Object result = task.outputs.get("result");
+ * assertEquals(11, result);
+ * }
+ *

+ * And here is an example using a few more of Appose's features: + *

+ * + *
{@code
+ * Environment env = Appose.conda("/path/to/environment.yml").build();
+ * Service python = env.python();
+ * Task golden_ratio = python.task("""
+ *     # Approximate the golden ratio using the Fibonacci sequence.
+ *     previous = 0
+ *     current = 1
+ *     for i in range(iterations):
+ *         if task.cancel_requested:
+ *             task.cancel()
+ *             break
+ *         task.status(current=i, maximum=iterations)
+ *         v = current
+ *         current += previous
+ *         previous = v
+ *     task.outputs["numer"] = current
+ *     task.outputs["denom"] = previous
+ *     """);
+ * task.listen(event -> {
+ *     switch (event.responseType) {
+ *         case UPDATE:
+ *             System.out.println("Progress: " + task.current + "/" + task.maximum);
+ *             break;
+ *         case COMPLETION:
+ *             long numer = (Long) task.outputs["numer"];
+ *             long denom = (Long) task.outputs["denom"];
+ *             double ratio = (double) numer / denom;
+ *             System.out.println("Task complete. Result: " + numer + "/" + denom + " =~ " + ratio);
+ *             break;
+ *         case CANCELATION:
+ *             System.out.println("Task canceled");
+ *             break;
+ *         case FAILURE:
+ *             System.out.println("Task failed: " + task.error);
+ *             break;
+ *     }
+ * });
+ * task.start();
+ * Thread.sleep(1000);
+ * if (!task.status.isFinished()) {
+ *     // Task is taking too long; request a cancelation.
+ *     task.cancel();
+ * }
+ * task.waitFor();
+ * }
+ *

+ * Of course, the above examples could have been done all in Java. But hopefully + * they hint at the possibilities of easy cross-language integration. + *

+ *

Workers

+ *

+ * A worker is a separate process created by Appose to do asynchronous + * computation on behalf of the calling process. The calling process interacts + * with a worker via its associated {@link Service}. + *

+ *

+ * Appose comes with built-in support for two worker implementations: + * {@code python_worker} to run Python scripts, and {@link GroovyWorker} + * to run Groovy scripts. These workers can be created easily by invoking + * the {@link Environment#python} and {@link Environment#groovy} methods + * respectively. + *

+ *

+ * But Appose is compatible with any program that abides by the + * Appose worker process contract: + *

+ *
    + *
  1. The worker must accept requests in Appose's request format on its + * standard input (stdin) stream.
  2. + *
  3. The worker must issue responses in Appose's response format on its + * standard output (stdout) stream.
  4. + *
+ *

+ * TODO - write up the request and response formats in detail here! + * JSON, one line per request/response. + *

+ * + * @author Curtis Rueden + */ +public class Appose { + + public static Builder base(File directory) { + return new Builder().base(directory); + } + + public static Builder base(String directory) { + return base(new File(directory)); + } + + public static Builder java(String vendor, String version) { + return new Builder().java(vendor, version); + } + + public static Builder conda(File environmentYaml) { + return new Builder().conda(environmentYaml); + } + + public static Environment system() { + return system(new File(".")); + } + + public static Environment system(File directory) { + return new Builder().base(directory).useSystemPath().build(); + } + + public static Environment system(String directory) { + return system(new File(directory)); + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Builder.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Builder.java new file mode 100644 index 00000000..82bbbaf2 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Builder.java @@ -0,0 +1,112 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Paths; + +import org.apache.commons.compress.archivers.ArchiveException; + +public class Builder { + + + public Environment build() { + String base = baseDir.getPath(); + boolean useSystemPath = systemPath; + + // TODO Build the thing!~ + // Hash the state to make a base directory name. + // - Construct conda environment from condaEnvironmentYaml. + // - Download and unpack JVM of the given vendor+version. + // - Populate ${baseDirectory}/jars with Maven artifacts? + + try { + Conda conda = new Conda(Conda.BASE_PATH); + String envName = "appose"; + if (conda.getEnvironmentNames().contains( envName )) { + // TODO: Should we update it? For now, we just use it. + } + else { + conda.create(envName, "-f", condaEnvironmentYaml.getAbsolutePath()); + } + } catch (IOException e) { + throw new RuntimeException(e); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ArchiveException e) { + throw new RuntimeException(e); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + + return new Environment() { + @Override public String base() { return base; } + @Override public boolean useSystemPath() { return useSystemPath; } + }; + } + + // -- Configuration -- + + private boolean systemPath; + + public Builder useSystemPath() { + systemPath = true; + return this; + } + + private File baseDir; + + public Builder base(File directory) { + baseDir = directory; + return this; + } + + // -- Conda -- + + private File condaEnvironmentYaml; + + public Builder conda(File environmentYaml) { + this.condaEnvironmentYaml = environmentYaml; + return this; + } + + // -- Java -- + + private String javaVendor; + private String javaVersion; + + public Builder java(String vendor, String version) { + this.javaVendor = vendor; + this.javaVersion = version; + return this; + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Conda.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Conda.java new file mode 100644 index 00000000..53c7c693 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Conda.java @@ -0,0 +1,892 @@ +/******************************************************************************* + * Copyright (C) 2021, Ko Sugawara + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + ******************************************************************************/ +package io.bioimage.modelrunner.apposed.appose; + +import org.apache.commons.compress.archivers.ArchiveException; +import io.bioimage.modelrunner.apposed.appose.CondaException.EnvironmentExistsException; +import io.bioimage.modelrunner.system.PlatformDetection; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.Selector; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +/** + * Conda environment manager, implemented by delegating to micromamba. + * + * @author Ko Sugawara + * @author Curtis Rueden + */ +public class Conda { + + final String pythonCommand = PlatformDetection.isWindows() ? "python.exe" : "bin/python"; + + final String condaCommand; + + private String envName = DEFAULT_ENVIRONMENT_NAME; + + private final String rootdir; + + private final String envsdir; + + public final static String DEFAULT_ENVIRONMENT_NAME = "base"; + + private final static String CONDA_RELATIVE_PATH = PlatformDetection.isWindows() ? + File.separator + "Library" + File.separator + "bin" + File.separator + "micromamba.exe" + : File.separator + "bin" + File.separator + "micromamba"; + + final public static String BASE_PATH = Paths.get(System.getProperty("user.home"), ".local", "share", "appose", "micromamba").toString(); + + final public static String ENVS_NAME = "envs"; + + public final static String MICROMAMBA_URL = + "https://micro.mamba.pm/api/micromamba/" + microMambaPlatform() + "/latest"; + + public final static String ERR_STREAM_UUUID = UUID.randomUUID().toString(); + + private static String microMambaPlatform() { + String osName = System.getProperty("os.name"); + if (osName.startsWith("Windows")) osName = "Windows"; + String osArch = System.getProperty("os.arch"); + switch (osName + "|" + osArch) { + case "Linux|amd64": return "linux-64"; + case "Linux|aarch64": return "linux-aarch64"; + case "Linux|ppc64le": return "linux-ppc64le"; + case "Mac OS X|x86_64": return "osx-64"; + case "Mac OS X|aarch64": return "osx-arm64"; + case "Windows|amd64": return "win-64"; + default: return null; + } + } + + /** + * Returns a {@link ProcessBuilder} with the working directory specified in the + * constructor. + * + * @param isInheritIO + * Sets the source and destination for subprocess standard I/O to be + * the same as those of the current Java process. + * @return The {@link ProcessBuilder} with the working directory specified in + * the constructor. + */ + private ProcessBuilder getBuilder( final boolean isInheritIO ) + { + final ProcessBuilder builder = new ProcessBuilder().directory( new File( rootdir ) ); + if ( isInheritIO ) + builder.inheritIO(); + return builder; + } + + /** + * Create a new Conda object. The root dir for the Micromamba installation + * will be /user/.local/share/appose/micromamba. + * If there is no directory found at the specified + * path, Miniconda will be automatically installed in the path. It is expected + * that the Conda installation has executable commands as shown below: + * + *
+	 * CONDA_ROOT
+	 * ├── condabin
+	 * │   ├── conda(.bat)
+	 * │   ... 
+	 * ├── envs
+	 * │   ├── your_env
+	 * │   │   ├── python(.exe)
+	 * 
+ * + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + * @throws ArchiveException + * @throws URISyntaxException + */ + public Conda() throws IOException, InterruptedException, ArchiveException, URISyntaxException + { + this(BASE_PATH); + } + + /** + * Create a new Conda object. The root dir for Conda installation can be + * specified as {@code String}. If there is no directory found at the specified + * path, Miniconda will be automatically installed in the path. It is expected + * that the Conda installation has executable commands as shown below: + * + *
+	 * CONDA_ROOT
+	 * ├── condabin
+	 * │   ├── conda(.bat)
+	 * │   ... 
+	 * ├── envs
+	 * │   ├── your_env
+	 * │   │   ├── python(.exe)
+	 * 
+ * + * @param rootdir + * The root dir for Conda installation. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + * @throws ArchiveException + * @throws URISyntaxException + */ + public Conda( final String rootdir ) throws IOException, InterruptedException, ArchiveException, URISyntaxException + { + if (rootdir == null) + this.rootdir = BASE_PATH; + else + this.rootdir = rootdir; + this.condaCommand = this.rootdir + CONDA_RELATIVE_PATH; + this.envsdir = this.rootdir + File.separator + "envs"; + if ( Files.notExists( Paths.get( condaCommand ) ) ) + { + + final File tempFile = File.createTempFile( "miniconda", ".tar.bz2" ); + tempFile.deleteOnExit(); + URL website = MambaInstallerUtils.redirectedURL(new URL(MICROMAMBA_URL)); + ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + } + final File tempTarFile = File.createTempFile( "miniconda", ".tar" ); + tempTarFile.deleteOnExit(); + MambaInstallerUtils.unBZip2(tempFile, tempTarFile); + File mambaBaseDir = new File(rootdir); + if (!mambaBaseDir.isDirectory() && !mambaBaseDir.mkdirs()) + throw new IOException("Failed to create Micromamba default directory " + mambaBaseDir.getParentFile().getAbsolutePath()); + MambaInstallerUtils.unTar(tempTarFile, mambaBaseDir); + if (!(new File(envsdir)).isDirectory() && !new File(envsdir).mkdirs()) + throw new IOException("Failed to create Micromamba default envs directory " + envsdir); + + } + + // The following command will throw an exception if Conda does not work as + // expected. + boolean executableSet = new File(condaCommand).setExecutable(true); + if (!executableSet) + throw new IOException("Cannot set file as executable due to missing permissions, " + + "please do it manually: " + condaCommand); + + // The following command will throw an exception if Conda does not work as + // expected. + getVersion(); + } + + public String getEnvsDir() { + return this.envsdir; + } + + /** + * Returns {@code \{"cmd.exe", "/c"\}} for Windows and an empty list for + * Mac/Linux. + * + * @return {@code \{"cmd.exe", "/c"\}} for Windows and an empty list for + * Mac/Linux. + * @throws IOException + */ + private List< String > getBaseCommand() + { + final List< String > cmd = new ArrayList<>(); + if ( PlatformDetection.isWindows() ) + cmd.addAll( Arrays.asList( "cmd.exe", "/c" ) ); + return cmd; + } + + /** + * Run {@code conda update} in the activated environment. A list of packages to + * be updated and extra parameters can be specified as {@code args}. + * + * @param args + * The list of packages to be updated and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void update( final String... args ) throws IOException, InterruptedException + { + updateIn( envName, args ); + } + + /** + * Run {@code conda update} in the specified environment. A list of packages to + * update and extra parameters can be specified as {@code args}. + * + * @param envName + * The environment name to be used for the update command. + * @param args + * The list of packages to be updated and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void updateIn( final String envName, final String... args ) throws IOException, InterruptedException + { + final List< String > cmd = new ArrayList<>( Arrays.asList( "update", "-y", "-n", envName ) ); + cmd.addAll( Arrays.asList( args ) ); + runConda( cmd.stream().toArray( String[]::new ) ); + } + + /** + * Run {@code conda create} to create a conda environment defined by the input environment yaml file. + * + * @param envName + * The environment name to be created. + * @param envYaml + * The environment yaml file containing the information required to build it + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void createWithYaml( final String envName, final String envYaml ) throws IOException, InterruptedException + { + createWithYaml(envName, envYaml, false); + } + + /** + * Run {@code conda create} to create a conda environment defined by the input environment yaml file. + * + * @param envName + * The environment name to be created. + * @param envYaml + * The environment yaml file containing the information required to build it + * @param consumer + * String consumer that keeps track of the environment creation + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void createWithYaml( final String envName, final String envYaml, Consumer consumer ) throws IOException, InterruptedException + { + createWithYaml(envName, envYaml, false, consumer); + } + + /** + * Run {@code conda create} to create a conda environment defined by the input environment yaml file. + * + * @param envName + * The environment name to be created. + * @param envYaml + * The environment yaml file containing the information required to build it + * @param envName + * The environment name to be created. + * @param isForceCreation + * Force creation of the environment if {@code true}. If this value + * is {@code false} and an environment with the specified name + * already exists, throw an {@link EnvironmentExistsException}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void createWithYaml( final String envName, final String envYaml, final boolean isForceCreation ) throws IOException, InterruptedException + { + if ( !isForceCreation && getEnvironmentNames().contains( envName ) ) + throw new EnvironmentExistsException(); + runConda( "env", "create", "--prefix", + envsdir + File.separator + envName, "-f", envYaml, "-y" ); + } + + /** + * Run {@code conda create} to create a conda environment defined by the input environment yaml file. + * + * @param envName + * The environment name to be created. + * @param envYaml + * The environment yaml file containing the information required to build it + * @param envName + * The environment name to be created. + * @param isForceCreation + * Force creation of the environment if {@code true}. If this value + * is {@code false} and an environment with the specified name + * already exists, throw an {@link EnvironmentExistsException}. + * @param consumer + * String consumer that keeps track of the environment creation + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void createWithYaml( final String envName, final String envYaml, final boolean isForceCreation, Consumer consumer) throws IOException, InterruptedException + { + if ( !isForceCreation && getEnvironmentNames().contains( envName ) ) + throw new EnvironmentExistsException(); + runConda(consumer, "env", "create", "--prefix", + envsdir + File.separator + envName, "-f", envYaml, "-y", "-vv" ); + } + + /** + * Run {@code conda create} to create an empty conda environment. + * + * @param envName + * The environment name to be created. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void create( final String envName ) throws IOException, InterruptedException + { + create( envName, false ); + } + + /** + * Run {@code conda create} to create an empty conda environment. + * + * @param envName + * The environment name to be created. + * @param isForceCreation + * Force creation of the environment if {@code true}. If this value + * is {@code false} and an environment with the specified name + * already exists, throw an {@link EnvironmentExistsException}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void create( final String envName, final boolean isForceCreation ) throws IOException, InterruptedException + { + if ( !isForceCreation && getEnvironmentNames().contains( envName ) ) + throw new EnvironmentExistsException(); + runConda( "create", "-y", "-p", envsdir + File.separator + envName ); + } + + /** + * Run {@code conda create} to create a new conda environment with a list of + * specified packages. + * + * @param envName + * The environment name to be created. + * @param args + * The list of packages to be installed on environment creation and + * extra parameters as {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void create( final String envName, final String... args ) throws IOException, InterruptedException + { + create( envName, false, args ); + } + + /** + * Run {@code conda create} to create a new conda environment with a list of + * specified packages. + * + * @param envName + * The environment name to be created. + * @param isForceCreation + * Force creation of the environment if {@code true}. If this value + * is {@code false} and an environment with the specified name + * already exists, throw an {@link EnvironmentExistsException}. + * @param args + * The list of packages to be installed on environment creation and + * extra parameters as {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void create( final String envName, final boolean isForceCreation, final String... args ) throws IOException, InterruptedException + { + if ( !isForceCreation && getEnvironmentNames().contains( envName ) ) + throw new EnvironmentExistsException(); + final List< String > cmd = new ArrayList<>( Arrays.asList( "env", "create", "--force", "-p", envsdir + File.separator + envName ) ); + cmd.addAll( Arrays.asList( args ) ); + runConda( cmd.stream().toArray( String[]::new ) ); + } + + /** + * This method works as if the user runs {@code conda activate envName}. This + * method internally calls {@link Conda#setEnvName(String)}. + * + * @param envName + * The environment name to be activated. + * @throws IOException + * If an I/O error occurs. + */ + public void activate( final String envName ) throws IOException + { + if ( getEnvironmentNames().contains( envName ) ) + setEnvName( envName ); + else + throw new IllegalArgumentException( "environment: " + envName + " not found." ); + } + + /** + * This method works as if the user runs {@code conda deactivate}. This method + * internally sets the {@code envName} to {@code base}. + */ + public void deactivate() + { + setEnvName( DEFAULT_ENVIRONMENT_NAME ); + } + + /** + * This method is used by {@code Conda#activate(String)} and + * {@code Conda#deactivate()}. This method is kept private since it is not + * expected to call this method directory. + * + * @param envName + * The environment name to be set. + */ + private void setEnvName( final String envName ) + { + this.envName = envName; + } + + /** + * Returns the active environment name. + * + * @return The active environment name. + * + */ + public String getEnvName() + { + return envName; + } + + /** + * Run {@code conda install} in the activated environment. A list of packages to + * install and extra parameters can be specified as {@code args}. + * + * @param args + * The list of packages to be installed and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void install( final String... args ) throws IOException, InterruptedException + { + installIn( envName, args ); + } + + /** + * Run {@code conda install} in the specified environment. A list of packages to + * install and extra parameters can be specified as {@code args}. + * + * @param envName + * The environment name to be used for the install command. + * @param args + * The list of packages to be installed and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void installIn( final String envName, final String... args ) throws IOException, InterruptedException + { + final List< String > cmd = new ArrayList<>( Arrays.asList( "install", "-y", "-n", envName ) ); + cmd.addAll( Arrays.asList( args ) ); + runConda( cmd.stream().toArray( String[]::new ) ); + } + + /** + * Run {@code pip install} in the activated environment. A list of packages to + * install and extra parameters can be specified as {@code args}. + * + * @param args + * The list of packages to be installed and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void pipInstall( final String... args ) throws IOException, InterruptedException + { + pipInstallIn( envName, args ); + } + + /** + * Run {@code pip install} in the specified environment. A list of packages to + * install and extra parameters can be specified as {@code args}. + * + * @param envName + * The environment name to be used for the install command. + * @param args + * The list of packages to be installed and extra parameters as + * {@code String...}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void pipInstallIn( final String envName, final String... args ) throws IOException, InterruptedException + { + final List< String > cmd = new ArrayList<>( Arrays.asList( "-m", "pip", "install" ) ); + cmd.addAll( Arrays.asList( args ) ); + runPythonIn( envName, cmd.stream().toArray( String[]::new ) ); + } + + /** + * Run a Python command in the activated environment. This method automatically + * sets environment variables associated with the activated environment. In + * Windows, this method also sets the {@code PATH} environment variable so that + * the specified environment runs as expected. + * + * @param args + * One or more arguments for the Python command. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void runPython( final String... args ) throws IOException, InterruptedException + { + runPythonIn( envName, args ); + } + + /** + * Run a Python command in the specified environment. This method automatically + * sets environment variables associated with the specified environment. In + * Windows, this method also sets the {@code PATH} environment variable so that + * the specified environment runs as expected. + * + * @param envName + * The environment name used to run the Python command. + * @param args + * One or more arguments for the Python command. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void runPythonIn( final String envName, final String... args ) throws IOException, InterruptedException + { + final List< String > cmd = getBaseCommand(); + if ( envName.equals( DEFAULT_ENVIRONMENT_NAME ) ) + cmd.add( pythonCommand ); + else + cmd.add( Paths.get( "envs", envName, pythonCommand ).toString() ); + cmd.addAll( Arrays.asList( args ) ); + final ProcessBuilder builder = getBuilder( true ); + if ( PlatformDetection.isWindows() ) + { + final Map< String, String > envs = builder.environment(); + final String envDir = Paths.get( rootdir, "envs", envName ).toString(); + envs.put( "Path", envDir + ";" + envs.get( "Path" ) ); + envs.put( "Path", Paths.get( envDir, "Scripts" ).toString() + ";" + envs.get( "Path" ) ); + envs.put( "Path", Paths.get( envDir, "Library" ).toString() + ";" + envs.get( "Path" ) ); + envs.put( "Path", Paths.get( envDir, "Library", "Bin" ).toString() + ";" + envs.get( "Path" ) ); + } + // TODO find way to get env vars in micromamba builder.environment().putAll( getEnvironmentVariables( envName ) ); + if ( builder.command( cmd ).start().waitFor() != 0 ) + throw new RuntimeException(); + } + + /** + * Returns Conda version as a {@code String}. + * + * @return The Conda version as a {@code String}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public String getVersion() throws IOException, InterruptedException + { + final List< String > cmd = getBaseCommand(); + cmd.addAll( Arrays.asList( condaCommand, "--version" ) ); + final Process process = getBuilder( false ).command( cmd ).start(); + if ( process.waitFor() != 0 ) + throw new RuntimeException(); + return new BufferedReader( new InputStreamReader( process.getInputStream() ) ).readLine(); + } + + /** + * Run a Conda command with one or more arguments. + * + * @param consumer + * String consumer that receives the Strings that the process prints to the console + * @param args + * One or more arguments for the Conda command. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void runConda(Consumer consumer, final String... args ) throws RuntimeException, IOException, InterruptedException + { + SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss"); + + final List< String > cmd = getBaseCommand(); + cmd.add( condaCommand ); + cmd.addAll( Arrays.asList( args ) ); + + ProcessBuilder builder = getBuilder(false).command(cmd); + Process process = builder.start(); + // Use separate threads to read each stream to avoid a deadlock. + consumer.accept(sdf.format(Calendar.getInstance().getTime()) + " -- STARTING INSTALLATION" + System.lineSeparator()); + long updatePeriod = 300; + Thread outputThread = new Thread(() -> { + try ( + InputStream inputStream = process.getInputStream(); + InputStream errStream = process.getErrorStream(); + ){ + byte[] buffer = new byte[1024]; // Buffer size can be adjusted + StringBuilder processBuff = new StringBuilder(); + StringBuilder errBuff = new StringBuilder(); + String processChunk = ""; + String errChunk = ""; + int newLineIndex; + long t0 = System.currentTimeMillis(); + while (process.isAlive() || inputStream.available() > 0) { + if (inputStream.available() > 0) { + processBuff.append(new String(buffer, 0, inputStream.read(buffer))); + while ((newLineIndex = processBuff.indexOf(System.lineSeparator())) != -1) { + processChunk += sdf.format(Calendar.getInstance().getTime()) + " -- " + + processBuff.substring(0, newLineIndex + 1).trim() + System.lineSeparator(); + processBuff.delete(0, newLineIndex + 1); + } + } + if (errStream.available() > 0) { + errBuff.append(new String(buffer, 0, errStream.read(buffer))); + while ((newLineIndex = errBuff.indexOf(System.lineSeparator())) != -1) { + errChunk += ERR_STREAM_UUUID + errBuff.substring(0, newLineIndex + 1).trim() + System.lineSeparator(); + errBuff.delete(0, newLineIndex + 1); + } + } + // Sleep for a bit to avoid busy waiting + Thread.sleep(60); + if (System.currentTimeMillis() - t0 > updatePeriod) { + // TODO decide what to do with the err stream consumer.accept(errChunk.equals("") ? null : errChunk); + consumer.accept(processChunk); + processChunk = ""; + errChunk = ""; + t0 = System.currentTimeMillis(); + } + } + if (inputStream.available() > 0) { + processBuff.append(new String(buffer, 0, inputStream.read(buffer))); + processChunk += sdf.format(Calendar.getInstance().getTime()) + " -- " + processBuff.toString().trim(); + } + if (errStream.available() > 0) { + errBuff.append(new String(buffer, 0, errStream.read(buffer))); + errChunk += ERR_STREAM_UUUID + errBuff.toString().trim(); + } + consumer.accept(errChunk); + consumer.accept(processChunk + System.lineSeparator() + + sdf.format(Calendar.getInstance().getTime()) + " -- TERMINATED INSTALLATION"); + } catch (IOException | InterruptedException e) { + e.printStackTrace(); + } + }); + // Start reading threads + outputThread.start(); + int processResult = process.waitFor(); + // Wait for all output to be read + outputThread.join(); + if (processResult != 0) + throw new RuntimeException("Error executing the following command: " + builder.command()); + } + + /** + * Run a Conda command with one or more arguments. + * + * @param args + * One or more arguments for the Conda command. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public void runConda(final String... args ) throws RuntimeException, IOException, InterruptedException + { + final List< String > cmd = getBaseCommand(); + cmd.add( condaCommand ); + cmd.addAll( Arrays.asList( args ) ); + if ( getBuilder( true ).command( cmd ).start().waitFor() != 0 ) + throw new RuntimeException(); + } + + /** + * Returns environment variables associated with the activated environment as + * {@code Map< String, String >}. + * + * @return The environment variables as {@code Map< String, String >}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + /* TODO find equivalent in mamba + public Map< String, String > getEnvironmentVariables() throws IOException, InterruptedException + { + return getEnvironmentVariables( envName ); + } + */ + + /** + * Returns environment variables associated with the specified environment as + * {@code Map< String, String >}. + * + * @param envName + * The environment name used to run the Python command. + * @return The environment variables as {@code Map< String, String >}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + /** + * TODO find equivalent in mamba + public Map< String, String > getEnvironmentVariables( final String envName ) throws IOException, InterruptedException + { + final List< String > cmd = getBaseCommand(); + cmd.addAll( Arrays.asList( condaCommand, "env", "config", "vars", "list", "-n", envName ) ); + final Process process = getBuilder( false ).command( cmd ).start(); + if ( process.waitFor() != 0 ) + throw new RuntimeException(); + final Map< String, String > map = new HashMap<>(); + try (final BufferedReader reader = new BufferedReader( new InputStreamReader( process.getInputStream() ) )) + { + String line; + + while ( ( line = reader.readLine() ) != null ) + { + final String[] keyVal = line.split( " = " ); + map.put( keyVal[ 0 ], keyVal[ 1 ] ); + } + } + return map; + } + */ + + /** + * Returns a list of the Conda environment names as {@code List< String >}. + * + * @return The list of the Conda environment names as {@code List< String >}. + * @throws IOException + * If an I/O error occurs. + * @throws InterruptedException + * If the current thread is interrupted by another thread while it + * is waiting, then the wait is ended and an InterruptedException is + * thrown. + */ + public List< String > getEnvironmentNames() throws IOException + { + final List< String > envs = new ArrayList<>( Arrays.asList( DEFAULT_ENVIRONMENT_NAME ) ); + envs.addAll( Files.list( Paths.get( envsdir ) ) + .map( p -> p.getFileName().toString() ) + .filter( p -> !p.startsWith( "." ) ) + .collect( Collectors.toList() ) ); + return envs; + } + + public static boolean checkDependenciesInEnv(String envDir, List dependencies) { + if (!(new File(envDir).isDirectory())) + return false; + Builder env = new Builder().conda(new File(envDir)); + // TODO run conda list -p /full/path/to/env + return false; + } + + public boolean checkEnvFromYamlExists(String envYaml) { + if (envYaml == null || new File(envYaml).isFile() == false + || (envYaml.endsWith(".yaml") && envYaml.endsWith(".yml"))) { + return false; + } + // TODO parse yaml without adding deps + return false; + } + +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/CondaException.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/CondaException.java new file mode 100644 index 00000000..2dc738b8 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/CondaException.java @@ -0,0 +1,96 @@ +package io.bioimage.modelrunner.apposed.appose; + +public class CondaException +{ + + public static class EnvironmentExistsException extends RuntimeException + { + private static final long serialVersionUID = -1625119813967214783L; + + /** + * Constructs a new exception with {@code null} as its detail message. The cause + * is not initialized, and may subsequently be initialized by a call to + * {@link #initCause}. + */ + public EnvironmentExistsException() + { + super(); + } + + /** + * Constructs a new exception with the specified detail message. The cause is + * not initialized, and may subsequently be initialized by a call to + * {@link #initCause}. + * + * @param msg + * the detail message. The detail message is saved for later + * retrieval by the {@link #getMessage()} method. + */ + public EnvironmentExistsException( String msg ) + { + super( msg ); + } + + /** + * Constructs a new exception with the specified detail message and cause. + *

+ * Note that the detail message associated with {@code cause} is not + * automatically incorporated in this exception's detail message. + * + * @param message + * the detail message (which is saved for later retrieval by the + * {@link #getMessage()} method). + * @param cause + * the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + * @since 1.4 + */ + public EnvironmentExistsException( String message, Throwable cause ) + { + super( message, cause ); + } + + /** + * Constructs a new exception with the specified cause and a detail message of + * (cause==null ? null : cause.toString()) (which typically contains + * the class and detail message of cause). This constructor is useful + * for exceptions that are little more than wrappers for other throwables (for + * example, {@link java.security.PrivilegedActionException}). + * + * @param cause + * the cause (which is saved for later retrieval by the + * {@link #getCause()} method). (A null value is permitted, + * and indicates that the cause is nonexistent or unknown.) + * @since 1.4 + */ + public EnvironmentExistsException( Throwable cause ) + { + super( cause ); + } + + /** + * Constructs a new exception with the specified detail message, cause, + * suppression enabled or disabled, and writable stack trace enabled or + * disabled. + * + * @param message + * the detail message. + * @param cause + * the cause. (A {@code null} value is permitted, and indicates that + * the cause is nonexistent or unknown.) + * @param enableSuppression + * whether or not suppression is enabled or disabled + * @param writableStackTrace + * whether or not the stack trace should be writable + * @since 1.7 + */ + protected EnvironmentExistsException( String message, Throwable cause, + boolean enableSuppression, + boolean writableStackTrace ) + { + super( message, cause, enableSuppression, writableStackTrace ); + } + } + +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Environment.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Environment.java new file mode 100644 index 00000000..d7da2da8 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Environment.java @@ -0,0 +1,143 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; + +public interface Environment { + + default String base() { return "."; } + default boolean useSystemPath() { return false; } + + /** + * Creates a Python script service. + *

+ * This is a high level way to create a service, enabling execution of + * Python scripts asynchronously on its linked process running a + * {@code python_worker}. + *

+ * + * @return The newly created service. + * @see #groovy To create a service for Groovy script execution. + * @throws IOException If something goes wrong starting the worker process. + */ + default Service python() throws IOException { + List pythonExes = Arrays.asList( + "python", "python.exe", + "bin/python", "bin/python.exe" + ); + return service(pythonExes, "-c", + "import appose.python_worker; appose.python_worker.main()"); + } + + default Service java(String mainClass, String... jvmArgs) + throws IOException + { + return java(mainClass, Collections.emptyList(), jvmArgs); + } + + default Service java(String mainClass, List classPath, + String... jvmArgs) throws IOException + { + // Collect classpath elements into a set, to avoid duplicate entries. + Set cp = new LinkedHashSet<>(); + + // Ensure that the classpath includes Appose and its dependencies. + // NB: This list must match Appose's dependencies in pom.xml! + List> apposeDeps = Arrays.asList(// + com.sun.jna.Pointer.class, // ------------------> com.sun.jna:jna + com.sun.jna.platform.linux.LibRT.class, // -----> com.sun.jna:jna-platform + com.sun.jna.platform.win32.Kernel32.class // ---> com.sun.jna:jna-platform + ); + for (Class depClass : apposeDeps) { + File location = FilePaths.location(depClass); + if (location != null) cp.add(location.getCanonicalPath()); + } + + // Append any explicitly requested classpath elements. + cp.addAll(classPath); + + // Build up the service arguments. + List args = new ArrayList<>(); + args.add("-cp"); + args.add(String.join(File.pathSeparator, cp)); + args.addAll(Arrays.asList(jvmArgs)); + args.add(mainClass); + + // Create the service. + List javaExes = Arrays.asList( + "java", "java.exe", + "bin/java", "bin/java.exe", + "jre/bin/java", "jre/bin/java.exe" + ); + return service(javaExes, args.toArray(new String[0])); + } + + /** + * Creates a service with the given command line arguments. + *

+ * This is a low level way to create a service. It assumes the + * specified executable conforms to the {@link Appose Appose worker process + * contract}, meaning it accepts requests on stdin and produces responses on + * stdout, both formatted according to Appose's assumptions. + *

+ * + * @param exes List of executables to try for launching the worker process. + * @param args Command line arguments to pass to the worker process + * (e.g. {"-v", "--enable-everything"}. + * @return The newly created service. + * @see #groovy To create a service for Groovy script execution. + * @see #python() To create a service for Python script execution. + * @throws IOException If something goes wrong starting the worker process. + */ + default Service service(List exes, String... args) throws IOException { + if (args.length == 0) throw new IllegalArgumentException("No executable given"); + + List dirs = useSystemPath() // + ? Arrays.asList(System.getenv("PATH").split(File.pathSeparator)) // + : Arrays.asList(base()); + + File exeFile = FilePaths.findExe(dirs, exes); + if (exeFile == null) throw new IllegalArgumentException("No executables found amongst candidates: " + exes); + + String[] allArgs = new String[args.length + 1]; + System.arraycopy(args, 0, allArgs, 1, args.length); + allArgs[0] = exeFile.getCanonicalPath(); + + return new Service(new File(base()), allArgs); + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/FilePaths.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/FilePaths.java new file mode 100644 index 00000000..3d39124a --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/FilePaths.java @@ -0,0 +1,81 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.File; +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.List; + +/** + * Utility methods for working with file paths. + */ +public final class FilePaths { + + private FilePaths() { + // Prevent instantiation of utility class. + } + + /** + * Gets the path to the JAR file containing the given class. Technically + * speaking, it might not actually be a JAR file, it might be a raw class + * file, or even something weirder... But for our purposes, we'll just + * assume it's going to be something you can put onto a classpath. + * + * @param c The class whose file path should be discerned. + * @return File path of the JAR file containing the given class. + */ + public static File location(Class c) { + try { + return new File(c.getProtectionDomain().getCodeSource().getLocation().toURI()); + } + catch (URISyntaxException exc) { + return null; + } + } + + public static File findExe(List dirs, List exes) { + for (String exe : exes) { + File exeFile = new File(exe); + if (exeFile.isAbsolute()) { + // Candidate is an absolute path; check it directly. + if (exeFile.canExecute()) return exeFile; + } + else { + // Candidate is a relative path; check beneath each given directory. + for (String dir : dirs) { + File f = Paths.get(dir, exe).toFile(); + if (f.canExecute()) return f; + } + } + } + return null; + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/MambaInstallerUtils.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/MambaInstallerUtils.java new file mode 100644 index 00000000..92150eb8 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/MambaInstallerUtils.java @@ -0,0 +1,189 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; + +import org.apache.commons.compress.archivers.ArchiveException; +import org.apache.commons.compress.archivers.ArchiveStreamFactory; +import org.apache.commons.compress.archivers.tar.TarArchiveEntry; +import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.utils.IOUtils; + +/** + * Utility methods unzip bzip2 files + */ +public final class MambaInstallerUtils { + + private MambaInstallerUtils() { + // Prevent instantiation of utility class. + } + + /** + * DEcompress a bzip2 file into a new file. + * The method is needed because Micromamba is distributed as a .tr.bz2 file and + * many distributions do not have tools readily available to extract the required files + * @param source + * .bzip2 file + * @param destination + * destination folder where the contents of the file are going to be decompressed + * @throws FileNotFoundException if the .bzip2 file is not found or does not exist + * @throws IOException if the source file already exists or there is any error with the decompression + */ + public static void unBZip2(File source, File destination) throws FileNotFoundException, IOException { + try ( + BZip2CompressorInputStream input = new BZip2CompressorInputStream(new BufferedInputStream(new FileInputStream(source))); + FileOutputStream output = new FileOutputStream(destination); + ) { + IOUtils.copy(input, output); + } + } + + /** Untar an input file into an output file. + + * The output file is created in the output folder, having the same name + * as the input file, minus the '.tar' extension. + * + * @param inputFile the input .tar file + * @param outputDir the output directory file. + * @throws IOException + * @throws FileNotFoundException + * @throws ArchiveException + */ + public static void unTar(final File inputFile, final File outputDir) throws FileNotFoundException, IOException, ArchiveException { + + try ( + InputStream is = new FileInputStream(inputFile); + TarArchiveInputStream debInputStream = (TarArchiveInputStream) new ArchiveStreamFactory().createArchiveInputStream("tar", is); + ) { + TarArchiveEntry entry = null; + while ((entry = (TarArchiveEntry)debInputStream.getNextEntry()) != null) { + final File outputFile = new File(outputDir, entry.getName()); + if (entry.isDirectory()) { + if (!outputFile.exists()) { + if (!outputFile.mkdirs()) { + throw new IllegalStateException(String.format("Couldn't create directory %s.", outputFile.getAbsolutePath())); + } + } + } else { + if (!outputFile.getParentFile().exists()) { + if (!outputFile.getParentFile().mkdirs()) + throw new IOException("Failed to create directory " + outputFile.getParentFile().getAbsolutePath()); + } + try (OutputStream outputFileStream = new FileOutputStream(outputFile)) { + IOUtils.copy(debInputStream, outputFileStream); + } + } + } + } + + } + + public static void main(String[] args) throws FileNotFoundException, IOException, ArchiveException, URISyntaxException { + String url = Conda.MICROMAMBA_URL; + final File tempFile = File.createTempFile( "miniconda", ".tar.bz2" ); + tempFile.deleteOnExit(); + URL website = MambaInstallerUtils.redirectedURL(new URL(url)); + ReadableByteChannel rbc = Channels.newChannel(website.openStream()); + try (FileOutputStream fos = new FileOutputStream(tempFile)) { + long transferred = fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); + System.out.print(tempFile.length()); + } + String tarPath = "C:\\Users\\angel\\OneDrive\\Documentos\\pasteur\\git\\micromamba-1.5.1-1.tar"; + String mambaPath = "C:\\Users\\angel\\OneDrive\\Documentos\\pasteur\\git\\mamba"; + unBZip2(new File("C:\\Users\\angel\\OneDrive\\Documentos\\pasteur\\git\\micromamba-1.5.1-1.tar.bz2"), + new File(tarPath)); + unTar(new File(tarPath), new File(mambaPath)); + } + + /** + * This method shuold be used when we get the following response codes from + * a {@link HttpURLConnection}: + * - {@link HttpURLConnection#HTTP_MOVED_TEMP} + * - {@link HttpURLConnection#HTTP_MOVED_PERM} + * - {@link HttpURLConnection#HTTP_SEE_OTHER} + * + * If that is not the response code or the connection does not work, the url + * returned will be the same as the provided. + * If the method is used corretly, it will return the URL to which the original URL + * has been redirected + * @param url + * original url. Connecting to that url must give a 301, 302 or 303 response code + * @param conn + * connection to the url + * @return the redirected url + * @throws MalformedURLException + * @throws URISyntaxException + */ + public static URL redirectedURL(URL url) throws MalformedURLException, URISyntaxException { + int statusCode; + HttpURLConnection conn; + try { + conn = (HttpURLConnection) url.openConnection(); + statusCode = conn.getResponseCode(); + } catch (IOException ex) { + return url; + } + if (statusCode < 300 || statusCode > 308) + return url; + String newURL = conn.getHeaderField("Location"); + try { + return redirectedURL(new URL(newURL)); + } catch (MalformedURLException ex) { + } + try { + if (newURL.startsWith("//")) + return redirectedURL(new URL("http:" + newURL)); + else + throw new MalformedURLException(); + } catch (MalformedURLException ex) { + } + URI uri = url.toURI(); + String scheme = uri.getScheme(); + String host = uri.getHost(); + String mainDomain = scheme + "://" + host; + return redirectedURL(new URL(mainDomain + newURL)); + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Service.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Service.java new file mode 100644 index 00000000..ff203f44 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Service.java @@ -0,0 +1,397 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Consumer; + +import io.bioimage.modelrunner.apposed.appose.TaskEvent; + +/** + * An Appose *service* provides access to a linked Appose *worker* running in a + * different process. Using the service, programs create Appose {@link Task}s + * that run asynchronously in the worker process, which notifies the service of + * updates via communication over pipes (stdin and stdout). + */ +public class Service implements AutoCloseable { + + private static int serviceCount = 0; + + private final File cwd; + private final String[] args; + private final Map tasks = new ConcurrentHashMap<>(); + private final int serviceID; + + private Process process; + private PrintWriter stdin; + private Thread stdoutThread; + private Thread stderrThread; + private Thread monitorThread; + + private Consumer debugListener; + + public Service(File cwd, String... args) { + this.cwd = cwd; + this.args = args.clone(); + serviceID = serviceCount++; + } + + /** + * Registers a callback function to receive messages + * describing current service/worker activity. + * + * @param debugListener A function that accepts a single string argument. + */ + public void debug(Consumer debugListener) { + this.debugListener = debugListener; + } + + /** + * Launches the worker process associated with this service. + * + * @return This service object, for chaining method calls (typically with {@link #task}). + * @throws IOException If the process fails to execute; see {@link ProcessBuilder#start()}. + */ + public Service start() throws IOException { + if (process != null) { + // Already started. + return this; + } + + String prefix = "Appose-Service-" + serviceID; + ProcessBuilder pb = new ProcessBuilder(args).directory(cwd); + process = pb.start(); + stdin = new PrintWriter(process.getOutputStream()); + stdoutThread = new Thread(this::stdoutLoop, prefix + "-Stdout"); + stderrThread = new Thread(this::stderrLoop, prefix + "-Stderr"); + monitorThread = new Thread(this::monitorLoop, prefix + "-Monitor"); + stderrThread.start(); + stdoutThread.start(); + monitorThread.start(); + return this; + } + + /** + * Creates a new task, passing the given script to the worker for execution. + * + * @param script The script for the worker to execute in its environment. + * @return The newly created {@link Task} object tracking the execution. + * @throws IOException If something goes wrong communicating with the worker. + */ + public Task task(String script) throws IOException { + return task(script, null); + } + + /** + * Creates a new task, passing the given script to the worker for execution. + * + * @param script The script for the worker to execute in its environment. + * @param inputs Optional list of key/value pairs to feed into the script as inputs. + * @return The newly created {@link Task} object tracking the execution. + * @throws IOException If something goes wrong communicating with the worker. + */ + public Task task(String script, Map inputs) throws IOException { + start(); + return new Task(script, inputs); + } + + /** Closes the worker process's input stream, in order to shut it down. */ + @Override + public void close() { + stdin.close(); + } + + /** Input loop processing lines from the worker stdout stream. */ + private void stdoutLoop() { + BufferedReader stdout = new BufferedReader(new InputStreamReader(process.getInputStream())); + while (true) { + String line; + try { + line = stdout.readLine(); + } + catch (IOException exc) { + // Something went wrong reading the line. Panic! + debugService(Types.stackTrace(exc)); + break; + } + + if (line == null) { + debugService(""); + return; + } + try { + Map response = Types.decode(line); + debugService(line); // Echo the line to the debug listener. + Object uuid = response.get("task"); + if (uuid == null) { + debugService("Invalid service message:" + line); + continue; + } + Task task = tasks.get(uuid.toString()); + if (task == null) { + debugService("No such task: " + uuid); + continue; + } + task.handle(response); + } + catch (Exception exc) { + // Something went wrong decoding the line of JSON. + // Skip it and keep going, but log it first. + debugService(String.format(" %s", line)); + } + } + } + + /** Input loop processing lines from the worker stderr stream. */ + private void stderrLoop() { + BufferedReader stderr = new BufferedReader(new InputStreamReader(process.getErrorStream())); + try { + while (true) { + String line = stderr.readLine(); + if (line == null) { + debugService(""); + return; + } + debugWorker(line); + } + } + catch (IOException exc) { + debugWorker(Types.stackTrace(exc)); + } + } + + private void monitorLoop() { + // Wait until the worker process terminates. + while (process.isAlive()) { + try { + Thread.sleep(50); + } + catch (InterruptedException exc) { + debugService(Types.stackTrace(exc)); + } + } + + // Do some sanity checks. + int exitCode = process.exitValue(); + if (exitCode != 0) debugService(""); + int taskCount = tasks.size(); + if (taskCount > 0) debugService(""); + + // Notify any remaining tasks about the process crash. + tasks.values().forEach(Task::crash); + tasks.clear(); + } + + private void debugService(String message) { debug("SERVICE", message); } + private void debugWorker(String message) { debug("WORKER", message); } + + /** + * Passes a message to the listener registered + * via the {@link #debug(Consumer)} method. + */ + private void debug(String prefix, String message) { + if (debugListener == null) return; + debugListener.accept("[" + prefix + "-" + serviceID + "] " + message); + } + + public enum TaskStatus { + INITIAL, QUEUED, RUNNING, COMPLETE, CANCELED, FAILED, CRASHED; + + /** + * @return true iff status is {@link #COMPLETE}, {@link #CANCELED}, {@link #FAILED}, or {@link #CRASHED}. + */ + public boolean isFinished() { + return this == COMPLETE || this == CANCELED || this == FAILED || this == CRASHED; + } + } + + public enum RequestType { + EXECUTE, CANCEL + } + + public enum ResponseType { + LAUNCH, UPDATE, COMPLETION, CANCELATION, FAILURE, CRASH + } + + /** + * An Appose *task* is an asynchronous operation performed by its associated + * Appose {@link Service}. It is analogous to a {@code Future}. + */ + public class Task { + + public final String uuid = UUID.randomUUID().toString(); + public final String script; + private final Map mInputs = new HashMap<>(); + private final Map mOutputs = new HashMap<>(); + public final Map inputs = Collections.unmodifiableMap(mInputs); + public final Map outputs = Collections.unmodifiableMap(mOutputs); + + public TaskStatus status = TaskStatus.INITIAL; + public String message; + public long current; + public long maximum = 1; + public String error; + + private final List> listeners = new ArrayList<>(); + + public Task(String script, Map inputs) { + this.script = script; + if (inputs != null) mInputs.putAll(inputs); + tasks.put(uuid, this); + } + + public synchronized Task start() { + if (status != TaskStatus.INITIAL) throw new IllegalStateException(); + status = TaskStatus.QUEUED; + + Map args = new HashMap<>(); + args.put("script", script); + args.put("inputs", inputs); + request(RequestType.EXECUTE, args); + + return this; + } + + /** + * Registers a listener to be notified of updates to the task. + * + * @param listener Function to invoke in response to task status updates. + */ + public synchronized void listen(Consumer listener) { + if (status != TaskStatus.INITIAL) { + throw new IllegalStateException("Task is not in the INITIAL state"); + } + listeners.add(listener); + } + + public synchronized void waitFor() throws InterruptedException { + if (status == TaskStatus.INITIAL) start(); + if (status != TaskStatus.QUEUED && status != TaskStatus.RUNNING) return; + wait(); + } + + /** Sends a task cancelation request to the worker process. */ + public void cancel() { + request(RequestType.CANCEL, null); + } + + /** Sends a request to the worker process. */ + private void request(RequestType requestType, Map args) { + Map request = new HashMap<>(); + request.put("task", uuid); + request.put("requestType", requestType.toString()); + if (args != null) request.putAll(args); + String encoded = Types.encode(request); + + stdin.println(encoded); + // NB: Flush is necessary to ensure worker receives the data! + stdin.flush(); + debugService(encoded); + } + + @SuppressWarnings("hiding") + private void handle(Map response) { + String maybeResponseType = (String) response.get("responseType"); + if (maybeResponseType == null) { + debugService("Message type not specified"); + return; + } + ResponseType responseType = ResponseType.valueOf(maybeResponseType); + + switch (responseType) { + case LAUNCH: + status = TaskStatus.RUNNING; + break; + case UPDATE: + message = (String) response.get("message"); + Number current = (Number) response.get("current"); + Number maximum = (Number) response.get("maximum"); + if (current != null) this.current = current.longValue(); + if (maximum != null) this.maximum = maximum.longValue(); + break; + case COMPLETION: + tasks.remove(uuid); + status = TaskStatus.COMPLETE; + @SuppressWarnings({ "rawtypes", "unchecked" }) + Map outputs = (Map) response.get("outputs"); + if (outputs != null) mOutputs.putAll(outputs); + break; + case CANCELATION: + tasks.remove(uuid); + status = TaskStatus.CANCELED; + break; + case FAILURE: + tasks.remove(uuid); + status = TaskStatus.FAILED; + Object error = response.get("error"); + this.error = error == null ? null : error.toString(); + break; + default: + debugService("Invalid service message type: " + responseType); + return; + } + + TaskEvent event = new TaskEvent(this, responseType); + listeners.forEach(l -> l.accept(event)); + + if (status.isFinished()) { + synchronized (this) { + notifyAll(); + } + } + } + + private void crash() { + TaskEvent event = new TaskEvent(this, ResponseType.CRASH); + status = TaskStatus.CRASHED; + listeners.forEach(l -> l.accept(event)); + synchronized (this) { + notifyAll(); + } + } + + @Override + public String toString() { + return String.format("uuid=%s, status=%s, message=%s, current=%d, maximum=%d, error=%s", + uuid, status, message, current, maximum, error); + } + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/SharedMemory.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/SharedMemory.java new file mode 100644 index 00000000..6783e3d5 --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/SharedMemory.java @@ -0,0 +1,346 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import com.sun.jna.Pointer; +import com.sun.jna.platform.linux.LibRT; +import com.sun.jna.platform.unix.LibCUtil; +import com.sun.jna.platform.win32.Kernel32; +import com.sun.jna.platform.win32.WinBase; +import com.sun.jna.platform.win32.WinError; +import com.sun.jna.platform.win32.WinNT; + +/** + * Unfinished port of Python's + * {@code multiprocess.shared_memory.SharedMemory} class. + *

+ * Original source code in Python can be found here. + *

+ */ +public class SharedMemory { + + private static final boolean USE_POSIX = + System.getProperty("os.name").indexOf("Windows") < 0; + + public static final int O_RDONLY = 0; + public static final int O_WRONLY = 1; + public static final int O_RDWR = 2; + public static final int O_NONBLOCK = 4; + public static final int O_APPEND = 8; + public static final int O_SHLOCK = 16; + public static final int O_EXLOCK = 32; + public static final int O_ASYNC = 64; + public static final int O_SYNC = 128; + public static final int O_FSYNC = 128; + public static final int O_NOFOLLOW = 256; + public static final int O_CREAT = 512; + public static final int O_TRUNC = 1024; + public static final int O_EXCL = 2048; + + public static final int O_ACCMODE = 3; + public static final int O_NDELAY = 4; + + public static final int O_EVTONLY = 32768; + public static final int O_NOCTTY = 131072; + public static final int O_DIRECTORY = 1048576; + public static final int O_SYMLINK = 2097152; + public static final int O_DSYNC = 4194304; + public static final int O_CLOEXEC = 16777216; + public static final int O_NOFOLLOW_ANY = 536870912; + + private static final int O_CREX = O_CREAT | O_EXCL; + + /** FreeBSD (and perhaps other BSDs) limit names to 14 characters. */ + private static final int SHM_SAFE_NAME_LENGTH = 14; + + /** Shared memory block name prefix. */ + private static final String SHM_NAME_PREFIX = USE_POSIX ? "/psm_" : "wnsm_"; + + public static class MemoryView { + public MemoryView(Pointer mmap) { + throw new UnsupportedOperationException("Unimplemented"); + } + + public void release() { + throw new UnsupportedOperationException("Unimplemented"); + } + } + + private static String token_hex(long nbytes) { + StringBuilder sb = new StringBuilder(); + for (int b=0; b= 1 && s.length() <= 2; + if (s.length() == 1) sb.append("0"); + sb.append(s); + } + return sb.toString(); + } + + private static long sizeFromFileDescriptor(int fd) { + throw new UnsupportedOperationException("Unimplemented"); + } + + private Pointer mmap(int i, long size) { + String tagName = null; // FIXME + return mmap(i, size, tagName); + } + + private Pointer mmap(int i, long size, String tagName) { + throw new UnsupportedOperationException("Unimplemented"); + } + + private void osClose(int fd) { + throw new UnsupportedOperationException("Unimplemented"); + } + + private void register(String name) { + throw new UnsupportedOperationException("Unimplemented"); + } + + private static void unregister(String name) { + throw new UnsupportedOperationException("Unimplemented"); + } + + /** Creates a random filename for the shared memory object. */ + private static String make_filename() { + // number of random bytes to use for name + long nbytes = (SHM_SAFE_NAME_LENGTH - SHM_NAME_PREFIX.length()) / 2; + assert nbytes >= 2; // 'SHM_NAME_PREFIX too long' + String name = SHM_NAME_PREFIX + token_hex(nbytes); + assert name.length() <= SHM_SAFE_NAME_LENGTH; + return name; + } + + private String name; + private long size; + private int fd = -1; + private Pointer mmap; + private MemoryView buf; + private int flags = O_RDWR; + private int mode = 0600; + private boolean prepend_leading_slash = USE_POSIX; + + public SharedMemory(String name, boolean create, long size) { + // NB: Would be great to use LArray for this instead. But it + // doesn't support an equivalent of Python's shared_memory: + // https://github.com/xerial/larray/issues/78 + + if (size < 0) { + throw new IllegalArgumentException("'size' must be a positive integer"); + } + if (create) { + this.flags = O_CREX | O_RDWR; + if (size == 0) { + throw new IllegalArgumentException("'size' must be a positive number different from zero"); + } + } + if (name == null && (this.flags & O_EXCL) != 0) { + throw new IllegalArgumentException("'name' can only be null if create=true"); + } + + if (USE_POSIX) { + // POSIX Shared Memory + + if (name == null) { + while (true) { + name = make_filename(); + this.fd = LibRT.INSTANCE.shm_open( + name, + this.flags, + this.mode + ); + this.name = name; + break; + } + } + else { + name = this.prepend_leading_slash ? "/" + name : name; + this.fd = LibRT.INSTANCE.shm_open( + name, + this.flags, + mode + ); + this.name = name; + } + try { + if (create && size != 0) { + LibCUtil.ftruncate(this.fd, size); + } + size = sizeFromFileDescriptor(this.fd); + //LibCUtil.mmap(Pointer addr, long length, int prot, int flags, int fd, long offset); + this.mmap = mmap(this.fd, size); + } + finally { + this.unlink(); + } + + register(this.name); + } + else { + // Windows Named Shared Memory + + if (create) { + while (true) { + String temp_name = name == null ? make_filename() : name; + // Create and reserve shared memory block with this name + // until it can be attached to by mmap. + Kernel32.HANDLE h_map = Kernel32.INSTANCE.CreateFileMapping( + WinBase.INVALID_HANDLE_VALUE, + null, + WinNT.PAGE_READWRITE, + (int) ((size >> 32) & 0xFFFFFFFF), + (int) (size & 0xFFFFFFFF), + temp_name + ); + try { + int last_error_code = Kernel32.INSTANCE.GetLastError(); + if (last_error_code == WinError.ERROR_ALREADY_EXISTS) { + if (name != null) { + throw new RuntimeException("File already exists: " + name); + } + continue; + } + //LibCUtil.mmap(Pointer addr, long length, int prot, int flags, int fd, long offset); + this.mmap = mmap(-1, size, temp_name); + } + finally { + Kernel32.INSTANCE.CloseHandle(h_map); + } + this.name = temp_name; + break; + } + } + else { + this.name = name; + // Dynamically determine the existing named shared memory + // block's size which is likely a multiple of mmap.PAGESIZE. + Kernel32.HANDLE h_map = Kernel32.INSTANCE.OpenFileMapping( + WinBase.FILE_MAP_READ, + false, + name + ); + Pointer p_buf; + try { + p_buf = Kernel32.INSTANCE.MapViewOfFile( + h_map, + WinBase.FILE_MAP_READ, + 0, + 0, + 0 + ); + } + finally { + Kernel32.INSTANCE.CloseHandle(h_map); + } + try { + //SIZE_T size = Kernel32.INSTANCE.VirtualQueryEx(HANDLE hProcess, Pointer lpAddress, MEMORY_BASIC_INFORMATION lpBuffer, SIZE_T dwLength); + if (getClass() == getClass()) throw new UnsupportedOperationException(); + //size = Kernel32.INSTANCE.VirtualQuerySize(p_buf); + } + finally { + Kernel32.INSTANCE.UnmapViewOfFile(p_buf); + } + //LibCUtil.mmap(Pointer addr, long length, int prot, int flags, int fd, long offset); + this.mmap = mmap(-1, size, name); + } + } + + this.size = size; + this.buf = new MemoryView(this.mmap); + } + + /** + * A memoryview of contents of the shared memory block. + * + * @return The memoryview buffer. + */ + public MemoryView buf() { + return this.buf; + } + + /** + * Unique name that identifies the shared memory block. + * + * @return The name of the shared memory. + */ + public String name() { + String reported_name = this.name; + if (USE_POSIX && this.prepend_leading_slash) { + if (this.name.startsWith("/")) { + reported_name = this.name.substring(1); + } + } + return reported_name; + } + + /** + * Size in bytes. + * + * @return The length in bytes of the shared memory. + */ + public long size() { + return this.size; + } + + /** + * Closes access to the shared memory from this instance but does + * not destroy the shared memory block. + */ + public void close() { + if (this.buf != null) { + this.buf.release(); + this.buf = null; + } + if (this.mmap != null ){ + //this._mmap.close(); + this.mmap = null; + } + if (USE_POSIX && this.fd >= 0) { + osClose(this.fd); + this.fd = -1; + } + } + + /** + * Requests that the underlying shared memory block be destroyed. + * In order to ensure proper cleanup of resources, unlink should be + * called once (and only once) across all processes which have access + * to the shared memory block. + */ + public void unlink() { + if (USE_POSIX && this.name != null) { + LibRT.INSTANCE.shm_unlink(this.name); + unregister(this.name); + } + } + +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/TaskEvent.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/TaskEvent.java new file mode 100644 index 00000000..bf1172bb --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/TaskEvent.java @@ -0,0 +1,48 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ + +package io.bioimage.modelrunner.apposed.appose; + +import io.bioimage.modelrunner.apposed.appose.Service.ResponseType; + +public class TaskEvent { + + public final Service.Task task; + public final ResponseType responseType; + + public TaskEvent(Service.Task task, ResponseType responseType) { + this.task = task; + this.responseType = responseType; + } + + @Override + public String toString() { + return String.format("[%s] %s", responseType, task); + } +} diff --git a/src/main/java/io/bioimage/modelrunner/apposed/appose/Types.java b/src/main/java/io/bioimage/modelrunner/apposed/appose/Types.java new file mode 100644 index 00000000..f38bb6eb --- /dev/null +++ b/src/main/java/io/bioimage/modelrunner/apposed/appose/Types.java @@ -0,0 +1,64 @@ +/*- + * #%L + * Appose: multi-language interprocess cooperation with shared memory. + * %% + * Copyright (C) 2023 Appose developers. + * %% + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + * #L% + */ +package io.bioimage.modelrunner.apposed.appose; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.Map; + +import com.google.gson.Gson; +import com.google.gson.reflect.TypeToken; + +public final class Types { + + private Types() { + // NB: Prevent instantiation of utility class. + } + + public static String encode(Map data) { + Gson gson = new Gson(); + return gson.toJson(data); + } + + @SuppressWarnings("unchecked") + public static Map decode(String json) { + Gson gson = new Gson(); + Type mapType = new TypeToken>() {}.getType(); + return gson.fromJson(json, mapType); + } + + /** Dumps the given exception, including stack trace, to a string. */ + public static String stackTrace(Throwable t) { + StringWriter sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } +} diff --git a/src/main/java/io/bioimage/modelrunner/runmode/RunMode.java b/src/main/java/io/bioimage/modelrunner/runmode/RunMode.java index ff66bad6..e62b0f39 100644 --- a/src/main/java/io/bioimage/modelrunner/runmode/RunMode.java +++ b/src/main/java/io/bioimage/modelrunner/runmode/RunMode.java @@ -30,13 +30,12 @@ import java.util.UUID; import java.util.stream.IntStream; -import org.apposed.appose.Appose; -import org.apposed.appose.Environment; -import org.apposed.appose.Service; -import org.apposed.appose.Service.Task; +import io.bioimage.modelrunner.apposed.appose.Appose; +import io.bioimage.modelrunner.apposed.appose.Environment; +import io.bioimage.modelrunner.apposed.appose.Service; +import io.bioimage.modelrunner.apposed.appose.Service.Task; import io.bioimage.modelrunner.runmode.ops.OpInterface; -import io.bioimage.modelrunner.system.PlatformDetection; import io.bioimage.modelrunner.tensor.shm.SharedMemoryArray; import io.bioimage.modelrunner.tensor.shm.SharedMemoryFile; import io.bioimage.modelrunner.tensor.Tensor; diff --git a/src/main/java/io/bioimage/modelrunner/transformations/PythonTransformation.java b/src/main/java/io/bioimage/modelrunner/transformations/PythonTransformation.java index 817a72e1..54928cb8 100644 --- a/src/main/java/io/bioimage/modelrunner/transformations/PythonTransformation.java +++ b/src/main/java/io/bioimage/modelrunner/transformations/PythonTransformation.java @@ -33,7 +33,7 @@ import java.util.stream.Collectors; import org.apache.commons.compress.archivers.ArchiveException; -import org.apposed.appose.Conda; +import io.bioimage.modelrunner.apposed.appose.Conda; import io.bioimage.modelrunner.numpy.DecodeNumpy; import io.bioimage.modelrunner.runmode.RunMode;