Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node API to migrate lock files #598

Merged
merged 4 commits into from
Nov 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Introduce a `migrate` API on the Node to allow publishing data programmatically [#598](https://github.com/p2panda/aquadoggo/pull/598)

## [0.6.0]

### Added
Expand Down
45 changes: 45 additions & 0 deletions aquadoggo/src/api/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{bail, Result};

use crate::api::{migrate, LockFile};
use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;

/// Interface to interact with the node in a programmatic, "low-level" way.
#[derive(Debug)]
pub struct NodeInterface {
context: Context,
tx: ServiceSender,
}

impl NodeInterface {
pub fn new(context: Context, tx: ServiceSender) -> Self {
Self { context, tx }
}

pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
let committed_operations = migrate(
&self.context.store,
&self.context.schema_provider,
lock_file,
)
.await?;

let did_migration_happen = !committed_operations.is_empty();

// Send new operations from migration on service communication bus, this will arrive
// eventually at the materializer service
for operation_id in committed_operations {
if self
.tx
.send(ServiceMessage::NewOperation(operation_id))
.is_err()
{
bail!("Failed to inform materialization service about migration");
}
}

Ok(did_migration_happen)
}
}
89 changes: 89 additions & 0 deletions aquadoggo/src/api/lock_file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::Result;
use p2panda_rs::entry::EncodedEntry;
use p2panda_rs::hash::Hash;
use p2panda_rs::operation::EncodedOperation;
use serde::{Deserialize, Deserializer, Serialize, Serializer};

/// Serializable format holding encoded and signed p2panda operations and entries.
///
/// ```toml
/// version = 1
///
/// [[commits]]
/// entry_hash = "..."
/// entry = "..."
/// operation = "..."
///
/// [[commits]]
/// entry_hash = "..."
/// entry = "..."
/// operation = "..."
///
/// # ...
/// ```
#[derive(Debug, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct LockFile {
/// Version of this lock file.
pub version: LockFileVersion,

/// List of commits holding the signed operation and entry data.
pub commits: Option<Vec<Commit>>,
}

/// Known versions of lock file format.
#[derive(Debug, Clone)]
pub enum LockFileVersion {
V1,
}

impl LockFileVersion {
/// Returns the operation version encoded as u64.
pub fn as_u64(&self) -> u64 {
match self {
LockFileVersion::V1 => 1,
}
}
}

impl Serialize for LockFileVersion {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_u64(self.as_u64())
}
}

impl<'de> Deserialize<'de> for LockFileVersion {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let version = u64::deserialize(deserializer)?;

match version {
1 => Ok(LockFileVersion::V1),
_ => Err(serde::de::Error::custom(format!(
"unsupported lock file version {}",
version
))),
}
}
}

/// Single commit with encoded entry and operation pair.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Commit {
/// Hash of the entry.
pub entry_hash: Hash,

/// Encoded and signed p2panda entry.
pub entry: EncodedEntry,

/// Encoded p2panda operation.
pub operation: EncodedOperation,
}
77 changes: 77 additions & 0 deletions aquadoggo/src/api/migration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use anyhow::{anyhow, bail, Context as AnyhowContext, Result};
use p2panda_rs::api::publish;
use p2panda_rs::entry::decode::decode_entry;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::operation::decode::decode_operation;
use p2panda_rs::operation::traits::Schematic;
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore};

use crate::api::LockFile;
use crate::schema::SchemaProvider;

