diff --git a/.env.example b/.env.example index 565de54..4fda0d9 100644 --- a/.env.example +++ b/.env.example @@ -3,6 +3,4 @@ PRIVATE_KEY= GRAPH_NODE_STATUS_ENDPOINT= REGISTRY_SUBGRAPH= NETWORK_SUBGRAPH= -WAKU_HOST= -WAKU_PORT= -BOOT_NODE_ADDRESSES +GRAPHCAST_NETWORK= diff --git a/src/lib.rs b/src/lib.rs index 9afd362..cc44fad 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,4 @@ use anyhow::anyhow; -use colored::*; use ethers_contract::EthAbiType; use ethers_core::types::transaction::eip712::Eip712; use ethers_derive_eip712::*; @@ -10,10 +9,11 @@ use prost::Message; use serde::{Deserialize, Serialize}; use std::{ collections::HashMap, + fmt::Display, sync::{Arc, Mutex as SyncMutex}, }; use tokio::sync::Mutex as AsyncMutex; -use tracing::{debug, error, info}; +use tracing::{debug, error, info, warn}; use graphcast_sdk::{ graphcast_agent::{ @@ -102,15 +102,14 @@ pub async fn active_allocation_hashes( indexer_address: Option, ) -> Vec { if let Some(addr) = indexer_address { - let allocs = query_network_subgraph(network_subgraph.to_string(), addr) + query_network_subgraph(network_subgraph.to_string(), addr) .await .map_err(|e| -> Vec { error!("Topic generation error: {}", e); [].to_vec() }) .unwrap() - .indexer_allocations(); - allocs + .indexer_allocations() } else { [].to_vec() } @@ -207,9 +206,7 @@ impl Attestation { if base.senders.contains(&address) { Err(anyhow!( "{}", - "There is already an attestation from this address. Skipping..." - .to_string() - .yellow() + "There is already an attestation from this address. Skipping...".to_string() )) } else { let senders = [base.senders.clone(), vec![address]].concat(); @@ -251,13 +248,28 @@ pub fn save_local_attestation( /// in the handler. pub fn attestation_handler( ) -> impl Fn(Result, WakuHandlingError>) { - |msg: Result, WakuHandlingError>| match msg { - Ok(msg) => { + |msg: Result, WakuHandlingError>| { + // TODO: Handle the error case by incrementing a Prometheus "error" counter + if let Ok(msg) = msg { debug!("Received message: {:?}", msg); MESSAGES.get().unwrap().lock().unwrap().push(msg); } - Err(err) => { - error!("{}", err); + } +} + +#[derive(Debug, PartialEq)] +pub enum ComparisonResult { + NotFound(String), + Divergent(String), + Match(String), +} + +impl Display for ComparisonResult { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ComparisonResult::NotFound(s) => write!(f, "NotFound: {}", s), + ComparisonResult::Divergent(s) => write!(f, "Divergent: {}", s), + ComparisonResult::Match(s) => write!(f, "Matched: {}", s), } } } @@ -271,71 +283,76 @@ pub async fn compare_attestations( attestation_block: u64, remote: RemoteAttestationsMap, local: Arc>, -) -> Result { +) -> Result { let local = local.lock().await; + let (ipfs_hash, blocks) = match local.iter().next() { + Some(pair) => pair, + None => { + return Ok(ComparisonResult::NotFound(String::from( + "No local attestation found", + ))) + } + }; + let local_attestation = match blocks.get(&attestation_block) { + Some(attestations) => attestations, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No local attestation found for block {}", + attestation_block + ))) + } + }; - // Iterate & compare - if let Some((ipfs_hash, blocks)) = local.iter().next() { - let attestations = blocks.get(&attestation_block); - match attestations { - Some(local_attestation) => { - let remote_blocks = remote.get(ipfs_hash); - match remote_blocks { - Some(remote_blocks) => { - let remote_attestations = remote_blocks.get(&attestation_block); - - match remote_attestations { - Some(remote_attestations) => { - let mut remote_attestations = remote_attestations.clone(); - - remote_attestations - .sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); - - info!("Number of nPOI submitted for block {attestation_block}: {:#?}", remote_attestations.len()); - if remote_attestations.len() > 1 { - debug!("Sorted attestations: {:#?}", remote_attestations); - } - - let most_attested_npoi = &remote_attestations.last().unwrap().npoi; - if most_attested_npoi == &local_attestation.npoi { - return Ok(format!( - "POIs match for subgraph {ipfs_hash} on block {attestation_block}!: {most_attested_npoi}" - )); - } else { - return Err(anyhow!(format!( - "POIs don't match for subgraph {ipfs_hash} on block {attestation_block}!" - ) - )); - } - }, - None => { - return Err(anyhow!(format!( - "No record for subgraph {ipfs_hash} on block {attestation_block} found in remote attestations" - ) - )); - } - } - } - None => { - return Err(anyhow!(format!("No attestations for subgraph {ipfs_hash} on block {attestation_block} found in remote attestations store. Continuing...", ))) - } - } - } - None => { - return Err(anyhow!(format!("No attestation for subgraph {ipfs_hash} on block {attestation_block} found in local attestations store. Continuing...", ))) - } + let remote_blocks = match remote.get(ipfs_hash) { + Some(blocks) => blocks, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No remote attestation found for subgraph {}", + ipfs_hash + ))) + } + }; + let remote_attestations = match remote_blocks.get(&attestation_block) { + Some(attestations) => attestations, + None => { + return Ok(ComparisonResult::NotFound(format!( + "No remote attestation found for subgraph {} on block {}", + ipfs_hash, attestation_block + ))) } + }; + + let mut remote_attestations = remote_attestations.clone(); + remote_attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap()); + + info!( + "Number of nPOI submitted for block {}: {:#?}", + attestation_block, + remote_attestations.len() + ); + if remote_attestations.len() > 1 { + warn!( + "More than 1 nPOI found for subgraph {} on block {}. Attestations (sorted): {:#?}", + ipfs_hash, attestation_block, remote_attestations + ); } - Err(anyhow!(format!( - "The comparison did not execute successfully for on block {attestation_block}. Continuing...", - ) - )) + let most_attested_npoi = &remote_attestations.last().unwrap().npoi; + if most_attested_npoi == &local_attestation.npoi { + Ok(ComparisonResult::Match(format!( + "POIs match for subgraph {} on block {}!: {}", + ipfs_hash, attestation_block, most_attested_npoi + ))) + } else { + Ok(ComparisonResult::Divergent(format!( + "POIs don't match for subgraph {} on block {}!", + ipfs_hash, attestation_block + ))) + } } #[cfg(test)] mod tests { - use dotenv::dotenv; use graphcast_sdk::NetworkName; use num_traits::One; @@ -402,70 +419,6 @@ mod tests { ); } - #[tokio::test] - async fn test_process_messages() { - dotenv().ok(); - - const REGISTRY_SUBGRAPH: &str = - "https://api.thegraph.com/subgraphs/name/hopeyen/graphcast-registry-goerli"; - const NETWORK_SUBGRAPH: &str = "https://gateway.testnet.thegraph.com/network"; - - let hash: String = "QmaCRFCJX3f1LACgqZFecDphpxrqMyJw1r2DCBHXmQRYY8".to_string(); - let content: String = - "0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string(); - let nonce: i64 = 1675908856; - let block_number: u64 = 8459496; - let block_hash: String = - "0x2f3ac7506db33d57a58bf3bcd9b2f6a8b04d8566e50f3a3656eb07e763640882".to_string(); - let sig: String = "907f863a74da1c5e42e2dab66eeb3f617ff3d8ace160ef48b298f28bdc6b7140156be33709c8a5ceec8346e9e02601359ad2d45a6e38bce75a7af8d5f7b170881b".to_string(); - let radio_msg = RadioPayloadMessage::new(hash.clone(), content.clone()); - let msg1 = GraphcastMessage::new( - hash.clone(), - Some(radio_msg), - nonce, - NETWORK, - block_number, - block_hash.clone(), - sig, - ) - .expect("Shouldn't get here since the message is purposefully constructed for testing"); - - let parsed = process_messages( - Arc::new(AsyncMutex::new(vec![msg1.clone()])), - REGISTRY_SUBGRAPH, - NETWORK_SUBGRAPH, - ) - .await; - assert!(parsed.is_ok()); - - let content: String = - "0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string(); - let nonce: i64 = 1675908903; - let block_number: u64 = 8459499; - let block_hash: String = - "0xf48f240aa359a5750f5b47e748718b70bb010d234e17ee935d65fd3f1503d3ae".to_string(); - let radio_msg = RadioPayloadMessage::new(hash.clone(), content.clone()); - let sig: String = "907f863a74da1c5e42e2dab66eeb3f617ff3d8ace160ef48b298f28bdc6b7140156be33709c8a5ceec8346e9e02601359ad2d45a6e38bce75a7af8d5f7b170881b".to_string(); - let msg2 = GraphcastMessage::new( - hash, - Some(radio_msg), - nonce, - NETWORK, - block_number, - block_hash.clone(), - sig, - ) - .expect("Shouldn't get here since the message is purposefully constructed for testing"); - - let parsed = process_messages( - Arc::new(AsyncMutex::new(vec![msg1, msg2])), - REGISTRY_SUBGRAPH, - NETWORK_SUBGRAPH, - ) - .await; - assert!(parsed.is_ok()); - } - #[test] fn test_delete_messages() { _ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new()))); @@ -558,9 +511,7 @@ mod tests { assert!(updated_attestation.is_err()); assert_eq!( updated_attestation.unwrap_err().to_string(), - "There is already an attestation from this address. Skipping..." - .yellow() - .to_string() + "There is already an attestation from this address. Skipping...".to_string() ); } @@ -573,12 +524,10 @@ mod tests { ) .await; - assert!(res.is_err()); + assert!(res.is_ok()); assert_eq!( - res.unwrap_err().to_string(), - "The comparison did not execute successfully for on block 42. Continuing..." - .yellow() - .to_string() + res.unwrap().to_string(), + "NotFound: No local attestation found".to_string() ); } @@ -615,8 +564,11 @@ mod tests { ) .await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().to_string(),"No attestations for subgraph different-awesome-hash on block 42 found in remote attestations store. Continuing...".yellow().to_string()); + assert!(res.is_ok()); + assert_eq!( + res.unwrap().to_string(), + "NotFound: No remote attestation found for subgraph different-awesome-hash".to_string() + ); } #[tokio::test] @@ -638,8 +590,11 @@ mod tests { ) .await; - assert!(res.is_err()); - assert_eq!(res.unwrap_err().to_string(),"No attestation for subgraph my-awesome-hash on block 42 found in local attestations store. Continuing...".yellow().to_string()); + assert!(res.is_ok()); + assert_eq!( + res.unwrap().to_string(), + "NotFound: No local attestation found for block 42".to_string() + ); } #[tokio::test] @@ -678,7 +633,9 @@ mod tests { assert!(res.is_ok()); assert_eq!( res.unwrap(), - "POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string() + ComparisonResult::Match( + "POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string() + ) ); } } diff --git a/src/main.rs b/src/main.rs index 6104944..8242e10 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,8 +14,8 @@ use num_bigint::BigUint; use num_traits::Zero; use poi_radio::{ active_allocation_hashes, attestation_handler, compare_attestations, comparison_trigger, - process_messages, save_local_attestation, Attestation, BlockClock, LocalAttestationsMap, - RadioPayloadMessage, GRAPHCAST_AGENT, MESSAGES, + process_messages, save_local_attestation, Attestation, BlockClock, ComparisonResult, + LocalAttestationsMap, RadioPayloadMessage, GRAPHCAST_AGENT, MESSAGES, }; use std::collections::HashMap; use std::env; @@ -212,7 +212,7 @@ async fn main() { ) .await; - debug!( + info!( "{} {} {} {} {} {} {} {}", "🔗 Message block: ".cyan(), message_block, @@ -228,7 +228,7 @@ async fn main() { block_clock.current_block = latest_block.number; if Utc::now().timestamp() >= comparison_trigger { - debug!("{}", "Comparing attestations"); + info!("{}", "Comparing attestations"); trace!("{}{:?}", "Messages: ", MESSAGES); let msgs = MESSAGES.get().unwrap().lock().unwrap().to_vec(); @@ -240,15 +240,16 @@ async fn main() { .await; match remote_attestations { Ok(remote_attestations) => { - match compare_attestations( + let comparison_result = compare_attestations( compare_block, - remote_attestations, + remote_attestations.clone(), Arc::clone(&local_attestations), ) - .await - { - Ok(msg) => { - debug!("{}", msg.green().bold()); + .await; + + match comparison_result { + Ok(ComparisonResult::Match(msg)) => { + info!("{}", msg.green().bold()); // Only clear the ones matching identifier and block number MESSAGES.get().unwrap().lock().unwrap().retain(|msg| { msg.block_number != compare_block @@ -256,14 +257,25 @@ async fn main() { }); debug!("Messages left: {:#?}", MESSAGES); } - Err(err) => { - error!("{}", err); + Ok(ComparisonResult::NotFound(m)) => { + warn!("{}", m); + MESSAGES.get().unwrap().lock().unwrap().retain(|msg| { + msg.block_number != compare_block + || msg.identifier != id.clone() + }); + debug!("Messages left: {:#?}", MESSAGES); + } + Ok(ComparisonResult::Divergent(m)) => { + error!("{}", m); MESSAGES.get().unwrap().lock().unwrap().retain(|msg| { msg.block_number != compare_block || msg.identifier != id.clone() }); debug!("Messages left: {:#?}", MESSAGES); } + Err(e) => { + error!("An error occured while comparing attestations: {}", e); + } } } Err(err) => {