diff --git a/aquadoggo/src/api/api.rs b/aquadoggo/src/api/api.rs new file mode 100644 index 000000000..247c72bda --- /dev/null +++ b/aquadoggo/src/api/api.rs @@ -0,0 +1,45 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::{bail, Result}; + +use crate::api::{migrate, LockFile}; +use crate::bus::{ServiceMessage, ServiceSender}; +use crate::context::Context; + +/// Interface to interact with the node in a programmatic, "low-level" way. +#[derive(Debug)] +pub struct NodeInterface { + context: Context, + tx: ServiceSender, +} + +impl NodeInterface { + pub fn new(context: Context, tx: ServiceSender) -> Self { + Self { context, tx } + } + + pub async fn migrate(&self, lock_file: LockFile) -> Result { + let committed_operations = migrate( + &self.context.store, + &self.context.schema_provider, + lock_file, + ) + .await?; + + let did_migration_happen = !committed_operations.is_empty(); + + // Send new operations from migration on service communication bus, this will arrive + // eventually at the materializer service + for operation_id in committed_operations { + if self + .tx + .send(ServiceMessage::NewOperation(operation_id)) + .is_err() + { + bail!("Failed to inform materialization service about migration"); + } + } + + Ok(did_migration_happen) + } +} diff --git a/aquadoggo/src/api/lock_file.rs b/aquadoggo/src/api/lock_file.rs new file mode 100644 index 000000000..44993c648 --- /dev/null +++ b/aquadoggo/src/api/lock_file.rs @@ -0,0 +1,89 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use p2panda_rs::entry::EncodedEntry; +use p2panda_rs::hash::Hash; +use p2panda_rs::operation::EncodedOperation; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +/// Serializable format holding encoded and signed p2panda operations and entries. +/// +/// ```toml +/// version = 1 +/// +/// [[commits]] +/// entry_hash = "..." +/// entry = "..." +/// operation = "..." +/// +/// [[commits]] +/// entry_hash = "..." +/// entry = "..." +/// operation = "..." +/// +/// # ... +/// ``` +#[derive(Debug, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct LockFile { + /// Version of this lock file. + pub version: LockFileVersion, + + /// List of commits holding the signed operation and entry data. + pub commits: Option>, +} + +/// Known versions of lock file format. +#[derive(Debug, Clone)] +pub enum LockFileVersion { + V1, +} + +impl LockFileVersion { + /// Returns the operation version encoded as u64. + pub fn as_u64(&self) -> u64 { + match self { + LockFileVersion::V1 => 1, + } + } +} + +impl Serialize for LockFileVersion { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_u64(self.as_u64()) + } +} + +impl<'de> Deserialize<'de> for LockFileVersion { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let version = u64::deserialize(deserializer)?; + + match version { + 1 => Ok(LockFileVersion::V1), + _ => Err(serde::de::Error::custom(format!( + "unsupported lock file version {}", + version + ))), + } + } +} + +/// Single commit with encoded entry and operation pair. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(deny_unknown_fields)] +pub struct Commit { + /// Hash of the entry. + pub entry_hash: Hash, + + /// Encoded and signed p2panda entry. + pub entry: EncodedEntry, + + /// Encoded p2panda operation. + pub operation: EncodedOperation, +} diff --git a/aquadoggo/src/api/migration.rs b/aquadoggo/src/api/migration.rs new file mode 100644 index 000000000..5a11267f0 --- /dev/null +++ b/aquadoggo/src/api/migration.rs @@ -0,0 +1,77 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::{anyhow, bail, Context as AnyhowContext, Result}; +use p2panda_rs::api::publish; +use p2panda_rs::entry::decode::decode_entry; +use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry}; +use p2panda_rs::operation::decode::decode_operation; +use p2panda_rs::operation::traits::Schematic; +use p2panda_rs::operation::OperationId; +use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore}; + +use crate::api::LockFile; +use crate::schema::SchemaProvider; + +/// Utility method to publish multiple operations and entries in the node database. +/// +/// Returns a list of operation ids which have been committed during this migration process. Can be +/// empty if no migration was required. +pub async fn migrate( + store: &S, + schema_provider: &SchemaProvider, + lock_file: LockFile, +) -> Result> +where + S: OperationStore + EntryStore + LogStore, +{ + let mut committed_operations = Vec::new(); + let commits = lock_file.commits.unwrap_or(Vec::new()); + + for commit in commits { + let planned_entry = decode_entry(&commit.entry) + .context("Invalid entry encoding encountered in lock file")?; + + let existing_entry = store + .get_entry_at_seq_num( + planned_entry.public_key(), + planned_entry.log_id(), + planned_entry.seq_num(), + ) + .await + .context("Internal database error occurred while retrieving entry")?; + + // Check if node already knows about this commit + match existing_entry { + Some(existing_entry) => { + // Check its integrity with our lock file by comparing entry hashes + if existing_entry.hash() != commit.entry_hash { + bail!("Integrity check failed when comparing planned and existing commit") + } + } + None => { + // .. otherwise publish the planned commit! + let plain_operation = decode_operation(&commit.operation) + .context("Invalid operation encoding encountered in lock file")?; + + let schema = schema_provider + .get(plain_operation.schema_id()) + .await + .ok_or_else(|| anyhow!("Could not migrate commit with unknown schema id"))?; + + publish( + store, + &schema, + &commit.entry, + &plain_operation, + &commit.operation, + ) + .await + .context("Internal database error occurred while publishing migration commit")?; + + committed_operations.push(commit.entry_hash.into()); + } + }; + } + + Ok(committed_operations) +} diff --git a/aquadoggo/src/api/mod.rs b/aquadoggo/src/api/mod.rs new file mode 100644 index 000000000..3ecdeb4b1 --- /dev/null +++ b/aquadoggo/src/api/mod.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod api; +mod lock_file; +mod migration; + +pub use api::NodeInterface; +pub use lock_file::LockFile; +pub use migration::migrate; diff --git a/aquadoggo/src/lib.rs b/aquadoggo/src/lib.rs index c7dc93919..7af6cf617 100644 --- a/aquadoggo/src/lib.rs +++ b/aquadoggo/src/lib.rs @@ -12,6 +12,7 @@ unused_qualifications )] #![allow(clippy::uninlined_format_args)] +mod api; mod bus; mod config; mod context; @@ -31,6 +32,7 @@ mod test_utils; #[cfg(test)] mod tests; +pub use crate::api::LockFile; pub use crate::config::{AllowList, Configuration}; pub use crate::network::NetworkConfiguration; pub use node::Node; diff --git a/aquadoggo/src/manager.rs b/aquadoggo/src/manager.rs index 16da4106f..9fa6b813b 100644 --- a/aquadoggo/src/manager.rs +++ b/aquadoggo/src/manager.rs @@ -206,6 +206,13 @@ where rx_ready } + /// Returns a sender for allowing communication on the service bus. + /// + /// Can also be used to subscribe to the bus. + pub fn get_sender(&self) -> Sender { + return self.tx.clone(); + } + /// Future which resolves as soon as a service returned an error, panicked or stopped. pub async fn on_exit(&self) { self.exit_handle.clone().await; diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index 3bd70c151..0d059fbf4 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -3,6 +3,7 @@ use anyhow::Result; use p2panda_rs::identity::KeyPair; +use crate::api::NodeInterface; use crate::bus::ServiceMessage; use crate::config::Configuration; use crate::context::Context; @@ -14,6 +15,7 @@ use crate::materializer::materializer_service; use crate::network::network_service; use crate::replication::replication_service; use crate::schema::SchemaProvider; +use crate::LockFile; /// Capacity of the internal broadcast channel used to communicate between services. const SERVICE_BUS_CAPACITY: usize = 512_000; @@ -38,8 +40,17 @@ async fn initialize_db(config: &Configuration) -> Result { /// Main runtime managing the p2panda node process. #[allow(missing_debug_implementations)] pub struct Node { + /// SQL database connection pool. pool: Pool, + + /// Service manager with communication bus. manager: ServiceManager, + + /// Interface to interact with the node in a programmatic, "low-level" way. + /// + /// Through this interface access to the internal store or service communication bus is + /// possible. + pub api: NodeInterface, } impl Node { @@ -65,7 +76,7 @@ impl Node { // Create service manager with shared data between services let context = Context::new(store, key_pair, config, schema_provider); let mut manager = - ServiceManager::::new(SERVICE_BUS_CAPACITY, context); + ServiceManager::::new(SERVICE_BUS_CAPACITY, context.clone()); // Start materializer service if manager @@ -95,7 +106,11 @@ impl Node { panic!("Failed starting replication service"); } - Self { pool, manager } + // Create a low-level interface which can be exposed so developers can interact with the + // internal store and service bus + let api = NodeInterface::new(context, manager.get_sender()); + + Self { pool, manager, api } } /// This future resolves when at least one system service stopped. @@ -114,4 +129,14 @@ impl Node { // Close connection pool self.pool.close().await; } + + /// Utility method to publish multiple operations and entries in the node database. + /// + /// This method is especially useful for migrating schemas or seeding initial data. Lock files + /// with schema data can be created with the p2panda command line tool `fishy`. + /// + /// Returns `true` if migration took place or `false` if no migration was required. + pub async fn migrate(&self, lock_file: LockFile) -> Result { + self.api.migrate(lock_file).await + } }