Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add configuration to event filtering #5185

Merged
merged 15 commits into from
Jan 30, 2025
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@

- [#5167](https://github.com/ChainSafe/forest/pull/5167) Allow overriding drand configs with environment variables.

- [#4851](https://github.com/ChainSafe/forest/issues/4851) Add support for `FOREST_MAX_FILTER_RESULTS` in `Filecoin.EthGetLogs` RPC method.
Add an `[events]` section to Forest configuration file.

### Changed

### Removed
Expand Down
4 changes: 2 additions & 2 deletions docs/docs/users/reference/env_variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ process.
| `FOREST_BLOCK_DELAY_SECS` | positive integer | Depends on the network | 30 | Duration of each tipset epoch |
| `FOREST_PROPAGATION_DELAY_SECS` | positive integer | Depends on the network | 20 | How long to wait for a block to propagate through the network |
| `FOREST_MAX_FILTERS` | integer | 100 | 100 | The maximum number of filters |
| `FOREST_MAX_FILTER_RESULTS` | integer | 10,000 | 10000 | The maximum number of filter results |
| `FOREST_MAX_FILTER_HEIGHT_RANGE` | integer | 2880 | 2880 | The maximum filter height range allowed, a conservative limit of one day |
| `FOREST_MAX_FILTER_RESULTS` | positive integer | 10,000 | 10000 | The maximum number of filter results |
| `FOREST_MAX_FILTER_HEIGHT_RANGE` | positive integer | 2880 | 2880 | The maximum filter height range allowed, a conservative limit of one day |
| `FOREST_STATE_MIGRATION_THREADS` | integer | Depends on the machine. | 3 | The number of threads for state migration thread-pool. Advanced users only. |
| `FOREST_CONFIG_PATH` | string | /$FOREST_HOME/com.ChainSafe.Forest/config.toml | `/patj/to/config.toml` | Forest configuration path. Alternatively supplied via `--config` cli parameter. |
| `RUST_LOG` | string | empty | `debug,forest_libp2p::service=info` | Allows for log level customization. |
Expand Down
2 changes: 0 additions & 2 deletions scripts/tests/api_compare/filter-list
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
!Filecoin.MpoolGetNonce
# CustomCheckFailed in Forest: https://github.com/ChainSafe/forest/actions/runs/9593268587/job/26453560366
!Filecoin.StateReplay
# TODO(elmattic): https://github.com/ChainSafe/forest/issues/4851
!Filecoin.EthGetLogs
# TODO: https://github.com/ChainSafe/forest/issues/4996
!Filecoin.EthGetTransactionReceipt
!Filecoin.EthGetTransactionReceiptLimited
2 changes: 0 additions & 2 deletions scripts/tests/api_compare/filter-list-offline
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@
!Filecoin.EthGetBlockByNumber
!eth_getBlockByNumber
!Filecoin.ChainSetHead
# TODO(elmattic): https://github.com/ChainSafe/forest/issues/4759
!Filecoin.EthGetLogs
# TODO: https://github.com/ChainSafe/forest/issues/4996
!Filecoin.EthGetTransactionReceipt
!Filecoin.EthGetTransactionReceiptLimited
20 changes: 20 additions & 0 deletions src/cli_shared/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::db::db_engine::DbConfig;
use crate::libp2p::Libp2pConfig;
use crate::shim::clock::ChainEpoch;
use crate::{chain_sync::SyncConfig, networks::NetworkChain};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
Expand Down Expand Up @@ -36,6 +37,24 @@ impl Default for DaemonConfig {
}
}

/// Structure that defines events configuration
#[derive(Deserialize, Serialize, PartialEq, Eq, Debug, Clone)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
pub struct EventsConfig {
#[cfg_attr(test, arbitrary(gen(|g| u32::arbitrary(g) as _)))]
pub max_filter_results: usize,
pub max_filter_height_range: ChainEpoch,
}

impl Default for EventsConfig {
fn default() -> Self {
Self {
max_filter_results: 10000,
max_filter_height_range: 2880,
}
}
}

#[derive(Serialize, Deserialize, PartialEq, Default, Debug, Clone)]
#[cfg_attr(test, derive(derive_quickcheck_arbitrary::Arbitrary))]
#[serde(default)]
Expand All @@ -46,6 +65,7 @@ pub struct Config {
pub network: Libp2pConfig,
pub sync: SyncConfig,
pub daemon: DaemonConfig,
pub events: EventsConfig,
}

impl Config {
Expand Down
4 changes: 3 additions & 1 deletion src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,8 @@ pub(super) async fn start(

info!("JSON-RPC endpoint will listen at {rpc_address}");

let eth_event_handler = Arc::new(EthEventHandler::from_config(&config.events));

services.spawn(async move {
start_rpc(
RPCState {
Expand All @@ -408,7 +410,7 @@ pub(super) async fn start(
mpool,
bad_blocks,
sync_state,
eth_event_handler: Arc::new(EthEventHandler::new()),
eth_event_handler,
sync_network_context,
network_name,
start_time,
Expand Down
40 changes: 35 additions & 5 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use super::CollectedEvent;
use super::Predefined;
use crate::blocks::Tipset;
use crate::chain::index::ResolveNullTipset;
use crate::cli_shared::cli::EventsConfig;
use crate::rpc::eth::filter::event::*;
use crate::rpc::eth::filter::mempool::*;
use crate::rpc::eth::filter::tipset::*;
Expand Down Expand Up @@ -59,17 +60,41 @@ pub trait FilterManager {
/// configurations such as the maximum filter height range and maximum filter results.
pub struct EthEventHandler {
filter_store: Option<Arc<dyn FilterStore>>,
max_filter_height_range: ChainEpoch,
pub max_filter_results: usize,
pub max_filter_height_range: ChainEpoch,
event_filter_manager: Option<Arc<EventFilterManager>>,
tipset_filter_manager: Option<Arc<TipSetFilterManager>>,
mempool_filter_manager: Option<Arc<MempoolFilterManager>>,
}

impl EthEventHandler {
pub fn new() -> Self {
let config = EventsConfig::default();
Self::from_config(&config)
}

pub fn from_config(config: &EventsConfig) -> Self {
let max_filters: usize = env_or_default("FOREST_MAX_FILTERS", 100);
let max_filter_results: usize = env_or_default("FOREST_MAX_FILTER_RESULTS", 10000);
let max_filter_height_range: i64 = env_or_default("FOREST_MAX_FILTER_HEIGHT_RANGE", 2880);
let max_filter_results = std::env::var("FOREST_MAX_FILTER_RESULTS")
elmattic marked this conversation as resolved.
Show resolved Hide resolved
.ok()
.and_then(|v| match v.parse::<usize>() {
Ok(u) if u > 0 => Some(u),
_ => {
tracing::warn!("Invalid FOREST_MAX_FILTER_RESULTS value {v}. A positive integer is expected.");
None
}
})
.unwrap_or(config.max_filter_results);
let max_filter_height_range = std::env::var("FOREST_MAX_FILTER_HEIGHT_RANGE")
.ok()
.and_then(|v| match v.parse::<ChainEpoch>() {
Ok(i) if i > 0 => Some(i),
_ => {
tracing::warn!("Invalid FOREST_MAX_FILTER_HEIGHT_RANGE value {v}. A positive integer is expected.");
None
}
})
.unwrap_or(config.max_filter_height_range);
let filter_store: Option<Arc<dyn FilterStore>> =
Some(MemFilterStore::new(max_filters) as Arc<dyn FilterStore>);
let event_filter_manager = Some(EventFilterManager::new(max_filter_results));
Expand All @@ -78,6 +103,7 @@ impl EthEventHandler {

Self {
filter_store,
max_filter_results,
max_filter_height_range,
event_filter_manager,
tipset_filter_manager,
Expand Down Expand Up @@ -257,6 +283,7 @@ impl EthEventHandler {
messages.len() == events.len(),
"Length of messages and events do not match"
);

let mut event_count = 0;
for (i, (message, events)) in messages.iter().zip(events.into_iter()).enumerate() {
for event in events.iter() {
Expand Down Expand Up @@ -320,6 +347,9 @@ impl EthEventHandler {
msg_idx: i as u64,
msg_cid: message.cid(),
};
if collected_events.len() >= ctx.eth_event_handler.max_filter_results {
bail!("filter matches too many events, try a more restricted filter");
}
collected_events.push(ce);
event_count += 1;
}
Expand Down Expand Up @@ -357,7 +387,7 @@ impl EthEventHandler {
*range.end()
};

let num_tipsets = (range.end() - range.start()) as usize + 1;
let num_tipsets = (range.end() - range.start()) + 1;
let max_tipset = ctx.chain_store().chain_index.tipset_by_height(
max_height,
ctx.chain_store().heaviest_tipset(),
Expand All @@ -367,7 +397,7 @@ impl EthEventHandler {
.as_ref()
.clone()
.chain(&ctx.store())
.take(num_tipsets)
.take(num_tipsets as usize)
{
let tipset = Arc::new(tipset);
Self::collect_events(ctx, &tipset, Some(&spec), &mut collected_events).await?;
Expand Down
4 changes: 3 additions & 1 deletion src/tool/offline_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::auth::generate_priv_key;
use crate::chain::ChainStore;
use crate::chain_sync::network_context::SyncNetworkContext;
use crate::chain_sync::{SyncConfig, SyncStage};
use crate::cli_shared::cli::EventsConfig;
use crate::cli_shared::snapshot::TrustedVendor;
use crate::daemon::db_util::{download_to, populate_eth_mappings};
use crate::db::{car::ManyCar, MemoryDB};
Expand Down Expand Up @@ -59,6 +60,7 @@ pub async fn start_offline_server(
db.read_only_files(snapshot_files.iter().cloned())?;
let chain_config = Arc::new(handle_chain_config(&chain)?);
let sync_config = Arc::new(SyncConfig::default());
let events_config = Arc::new(EventsConfig::default());
let genesis_header = read_genesis_header(
genesis.as_deref(),
chain_config.genesis_bytes(&db).await?.as_deref(),
Expand Down Expand Up @@ -136,7 +138,7 @@ pub async fn start_offline_server(
mpool: Arc::new(message_pool),
bad_blocks: Default::default(),
sync_state: Arc::new(parking_lot::RwLock::new(Default::default())),
eth_event_handler: Arc::new(EthEventHandler::new()),
eth_event_handler: Arc::new(EthEventHandler::from_config(&events_config)),
sync_network_context,
network_name,
start_time: chrono::Utc::now(),
Expand Down
56 changes: 56 additions & 0 deletions src/tool/subcommands/api_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,12 +512,41 @@ enum PolicyOnRejected {
PassWithQuasiIdenticalError,
}

enum SortPolicy {
/// Recursively sorts both arrays and maps in a JSON value.
All,
}

struct RpcTest {
request: rpc::Request,
check_syntax: Arc<dyn Fn(serde_json::Value) -> bool + Send + Sync>,
check_semantics: Arc<dyn Fn(serde_json::Value, serde_json::Value) -> bool + Send + Sync>,
ignore: Option<&'static str>,
policy_on_rejected: PolicyOnRejected,
sort_policy: Option<SortPolicy>,
}

fn sort_json(value: &mut Value) {
match value {
Value::Array(arr) => {
for v in arr.iter_mut() {
sort_json(v);
}
arr.sort_by_key(|a| a.to_string());
}
Value::Object(obj) => {
let mut sorted_map: serde_json::Map<String, Value> = serde_json::Map::new();
let mut keys: Vec<String> = obj.keys().cloned().collect();
keys.sort();
for k in keys {
let mut v = obj.remove(&k).unwrap();
sort_json(&mut v);
sorted_map.insert(k, v);
}
*obj = sorted_map;
}
_ => (),
}
}

/// Duplication between `<method>` and `<method>_raw` is a temporary measure, and
Expand Down Expand Up @@ -546,6 +575,7 @@ impl RpcTest {
check_semantics: Arc::new(|_, _| true),
ignore: None,
policy_on_rejected: PolicyOnRejected::Fail,
sort_policy: None,
}
}
/// Check that an endpoint exists, has the same JSON schema, and do custom
Expand Down Expand Up @@ -591,6 +621,7 @@ impl RpcTest {
}),
ignore: None,
policy_on_rejected: PolicyOnRejected::Fail,
sort_policy: None,
}
}
/// Check that an endpoint exists and that Forest returns exactly the same
Expand All @@ -609,6 +640,11 @@ impl RpcTest {
self
}

fn sort_policy(mut self, policy: SortPolicy) -> Self {
self.sort_policy = Some(policy);
self
}

async fn run(&self, forest: &rpc::Client, lotus: &rpc::Client) -> TestResult {
let forest_resp = forest.call(self.request.clone()).await;
let forest_response = forest_resp.as_ref().map_err(|e| e.to_string()).cloned();
Expand All @@ -619,6 +655,15 @@ impl RpcTest {
(Ok(forest), Ok(lotus))
if (self.check_syntax)(forest.clone()) && (self.check_syntax)(lotus.clone()) =>
{
let (forest, lotus) = if self.sort_policy.is_some() {
let mut sorted_forest = forest.clone();
sort_json(&mut sorted_forest);
let mut sorted_lotus = lotus.clone();
sort_json(&mut sorted_lotus);
(sorted_forest, sorted_lotus)
} else {
(forest, lotus)
};
let forest_status = if (self.check_semantics)(forest, lotus) {
TestSummary::Valid
} else {
Expand Down Expand Up @@ -1625,6 +1670,17 @@ fn eth_tests_with_tipset<DB: Blockstore>(store: &Arc<DB>, shared_tipset: &Tipset
},))
.unwrap(),
),
RpcTest::identity(
EthGetLogs::request((EthFilterSpec {
from_block: Some(format!("0x{:x}", shared_tipset.epoch() - 100)),
to_block: Some(format!("0x{:x}", shared_tipset.epoch())),
address: vec![],
topics: None,
block_hash: None,
},))
.unwrap(),
)
.sort_policy(SortPolicy::All),
RpcTest::identity(EthGetTransactionHashByCid::request((block_cid,)).unwrap()),
RpcTest::identity(
EthTraceBlock::request((BlockNumberOrHash::from_block_number(shared_tipset.epoch()),))
Expand Down
Loading