Skip to content

Commit

Permalink
feat: pregenesis syncronization allows to be stopped and continue in …
Browse files Browse the repository at this point in the history
…next run
  • Loading branch information
joanestebanr committed Aug 22, 2024
1 parent 9aa12d6 commit d59f092
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 86 deletions.
44 changes: 44 additions & 0 deletions etherman/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ import (
"golang.org/x/crypto/sha3"
)

const (
// ETrogUpgradeVersion is the version of the LxLy upgrade
ETrogUpgradeVersion = 2
)

var (
// Events RollupManager
setBatchFeeSignatureHash = crypto.Keccak256Hash([]byte("SetBatchFee(uint256)"))
Expand Down Expand Up @@ -361,6 +366,26 @@ func (etherMan *Client) VerifyGenBlockNumber(ctx context.Context, genBlockNumber
return true, nil
}

// GetL1BlockUpgradeLxLy It returns the block genesis for LxLy before genesisBlock or error
func (etherMan *Client) GetL1BlockUpgradeLxLy(ctx context.Context, genesisBlock uint64) (uint64, error) {
it, err := etherMan.RollupManager.FilterInitialized(&bind.FilterOpts{
Start: 1,
End: &genesisBlock,
Context: ctx,
})
if err != nil {
return uint64(0), err
}
for it.Next() {
log.Debugf("BlockNumber: %d Topics:Initialized(%d)", it.Event.Raw.BlockNumber, it.Event.Version)
if it.Event.Version == ETrogUpgradeVersion { // 2 is ETROG (LxLy upgrade)
log.Infof("LxLy upgrade found at blockNumber: %d", it.Event.Raw.BlockNumber)
return it.Event.Raw.BlockNumber, nil
}
}
return uint64(0), ErrNotFound
}

// GetForks returns fork information
func (etherMan *Client) GetForks(ctx context.Context, genBlockNumber uint64, lastL1BlockSynced uint64) ([]state.ForkIDInterval, error) {
log.Debug("Getting forkIDs from blockNumber: ", genBlockNumber)
Expand Down Expand Up @@ -497,6 +522,25 @@ func (etherMan *Client) GetRollupInfoByBlockRange(ctx context.Context, fromBlock
return blocks, blocksOrder, nil
}

// GetRollupInfoByBlockRangePreviousRollupGenesis function retrieves the Rollup information that are included in all this ethereum blocks
// but it only retrieves the information from the previous rollup genesis block to the current block.
func (etherMan *Client) GetRollupInfoByBlockRangePreviousRollupGenesis(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]Block, map[common.Hash][]Order, error) {
// Filter query
query := ethereum.FilterQuery{
FromBlock: new(big.Int).SetUint64(fromBlock),
Addresses: []common.Address{etherMan.l1Cfg.GlobalExitRootManagerAddr},
Topics: [][]common.Hash{{updateL1InfoTreeSignatureHash}},
}
if toBlock != nil {
query.ToBlock = new(big.Int).SetUint64(*toBlock)
}
blocks, blocksOrder, err := etherMan.readEvents(ctx, query)
if err != nil {
return nil, nil, err
}
return blocks, blocksOrder, nil
}

// Order contains the event order to let the synchronizer store the information following this order.
type Order struct {
Name EventOrder
Expand Down
8 changes: 7 additions & 1 deletion synchronizer/common/syncinterfaces/etherman.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ type EthermanFullInterface interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*ethTypes.Header, error)
GetRollupInfoByBlockRange(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error)
EthBlockByNumber(ctx context.Context, blockNumber uint64) (*ethTypes.Block, error)
GetLatestBatchNumber() (uint64, error)
GetTrustedSequencerURL() (string, error)
VerifyGenBlockNumber(ctx context.Context, genBlockNumber uint64) (bool, error)
GetLatestVerifiedBatchNum() (uint64, error)
EthermanGetLatestBatchNumber
EthermanPreRollup
}

type EthermanGetLatestBatchNumber interface {
GetLatestBatchNumber() (uint64, error)
}

