Skip to content

Commit

Permalink
Remove more stopping machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
piersy committed Sep 17, 2024
1 parent 90dbc6c commit e6dd516
Show file tree
Hide file tree
Showing 6 changed files with 16 additions and 43 deletions.
6 changes: 0 additions & 6 deletions consensus/istanbul/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,12 +554,6 @@ func (sb *Backend) Commit(proposal istanbul.Proposal, aggregatedSeal types.Istan
}
}
sb.onNewConsensusBlock(block, result.Receipts, result.Logs, result.State)

nextBlockNum := new(big.Int).Add(block.Number(), big.NewInt(1))
if sb.chain.Config().IsL2Migration(nextBlockNum) {
sb.logger.Info("The next block is the L2 migration block, stopping announce protocol and closing istanbul backend", "currentBlock", block.NumberU64(), "hash", block.Hash(), "nextBlock", nextBlockNum)
sb.StopAnnouncing()
}
return nil
}

Expand Down
5 changes: 1 addition & 4 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,7 @@ func (bc *BlockChain) Stop() {
triedb := bc.stateCache.TrieDB()
triedb.SaveCache(bc.cacheConfig.TrieCleanJournal)
}
log.Info("Blockchain stopped", "number", bc.CurrentBlock().NumberU64(), "hash", bc.CurrentBlock().Hash())
log.Info("Blockchain stopped")
}

// StopInsert interrupts all insertion methods, causing them to return
Expand Down Expand Up @@ -1870,9 +1870,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals bool) (int, er
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
if bc.Config().IsL2Migration(block.Number()) {
err = errInsertionInterrupted
}
break
}
// If the header is a banned one, straight out abort
Expand Down
7 changes: 3 additions & 4 deletions eth/fetcher/block_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ var (
bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/block/filter/bodies/out", nil)
)

// ErrTerminated indicates that the fetcher's peer connection has been terminated.
var ErrTerminated = errors.New("terminated")
var errTerminated = errors.New("terminated")

// HeaderRetrievalFn is a callback type for retrieving a header from the local chain.
type HeaderRetrievalFn func(common.Hash) *types.Header
Expand Down Expand Up @@ -255,7 +254,7 @@ func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time
case f.notify <- block:
return nil
case <-f.quit:
return ErrTerminated
return errTerminated
}
}

Expand All @@ -269,7 +268,7 @@ func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
case f.inject <- op:
return nil
case <-f.quit:
return ErrTerminated
return errTerminated
}
}

Expand Down
6 changes: 3 additions & 3 deletions eth/fetcher/tx_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error {
case f.notify <- announce:
return nil
case <-f.quit:
return ErrTerminated
return errTerminated
}
}

Expand Down Expand Up @@ -316,7 +316,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool)
case f.cleanup <- &txDelivery{origin: peer, hashes: added, direct: direct}:
return nil
case <-f.quit:
return ErrTerminated
return errTerminated
}
}

Expand All @@ -327,7 +327,7 @@ func (f *TxFetcher) Drop(peer string) error {
case f.drop <- &txDrop{peer: peer}:
return nil
case <-f.quit:
return ErrTerminated
return errTerminated
}
}

Expand Down
4 changes: 1 addition & 3 deletions eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,7 @@ func (h *handler) unregisterPeer(id string) *ethPeer {
log.Error("Peer removal from downloader failed", "peer", id, "err", err)
}
if err := h.txFetcher.Drop(id); err != nil {
if !errors.Is(err, fetcher.ErrTerminated) {
log.Error("Peer removal from tx fetcher failed", "peer", id, "err", err)
}
log.Error("Peer removal from tx fetcher failed", "peer", id, "err", err)
}
if handler, ok := h.chain.Engine().(consensus.Handler); ok {
handler.UnregisterPeer(peer, peer.Peer.Server == h.proxyServer)
Expand Down
31 changes: 8 additions & 23 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package miner

import (
"context"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/celo-org/celo-blockchain/common"
"github.com/celo-org/celo-blockchain/consensus"
istanbulBackend "github.com/celo-org/celo-blockchain/consensus/istanbul/backend"
"github.com/celo-org/celo-blockchain/core"
"github.com/celo-org/celo-blockchain/core/state"
"github.com/celo-org/celo-blockchain/core/types"
Expand Down Expand Up @@ -241,12 +239,10 @@ func (w *worker) start() {
func (w *worker) stop() {
atomic.StoreInt32(&w.running, 0)

if istanbul, ok := w.engine.(*istanbulBackend.Backend); ok {
if istanbul.IsValidating() {
err := istanbul.StopValidating()
if err != nil {
log.Error("Error while calling engine.StopValidating", "err", err)
}
if istanbul, ok := w.engine.(consensus.Istanbul); ok {
err := istanbul.StopValidating()
if err != nil {
log.Error("Error while calling engine.StopValidating", "err", err)
}
}
}
Expand Down Expand Up @@ -396,19 +392,10 @@ func (w *worker) mainLoop() {
defer wg.Wait()
txsCh := make(chan core.NewTxsEvent, txChanSize)

generateNewBlock := func(nextBlockNum *big.Int) {
generateNewBlock := func() {
if cancel != nil {
cancel()
}

if w.chainConfig.IsL2Migration(nextBlockNum) {
if w.isRunning() {
log.Info("The next block is the L2 migration block, stopping block construction", "currentBlock", w.chain.CurrentBlock().NumberU64(), "hash", w.chain.CurrentBlock().Hash(), "nextBlock", nextBlockNum.Uint64())
w.stop()
}
return
}

wg.Wait()
taskCtx, cancel = context.WithCancel(context.Background())
wg.Add(1)
Expand All @@ -434,12 +421,10 @@ func (w *worker) mainLoop() {
for {
select {
case <-w.startCh:
nextBlockNum := new(big.Int).Add(w.chain.CurrentBlock().Number(), big.NewInt(1))
generateNewBlock(nextBlockNum)
generateNewBlock()

case ev := <-w.chainHeadCh:
nextBlockNum := new(big.Int).Add(ev.Block.Number(), big.NewInt(1))
generateNewBlock(nextBlockNum)
case <-w.chainHeadCh:
generateNewBlock()

case ev := <-w.txsCh:
// Drain tx sub channel as a validator,
Expand Down

0 comments on commit e6dd516

Please sign in to comment.