/// Utility method to publish multiple operations and entries in the node database.
///
/// Returns a list of operation ids which have been committed during this migration process. Can be
/// empty if no migration was required.
pub async fn migrate<S>(
store: &S,
schema_provider: &SchemaProvider,
lock_file: LockFile,
) -> Result<Vec<OperationId>>
where
S: OperationStore + EntryStore + LogStore,
{
let mut committed_operations = Vec::new();
let commits = lock_file.commits.unwrap_or_default();

for commit in commits {
let planned_entry = decode_entry(&commit.entry)
.context("Invalid entry encoding encountered in lock file")?;

let existing_entry = store
.get_entry_at_seq_num(
planned_entry.public_key(),
planned_entry.log_id(),
planned_entry.seq_num(),
)
.await
.context("Internal database error occurred while retrieving entry")?;

// Check if node already knows about this commit
match existing_entry {
Some(existing_entry) => {
// Check its integrity with our lock file by comparing entry hashes
if existing_entry.hash() != commit.entry_hash {
bail!("Integrity check failed when comparing planned and existing commit")
}
}
None => {
// .. otherwise publish the planned commit!
let plain_operation = decode_operation(&commit.operation)
.context("Invalid operation encoding encountered in lock file")?;

let schema = schema_provider
.get(plain_operation.schema_id())
.await
.ok_or_else(|| anyhow!("Could not migrate commit with unknown schema id"))?;

publish(
store,
&schema,
&commit.entry,
&plain_operation,
&commit.operation,
)
.await
.context("Internal database error occurred while publishing migration commit")?;

committed_operations.push(commit.entry_hash.into());
}
};
}

Ok(committed_operations)
}
10 changes: 10 additions & 0 deletions aquadoggo/src/api/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

#[allow(clippy::module_inception)]
mod api;
mod lock_file;
mod migration;

pub use api::NodeInterface;
pub use lock_file::LockFile;
pub use migration::migrate;
2 changes: 2 additions & 0 deletions aquadoggo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
unused_qualifications
)]
#![allow(clippy::uninlined_format_args)]
mod api;
mod bus;
mod config;
mod context;
Expand All @@ -31,6 +32,7 @@ mod test_utils;
#[cfg(test)]
mod tests;

pub use crate::api::LockFile;
pub use crate::config::{AllowList, Configuration};
pub use crate::network::NetworkConfiguration;
pub use node::Node;
Expand Down
7 changes: 7 additions & 0 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ where
rx_ready
}

/// Returns a sender for allowing communication on the service bus.
///
/// Can also be used to subscribe to the bus.
pub fn get_sender(&self) -> Sender<M> {
self.tx.clone()
}

/// Future which resolves as soon as a service returned an error, panicked or stopped.
pub async fn on_exit(&self) {
self.exit_handle.clone().await;
Expand Down
21 changes: 19 additions & 2 deletions aquadoggo/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use anyhow::Result;
use p2panda_rs::identity::KeyPair;

use crate::api::NodeInterface;
use crate::bus::ServiceMessage;
use crate::config::Configuration;
use crate::context::Context;
Expand All @@ -14,6 +15,7 @@ use crate::materializer::materializer_service;
use crate::network::network_service;
use crate::replication::replication_service;
use crate::schema::SchemaProvider;
use crate::LockFile;

/// Capacity of the internal broadcast channel used to communicate between services.
const SERVICE_BUS_CAPACITY: usize = 512_000;
Expand All @@ -40,6 +42,7 @@ async fn initialize_db(config: &Configuration) -> Result<Pool> {
pub struct Node {
pool: Pool,
manager: ServiceManager<Context, ServiceMessage>,
api: NodeInterface,
}

impl Node {
Expand All @@ -65,7 +68,7 @@ impl Node {
// Create service manager with shared data between services
let context = Context::new(store, key_pair, config, schema_provider);
let mut manager =
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context);
ServiceManager::<Context, ServiceMessage>::new(SERVICE_BUS_CAPACITY, context.clone());

// Start materializer service
if manager
Expand Down Expand Up @@ -95,7 +98,11 @@ impl Node {
panic!("Failed starting replication service");
}

Self { pool, manager }
// Create a low-level interface which can be exposed so developers can interact with the
// internal store and service bus
let api = NodeInterface::new(context, manager.get_sender());

Self { pool, manager, api }
}

/// This future resolves when at least one system service stopped.
Expand All @@ -114,4 +121,14 @@ impl Node {
// Close connection pool
self.pool.close().await;
}

/// Utility method to publish multiple operations and entries in the node database.
///
/// This method is especially useful for migrating schemas or seeding initial data. Lock files
/// with schema data can be created with the p2panda command line tool `fishy`.
///
/// Returns `true` if migration took place or `false` if no migration was required.
pub async fn migrate(&self, lock_file: LockFile) -> Result<bool> {
self.api.migrate(lock_file).await
}
}