type EthermanPreRollup interface {
GetL1BlockUpgradeLxLy(ctx context.Context, genesisBlock uint64) (uint64, error)
GetRollupInfoByBlockRangePreviousRollupGenesis(ctx context.Context, fromBlock uint64, toBlock *uint64) ([]etherman.Block, map[common.Hash][]etherman.Order, error)
}
252 changes: 167 additions & 85 deletions synchronizer/synchronizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,104 +241,186 @@ func rollback(ctx context.Context, dbTx pgx.Tx, err error) error {
return err
}

func (s *ClientSynchronizer) isGenesisProcessed(ctx context.Context, dbTx pgx.Tx) (bool, *state.Block, error) {
lastEthBlockSynced, err := s.state.GetLastBlock(ctx, dbTx)
if err != nil && errors.Is(err, state.ErrStateNotSynchronized) {
return false, lastEthBlockSynced, nil
}

if lastEthBlockSynced.BlockNumber >= s.genesis.RollupBlockNumber {
log.Infof("Genesis block processed. Last block synced: %d >= genesis %d", lastEthBlockSynced.BlockNumber, s.genesis.RollupBlockNumber)
return true, lastEthBlockSynced, nil
}
log.Warnf("Genesis block not processed yet. Last block synced: %d < genesis %d", lastEthBlockSynced.BlockNumber, s.genesis.RollupBlockNumber)
return false, lastEthBlockSynced, nil
}

// getStartingL1Block find if need to update and if yes the starting point:
// bool -> need to process blocks
// uint64 -> first block to synchronize
// error -> error
// 1. First try to get last block on DB, if there are could be fully synced or pending blocks
// 2. If DB is empty the LxLy upgrade block as starting point
func (s *ClientSynchronizer) getStartingL1Block(ctx context.Context, genesisBlockNumber, rollupManagerBlockNumber uint64, dbTx pgx.Tx) (bool, uint64, error) {
lastBlock, err := s.state.GetLastBlock(ctx, dbTx)
if err != nil && errors.Is(err, state.ErrStateNotSynchronized) {
// No block on DB
upgradeLxLyBlockNumber := rollupManagerBlockNumber
if upgradeLxLyBlockNumber == 0 {
upgradeLxLyBlockNumber, err = s.etherMan.GetL1BlockUpgradeLxLy(ctx, genesisBlockNumber)
if err != nil && errors.Is(err, etherman.ErrNotFound) {
log.Infof("sync pregenesis: LxLy upgrade not detected before genesis block %d, it'll be sync as usual. Nothing to do yet", genesisBlockNumber)
return false, 0, nil
} else if err != nil {
log.Errorf("sync pregenesis: error getting LxLy upgrade block. Error: %v", err)
return false, 0, err
}
}
if rollupManagerBlockNumber >= genesisBlockNumber {
log.Infof("sync pregenesis: rollupManagerBlockNumber>=genesisBlockNumber (%d>=%d). Nothing in pregenesis", rollupManagerBlockNumber, genesisBlockNumber)
return false, 0, nil
}
log.Infof("sync pregenesis: No block on DB, starting from LxLy upgrade block (rollupManagerBlockNumber) %d", upgradeLxLyBlockNumber)
return true, upgradeLxLyBlockNumber, nil
} else if err != nil {
log.Errorf("Error getting last Block on DB err:%v", err)
return false, 0, err
}
if lastBlock.BlockNumber >= genesisBlockNumber-1 {
log.Warnf("sync pregenesis: Last block processed is %d, which is greater or equal than the previous genesis block %d", lastBlock, genesisBlockNumber)
return false, 0, nil
}
log.Infof("sync pregenesis: Continue processing pre-genesis blocks, last block processed on DB is %d", lastBlock.BlockNumber)
return true, lastBlock.BlockNumber, nil
}

