Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: TFHE operation events for coprocessor #269

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[workspace]
resolver = "2"
members = ["coprocessor", "executor", "fhevm-engine-common"]
members = ["coprocessor", "executor", "fhevm-engine-common", "listener"]

[workspace.package]
authors = ["Zama"]
Expand Down
22 changes: 22 additions & 0 deletions fhevm-engine/listener/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "listener"
version = "0.0.1"
edition = "2021"

[[bin]]
path = "src/bin/main.rs"
name = "listen"
test = false
bench = false

[dependencies]
# workspace dependencies
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: These are both crates.io and workspace dependencies, maybe could split to separate sections.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes all versions will go to the root Cargo if no conflict and be pinned.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postponed because no CI test to see if I introduce issues

alloy = { version = "=0.9.2", features = ["contract", "json", "providers", "provider-ws", "pubsub", "rpc-types", "sol-types"] }
alloy-rpc-types = "=0.9.2"
alloy-sol-types = "=0.8.19"
alloy-primitives = "=0.8.19"
alloy-provider = "=0.9.2"
clap = { workspace = true }
futures-util = "=0.3"
serde = { workspace = true }
tokio = { workspace = true }
99 changes: 99 additions & 0 deletions fhevm-engine/listener/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# Listener

The listener primary role is to observe the block chain execution and extend that execution off the chain.

## How

Our contracts actively emits events that forms the trace of a symbolic execution. These events can be observed via the blockchain node pubsub events feature.

## Command-line

WIP

## Events in FHEVM

### Blockchain Events
> Status: in progress
Blockchain events are used export the symbolic execution of TFHE operations from a blockchain node configured to accept pubsub requests.
A listener subscribe to the blockchain node and converts the events to a TFHE workload in a database.

There are 3 types of events related to:
- TFHE operations
- ACL, can be used to preprocess ciphertext for certain use case
- Public and User Decryption

### Database Events
> Status: proposal
Database events are used to hint the scheduler to dispath workload and to notice workload completion.

> https://stackoverflow.com/questions/56747634/how-do-i-use-the-postgres-crate-to-receive-table-modification-events-from-postgr

### Decryption Events
> Status: in progress

### Overview FHEVM
> **_NOTE:_** Listener and scheduler could be in the same service.**
```mermaid
sequenceDiagram
participant BC App Node
participant Listener
participant Scheduler
participant DB
participant Coprocessor

Listener-->>BC App Node: Subscribe Contract Events
Scheduler-->>DB: Subscribe Computations Insertions/Status<br/>(proposal)

loop Block Execution - Symbolic Operations
Note over BC App Node: Solidity traces a Symbolic Sequence
Note over BC App Node: TFHEExecutor contract
Note over BC App Node: ACL contract
end

Note over BC App Node: End of Block Execution (MAYBE)

BC App Node-)Listener: TFHE Operations Events
BC App Node-)Listener: ACL Events

Listener->>DB: Insert TFHE Operations
DB-)Scheduler: Notice TFHE Operations Insertions<br/>(proposal)
Scheduler-)Coprocessor: THFE Operation Workload
BC App Node-)Listener: Decryption Events

loop FHE Computation
Coprocessor -->> DB: Read Operands Ciphertexts
Note over Coprocessor: TFHE Computation
Coprocessor -->> DB: Write Result Ciphertext
Coprocessor-->>DB: Mark TFHE Operation as Done
end
DB-)Scheduler: Notice TFHE Operations Status<br/>(proposal)
```

### Overview Relayer (maybe incorrect to be refined)

```mermaid
sequenceDiagram
participant Relayer
participant Listener
participant Scheduler
participant DB
participant Coprocessor

Note over Listener: THEFE Operations Events
Note over Listener: Decryption Events

Listener->>DB: Insert TFHE Operations
Listener->>Relayer: Decryption Workload
DB-)Scheduler: Notice TFHE Operations Insertions<br/>(proposal)
Scheduler-)Coprocessor: THEFE Operation Workload

loop FHE Computation
Coprocessor -->> DB: Read Operands Ciphertexts
Note over Coprocessor: TFHE Computation
Coprocessor -->> DB: Write Result Ciphertexts
Coprocessor-->>DB: TFHE Operation Done
end
DB-)Scheduler: Notice TFHE Operations Status<br/>(proposal)
Scheduler-)Relayer: Notice Ciphertext ready for decryption
```

215 changes: 215 additions & 0 deletions fhevm-engine/listener/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
use futures_util::stream::StreamExt;
use std::str::FromStr;
use std::time::Duration;

use alloy::primitives::Address;
use alloy::providers::{Provider, ProviderBuilder, RootProvider, WsConnect};
use alloy::pubsub::{PubSubFrontend, SubscriptionStream};
use alloy::rpc::types::{BlockNumberOrTag, Filter};
use alloy_rpc_types::Log;
use alloy_sol_types::SolEventInterface;

use clap::Parser;

use listener::contracts::{AclContract, TfheContract};

const DEFAULT_BLOCK: BlockNumberOrTag = BlockNumberOrTag::Latest;
const DEFAULT_CATCHUP: u64 = 5;

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
#[arg(long, default_value = "ws://0.0.0.0:8746")]
pub url: String,

#[arg(long, default_value = "false")]
pub ignore_tfhe_events: bool,

#[arg(long, default_value = "false")]
pub ignore_acl_events: bool,

#[arg(long, default_value = None)]
pub acl_contract_address: Option<String>,

