-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(services): reworked Madara services for better cancellation control #405
Conversation
09de91d
to
c2c7655
Compare
README.md
Outdated
| `madara_gatewayDisable` | Disables the feeder gateway | | ||
| `madara_gatewayEnable` | Enables the feeder gateway | | ||
| `madara_gatewayRestart` | Restarts the feeder gateway | | ||
| `madara_telemetryDisable` | Disables node telemetry | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this used for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are supposed to be endpoint node admin can use in case they need to disable a specific service, for example for maintenance reasons. I am going to refactor this into a single endpoint though, as per @jbcaron's suggestion
README.md
Outdated
> database. | ||
### Warp Update | ||
|
||
Warp update requires an already synchronized _local_ node with a working |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we may want to be a bit clearer that this is the source database that we're setting up here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, will add some clarifying information!
--full `# This also works with other types of nodes` \ | ||
--network mainnet \ | ||
--warp-update-sender \ | ||
--l1-sync-disabled `# We disable sync, for testing purposes` \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"replace this argument with your --l1-endpoint parameter"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will add more info about this. The goal for this demonstration though is to stop on sync though, else the sender node would keep synchronizing and the update would take forever
README.md
Outdated
Suppose your are an RPC service provider and your node is also running an RPC | ||
server and exposing it to your clients: if you have to shut it down or restart | ||
it for the duration of a migration this will result in downtime for your service | ||
and added complexity in setting up redundancies. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well you can just resync a new node, i think it makes more sense to talk about sequencers here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea fair, though this is still a bit faster than that due to being on a local network. It would also be quite easy to make this a lot faster "in the future ™️ " by computing the state root once the migration has completed, instead of at each block which is what we do on a normal sync. Something to keep in mind.
pub struct TelemetryEvent { | ||
verbosity: VerbosityLevel, | ||
message: serde_json::Value, | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub struct TelemetryHandle(Option<Arc<mpsc::Sender<TelemetryEvent>>>); | ||
#[repr(transparent)] | ||
pub struct TelemetryHandle(tokio::sync::broadcast::Sender<TelemetryEvent>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah! why the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So! We need services to be able to be potentially restarted. This does not work if we store TelemetryHandle
as an Option
and just call std::mem::take
on it. Similarly, we cannot use a tokio::sync::mpsc
as we need to be able to re-create the receiver on each service start (the service future takes ownership of it).
use tokio::task::JoinSet; | ||
|
||
pub const SERVICE_COUNT: usize = 8; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this shouldnt be here btw, it should be in main.rs
if an appchain wants to add a service by importing all of our crates and just have their own main.rs which starts and hook services differently (/ using other services), this wont work for them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well they'll have bigger problems due to the limit in size of using an std::sync::atomic::AtomicU8
. I assume you mean having the service id behind a trait so appchains can create their own services?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, this was a bit of a refactor but MadaraServiceId
is now only a blanket implementation of Serviceid
for our services, with PowerOfTwo
being the actual backing type used to identify services. Appchains can create and register their own services by implementing the Service
and ServiceId
traits on their own structs, and they are free to use any service id enum they want, all relevant methods now only require an impl ServiceId
.
As for SERVICE_COUNT
, it has been renamed to SERVICE_COUNT_MAX
to better reflect what it actually represents and has been increased to 64.
// been canceled or this service was deactivated | ||
let res = tokio::select! { | ||
svc = rx.recv() => svc.ok(), | ||
_ = token_global.cancelled() => break, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why isnt token global a parent of token local? you shouldnt have to await both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to await both in case the token_local
is cancelled but the token_global
is not.
/// | ||
/// Used as part of [ServiceContext::service_subscribe]. | ||
#[derive(Clone, Copy)] | ||
pub struct ServiceTransport { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is this called transport?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a name I came up with.
while !ctx.is_cancelled() { | ||
interval.tick().await; | ||
gas_price_worker_once(eth_client, &l1_gas_provider, gas_price_poll_ms).await?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, cancellation is only checked before waiting for the interval tick. If ctx.is_cancelled()
becomes true during interval.tick().await
, the worker cannot be canceled immediately, potentially delaying the shutdown. It would be better to modify this code to allow cancellation even during the tick wait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, will be adding that!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed!
listen_and_update_state(eth_client, backend, ð_client.l1_block_metrics, chain_id, ctx) | ||
let event_filter = eth_client.l1_core_contract.event_filter::<StarknetCoreContract::LogStateUpdate>(); | ||
|
||
let mut event_stream = event_filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed.
crates/client/eth/src/sync.rs
Outdated
if !gas_price_sync_disabled { | ||
tokio::try_join!( | ||
state_update_worker(&backend, ð_client, ctx.clone()), | ||
gas_price_worker(ð_client, l1_gas_provider, gas_price_poll_ms, ctx.clone()), | ||
sync(&backend, ð_client, &chain_id, mempool, ctx.clone()) | ||
)?; | ||
} else { | ||
tokio::try_join!( | ||
state_update_worker(&backend, ð_client, ctx.clone()), | ||
sync(&backend, ð_client, &chain_id, mempool, ctx.clone()) | ||
)?; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if !gas_price_sync_disabled { | |
tokio::try_join!( | |
state_update_worker(&backend, ð_client, ctx.clone()), | |
gas_price_worker(ð_client, l1_gas_provider, gas_price_poll_ms, ctx.clone()), | |
sync(&backend, ð_client, &chain_id, mempool, ctx.clone()) | |
)?; | |
} else { | |
tokio::try_join!( | |
state_update_worker(&backend, ð_client, ctx.clone()), | |
sync(&backend, ð_client, &chain_id, mempool, ctx.clone()) | |
)?; | |
} | |
let mut join_set = JoinSet::new(); | |
join_set.spawn(state_update_worker(&backend, ð_client, ctx.clone())); | |
join_set.spawn(sync(&backend, ð_client, &chain_id, mempool, ctx.clone())); | |
if !gas_price_sync_disabled { | |
join_set.spawn(gas_price_worker(ð_client, l1_gas_provider, gas_price_poll_ms, ctx.clone())); | |
} | |
while let Some(result) = join_set.join_next().await { | |
result??; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed.
crates/client/sync/src/fetch/mod.rs
Outdated
@@ -105,17 +107,27 @@ pub async fn l2_fetch_task( | |||
|
|||
let mut interval = tokio::time::interval(sync_polling_interval); | |||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | |||
while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() { | |||
while !ctx.is_cancelled() { | |||
interval.tick().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has been fixed.
crates/client/sync/src/l2.rs
Outdated
@@ -190,17 +197,20 @@ async fn l2_pending_block_task( | |||
|
|||
let mut interval = tokio::time::interval(pending_block_poll_interval); | |||
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); | |||
while wait_or_graceful_shutdown(interval.tick(), &ctx).await.is_some() { | |||
while !ctx.is_cancelled() { | |||
interval.tick().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been fixed.
This allows the implementation of new services by appchains simply by implementing the new `ServicId` trait (in adition to the existing `Service`) trait.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could have resolved #387 at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
works extremely fast locally
9d71153
to
47be1c5
Compare
780776f
to
34a51f5
Compare
34a51f5
to
780776f
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay
Pull Request type
Please add the labels corresponding to the type of changes your PR introduces:
What is the current behavior?
Current Madara service architecture is limited and only allows us to start services on node startup. This is especially a problem if we wish to achieve zero downtime on warp updates.
What is the new behavior?
Significantly overhauls the madara service architecture. Not much will be said in this pr as this has already been extensively documented in service.rs. You can also find more information about how this impacts warp update in the Readme.
Some services also had to be updated to allow them to be compatible with the new architecture. This mostly meant supporting service restarts. You can check out the RpcService for example as it has been quite significantly refactored.
Does this introduce a breaking change?
Yes. Some changes were made to cli args for the sake of cleanup:
--sync-disabled
is now called--l1-sync-disabled
to more accurately reflect its function.Other information
I am not sure how much I like my handling of SERVICE_GRACE_PERIOD. On the one hand this is handy to avoid node stalls in case of mistakes on our behalf and guarantee
SIGINT
andSIGTERM
always work, on the other though it can still lead to forceful cancellations (this is by design). Let me know if you think this trade is worth it.