func (s *ClientSynchronizer) synchronizePreGenesisRollupEvents(ctx context.Context) error {
// Sync events from RollupManager that happen before rollup creation
startTime := time.Now()
log.Info("synchronizing events from RollupManager that happen before rollup creation")
needToUpdate, fromBlock, err := s.getStartingL1Block(ctx, s.genesis.RollupBlockNumber, s.genesis.RollupManagerBlockNumber, nil)
if err != nil {
log.Errorf("sync pregenesis: error getting starting L1 block. Error: %v", err)
return err
}
if !needToUpdate {
log.Infof("sync pregenesis: No need to process blocks before the genesis block %d", s.genesis.RollupBlockNumber)
return nil
}
toBlockFinal := s.genesis.RollupBlockNumber - 1
log.Infof("sync pregenesis: starting syncing pre genesis LxLy events from block %d to block %d (total %d blocks)",
fromBlock, toBlockFinal, toBlockFinal-fromBlock+1)
for i := fromBlock; true; i += s.cfg.SyncChunkSize {
toBlock := min(i+s.cfg.SyncChunkSize-1, toBlockFinal)
blocks, order, err := s.etherMan.GetRollupInfoByBlockRangePreviousRollupGenesis(s.ctx, i, &toBlock)
if err != nil {
log.Error("sync pregenesis: error getting rollupInfoByBlockRange before rollup genesis: ", err)
return err
}
err = s.ProcessBlockRange(blocks, order)
if err != nil {
log.Error("sync pregenesis: error processing blocks before the genesis: ", err)
return err
}
if toBlock == toBlockFinal {
break
}
}
elapsedTime := time.Since(startTime)
log.Infof("sync pregenesis: sync L1InfoTree finish: from %d to %d total_block %d done in %s", fromBlock, toBlockFinal, toBlockFinal-fromBlock+1, &elapsedTime)
return nil
}

func (s *ClientSynchronizer) processGenesis() (*state.Block, error) {
log.Info("State is empty, verifying genesis block")
valid, err := s.etherMan.VerifyGenBlockNumber(s.ctx, s.genesis.RollupBlockNumber)
if err != nil {
log.Error("error checking genesis block number. Error: ", err)
return nil, err
} else if !valid {
log.Error("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed")
return nil, fmt.Errorf("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed")
}
//TODO: ???
err = s.synchronizePreGenesisRollupEvents(s.ctx)
if err != nil {
log.Error("error synchronizing pre genesis events: ", err)
return nil, err
}

header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(0).SetUint64(s.genesis.RollupBlockNumber))
if err != nil {
log.Errorf("error getting l1 block header for block %d. Error: %v", s.genesis.RollupBlockNumber, err)
return nil, err
}
log.Info("synchronizing rollup creation block")
lastEthBlockSynced := &state.Block{
BlockNumber: header.Number.Uint64(),
BlockHash: header.Hash(),
ParentHash: header.ParentHash,
ReceivedAt: time.Unix(int64(header.Time), 0),
}
dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to get latest block. Error: %v", err)
return nil, err
}
genesisRoot, err := s.state.SetGenesis(s.ctx, *lastEthBlockSynced, s.genesis, stateMetrics.SynchronizerCallerLabel, dbTx)
if err != nil {
log.Error("error setting genesis: ", err)
return nil, rollback(s.ctx, dbTx, err)
}
err = s.RequestAndProcessRollupGenesisBlock(dbTx, lastEthBlockSynced)
if err != nil {
log.Error("error processing Rollup genesis block: ", err)
return nil, rollback(s.ctx, dbTx, err)
}

if genesisRoot != s.genesis.Root {
log.Errorf("Calculated newRoot should be %s instead of %s", s.genesis.Root.String(), genesisRoot.String())
return nil, rollback(s.ctx, dbTx, fmt.Errorf("calculated newRoot should be %s instead of %s", s.genesis.Root.String(), genesisRoot.String()))
}
// Waiting for the flushID to be stored
err = s.checkFlushID(dbTx)
if err != nil {
log.Error("error checking genesis flushID: ", err)
return nil, rollback(s.ctx, dbTx, err)
}
if err := dbTx.Commit(s.ctx); err != nil {
log.Errorf("error genesis committing dbTx, err: %v", err)
return nil, rollback(s.ctx, dbTx, err)
}
log.Info("Genesis root matches! Stored genesis blocks.")
return lastEthBlockSynced, nil
}