#[arg(long, default_value = None)]
pub tfhe_contract_address: Option<String>,

#[arg(long, default_value = None)]
pub database_url: Option<String>,

#[arg(long, default_value = None, help = "Can be negative from last block", allow_hyphen_values = true)]
pub start_at_block: Option<i64>,

#[arg(long, default_value = None)]
pub end_at_block: Option<u64>,
}

// TODO: to merge with Levent works
struct InfiniteLogIter {
url: String,
contract_addresses: Vec<Address>,
stream: Option<SubscriptionStream<Log>>,
provider: Option<RootProvider<PubSubFrontend>>, // required to maintain the stream
last_seen_block: Option<u64>,
start_at_block: Option<i64>,
end_at_block: Option<u64>,
}

impl InfiniteLogIter {
fn new(args: &Args) -> Self {
let mut contract_addresses = vec![];
if let Some(acl_contract_address) = &args.acl_contract_address {
contract_addresses.push(Address::from_str(acl_contract_address).unwrap());
};
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
contract_addresses.push(Address::from_str(tfhe_contract_address).unwrap());
};
Self {
url: args.url.clone(),
contract_addresses: contract_addresses,
stream: None,
provider: None,
last_seen_block: None,
start_at_block: args.start_at_block,
end_at_block: args.end_at_block,
}
}

async fn catchup_block_from(
&self,
provider: &RootProvider<PubSubFrontend>,
) -> BlockNumberOrTag {
if let Some(last_seen_block) = self.last_seen_block {
return BlockNumberOrTag::Number(last_seen_block - 1);
}
if let Some(start_at_block) = self.start_at_block {
if start_at_block >= 0 {
return BlockNumberOrTag::Number(start_at_block.try_into().unwrap());
}
}
let Ok(last_block) = provider.get_block_number().await else {
return BlockNumberOrTag::Earliest; // should not happend
};
let catch_size = if let Some(start_at_block) = self.start_at_block {
(-start_at_block).try_into().unwrap()
} else {
DEFAULT_CATCHUP
};
return BlockNumberOrTag::Number(last_block - catch_size.min(last_block));
}

async fn new_log_stream(&mut self, not_initialized: bool) {
let mut retry = 20;
loop {
let ws = WsConnect::new(&self.url);
match ProviderBuilder::new().on_ws(ws).await {
Ok(provider) => {
let catch_up_from = self.catchup_block_from(&provider).await;
if not_initialized {
eprintln!("Catchup from {:?}", catch_up_from);
}
let mut filter = Filter::new().from_block(catch_up_from);
if let Some(end_at_block) = self.end_at_block {
filter = filter.to_block(BlockNumberOrTag::Number(end_at_block));
// inclusive
}
if !self.contract_addresses.is_empty() {
filter = filter.address(self.contract_addresses.clone())
}
eprintln!("Listening on {}", &self.url);
eprintln!("Contracts {:?}", &self.contract_addresses);
self.stream = Some(
provider
.subscribe_logs(&filter)
.await
.expect("BLA2")
.into_stream(),
);
self.provider = Some(provider);
return;
}
Err(err) => {
let delay = if not_initialized {
if retry == 0 {
panic!("Cannot connect to {} due to {err}.", &self.url)
}
5
} else {
1
};
if not_initialized {
eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, {retry} times.", &self.url);
} else {
eprintln!("Cannot connect to {} due to {err}. Will retry in {delay} secs, indefinitively.", &self.url);
}
retry -= 1;
tokio::time::sleep(Duration::from_secs(delay)).await;
}
}
}
}

async fn next(&mut self) -> Option<Log> {
let mut not_initialized = true;
loop {
let Some(stream) = &mut self.stream else {
self.new_log_stream(not_initialized).await;
not_initialized = false;
continue;
};
let Some(log) = stream.next().await else {
// the stream ends, could be a restart of the full node, or just a temporary gap
self.stream = None;
if let (Some(end_at_block), Some(last_seen_block)) =
(self.end_at_block, self.last_seen_block)
{
if end_at_block == last_seen_block {
eprintln!("Nothing to read, reached end of block range");
return None;
}
}
eprintln!("Nothing to read, retrying");
tokio::time::sleep(Duration::from_secs(1)).await;
continue;
};
return Some(log);
}
}
}

#[tokio::main]
async fn main() -> () {
let args = Args::parse();
let mut log_iter = InfiniteLogIter::new(&args);
if let Some(acl_contract_address) = &args.acl_contract_address {
if let Err(err) = Address::from_str(acl_contract_address) {
panic!("Invalid acl contract address: {err}");
};
};
if let Some(tfhe_contract_address) = &args.tfhe_contract_address {
if let Err(err) = Address::from_str(tfhe_contract_address) {
panic!("Invalid tfhe contract address: {err}");
};
}

log_iter.new_log_stream(true).await;
while let Some(log) = log_iter.next().await {
if let Some(block_number) = log.block_number {
eprintln!("Event at block: {}", { block_number });
log_iter.last_seen_block = Some(block_number);
}
if !args.ignore_tfhe_events {
if let Ok(event) = TfheContract::TfheContractEvents::decode_log(&log.inner, true) {
// TODO: filter on contract address if known
println!("TFHE {event:#?}");
continue;
}
}
if !args.ignore_tfhe_events {
if let Ok(event) = AclContract::AclContractEvents::decode_log(&log.inner, true) {
println!("ACL {event:#?}");
continue;
}
}
}
}
Loading