diff --git a/Cargo.lock b/Cargo.lock index 3dfb9adc1..03bd40c90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -628,12 +628,15 @@ dependencies = [ "log", "nix 0.27.1", "oci-spec", + "prost-types 0.11.9", "protobuf 3.2.0", "serde", "serde_json", + "sha256", "tempfile", "thiserror", "tokio", + "tokio-stream", "ttrpc", "ttrpc-codegen", "wat", diff --git a/crates/containerd-shim-wasm/Cargo.toml b/crates/containerd-shim-wasm/Cargo.toml index 4a6f15b66..5e2ea2a96 100644 --- a/crates/containerd-shim-wasm/Cargo.toml +++ b/crates/containerd-shim-wasm/Cargo.toml @@ -31,6 +31,9 @@ ttrpc = { workspace = true } wat = { workspace = true } tokio = { version = "1.34.0", features = [ "full" ] } futures = { version = "0.3.29" } +tokio-stream = { version = "0.1" } +prost-types = "0.11" # should match version in containerd-shim +sha256 = "1.4.0" [target.'cfg(unix)'.dependencies] caps = "0.5" diff --git a/crates/containerd-shim-wasm/src/container/context.rs b/crates/containerd-shim-wasm/src/container/context.rs index ae6c5cc47..4619cec39 100644 --- a/crates/containerd-shim-wasm/src/container/context.rs +++ b/crates/containerd-shim-wasm/src/container/context.rs @@ -42,6 +42,7 @@ pub enum Source<'a> { // and they will be included in this array, e.g., a `toml` file with the // runtime configuration. Oci(&'a [WasmLayer]), + Precompiled(&'a WasmLayer), } /// The entrypoint for a WASI module / component. @@ -79,6 +80,8 @@ impl RuntimeContext for WasiContext<'_> { let source = if self.wasm_layers.is_empty() { Source::File(PathBuf::from(path)) + } else if self.wasm_layers.len() == 1 && self.wasm_layers[0].precompiled { + Source::Precompiled(&self.wasm_layers[0]) } else { Source::Oci(self.wasm_layers) }; @@ -337,6 +340,7 @@ mod tests { wasm_layers: &[WasmLayer { layer: vec![], config: Descriptor::new(oci_spec::image::MediaType::Other("".to_string()), 10, ""), + precompiled: false, }], platform: &Platform::default(), }; diff --git a/crates/containerd-shim-wasm/src/container/engine.rs b/crates/containerd-shim-wasm/src/container/engine.rs index 77f10cc3f..2b9119364 100644 --- a/crates/containerd-shim-wasm/src/container/engine.rs +++ b/crates/containerd-shim-wasm/src/container/engine.rs @@ -1,7 +1,7 @@ use std::fs::File; use std::io::Read; -use anyhow::{Context, Result}; +use anyhow::{bail, Context, Result}; use super::Source; use crate::container::{PathResolve, RuntimeContext}; @@ -26,6 +26,7 @@ pub trait Engine: Clone + Send + Sync + 'static { let path = match source { Source::File(path) => path, Source::Oci(_) => return Ok(()), + Source::Precompiled(_) => return Ok(()), }; path.resolve_in_path_or_cwd() @@ -52,4 +53,14 @@ pub trait Engine: Clone + Send + Sync + 'static { fn supported_layers_types() -> &'static [&'static str] { &["application/vnd.bytecodealliance.wasm.component.layer.v0+wasm"] } + + /// Precomiple a module + fn precompile(&self, _layers: &[Vec]) -> Result> { + bail!("precompilation not supported for this runtime") + } + + /// Precomiple a module + fn can_precompile() -> bool { + false + } } diff --git a/crates/containerd-shim-wasm/src/sandbox/containerd.rs b/crates/containerd-shim-wasm/src/sandbox/containerd.rs index 8a5d2782b..190b23490 100644 --- a/crates/containerd-shim-wasm/src/sandbox/containerd.rs +++ b/crates/containerd-shim-wasm/src/sandbox/containerd.rs @@ -1,19 +1,29 @@ #![cfg(unix)] +use std::collections::HashMap; use std::path::Path; use containerd_client; use containerd_client::services::v1::containers_client::ContainersClient; use containerd_client::services::v1::content_client::ContentClient; use containerd_client::services::v1::images_client::ImagesClient; -use containerd_client::services::v1::{GetContainerRequest, GetImageRequest, ReadContentRequest}; +use containerd_client::services::v1::{ + Container, DeleteContentRequest, GetContainerRequest, GetImageRequest, Image, Info, + InfoRequest, ReadContentRequest, UpdateImageRequest, UpdateRequest, WriteAction, + WriteContentRequest, +}; use containerd_client::tonic::transport::Channel; use containerd_client::{tonic, with_namespace}; use futures::TryStreamExt; use oci_spec::image::{Arch, ImageManifest, MediaType, Platform}; +use prost_types::FieldMask; +use sha256::digest; use tokio::runtime::Runtime; -use tonic::Request; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Code, Request}; +use crate::container::Engine; use crate::sandbox::error::{Error as ShimError, Result}; use crate::sandbox::oci::{self, WasmLayer}; @@ -62,12 +72,162 @@ impl Client { }) } - pub fn get_image_content_sha(&self, image_name: impl ToString) -> Result { + // used in tests to clean up content + #[allow(dead_code)] + fn delete_content(&self, digest: impl ToString) -> Result<()> { + self.rt.block_on(async { + let req = DeleteContentRequest { + digest: digest.to_string(), + }; + let req = with_namespace!(req, self.namespace); + ContentClient::new(self.inner.clone()) + .delete(req) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))?; + Ok(()) + }) + } + + pub fn save_content(&self, data: Vec) -> Result { + self.rt.block_on(async { + // create a channel to feed the stream; only sending one message at a time so we can set this to one + let (tx, rx) = mpsc::channel(1); + + let len = data.len() as i64; + let expected = digest(data.clone()); + let expected = format!("sha256:{}", expected); + let mut client = ContentClient::new(self.inner.clone()); + let r#ref = "test".to_string(); + + // Send Stat action to containerd to let it know that we are going to write content + // if the content is already there, it will return early with AlreadyExists + let req = WriteContentRequest { + r#ref: r#ref.clone(), + action: WriteAction::Stat.into(), + total: len, + expected: expected.clone(), + ..Default::default() + }; + tx.send(req) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))?; + let request_stream = ReceiverStream::new(rx); + let request_stream = with_namespace!(request_stream, self.namespace); + let mut response_stream = match client.write(request_stream).await { + Ok(response_stream) => response_stream.into_inner(), + Err(e) if e.code() == Code::AlreadyExists => { + log::info!("content already exists {}", expected.clone().to_string()); + return Ok(expected); + } + Err(e) => return Err(ShimError::Containerd(e.to_string())), + }; + let response = response_stream + .message() + .await + .map_err(|e| ShimError::Containerd(e.to_string()))? + .ok_or_else(|| { + ShimError::Containerd(format!( + "no response received after write request for {}", + expected.clone() + )) + })?; + + // Write and commit at same time + let mut labels = HashMap::new(); + labels.insert("runwasi.io/precompiled".to_string(), "".to_string()); + let commit_request = WriteContentRequest { + action: WriteAction::Commit.into(), + total: len, + offset: response.offset, + expected: expected.clone(), + labels, + data, + ..Default::default() + }; + tx.send(commit_request) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))?; + let response = response_stream + .message() + .await + .map_err(|e| ShimError::Containerd(e.to_string()))? + .ok_or_else(|| { + ShimError::Containerd(format!( + "no response received after write request for {}", + expected.clone() + )) + })?; + + // client should validate that all bytes were written and that the digest matches + if response.offset != len { + return Err(ShimError::Containerd(format!( + "failed to write all bytes, expected {} got {}", + len, response.offset + ))); + } + if response.digest != expected { + return Err(ShimError::Containerd(format!( + "unexpected digest, expected {} got {}", + expected, response.digest + ))); + } + Ok(response.digest) + }) + } + + pub fn get_info(&self, content_digest: String) -> Result { + self.rt.block_on(async { + let req = InfoRequest { + digest: content_digest.clone(), + }; + let req = with_namespace!(req, self.namespace); + let info = ContentClient::new(self.inner.clone()) + .info(req) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))? + .into_inner() + .info + .ok_or_else(|| { + ShimError::Containerd(format!( + "failed to get info for content {}", + content_digest + )) + })?; + Ok(info) + }) + } + + pub fn update_info(&self, info: Info) -> Result { + self.rt.block_on(async { + let req = UpdateRequest { + info: Some(info.clone()), + update_mask: Some(FieldMask { + paths: vec!["labels".to_string()], + }), + }; + let req = with_namespace!(req, self.namespace); + let info = ContentClient::new(self.inner.clone()) + .update(req) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))? + .into_inner() + .info + .ok_or_else(|| { + ShimError::Containerd(format!( + "failed to update info for content {}", + info.digest + )) + })?; + Ok(info) + }) + } + + pub fn get_image(&self, image_name: impl ToString) -> Result { self.rt.block_on(async { let name = image_name.to_string(); let req = GetImageRequest { name }; let req = with_namespace!(req, self.namespace); - let digest = ImagesClient::new(self.inner.clone()) + let image = ImagesClient::new(self.inner.clone()) .get(req) .await .map_err(|err| ShimError::Containerd(err.to_string()))? @@ -75,28 +235,57 @@ impl Client { .image .ok_or_else(|| { ShimError::Containerd(format!( - "failed to get image content sha for image {}", + "failed to get image for image {}", image_name.to_string() )) - })? - .target + })?; + Ok(image) + }) + } + + pub fn update_image(&self, image: Image) -> Result { + self.rt.block_on(async { + let req = UpdateImageRequest { + image: Some(image.clone()), + update_mask: Some(FieldMask { + paths: vec!["labels".to_string()], + }), + }; + let req = with_namespace!(req, self.namespace); + let image = ImagesClient::new(self.inner.clone()) + .update(req) + .await + .map_err(|err| ShimError::Containerd(err.to_string()))? + .into_inner() + .image .ok_or_else(|| { - ShimError::Containerd(format!( - "failed to get image content sha for image {}", - image_name.to_string() - )) - })? - .digest; - Ok(digest) + ShimError::Containerd(format!("failed to update image {}", image.name)) + })?; + Ok(image) }) } - pub fn get_image(&self, container_name: impl ToString) -> Result { + pub fn extract_image_content_sha(&self, image: &Image) -> Result { + let digest = image + .target + .as_ref() + .ok_or_else(|| { + ShimError::Containerd(format!( + "failed to get image content sha for image {}", + image.name + )) + })? + .digest + .clone(); + Ok(digest) + } + + pub fn get_container(&self, container_name: impl ToString) -> Result { self.rt.block_on(async { let id = container_name.to_string(); let req = GetContainerRequest { id }; let req = with_namespace!(req, self.namespace); - let image = ContainersClient::new(self.inner.clone()) + let container = ContainersClient::new(self.inner.clone()) .get(req) .await .map_err(|err| ShimError::Containerd(err.to_string()))? @@ -107,23 +296,23 @@ impl Client { "failed to get image for container {}", container_name.to_string() )) - })? - .image; - Ok(image) + })?; + Ok(container) }) } // load module will query the containerd store to find an image that has an OS of type 'wasm' // If found it continues to parse the manifest and return the layers that contains the WASM modules // and possibly other configuration layers. - pub fn load_modules( + pub fn load_modules( &self, containerd_id: impl ToString, - supported_layer_types: &[&str], + engine: T, ) -> Result<(Vec, Platform)> { - let image_name = self.get_image(containerd_id.to_string())?; - let digest = self.get_image_content_sha(image_name)?; - let manifest = self.read_content(digest)?; + let container = self.get_container(containerd_id.to_string())?; + let mut image = self.get_image(container.image)?; + let digest = self.extract_image_content_sha(&image)?; + let manifest = self.read_content(digest.clone())?; let manifest = manifest.as_slice(); let manifest = ImageManifest::from_reader(manifest)?; @@ -137,23 +326,113 @@ impl Client { log::info!("manifest is not in WASM OCI image format"); return Ok((vec![], platform)); }; + log::info!("found manifest with WASM OCI image format."); + let label = format!("runwasi.io/precompiled/{}", T::name()); + match image.labels.get(&label) { + Some(precompile_digest) if T::can_precompile() => { + log::info!("found precompiled image"); + let precompiled = self.read_content(precompile_digest)?; + Ok(( + vec![WasmLayer { + config: image_config_descriptor.clone(), + layer: precompiled, + precompiled: true, + }], + platform, + )) + } + None if T::can_precompile() => { + log::info!("precompiling module"); + let layers = manifest + .layers() + .iter() + .filter(|x| is_wasm_layer(x.media_type(), T::supported_layers_types())) + .map(|config| self.read_content(config.digest())) + .collect::>>()?; + + log::debug!("precompile complete and saving content"); + let precompiled = engine.precompile(layers.as_slice())?; + let precompile_digest = self.save_content(precompiled.clone())?; + + log::debug!("updating image with compiled content digest"); + image.labels.insert( + "runwasi.io/precompiled".to_string(), + precompile_digest.clone(), + ); + self.update_image(image)?; + + log::debug!("updating content with precompile digest to avoid garbage collection"); + let mut image_content = self.get_info(digest.clone())?; + image_content.labels.insert( + "containerd.io/gc.ref.content.precompile".to_string(), + precompile_digest.clone(), + ); + self.update_info(image_content)?; - let layers = manifest - .layers() - .iter() - .filter(|x| is_wasm_layer(x.media_type(), supported_layer_types)) - .map(|config| { - self.read_content(config.digest()).map(|module| WasmLayer { - config: config.clone(), - layer: module, - }) - }) - .collect::>>()?; - Ok((layers, platform)) + Ok(( + vec![WasmLayer { + config: image_config_descriptor.clone(), + layer: precompiled, + precompiled: true, + }], + platform, + )) + } + _ => { + log::info!("using module from OCI layers"); + let layers = manifest + .layers() + .iter() + .filter(|x| is_wasm_layer(x.media_type(), T::supported_layers_types())) + .map(|config| { + self.read_content(config.digest()).map(|module| WasmLayer { + config: config.clone(), + layer: module, + precompiled: false, + }) + }) + .collect::>>()?; + Ok((layers, platform)) + } + } } } fn is_wasm_layer(media_type: &MediaType, supported_layer_types: &[&str]) -> bool { supported_layer_types.contains(&media_type.to_string().as_str()) } + +#[cfg(test)] +mod tests { + use std::path::PathBuf; + + use super::*; + + #[test] + fn test_save_content() { + let path = PathBuf::from("/run/containerd/containerd.sock"); + let path = path.to_str().unwrap(); + let client = Client::connect(path, "test-ns").unwrap(); + let data = b"hello world".to_vec(); + + let expected = digest(data.clone()); + let expected = format!("sha256:{}", expected); + + let returned = client.save_content(data).unwrap(); + assert_eq!(expected, returned); + + let data = client.read_content(returned).unwrap(); + assert_eq!(data, b"hello world"); + + // a second call should be successful since it already exists + let returned = client.save_content(data).unwrap(); + assert_eq!(expected, returned); + + client.delete_content(expected.clone()).unwrap(); + + client + .read_content(expected) + .expect_err("content should not exist"); + } +} diff --git a/crates/containerd-shim-wasm/src/sandbox/oci.rs b/crates/containerd-shim-wasm/src/sandbox/oci.rs index e06f50dec..b5c7d6342 100644 --- a/crates/containerd-shim-wasm/src/sandbox/oci.rs +++ b/crates/containerd-shim-wasm/src/sandbox/oci.rs @@ -16,6 +16,7 @@ use super::error::Result; pub struct WasmLayer { pub config: Descriptor, pub layer: Vec, + pub precompiled: bool, } fn parse_env(envs: &[String]) -> HashMap { diff --git a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs index 8b228066f..b8142d1f0 100644 --- a/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs +++ b/crates/containerd-shim-wasm/src/sys/unix/container/instance.rs @@ -45,7 +45,7 @@ impl SandboxInstance for Instance { // check if container is OCI image with wasm layers and attempt to read the module let (modules, platform) = containerd::Client::connect(cfg.get_containerd_address(), &namespace)? - .load_modules(&id, E::supported_layers_types()) + .load_modules(&id, engine.clone()) .unwrap_or_else(|e| { log::warn!("Error obtaining wasm layers for container {id}. Will attempt to use files inside container image. Error: {e}"); (vec![], Platform::default()) diff --git a/crates/containerd-shim-wasmedge/src/instance.rs b/crates/containerd-shim-wasmedge/src/instance.rs index e8c33151e..3541237e4 100644 --- a/crates/containerd-shim-wasmedge/src/instance.rs +++ b/crates/containerd-shim-wasmedge/src/instance.rs @@ -75,6 +75,9 @@ impl Engine for WasmEdgeEngine { Source::Oci(_modules) => { bail!("only a single module is supported when using images with OCI layers") } + Source::Precompiled(_) => { + bail!("precompiled modules not support at this time") + } }; stdio.redirect()?; diff --git a/crates/containerd-shim-wasmer/src/instance.rs b/crates/containerd-shim-wasmer/src/instance.rs index 7f1702cd0..5b3cd8c23 100644 --- a/crates/containerd-shim-wasmer/src/instance.rs +++ b/crates/containerd-shim-wasmer/src/instance.rs @@ -51,6 +51,9 @@ impl Engine for WasmerEngine { Source::Oci(_modules) => { bail!("only a single module is supported when using images with OCI layers") } + Source::Precompiled(_) => { + bail!("precompiled modules not support at this time") + } }; let runtime = tokio::runtime::Builder::new_multi_thread() diff --git a/crates/containerd-shim-wasmtime/src/instance.rs b/crates/containerd-shim-wasmtime/src/instance.rs index e415819fc..346feb56e 100644 --- a/crates/containerd-shim-wasmtime/src/instance.rs +++ b/crates/containerd-shim-wasmtime/src/instance.rs @@ -3,7 +3,7 @@ use containerd_shim_wasm::container::{ Engine, Entrypoint, Instance, PathResolve, RuntimeContext, Source, Stdio, }; use wasi_common::I32Exit; -use wasmtime::{Linker, Module, Store}; +use wasmtime::{Linker, Module, Precompiled, Store}; use wasmtime_wasi::{Dir, WasiCtxBuilder}; pub type WasmtimeInstance = Instance; @@ -19,6 +19,8 @@ impl Engine for WasmtimeEngine { } fn run_wasi(&self, ctx: &impl RuntimeContext, stdio: Stdio) -> Result { + let _config = wasmtime::Config::new(); + log::info!("setting up wasi"); let root_path = Dir::from_std_file(std::fs::File::open("/")?); let envs: Vec<_> = std::env::vars().collect(); @@ -30,8 +32,6 @@ impl Engine for WasmtimeEngine { .inherit_stdio() .preopened_dir(root_path, "/")?; - stdio.redirect()?; - log::info!("building wasi context"); let wctx = wasi_builder.build(); @@ -60,6 +60,10 @@ impl Engine for WasmtimeEngine { Source::Oci(_modules) => { bail!("only a single module is supported when using images with OCI layers") } + Source::Precompiled(module) => { + log::info!("loading precompiled module"); + load_precompiled(&self.engine, &module.layer)? + } }; let mut linker = Linker::new(&self.engine); @@ -76,7 +80,7 @@ impl Engine for WasmtimeEngine { .context("module does not have a WASI start function")?; log::debug!("running with start function {func:?}"); - + stdio.redirect()?; let status = start_func.call(&mut store, &[], &mut []); let status = status.map(|_| 0).or_else(|err| { match err.downcast_ref::() { @@ -91,4 +95,30 @@ impl Engine for WasmtimeEngine { Ok(status) } + + fn precompile(&self, layers: &[Vec]) -> Result> { + match layers { + [layer] => self.engine.precompile_module(layer), + _ => bail!("only a single module is supported when when precompiling"), + } + } + + fn can_precompile() -> bool { + true + } +} + +fn load_precompiled(engine: &wasmtime::Engine, bytes: &Vec) -> Result { + match engine.detect_precompiled(bytes) { + Some(Precompiled::Module) => { + log::info!("using precompiled module"); + unsafe { Module::deserialize(engine, bytes) } + } + Some(Precompiled::Component) => { + bail!("components not supported") + } + None => { + bail!("invalid precompiled module") + } + } }