// Sync function will read the last state synced and will continue from that point.
// Sync() will read blockchain events to detect rollup updates
func (s *ClientSynchronizer) Sync() error {
startInitialization := time.Now()
// If there is no lastEthereumBlock means that sync from the beginning is necessary. If not, it continues from the retrieved ethereum block
// Get the latest synced block. If there is no block on db, use genesis block
log.Info("Sync started")
dbTx, err := s.state.BeginStateTransaction(s.ctx)

genesisDone, lastEthBlockSynced, err := s.isGenesisProcessed(s.ctx, nil)
if err != nil {
log.Errorf("error creating db transaction to get latest block. Error: %v", err)
log.Errorf("error checking if genesis is processed. Error: %v", err)
return err
}
lastEthBlockSynced, err := s.state.GetLastBlock(s.ctx, dbTx)
if err != nil {
if errors.Is(err, state.ErrStateNotSynchronized) {
log.Info("State is empty, verifying genesis block")
valid, err := s.etherMan.VerifyGenBlockNumber(s.ctx, s.genesis.RollupBlockNumber)
if err != nil {
log.Error("error checking genesis block number. Error: ", err)
return rollback(s.ctx, dbTx, err)
} else if !valid {
log.Error("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed")
return rollback(s.ctx, dbTx, fmt.Errorf("genesis Block number configured is not valid. It is required the block number where the PolygonZkEVM smc was deployed"))
}

// Sync events from RollupManager that happen before rollup creation
log.Info("synchronizing events from RollupManager that happen before rollup creation")
for i := s.genesis.RollupManagerBlockNumber; true; i += s.cfg.SyncChunkSize {
toBlock := min(i+s.cfg.SyncChunkSize-1, s.genesis.RollupBlockNumber-1)
blocks, order, err := s.etherMan.GetRollupInfoByBlockRange(s.ctx, i, &toBlock)
if err != nil {
log.Error("error getting rollupInfoByBlockRange before rollup genesis: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error())
return rollbackErr
}
return err
}
err = s.ProcessBlockRange(blocks, order)
if err != nil {
log.Error("error processing blocks before the genesis: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error())
return rollbackErr
}
return err
}
if toBlock == s.genesis.RollupBlockNumber-1 {
break
}
}

header, err := s.etherMan.HeaderByNumber(s.ctx, big.NewInt(0).SetUint64(s.genesis.RollupBlockNumber))
if err != nil {
log.Errorf("error getting l1 block header for block %d. Error: %v", s.genesis.RollupBlockNumber, err)
return rollback(s.ctx, dbTx, err)
}
log.Info("synchronizing rollup creation block")
lastEthBlockSynced = &state.Block{
BlockNumber: header.Number.Uint64(),
BlockHash: header.Hash(),
ParentHash: header.ParentHash,
ReceivedAt: time.Unix(int64(header.Time), 0),
}
genesisRoot, err := s.state.SetGenesis(s.ctx, *lastEthBlockSynced, s.genesis, stateMetrics.SynchronizerCallerLabel, dbTx)
if err != nil {
log.Error("error setting genesis: ", err)
return rollback(s.ctx, dbTx, err)
}
err = s.RequestAndProcessRollupGenesisBlock(dbTx, lastEthBlockSynced)
if err != nil {
log.Error("error processing Rollup genesis block: ", err)
return rollback(s.ctx, dbTx, err)
}

if genesisRoot != s.genesis.Root {
log.Errorf("Calculated newRoot should be %s instead of %s", s.genesis.Root.String(), genesisRoot.String())
return rollback(s.ctx, dbTx, fmt.Errorf("calculated newRoot should be %s instead of %s", s.genesis.Root.String(), genesisRoot.String()))
}
// Waiting for the flushID to be stored
err = s.checkFlushID(dbTx)
if err != nil {
log.Error("error checking genesis flushID: ", err)
return rollback(s.ctx, dbTx, err)
}
log.Debug("Genesis root matches!")
} else {
log.Error("unexpected error getting the latest ethereum block. Error: ", err)
rollbackErr := dbTx.Rollback(s.ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. RollbackErr: %v, err: %s", rollbackErr, err.Error())
return rollbackErr
}
if !genesisDone {
lastEthBlockSynced, err = s.processGenesis()
if err != nil {
log.Errorf("error processing genesis. Error: %v", err)
return err
}
}

dbTx, err := s.state.BeginStateTransaction(s.ctx)
if err != nil {
log.Errorf("error creating db transaction to get latest block. Error: %v", err)
return err
}

initBatchNumber, err := s.state.GetLastBatchNumber(s.ctx, dbTx)
if err != nil {
log.Error("error getting latest batchNumber synced. Error: ", err)
Expand Down

0 comments on commit d59f092

Please sign in to comment.