Skip to content
This repository has been archived by the owner on Jul 20, 2023. It is now read-only.

Commit

Permalink
refactor: improve compare attestations logs
Browse files Browse the repository at this point in the history
  • Loading branch information
pete-eiger authored and hopeyen committed Mar 23, 2023
1 parent a687e51 commit e7d3bf0
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 160 deletions.
4 changes: 1 addition & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,4 @@ PRIVATE_KEY=
GRAPH_NODE_STATUS_ENDPOINT=
REGISTRY_SUBGRAPH=
NETWORK_SUBGRAPH=
WAKU_HOST=
WAKU_PORT=
BOOT_NODE_ADDRESSES
GRAPHCAST_NETWORK=
247 changes: 102 additions & 145 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::anyhow;
use colored::*;
use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
Expand All @@ -10,10 +9,11 @@ use prost::Message;
use serde::{Deserialize, Serialize};
use std::{
collections::HashMap,
fmt::Display,
sync::{Arc, Mutex as SyncMutex},
};
use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};

use graphcast_sdk::{
graphcast_agent::{
Expand Down Expand Up @@ -102,15 +102,14 @@ pub async fn active_allocation_hashes(
indexer_address: Option<String>,
) -> Vec<String> {
if let Some(addr) = indexer_address {
let allocs = query_network_subgraph(network_subgraph.to_string(), addr)
query_network_subgraph(network_subgraph.to_string(), addr)
.await
.map_err(|e| -> Vec<String> {
error!("Topic generation error: {}", e);
[].to_vec()
})
.unwrap()
.indexer_allocations();
allocs
.indexer_allocations()
} else {
[].to_vec()
}
Expand Down Expand Up @@ -207,9 +206,7 @@ impl Attestation {
if base.senders.contains(&address) {
Err(anyhow!(
"{}",
"There is already an attestation from this address. Skipping..."
.to_string()
.yellow()
"There is already an attestation from this address. Skipping...".to_string()
))
} else {
let senders = [base.senders.clone(), vec![address]].concat();
Expand Down Expand Up @@ -251,13 +248,28 @@ pub fn save_local_attestation(
/// in the handler.
pub fn attestation_handler(
) -> impl Fn(Result<GraphcastMessage<RadioPayloadMessage>, WakuHandlingError>) {
|msg: Result<GraphcastMessage<RadioPayloadMessage>, WakuHandlingError>| match msg {
Ok(msg) => {
|msg: Result<GraphcastMessage<RadioPayloadMessage>, WakuHandlingError>| {
// TODO: Handle the error case by incrementing a Prometheus "error" counter
if let Ok(msg) = msg {
debug!("Received message: {:?}", msg);
MESSAGES.get().unwrap().lock().unwrap().push(msg);
}
Err(err) => {
error!("{}", err);
}
}

#[derive(Debug, PartialEq)]
pub enum ComparisonResult {
NotFound(String),
Divergent(String),
Match(String),
}

impl Display for ComparisonResult {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ComparisonResult::NotFound(s) => write!(f, "NotFound: {}", s),
ComparisonResult::Divergent(s) => write!(f, "Divergent: {}", s),
ComparisonResult::Match(s) => write!(f, "Matched: {}", s),
}
}
}
Expand All @@ -271,71 +283,76 @@ pub async fn compare_attestations(
attestation_block: u64,
remote: RemoteAttestationsMap,
local: Arc<AsyncMutex<LocalAttestationsMap>>,
) -> Result<String, anyhow::Error> {
) -> Result<ComparisonResult, anyhow::Error> {
let local = local.lock().await;
let (ipfs_hash, blocks) = match local.iter().next() {
Some(pair) => pair,
None => {
return Ok(ComparisonResult::NotFound(String::from(
"No local attestation found",
)))
}
};
let local_attestation = match blocks.get(&attestation_block) {
Some(attestations) => attestations,
None => {
return Ok(ComparisonResult::NotFound(format!(
"No local attestation found for block {}",
attestation_block
)))
}
};

// Iterate & compare
if let Some((ipfs_hash, blocks)) = local.iter().next() {
let attestations = blocks.get(&attestation_block);
match attestations {
Some(local_attestation) => {
let remote_blocks = remote.get(ipfs_hash);
match remote_blocks {
Some(remote_blocks) => {
let remote_attestations = remote_blocks.get(&attestation_block);

match remote_attestations {
Some(remote_attestations) => {
let mut remote_attestations = remote_attestations.clone();

remote_attestations
.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap());

info!("Number of nPOI submitted for block {attestation_block}: {:#?}", remote_attestations.len());
if remote_attestations.len() > 1 {
debug!("Sorted attestations: {:#?}", remote_attestations);
}

let most_attested_npoi = &remote_attestations.last().unwrap().npoi;
if most_attested_npoi == &local_attestation.npoi {
return Ok(format!(
"POIs match for subgraph {ipfs_hash} on block {attestation_block}!: {most_attested_npoi}"
));
} else {
return Err(anyhow!(format!(
"POIs don't match for subgraph {ipfs_hash} on block {attestation_block}!"
)
));
}
},
None => {
return Err(anyhow!(format!(
"No record for subgraph {ipfs_hash} on block {attestation_block} found in remote attestations"
)
));
}
}
}
None => {
return Err(anyhow!(format!("No attestations for subgraph {ipfs_hash} on block {attestation_block} found in remote attestations store. Continuing...", )))
}
}
}
None => {
return Err(anyhow!(format!("No attestation for subgraph {ipfs_hash} on block {attestation_block} found in local attestations store. Continuing...", )))
}
let remote_blocks = match remote.get(ipfs_hash) {
Some(blocks) => blocks,
None => {
return Ok(ComparisonResult::NotFound(format!(
"No remote attestation found for subgraph {}",
ipfs_hash
)))
}
};
let remote_attestations = match remote_blocks.get(&attestation_block) {
Some(attestations) => attestations,
None => {
return Ok(ComparisonResult::NotFound(format!(
"No remote attestation found for subgraph {} on block {}",
ipfs_hash, attestation_block
)))
}
};

let mut remote_attestations = remote_attestations.clone();
remote_attestations.sort_by(|a, b| a.stake_weight.partial_cmp(&b.stake_weight).unwrap());

info!(
"Number of nPOI submitted for block {}: {:#?}",
attestation_block,
remote_attestations.len()
);
if remote_attestations.len() > 1 {
warn!(
"More than 1 nPOI found for subgraph {} on block {}. Attestations (sorted): {:#?}",
ipfs_hash, attestation_block, remote_attestations
);
}

Err(anyhow!(format!(
"The comparison did not execute successfully for on block {attestation_block}. Continuing...",
)
))
let most_attested_npoi = &remote_attestations.last().unwrap().npoi;
if most_attested_npoi == &local_attestation.npoi {
Ok(ComparisonResult::Match(format!(
"POIs match for subgraph {} on block {}!: {}",
ipfs_hash, attestation_block, most_attested_npoi
)))
} else {
Ok(ComparisonResult::Divergent(format!(
"POIs don't match for subgraph {} on block {}!",
ipfs_hash, attestation_block
)))
}
}

#[cfg(test)]
mod tests {
use dotenv::dotenv;
use graphcast_sdk::NetworkName;
use num_traits::One;

Expand Down Expand Up @@ -402,70 +419,6 @@ mod tests {
);
}

#[tokio::test]
async fn test_process_messages() {
dotenv().ok();

const REGISTRY_SUBGRAPH: &str =
"https://api.thegraph.com/subgraphs/name/hopeyen/graphcast-registry-goerli";
const NETWORK_SUBGRAPH: &str = "https://gateway.testnet.thegraph.com/network";

let hash: String = "QmaCRFCJX3f1LACgqZFecDphpxrqMyJw1r2DCBHXmQRYY8".to_string();
let content: String =
"0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string();
let nonce: i64 = 1675908856;
let block_number: u64 = 8459496;
let block_hash: String =
"0x2f3ac7506db33d57a58bf3bcd9b2f6a8b04d8566e50f3a3656eb07e763640882".to_string();
let sig: String = "907f863a74da1c5e42e2dab66eeb3f617ff3d8ace160ef48b298f28bdc6b7140156be33709c8a5ceec8346e9e02601359ad2d45a6e38bce75a7af8d5f7b170881b".to_string();
let radio_msg = RadioPayloadMessage::new(hash.clone(), content.clone());
let msg1 = GraphcastMessage::new(
hash.clone(),
Some(radio_msg),
nonce,
NETWORK,
block_number,
block_hash.clone(),
sig,
)
.expect("Shouldn't get here since the message is purposefully constructed for testing");

let parsed = process_messages(
Arc::new(AsyncMutex::new(vec![msg1.clone()])),
REGISTRY_SUBGRAPH,
NETWORK_SUBGRAPH,
)
.await;
assert!(parsed.is_ok());

let content: String =
"0xa6008cea5905b8b7811a68132feea7959b623188e2d6ee3c87ead7ae56dd0eae".to_string();
let nonce: i64 = 1675908903;
let block_number: u64 = 8459499;
let block_hash: String =
"0xf48f240aa359a5750f5b47e748718b70bb010d234e17ee935d65fd3f1503d3ae".to_string();
let radio_msg = RadioPayloadMessage::new(hash.clone(), content.clone());
let sig: String = "907f863a74da1c5e42e2dab66eeb3f617ff3d8ace160ef48b298f28bdc6b7140156be33709c8a5ceec8346e9e02601359ad2d45a6e38bce75a7af8d5f7b170881b".to_string();
let msg2 = GraphcastMessage::new(
hash,
Some(radio_msg),
nonce,
NETWORK,
block_number,
block_hash.clone(),
sig,
)
.expect("Shouldn't get here since the message is purposefully constructed for testing");

let parsed = process_messages(
Arc::new(AsyncMutex::new(vec![msg1, msg2])),
REGISTRY_SUBGRAPH,
NETWORK_SUBGRAPH,
)
.await;
assert!(parsed.is_ok());
}

#[test]
fn test_delete_messages() {
_ = MESSAGES.set(Arc::new(SyncMutex::new(Vec::new())));
Expand Down Expand Up @@ -558,9 +511,7 @@ mod tests {
assert!(updated_attestation.is_err());
assert_eq!(
updated_attestation.unwrap_err().to_string(),
"There is already an attestation from this address. Skipping..."
.yellow()
.to_string()
"There is already an attestation from this address. Skipping...".to_string()
);
}

Expand All @@ -573,12 +524,10 @@ mod tests {
)
.await;

assert!(res.is_err());
assert!(res.is_ok());
assert_eq!(
res.unwrap_err().to_string(),
"The comparison did not execute successfully for on block 42. Continuing..."
.yellow()
.to_string()
res.unwrap().to_string(),
"NotFound: No local attestation found".to_string()
);
}

Expand Down Expand Up @@ -615,8 +564,11 @@ mod tests {
)
.await;

assert!(res.is_err());
assert_eq!(res.unwrap_err().to_string(),"No attestations for subgraph different-awesome-hash on block 42 found in remote attestations store. Continuing...".yellow().to_string());
assert!(res.is_ok());
assert_eq!(
res.unwrap().to_string(),
"NotFound: No remote attestation found for subgraph different-awesome-hash".to_string()
);
}

#[tokio::test]
Expand All @@ -638,8 +590,11 @@ mod tests {
)
.await;

assert!(res.is_err());
assert_eq!(res.unwrap_err().to_string(),"No attestation for subgraph my-awesome-hash on block 42 found in local attestations store. Continuing...".yellow().to_string());
assert!(res.is_ok());
assert_eq!(
res.unwrap().to_string(),
"NotFound: No local attestation found for block 42".to_string()
);
}

#[tokio::test]
Expand Down Expand Up @@ -678,7 +633,9 @@ mod tests {
assert!(res.is_ok());
assert_eq!(
res.unwrap(),
"POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string()
ComparisonResult::Match(
"POIs match for subgraph my-awesome-hash on block 42!: awesome-npoi".to_string()
)
);
}
}
Loading

0 comments on commit e7d3bf0

Please sign in to comment.