Skip to content

Commit

Permalink
Node API to migrate lock files (#598)
Browse files Browse the repository at this point in the history
* Introduce a API to migrate a lock file

* Do not expose the api for now

* Make clippy happy

* Add entry to CHANGELOG.md
  • Loading branch information
adzialocha authored Nov 18, 2023
1 parent ce3c5eb commit 8e21b6a
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 2 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Introduce a `migrate` API on the Node to allow publishing data programmatically [#598](https://github.com/p2panda/aquadoggo/pull/598)

## [0.6.0]

### Added
Expand Down
45 changes: 45 additions & 0 deletions aquadoggo/src/api/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{bail, Result};

use crate::api::{migrate, LockFile};
use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;

/// Interface to interact with the node in a programmatic, "low-level" way.
#[derive(Debug)]
pub struct NodeInterface {
context: Context,
tx: ServiceSender,
}

impl NodeInterface {
pub fn new(context: Context, tx: ServiceSender) -> Self {
Self { context, tx }
}

pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
let committed_operations = migrate(
&self.context.store,
&self.context.schema_provider,
lock_file,
)
.await?;

let did_migration_happen = !committed_operations.is_empty();

// Send new operations from migration on service communication bus, this will arrive
// eventually at the materializer service
for operation_id in committed_operations {
if self
.tx
.send(ServiceMessage::NewOperation(operation_id))
.is_err()
{
bail!("Failed to inform materialization service about migration");
}
}

Ok(did_migration_happen)
}
}
89 changes: 89 additions & 0 deletions aquadoggo/src/api/lock_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::Result;
use p2panda_rs::entry::EncodedEntry;
use p2panda_rs::hash::Hash;
use p2panda_rs::operation::EncodedOperation;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// Serializable format holding encoded and signed p2panda operations and entries.
///
/// ```toml
/// version = 1
///
/// [[commits]]
/// entry_hash = "..."
/// entry = "..."
/// operation = "..."
///
/// [[commits]]
/// entry_hash = "..."
/// entry = "..."
/// operation = "..."
///
/// # ...
/// ```
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct LockFile {
/// Version of this lock file.
pub version: LockFileVersion,

/// List of commits holding the signed operation and entry data.
pub commits: Option<Vec<Commit>>,
}

/// Known versions of lock file format.
#[derive(Debug, Clone)]
pub enum LockFileVersion {
V1,
}

impl LockFileVersion {
/// Returns the operation version encoded as u64.
pub fn as_u64(&self) -> u64 {
match self {
LockFileVersion::V1 => 1,
}
}
}

impl Serialize for LockFileVersion {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(self.as_u64())
}
}

impl<'de> Deserialize<'de> for LockFileVersion {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let version = u64::deserialize(deserializer)?;

match version {
1 => Ok(LockFileVersion::V1),
_ => Err(serde::de::Error::custom(format!(
"unsupported lock file version {}",
version
))),
}
}
}

/// Single commit with encoded entry and operation pair.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Commit {
/// Hash of the entry.
pub entry_hash: Hash,

/// Encoded and signed p2panda entry.
pub entry: EncodedEntry,

/// Encoded p2panda operation.
pub operation: EncodedOperation,
}
77 changes: 77 additions & 0 deletions aquadoggo/src/api/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{anyhow, bail, Context as AnyhowContext, Result};
use p2panda_rs::api::publish;
use p2panda_rs::entry::decode::decode_entry;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::operation::decode::decode_operation;
use p2panda_rs::operation::traits::Schematic;
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore};

use crate::api::LockFile;
use crate::schema::SchemaProvider;

