diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 00000000..3a26366d --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +edition = "2021" diff --git a/tap_core/src/adapters/rav_storage_adapter.rs b/tap_core/src/adapters/rav_storage_adapter.rs index 9f917fcb..0188a647 100644 --- a/tap_core/src/adapters/rav_storage_adapter.rs +++ b/tap_core/src/adapters/rav_storage_adapter.rs @@ -5,10 +5,10 @@ use async_trait::async_trait; use crate::tap_manager::SignedRAV; -/// `RAVStorageAdapter` defines a trait for storage adapters to handle `SignedRAV` data. +/// `RAVStore` defines a trait for write storage adapters to handle `SignedRAV` data. /// /// This trait is designed to be implemented by users of this library who want to -/// customize the storage behavior of `SignedRAV` data. The error handling is also +/// customize the write storage behavior of `SignedRAV` data. The error handling is also /// customizable by defining an `AdapterError` type, which must implement both `Error` /// and `Debug` from the standard library. /// @@ -18,10 +18,6 @@ use crate::tap_manager::SignedRAV; /// in the storage managed by the adapter. Errors during this operation should be /// captured and returned in the `AdapterError` format. /// -/// The `last_rav` method is designed to fetch the latest `SignedRAV` from the storage. -/// If there is no `SignedRAV` available, it should return `None`. Any errors during -/// this operation should be captured and returned as an `AdapterError`. -/// /// This trait is utilized by [crate::tap_manager], which relies on these /// operations for working with `SignedRAV` data. /// @@ -30,7 +26,7 @@ use crate::tap_manager::SignedRAV; /// For example code see [crate::adapters::rav_storage_adapter_mock] #[async_trait] -pub trait RAVStorageAdapter { +pub trait RAVStore { /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from the standard library. @@ -42,6 +38,36 @@ pub trait RAVStorageAdapter { /// This method should be implemented to store the most recent validated `SignedRAV` into your chosen storage system. /// Any errors that occur during this process should be captured and returned as an `AdapterError`. async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError>; +} + + +/// `RAVRead` defines a trait for read storage adapters to handle `SignedRAV` data. +/// +/// This trait is designed to be implemented by users of this library who want to +/// customize the read storage behavior of `SignedRAV` data. The error handling is also +/// customizable by defining an `AdapterError` type, which must implement both `Error` +/// and `Debug` from the standard library. +/// +/// # Usage +/// +/// The `last_rav` method is designed to fetch the latest `SignedRAV` from the storage. +/// If there is no `SignedRAV` available, it should return `None`. Any errors during +/// this operation should be captured and returned as an `AdapterError`. +/// +/// This trait is utilized by [crate::tap_manager], which relies on these +/// operations for working with `SignedRAV` data. +/// +/// # Example +/// +/// For example code see [crate::adapters::rav_storage_adapter_mock] + +#[async_trait] +pub trait RAVRead { + /// Defines the user-specified error type. + /// + /// This error type should implement the `Error` and `Debug` traits from the standard library. + /// Errors of this type are returned to the user when an operation fails. + type AdapterError: std::error::Error + std::fmt::Debug + Send + Sync + 'static; /// Retrieves the latest `SignedRAV` from the storage. /// diff --git a/tap_core/src/adapters/receipt_storage_adapter.rs b/tap_core/src/adapters/receipt_storage_adapter.rs index a76ff9ce..962eb8f9 100644 --- a/tap_core/src/adapters/receipt_storage_adapter.rs +++ b/tap_core/src/adapters/receipt_storage_adapter.rs @@ -7,10 +7,10 @@ use async_trait::async_trait; use crate::tap_receipt::ReceivedReceipt; -/// `ReceiptStorageAdapter` defines a trait for storage adapters to manage `ReceivedReceipt` data. +/// `ReceiptStore` defines a trait for write storage adapters to manage `ReceivedReceipt` data. /// /// This trait is designed to be implemented by users of this library who want to -/// customize the storage behavior of `ReceivedReceipt` data. The error handling is also +/// customize the write storage behavior of `ReceivedReceipt` data. The error handling is also /// customizable by defining an `AdapterError` type, which must implement both `Error` /// and `Debug` from the standard library. /// @@ -20,10 +20,6 @@ use crate::tap_receipt::ReceivedReceipt; /// managed by the adapter. It returns a unique receipt_id associated with the stored receipt. /// Any errors during this operation should be captured and returned in the `AdapterError` format. /// -/// The `retrieve_receipts_in_timestamp_range` method should be implemented to fetch all `ReceivedReceipts` -/// within a specific timestamp range from the storage. The returned receipts should be in the form of a vector -/// of tuples where each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`. -/// /// The `update_receipt_by_id` method is designed to update a specific `ReceivedReceipt` identified by a unique /// receipt_id. Any errors during this operation should be captured and returned as an `AdapterError`. /// @@ -38,7 +34,7 @@ use crate::tap_receipt::ReceivedReceipt; /// For example code see [crate::adapters::receipt_storage_adapter_mock] #[async_trait] -pub trait ReceiptStorageAdapter { +pub trait ReceiptStore { /// Defines the user-specified error type. /// /// This error type should implement the `Error` and `Debug` traits from the standard library. @@ -52,6 +48,50 @@ pub trait ReceiptStorageAdapter { /// this process should be captured and returned as an `AdapterError`. async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result; + /// Updates a specific `ReceivedReceipt` identified by a unique receipt_id. + /// + /// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique + /// receipt_id in your storage system. Any errors that occur during this process should be captured + /// and returned as an `AdapterError`. + async fn update_receipt_by_id( + &self, + receipt_id: u64, + receipt: ReceivedReceipt, + ) -> Result<(), Self::AdapterError>; + + /// Removes all `ReceivedReceipts` within a specific timestamp range from the storage. + /// + /// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp + /// range from your storage system. Any errors that occur during this process should be captured and + /// returned as an `AdapterError`. + async fn remove_receipts_in_timestamp_range + std::marker::Send>( + &self, + timestamp_ns: R, + ) -> Result<(), Self::AdapterError>; +} + + + +/// `ReceiptRead` defines a trait for read storage adapters to manage `ReceivedReceipt` data. +/// +/// This trait is designed to be implemented by users of this library who want to +/// customize the read storage behavior of `ReceivedReceipt` data. The error handling is also +/// customizable by defining an `AdapterError` type, which must implement both `Error` +/// and `Debug` from the standard library. +/// +/// # Usage +/// +/// The `retrieve_receipts_in_timestamp_range` method should be implemented to fetch all `ReceivedReceipts` +/// within a specific timestamp range from the storage. The returned receipts should be in the form of a vector +/// of tuples where each tuple contains the unique receipt_id and the corresponding `ReceivedReceipt`. +#[async_trait] +pub trait ReceiptRead { + /// Defines the user-specified error type. + /// + /// This error type should implement the `Error` and `Debug` traits from the standard library. + /// Errors of this type are returned to the user when an operation fails. + type AdapterError: std::error::Error + std::fmt::Debug + Send + Sync + 'static; + /// Retrieves all `ReceivedReceipts` within a specific timestamp range. /// /// This method should be implemented to fetch all `ReceivedReceipts` within a specific timestamp range @@ -79,28 +119,11 @@ pub trait ReceiptStorageAdapter { limit: Option, ) -> Result, Self::AdapterError>; - /// Updates a specific `ReceivedReceipt` identified by a unique receipt_id. - /// - /// This method should be implemented to update a specific `ReceivedReceipt` identified by a unique - /// receipt_id in your storage system. Any errors that occur during this process should be captured - /// and returned as an `AdapterError`. - async fn update_receipt_by_id( - &self, - receipt_id: u64, - receipt: ReceivedReceipt, - ) -> Result<(), Self::AdapterError>; - - /// Removes all `ReceivedReceipts` within a specific timestamp range from the storage. - /// - /// This method should be implemented to remove all `ReceivedReceipts` within a specific timestamp - /// range from your storage system. Any errors that occur during this process should be captured and - /// returned as an `AdapterError`. - async fn remove_receipts_in_timestamp_range + std::marker::Send>( - &self, - timestamp_ns: R, - ) -> Result<(), Self::AdapterError>; } + + + /// See [`ReceiptStorageAdapter::retrieve_receipts_in_timestamp_range()`] for details. /// /// WARNING: Will sort the receipts by timestamp using diff --git a/tap_core/src/adapters/test/rav_storage_adapter_mock.rs b/tap_core/src/adapters/test/rav_storage_adapter_mock.rs index 5c471b1d..603af06d 100644 --- a/tap_core/src/adapters/test/rav_storage_adapter_mock.rs +++ b/tap_core/src/adapters/test/rav_storage_adapter_mock.rs @@ -7,7 +7,10 @@ use async_trait::async_trait; use thiserror::Error; use tokio::sync::RwLock; -use crate::{adapters::rav_storage_adapter::RAVStorageAdapter, tap_manager::SignedRAV}; +use crate::{ + adapters::rav_storage_adapter::{RAVRead, RAVStore}, + tap_manager::SignedRAV, +}; /// `RAVStorageAdapterMock` is a mock implementation of the `RAVStorageAdapter` trait. /// @@ -55,7 +58,7 @@ pub enum AdpaterErrorMock { } #[async_trait] -impl RAVStorageAdapter for RAVStorageAdapterMock { +impl RAVStore for RAVStorageAdapterMock { type AdapterError = AdpaterErrorMock; async fn update_last_rav(&self, rav: SignedRAV) -> Result<(), Self::AdapterError> { @@ -63,6 +66,12 @@ impl RAVStorageAdapter for RAVStorageAdapterMock { *rav_storage = Some(rav); Ok(()) } +} + +#[async_trait] +impl RAVRead for RAVStorageAdapterMock { + type AdapterError = AdpaterErrorMock; + async fn last_rav(&self) -> Result, Self::AdapterError> { Ok(self.rav_storage.read().await.clone()) } diff --git a/tap_core/src/adapters/test/rav_storage_adapter_test.rs b/tap_core/src/adapters/test/rav_storage_adapter_test.rs index 1e5b141f..a7234119 100644 --- a/tap_core/src/adapters/test/rav_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/rav_storage_adapter_test.rs @@ -12,8 +12,9 @@ mod rav_storage_adapter_unit_test { use rstest::*; use tokio::sync::RwLock; + use crate::adapters::rav_storage_adapter::RAVRead; use crate::adapters::{ - rav_storage_adapter::RAVStorageAdapter, rav_storage_adapter_mock::RAVStorageAdapterMock, + rav_storage_adapter::RAVStore, rav_storage_adapter_mock::RAVStorageAdapterMock, }; use crate::{ eip_712_signed_message::EIP712SignedMessage, diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs index d48943f1..d4bbe6fe 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_mock.rs @@ -7,7 +7,7 @@ use async_trait::async_trait; use tokio::sync::RwLock; use crate::{ - adapters::receipt_storage_adapter::{safe_truncate_receipts, ReceiptStorageAdapter}, + adapters::receipt_storage_adapter::{safe_truncate_receipts, ReceiptRead, ReceiptStore}, tap_receipt::ReceivedReceipt, }; @@ -84,7 +84,7 @@ pub enum AdapterErrorMock { } #[async_trait] -impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { +impl ReceiptStore for ReceiptStorageAdapterMock { type AdapterError = AdapterErrorMock; async fn store_receipt(&self, receipt: ReceivedReceipt) -> Result { let mut id_pointer = self.unique_id.write().await; @@ -94,28 +94,6 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { *id_pointer += 1; Ok(id_previous) } - async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( - &self, - timestamp_range_ns: R, - limit: Option, - ) -> Result, Self::AdapterError> { - let receipt_storage = self.receipt_storage.read().await; - let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage - .iter() - .filter(|(_, rx_receipt)| { - timestamp_range_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns) - }) - .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) - .collect(); - - if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) { - safe_truncate_receipts(&mut receipts_in_range, limit.unwrap()); - - Ok(receipts_in_range) - } else { - Ok(receipts_in_range) - } - } async fn update_receipt_by_id( &self, receipt_id: u64, @@ -144,3 +122,30 @@ impl ReceiptStorageAdapter for ReceiptStorageAdapterMock { Ok(()) } } + +#[async_trait] +impl ReceiptRead for ReceiptStorageAdapterMock { + type AdapterError = AdapterErrorMock; + async fn retrieve_receipts_in_timestamp_range + std::marker::Send>( + &self, + timestamp_range_ns: R, + limit: Option, + ) -> Result, Self::AdapterError> { + let receipt_storage = self.receipt_storage.read().await; + let mut receipts_in_range: Vec<(u64, ReceivedReceipt)> = receipt_storage + .iter() + .filter(|(_, rx_receipt)| { + timestamp_range_ns.contains(&rx_receipt.signed_receipt.message.timestamp_ns) + }) + .map(|(&id, rx_receipt)| (id, rx_receipt.clone())) + .collect(); + + if limit.is_some_and(|limit| receipts_in_range.len() > limit as usize) { + safe_truncate_receipts(&mut receipts_in_range, limit.unwrap()); + + Ok(receipts_in_range) + } else { + Ok(receipts_in_range) + } + } +} diff --git a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs index 0eb36355..42468d49 100644 --- a/tap_core/src/adapters/test/receipt_storage_adapter_test.rs +++ b/tap_core/src/adapters/test/receipt_storage_adapter_test.rs @@ -17,7 +17,7 @@ mod receipt_storage_adapter_unit_test { use tokio::sync::RwLock; use crate::adapters::{ - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::ReceiptStore, receipt_storage_adapter_mock::ReceiptStorageAdapterMock, }; use crate::{ diff --git a/tap_core/src/tap_manager/manager.rs b/tap_core/src/tap_manager/manager.rs index 3cb89d15..f29eb629 100644 --- a/tap_core/src/tap_manager/manager.rs +++ b/tap_core/src/tap_manager/manager.rs @@ -6,21 +6,17 @@ use alloy_sol_types::Eip712Domain; use super::{RAVRequest, SignedRAV, SignedReceipt}; use crate::{ adapters::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + escrow_adapter::EscrowAdapter, + rav_storage_adapter::{RAVRead, RAVStore}, receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::{ReceiptRead, ReceiptStore}, }, receipt_aggregate_voucher::ReceiptAggregateVoucher, tap_receipt::{ReceiptAuditor, ReceiptCheck, ReceivedReceipt}, Error, }; -pub struct Manager< - CA: EscrowAdapter, - RCA: ReceiptChecksAdapter, - RSA: ReceiptStorageAdapter, - RAVSA: RAVStorageAdapter, -> { +pub struct Manager { /// Adapter for RAV CRUD rav_storage_adapter: RAVSA, /// Adapter for receipt CRUD @@ -32,13 +28,7 @@ pub struct Manager< receipt_auditor: ReceiptAuditor, } -impl< - EA: EscrowAdapter, - RCA: ReceiptChecksAdapter, - RSA: ReceiptStorageAdapter, - RAVSA: RAVStorageAdapter, - > Manager -{ +impl Manager { /// Creates new manager with provided `adapters`, any receipts received by this manager /// will complete all `required_checks` before being accepted or declined from RAV. /// `starting_min_timestamp` will be used as min timestamp until the first RAV request is created. @@ -65,51 +55,13 @@ impl< receipt_auditor, } } +} - /// Runs `initial_checks` on `signed_receipt` for initial verification, then stores received receipt. - /// The provided `query_id` will be used as a key when chaecking query appraisal. - /// - /// # Errors - /// - /// Returns [`Error::AdapterError`] if there are any errors while storing receipts - /// - /// Returns [`Error::InvalidStateForRequestedAction`] if the checks requested in `initial_checks` cannot be comleted due to: All other checks must be complete before `CheckAndReserveEscrow` - /// - /// Returns [`Error::InvalidCheckError`] if check in `initial_checks` is not in `required_checks` provided when manager was created - /// - pub async fn verify_and_store_receipt( - &self, - signed_receipt: SignedReceipt, - query_id: u64, - initial_checks: &[ReceiptCheck], - ) -> std::result::Result<(), Error> { - let mut received_receipt = - ReceivedReceipt::new(signed_receipt, query_id, &self.required_checks); - // The receipt id is needed before `perform_checks` can be called on received receipt - // since it is needed for uniqueness check. Since the receipt_id is defined when it is stored - // This function first stores it, then checks it, then updates what was stored. - - let receipt_id = self - .receipt_storage_adapter - .store_receipt(received_receipt.clone()) - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })?; - - received_receipt - .perform_checks(initial_checks, receipt_id, &self.receipt_auditor) - .await?; - - self.receipt_storage_adapter - .update_receipt_by_id(receipt_id, received_receipt) - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })?; - Ok(()) - } - +impl Manager +where + RCA: ReceiptChecksAdapter, + RAVSA: RAVStore, +{ /// Verify `signed_rav` matches all values on `expected_rav`, and that `signed_rav` has a valid signer. /// /// # Errors @@ -141,70 +93,12 @@ impl< Ok(()) } +} - /// Removes obsolete receipts from storage. Obsolete receipts are receipts that are older than the last RAV, and - /// therefore already aggregated into the RAV. - /// This function should be called after a new RAV is received to limit the number of receipts stored. - /// No-op if there is no last RAV. - /// - /// # Errors - /// - /// Returns [`Error::AdapterError`] if there are any errors while retrieving last RAV or removing receipts - /// - pub async fn remove_obsolete_receipts(&self) -> Result<(), Error> { - match self.get_previous_rav().await? { - Some(last_rav) => { - self.receipt_storage_adapter - .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns) - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })?; - Ok(()) - } - None => Ok(()), - } - } - - /// Completes remaining checks on all receipts up to (current time - `timestamp_buffer_ns`). Returns them in - /// two lists (valid receipts and invalid receipts) along with the expected RAV that should be received - /// for aggregating list of valid receipts. - /// - /// Returns [`Error::AggregateOverflow`] if any receipt value causes aggregate value to overflow while generating expected RAV - /// - /// Returns [`Error::AdapterError`] if unable to fetch previous RAV or if unable to fetch previous receipts - /// - /// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon. - /// - pub async fn create_rav_request( - &self, - timestamp_buffer_ns: u64, - receipts_limit: Option, - ) -> Result { - let previous_rav = self.get_previous_rav().await?; - let min_timestamp_ns = previous_rav - .as_ref() - .map(|rav| rav.message.timestamp_ns + 1) - .unwrap_or(0); - - let (valid_receipts, invalid_receipts) = self - .collect_receipts(timestamp_buffer_ns, min_timestamp_ns, receipts_limit) - .await?; - - let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?; - - self.receipt_auditor - .update_min_timestamp_ns(expected_rav.timestamp_ns) - .await; - - Ok(RAVRequest { - valid_receipts, - previous_rav, - invalid_receipts, - expected_rav, - }) - } - +impl Manager +where + RAVSA: RAVRead, +{ async fn get_previous_rav(&self) -> Result, Error> { let previous_rav = self.rav_storage_adapter @@ -215,7 +109,15 @@ impl< })?; Ok(previous_rav) } +} +impl Manager +where + EA: EscrowAdapter, + RCA: ReceiptChecksAdapter, + RSA: ReceiptRead, + // RAVSA: RAVRead, +{ async fn collect_receipts( &self, timestamp_buffer_ns: u64, @@ -263,6 +165,53 @@ impl< Ok((accepted_signed_receipts, failed_signed_receipts)) } +} + +impl Manager +where + EA: EscrowAdapter, + RCA: ReceiptChecksAdapter, + RSA: ReceiptRead, + RAVSA: RAVRead, +{ + /// Completes remaining checks on all receipts up to (current time - `timestamp_buffer_ns`). Returns them in + /// two lists (valid receipts and invalid receipts) along with the expected RAV that should be received + /// for aggregating list of valid receipts. + /// + /// Returns [`Error::AggregateOverflow`] if any receipt value causes aggregate value to overflow while generating expected RAV + /// + /// Returns [`Error::AdapterError`] if unable to fetch previous RAV or if unable to fetch previous receipts + /// + /// Returns [`Error::TimestampRangeError`] if the max timestamp of the previous RAV is greater than the min timestamp. Caused by timestamp buffer being too large, or requests coming too soon. + /// + pub async fn create_rav_request( + &self, + timestamp_buffer_ns: u64, + receipts_limit: Option, + ) -> Result { + let previous_rav = self.get_previous_rav().await?; + let min_timestamp_ns = previous_rav + .as_ref() + .map(|rav| rav.message.timestamp_ns + 1) + .unwrap_or(0); + + let (valid_receipts, invalid_receipts) = self + .collect_receipts(timestamp_buffer_ns, min_timestamp_ns, receipts_limit) + .await?; + + let expected_rav = Self::generate_expected_rav(&valid_receipts, previous_rav.clone())?; + + self.receipt_auditor + .update_min_timestamp_ns(expected_rav.timestamp_ns) + .await; + + Ok(RAVRequest { + valid_receipts, + previous_rav, + invalid_receipts, + expected_rav, + }) + } fn generate_expected_rav( receipts: &[SignedReceipt], @@ -276,6 +225,87 @@ impl< } } +impl Manager +where + RSA: ReceiptStore, + RAVSA: RAVRead, +{ + /// Removes obsolete receipts from storage. Obsolete receipts are receipts that are older than the last RAV, and + /// therefore already aggregated into the RAV. + /// This function should be called after a new RAV is received to limit the number of receipts stored. + /// No-op if there is no last RAV. + /// + /// # Errors + /// + /// Returns [`Error::AdapterError`] if there are any errors while retrieving last RAV or removing receipts + /// + pub async fn remove_obsolete_receipts(&self) -> Result<(), Error> { + match self.get_previous_rav().await? { + Some(last_rav) => { + self.receipt_storage_adapter + .remove_receipts_in_timestamp_range(..=last_rav.message.timestamp_ns) + .await + .map_err(|err| Error::AdapterError { + source_error: anyhow::Error::new(err), + })?; + Ok(()) + } + None => Ok(()), + } + } +} + +impl Manager +where + EA: EscrowAdapter, + RCA: ReceiptChecksAdapter, + RSA: ReceiptStore, +{ + /// Runs `initial_checks` on `signed_receipt` for initial verification, then stores received receipt. + /// The provided `query_id` will be used as a key when chaecking query appraisal. + /// + /// # Errors + /// + /// Returns [`Error::AdapterError`] if there are any errors while storing receipts + /// + /// Returns [`Error::InvalidStateForRequestedAction`] if the checks requested in `initial_checks` cannot be comleted due to: All other checks must be complete before `CheckAndReserveEscrow` + /// + /// Returns [`Error::InvalidCheckError`] if check in `initial_checks` is not in `required_checks` provided when manager was created + /// + pub async fn verify_and_store_receipt( + &self, + signed_receipt: SignedReceipt, + query_id: u64, + initial_checks: &[ReceiptCheck], + ) -> std::result::Result<(), Error> { + let mut received_receipt = + ReceivedReceipt::new(signed_receipt, query_id, &self.required_checks); + // The receipt id is needed before `perform_checks` can be called on received receipt + // since it is needed for uniqueness check. Since the receipt_id is defined when it is stored + // This function first stores it, then checks it, then updates what was stored. + + let receipt_id = self + .receipt_storage_adapter + .store_receipt(received_receipt.clone()) + .await + .map_err(|err| Error::AdapterError { + source_error: anyhow::Error::new(err), + })?; + + received_receipt + .perform_checks(initial_checks, receipt_id, &self.receipt_auditor) + .await?; + + self.receipt_storage_adapter + .update_receipt_by_id(receipt_id, received_receipt) + .await + .map_err(|err| Error::AdapterError { + source_error: anyhow::Error::new(err), + })?; + Ok(()) + } +} + #[cfg(test)] #[path = "test/manager_test.rs"] mod manager_test; diff --git a/tap_core/src/tap_manager/test/manager_test.rs b/tap_core/src/tap_manager/test/manager_test.rs index da0043bd..741ea19f 100644 --- a/tap_core/src/tap_manager/test/manager_test.rs +++ b/tap_core/src/tap_manager/test/manager_test.rs @@ -21,7 +21,7 @@ mod manager_unit_test { escrow_adapter_mock::EscrowAdapterMock, rav_storage_adapter_mock::RAVStorageAdapterMock, receipt_checks_adapter_mock::ReceiptChecksAdapterMock, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::ReceiptRead, receipt_storage_adapter_mock::ReceiptStorageAdapterMock, }, eip_712_signed_message::EIP712SignedMessage, diff --git a/tap_core/src/tap_receipt/receipt_auditor.rs b/tap_core/src/tap_receipt/receipt_auditor.rs index ea1ffd18..1f910980 100644 --- a/tap_core/src/tap_receipt/receipt_auditor.rs +++ b/tap_core/src/tap_receipt/receipt_auditor.rs @@ -17,14 +17,14 @@ use crate::{ use super::ReceivedReceipt; -pub struct ReceiptAuditor { +pub struct ReceiptAuditor { domain_separator: Eip712Domain, escrow_adapter: EA, receipt_checks_adapter: RCA, min_timestamp_ns: RwLock, } -impl ReceiptAuditor { +impl ReceiptAuditor { pub fn new( domain_separator: Eip712Domain, escrow_adapter: EA, @@ -44,6 +44,72 @@ impl ReceiptAuditor { *self.min_timestamp_ns.write().await = min_timestamp_ns; } + async fn check_timestamp( + &self, + signed_receipt: &EIP712SignedMessage, + ) -> ReceiptResult<()> { + let min_timestamp_ns = *self.min_timestamp_ns.read().await; + if signed_receipt.message.timestamp_ns <= min_timestamp_ns { + return Err(ReceiptError::InvalidTimestamp { + received_timestamp: signed_receipt.message.timestamp_ns, + timestamp_min: min_timestamp_ns, + }); + } + Ok(()) + } + + async fn check_timestamp_batch( + &self, + received_receipts: &mut [ReceivedReceipt], + ) -> Vec> { + let mut results = Vec::new(); + + for received_receipt in received_receipts + .iter_mut() + .filter(|r| r.checks.contains_key(&ReceiptCheck::CheckTimestamp)) + { + if received_receipt.checks[&ReceiptCheck::CheckTimestamp].is_none() { + let signed_receipt = &received_receipt.signed_receipt; + results.push(self.check_timestamp(signed_receipt).await); + } + } + + results + } + + async fn check_uniqueness_batch( + &self, + received_receipts: &mut [ReceivedReceipt], + ) -> Vec> { + let mut results = Vec::new(); + + // If at least one of the receipts in the batch hasn't been checked for uniqueness yet, check the whole batch. + if received_receipts + .iter() + .filter(|r| r.checks.contains_key(&ReceiptCheck::CheckUnique)) + .any(|r| r.checks[&ReceiptCheck::CheckUnique].is_none()) + { + let mut signatures: HashSet = HashSet::new(); + + for received_receipt in received_receipts { + let signature = received_receipt.signed_receipt.signature; + if signatures.insert(signature) { + results.push(Ok(())); + } else { + results.push(Err(ReceiptError::NonUniqueReceipt)); + } + } + } + + results + } +} + +impl ReceiptAuditor +where + EA: EscrowAdapter, + RCA: ReceiptChecksAdapter, +{ pub async fn check( &self, receipt_check: &ReceiptCheck, @@ -81,7 +147,12 @@ impl ReceiptAuditor { } } } +} +impl ReceiptAuditor +where + RCA: ReceiptChecksAdapter, +{ async fn check_uniqueness( &self, signed_receipt: &EIP712SignedMessage, @@ -100,33 +171,6 @@ impl ReceiptAuditor { Ok(()) } - async fn check_uniqueness_batch( - &self, - received_receipts: &mut [ReceivedReceipt], - ) -> Vec> { - let mut results = Vec::new(); - - // If at least one of the receipts in the batch hasn't been checked for uniqueness yet, check the whole batch. - if received_receipts - .iter() - .filter(|r| r.checks.contains_key(&ReceiptCheck::CheckUnique)) - .any(|r| r.checks[&ReceiptCheck::CheckUnique].is_none()) - { - let mut signatures: HashSet = HashSet::new(); - - for received_receipt in received_receipts { - let signature = received_receipt.signed_receipt.signature; - if signatures.insert(signature) { - results.push(Ok(())); - } else { - results.push(Err(ReceiptError::NonUniqueReceipt)); - } - } - } - - results - } - async fn check_allocation_id( &self, signed_receipt: &EIP712SignedMessage, @@ -165,39 +209,6 @@ impl ReceiptAuditor { results } - async fn check_timestamp( - &self, - signed_receipt: &EIP712SignedMessage, - ) -> ReceiptResult<()> { - let min_timestamp_ns = *self.min_timestamp_ns.read().await; - if signed_receipt.message.timestamp_ns <= min_timestamp_ns { - return Err(ReceiptError::InvalidTimestamp { - received_timestamp: signed_receipt.message.timestamp_ns, - timestamp_min: min_timestamp_ns, - }); - } - Ok(()) - } - - async fn check_timestamp_batch( - &self, - received_receipts: &mut [ReceivedReceipt], - ) -> Vec> { - let mut results = Vec::new(); - - for received_receipt in received_receipts - .iter_mut() - .filter(|r| r.checks.contains_key(&ReceiptCheck::CheckTimestamp)) - { - if received_receipt.checks[&ReceiptCheck::CheckTimestamp].is_none() { - let signed_receipt = &received_receipt.signed_receipt; - results.push(self.check_timestamp(signed_receipt).await); - } - } - - results - } - async fn check_value( &self, signed_receipt: &EIP712SignedMessage, @@ -286,6 +297,31 @@ impl ReceiptAuditor { results } + pub async fn check_rav_signature( + &self, + signed_rav: &EIP712SignedMessage, + ) -> Result<()> { + let rav_signer_address = signed_rav.recover_signer(&self.domain_separator)?; + if !self + .receipt_checks_adapter + .is_valid_sender_id(rav_signer_address) + .await + .map_err(|err| Error::AdapterError { + source_error: anyhow::Error::new(err), + })? + { + return Err(Error::InvalidRecoveredSigner { + address: rav_signer_address, + }); + } + Ok(()) + } +} + +impl ReceiptAuditor +where + EA: EscrowAdapter, +{ async fn check_and_reserve_escrow( &self, signed_receipt: &EIP712SignedMessage, @@ -322,24 +358,4 @@ impl ReceiptAuditor { results } - - pub async fn check_rav_signature( - &self, - signed_rav: &EIP712SignedMessage, - ) -> Result<()> { - let rav_signer_address = signed_rav.recover_signer(&self.domain_separator)?; - if !self - .receipt_checks_adapter - .is_valid_sender_id(rav_signer_address) - .await - .map_err(|err| Error::AdapterError { - source_error: anyhow::Error::new(err), - })? - { - return Err(Error::InvalidRecoveredSigner { - address: rav_signer_address, - }); - } - Ok(()) - } } diff --git a/tap_integration_tests/tests/indexer_mock/mod.rs b/tap_integration_tests/tests/indexer_mock/mod.rs index dbf88ccc..94a9e186 100644 --- a/tap_integration_tests/tests/indexer_mock/mod.rs +++ b/tap_integration_tests/tests/indexer_mock/mod.rs @@ -21,9 +21,10 @@ use jsonrpsee::{ use tap_aggregator::jsonrpsee_helpers; use tap_core::{ adapters::{ - escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, + escrow_adapter::EscrowAdapter, + rav_storage_adapter::{RAVRead, RAVStore}, receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::{ReceiptRead, ReceiptStore}, }, tap_manager::{Manager, SignedRAV, SignedReceipt}, tap_receipt::ReceiptCheck, @@ -52,8 +53,8 @@ pub trait Rpc { pub struct RpcManager< EA: EscrowAdapter + Send + Sync + 'static, // An instance of EscrowAdapter, marked as thread-safe with Send and given 'static lifetime RCA: ReceiptChecksAdapter + Send + Sync + 'static, // An instance of ReceiptChecksAdapter - RSA: ReceiptStorageAdapter + Send + Sync + 'static, // An instance of ReceiptStorageAdapter - RAVSA: RAVStorageAdapter + Send + Sync + 'static, // An instance of RAVStorageAdapter + RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, // An instance of ReceiptStorageAdapter + RAVSA: RAVStore + RAVRead + Send + Sync + 'static, > { manager: Arc>, // Manager object reference counted with an Arc initial_checks: Vec, // Vector of initial checks to be performed on each request @@ -68,8 +69,8 @@ pub struct RpcManager< impl< EA: EscrowAdapter + Send + Sync + 'static, RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStorageAdapter + Send + Sync + 'static, - RAVSA: RAVStorageAdapter + Send + Sync + 'static, + RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, + RAVSA: RAVStore + RAVRead + Send + Sync + 'static, > RpcManager { pub fn new( @@ -109,8 +110,8 @@ impl< impl< CA: EscrowAdapter + Send + Sync + 'static, RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStorageAdapter + Send + Sync + 'static, - RAVSA: RAVStorageAdapter + Send + Sync + 'static, + RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, + RAVSA: RAVStore + RAVRead + Send + Sync + 'static, > RpcServer for RpcManager { async fn request( @@ -165,8 +166,8 @@ impl< pub async fn run_server< CA: EscrowAdapter + Send + Sync + 'static, RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStorageAdapter + Send + Sync + 'static, - RAVSA: RAVStorageAdapter + Send + Sync + 'static, + RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, + RAVSA: RAVStore + RAVRead + Send + Sync + 'static, >( port: u16, // Port on which the server will listen domain_separator: Eip712Domain, // EIP712 domain separator @@ -209,8 +210,8 @@ pub async fn run_server< async fn request_rav< CA: EscrowAdapter + Send + Sync + 'static, RCA: ReceiptChecksAdapter + Send + Sync + 'static, - RSA: ReceiptStorageAdapter + Send + Sync + 'static, - RAVSA: RAVStorageAdapter + Send + Sync + 'static, + RSA: ReceiptStore + ReceiptRead + Send + Sync + 'static, + RAVSA: RAVStore + RAVRead + Send + Sync + 'static, >( manager: &Arc>, time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details