Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(mev-boost-rs): Refactor and Cleaning #196

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/mev/src/cmd/boost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl Command {
info!("configured for `{network}`");

if let Some(config) = config.boost {
Ok(Service::from(network, config).spawn()?.await?)
Ok(Service::from(network, config).spawn()?.await??)
} else {
Err(eyre::eyre!("missing boost config from file provided"))
}
Expand Down
2 changes: 0 additions & 2 deletions mev-boost-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ version.workspace = true
edition = "2021"
license = "MIT OR Apache-2.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { version = "1.0", features = ["full"] }
tracing = "0.1"
Expand Down
15 changes: 15 additions & 0 deletions mev-boost-rs/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use serde::Deserialize;
use std::net::Ipv4Addr;

#[derive(Debug, Deserialize)]
pub struct Config {
pub host: Ipv4Addr,
pub port: u16,
pub relays: Vec<String>,
}

impl Default for Config {
fn default() -> Self {
Self { host: Ipv4Addr::UNSPECIFIED, port: 18550, relays: vec![] }
}
}
4 changes: 3 additions & 1 deletion mev-boost-rs/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
mod config;
mod relay_mux;
mod service;

pub use service::{Config, Service};
pub use config::Config;
pub use service::Service;
39 changes: 14 additions & 25 deletions mev-boost-rs/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,10 @@ use futures::StreamExt;
use mev_rs::{
blinded_block_provider::Server as BlindedBlockProviderServer,
relay::{parse_relay_endpoints, Relay, RelayEndpoint},
Error,
BoostError, Error,
};
use serde::Deserialize;
use std::{future::Future, net::Ipv4Addr, pin::Pin, task::Poll};
use tokio::task::{JoinError, JoinHandle};
use tracing::{error, info};

#[derive(Debug, Deserialize)]
pub struct Config {
pub host: Ipv4Addr,
pub port: u16,
pub relays: Vec<String>,
}

impl Default for Config {
fn default() -> Self {
Self { host: Ipv4Addr::UNSPECIFIED, port: 18550, relays: vec![] }
}
}

pub struct Service {
host: Ipv4Addr,
Expand All @@ -35,7 +20,7 @@ pub struct Service {
}

impl Service {
pub fn from(network: Network, config: Config) -> Self {
pub fn from(network: Network, config: crate::Config) -> Self {
let relays = parse_relay_endpoints(&config.relays);

Self { host: config.host, port: config.port, relays, network }
Expand All @@ -46,12 +31,12 @@ impl Service {
let Self { host, port, relays, network } = self;

if relays.is_empty() {
error!("no valid relays provided; please restart with correct configuration");
tracing::error!("no valid relays provided; please restart with correct configuration");
} else {
let count = relays.len();
info!("configured with {count} relay(s)");
tracing::info!("configured with {count} relay(s)");
for relay in &relays {
info!(%relay, "configured with relay");
tracing::info!(%relay, "configured with relay");
}
}

Expand All @@ -63,14 +48,16 @@ impl Service {
});
let relay_mux = RelayMux::new(relays, context);

// NOTE: cloning is inexpensive as the relay max wraps an Arc inner type
let relay_mux_clone = relay_mux.clone();
let relay_task = tokio::spawn(async move {
let relay_mux = relay_mux_clone;
let slots = clock.stream_slots();

tokio::pin!(slots);

let mut current_epoch = clock.current_epoch().expect("after genesis");
let mut current_epoch =
clock.current_epoch().ok_or(Error::Boost(BoostError::EpochFetchFailure))?;
while let Some(slot) = slots.next().await {
relay_mux.on_slot(slot);

Expand All @@ -80,6 +67,8 @@ impl Service {
current_epoch = epoch;
}
}

Ok(())
});

let server = BlindedBlockProviderServer::new(host, port, relay_mux).spawn();
Expand All @@ -94,19 +83,19 @@ impl Service {
#[pin_project::pin_project]
pub struct ServiceHandle {
#[pin]
relay_mux: JoinHandle<()>,
relay_mux: JoinHandle<Result<(), Error>>,
#[pin]
server: JoinHandle<()>,
server: JoinHandle<Result<(), Error>>,
}

impl Future for ServiceHandle {
type Output = Result<(), JoinError>;
type Output = Result<Result<(), Error>, JoinError>;

fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let relay_mux = this.relay_mux.poll(cx);
if relay_mux.is_ready() {
return relay_mux
return relay_mux;
}
this.server.poll(cx)
}
Expand Down
17 changes: 9 additions & 8 deletions mev-rs/src/blinded_block_provider/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use beacon_api_client::VersionedValue;
use hyper::server::conn::AddrIncoming;
use std::net::{Ipv4Addr, SocketAddr};
use tokio::task::JoinHandle;
use tracing::{error, info, trace};

/// Type alias for the configured axum server
pub type BlockProviderServer = axum::Server<AddrIncoming, IntoMakeService<Router>>;
Expand All @@ -30,7 +29,7 @@ pub(crate) async fn handle_validator_registration<B: BlindedBlockProvider>(
State(builder): State<B>,
Json(mut registrations): Json<Vec<SignedValidatorRegistration>>,
) -> Result<(), Error> {
trace!(count = registrations.len(), "processing validator registrations");
tracing::trace!(count = registrations.len(), "processing validator registrations");
builder.register_validators(&mut registrations).await.map_err(From::from)
}

Expand All @@ -39,7 +38,7 @@ pub(crate) async fn handle_fetch_bid<B: BlindedBlockProvider>(
Path(auction_request): Path<AuctionRequest>,
) -> Result<Json<VersionedValue<SignedBuilderBid>>, Error> {
let signed_bid = builder.fetch_best_bid(&auction_request).await?;
trace!(%auction_request, %signed_bid, "returning bid");
tracing::trace!(%auction_request, %signed_bid, "returning bid");
let version = signed_bid.version();
let response = VersionedValue { version, data: signed_bid, meta: Default::default() };
Ok(Json(response))
Expand All @@ -53,7 +52,7 @@ pub(crate) async fn handle_open_bid<B: BlindedBlockProvider>(
let payload = auction_contents.execution_payload();
let block_hash = payload.block_hash();
let slot = block.message().slot();
trace!(%slot, %block_hash, "returning payload");
tracing::trace!(%slot, %block_hash, "returning payload");
let version = payload.version();
let response = VersionedValue { version, data: auction_contents, meta: Default::default() };
Ok(Json(response))
Expand Down Expand Up @@ -86,14 +85,16 @@ impl<B: BlindedBlockProvider + Clone + Send + Sync + 'static> Server<B> {
}

/// Spawns the server on a new task returning the handle for it
pub fn spawn(&self) -> JoinHandle<()> {
pub fn spawn(&self) -> JoinHandle<Result<(), Error>> {
let server = self.serve();
let address = server.local_addr();
tokio::spawn(async move {
info!("listening at {address}...");
if let Err(err) = server.await {
error!(%err, "error while listening for incoming")
tracing::info!("listening at {address}...");
let result = server.await;
if let Err(ref err) = result {
tracing::error!(%err, "error while listening for incoming")
}
result.map_err(Error::Hyper)
})
}
}
4 changes: 4 additions & 0 deletions mev-rs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ pub enum BoostError {
MissingOpenBid,
#[error("could not register with any relay")]
CouldNotRegister,
#[error("current epoch fetch failure")]
EpochFetchFailure,
#[error("no payload returned for opened bid with block hash {0:?}")]
MissingPayload(Hash32),
}
Expand Down Expand Up @@ -68,6 +70,8 @@ pub enum Error {
Consensus(#[from] ConsensusError),
#[error(transparent)]
Api(#[from] ApiError),
#[error(transparent)]
Hyper(#[from] hyper::Error),
}

#[cfg(feature = "api")]
Expand Down