/// Utility method to publish multiple operations and entries in the node database.
///
/// Returns a list of operation ids which have been committed during this migration process. Can be
/// empty if no migration was required.
pub async fn migrate<S>(
store: &S,
schema_provider: &SchemaProvider,
lock_file: LockFile,
) -> Result<Vec<OperationId>>
where
S: OperationStore + EntryStore + LogStore,
{
let mut committed_operations = Vec::new();
let commits = lock_file.commits.unwrap_or_default();

for commit in commits {
let planned_entry = decode_entry(&commit.entry)
.context("Invalid entry encoding encountered in lock file")?;

let existing_entry = store
.get_entry_at_seq_num(
planned_entry.public_key(),
planned_entry.log_id(),
planned_entry.seq_num(),
)
.await
.context("Internal database error occurred while retrieving entry")?;

// Check if node already knows about this commit
match existing_entry {
Some(existing_entry) => {
// Check its integrity with our lock file by comparing entry hashes
if existing_entry.hash() != commit.entry_hash {
bail!("Integrity check failed when comparing planned and existing commit")
}
}
None => {
// .. otherwise publish the planned commit!
let plain_operation = decode_operation(&commit.operation)
.context("Invalid operation encoding encountered in lock file")?;

let schema = schema_provider
.get(plain_operation.schema_id())
.await
.ok_or_else(|| anyhow!("Could not migrate commit with unknown schema id"))?;

publish(
store,
&schema,
&commit.entry,
&plain_operation,
&commit.operation,
)
.await
.context("Internal database error occurred while publishing migration commit")?;

committed_operations.push(commit.entry_hash.into());
}
};
}

Ok(committed_operations)
}
10 changes: 10 additions & 0 deletions aquadoggo/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

#[allow(clippy::module_inception)]
mod api;
mod lock_file;
mod migration;

pub use api::NodeInterface;
pub use lock_file::LockFile;
pub use migration::migrate;
2 changes: 2 additions & 0 deletions aquadoggo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
unused_qualifications
)]
#![allow(clippy::uninlined_format_args)]
mod api;
mod bus;
mod config;
mod context;
Expand All @@ -31,6 +32,7 @@ mod test_utils;
#[cfg(test)]
mod tests;

pub use crate::api::LockFile;
pub use crate::config::{AllowList, Configuration};
pub use crate::network::NetworkConfiguration;
pub use node::Node;
Expand Down
7 changes: 7 additions & 0 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ where
rx_ready
}

/// Returns a sender for allowing communication on the service bus.
///
/// Can also be used to subscribe to the bus.
pub fn get_sender(&self) -> Sender<M> {
self.tx.clone()
}

/// Future which resolves as soon as a service returned an error, panicked or stopped.
pub async fn on_exit(&self) {
self.exit_handle.clone().await;
Expand Down
21 changes: 19 additions & 2 deletions aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use anyhow::Result;
use p2panda_rs::identity::KeyPair;

use crate::api::NodeInterface;
use crate::bus::ServiceMessage;
use crate::config::Configuration;
use crate::context::Context;
Expand All @@ -14,6 +15,7 @@ use crate::materializer::materializer_service;
use crate::network::network_service;
use crate::replication::replication_service;
use crate::schema::SchemaProvider;
use crate::LockFile;

/// Capacity of the internal broadcast channel used to communicate between services.
const SERVICE_BUS_CAPACITY: usize = 512_000;
Expand All @@ -40,6 +42,7 @@ async fn initialize_db(config: &Configuration) -> Result<Pool> {
pub struct Node {
pool: Pool,
manager: ServiceManager<Context, ServiceMessage>,
api: NodeInterface,
}

impl Node {
Expand All @@ -65,7 +68,7 @@ impl Node {
// Create service manager with shared data between services
let context = Context::new(store, key_pair, config, schema_provider);
let mut manager =
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context);
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context.clone());

// Start materializer service
if manager
Expand Down Expand Up @@ -95,7 +98,11 @@ impl Node {
panic!("Failed starting replication service");
}

Self { pool, manager }
// Create a low-level interface which can be exposed so developers can interact with the
// internal store and service bus
let api = NodeInterface::new(context, manager.get_sender());

Self { pool, manager, api }
}

/// This future resolves when at least one system service stopped.
Expand All @@ -114,4 +121,14 @@ impl Node {
// Close connection pool
self.pool.close().await;
}

/// Utility method to publish multiple operations and entries in the node database.
///
/// This method is especially useful for migrating schemas or seeding initial data. Lock files
/// with schema data can be created with the p2panda command line tool `fishy`.
///
/// Returns `true` if migration took place or `false` if no migration was required.
pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
self.api.migrate(lock_file).await
}
}

0 comments on commit 8e21b6a

Please sign in to comment.