Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core/state): async trie prefetching #76

Merged
merged 9 commits into from
Nov 26, 2024
4 changes: 2 additions & 2 deletions core/state/statedb.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,13 @@ func New(root common.Hash, db Database, snaps *snapshot.Tree) (*StateDB, error)
// StartPrefetcher initializes a new trie prefetcher to pull in nodes from the
// state trie concurrently while the state is mutated so that when we reach the
// commit phase, most of the needed data is already hot.
func (s *StateDB) StartPrefetcher(namespace string) {
func (s *StateDB) StartPrefetcher(namespace string, opts ...PrefetcherOption) {
if s.prefetcher != nil {
s.prefetcher.close()
s.prefetcher = nil
}
if s.snap != nil {
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace)
s.prefetcher = newTriePrefetcher(s.db, s.originalRoot, namespace, opts...)
}
}

Expand Down
48 changes: 40 additions & 8 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync"

"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/libevm/options"
"github.com/ava-labs/libevm/log"
"github.com/ava-labs/libevm/metrics"
)
Expand Down Expand Up @@ -49,9 +50,11 @@ type triePrefetcher struct {
storageDupMeter metrics.Meter
storageSkipMeter metrics.Meter
storageWasteMeter metrics.Meter

options []PrefetcherOption
}

func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePrefetcher {
func newTriePrefetcher(db Database, root common.Hash, namespace string, opts ...PrefetcherOption) *triePrefetcher {
prefix := triePrefetchMetricsPrefix + namespace
p := &triePrefetcher{
db: db,
Expand All @@ -67,6 +70,8 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
storageDupMeter: metrics.GetOrRegisterMeter(prefix+"/storage/dup", nil),
storageSkipMeter: metrics.GetOrRegisterMeter(prefix+"/storage/skip", nil),
storageWasteMeter: metrics.GetOrRegisterMeter(prefix+"/storage/waste", nil),

options: opts,
}
return p
}
Expand Down Expand Up @@ -99,6 +104,7 @@ func (p *triePrefetcher) close() {
}
}
}
p.releaseWorkerPools()
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}
Expand All @@ -122,6 +128,8 @@ func (p *triePrefetcher) copy() *triePrefetcher {
storageDupMeter: p.storageDupMeter,
storageSkipMeter: p.storageSkipMeter,
storageWasteMeter: p.storageWasteMeter,

options: p.options,
}
// If the prefetcher is already a copy, duplicate the data
if p.fetches != nil {
Expand Down Expand Up @@ -150,7 +158,7 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, addr comm
id := p.trieID(owner, root)
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root, addr)
fetcher = newSubfetcher(p.db, p.root, owner, root, addr, p.options...)
p.fetchers[id] = fetcher
}
fetcher.schedule(keys)
Expand Down Expand Up @@ -226,11 +234,13 @@ type subfetcher struct {
seen map[string]struct{} // Tracks the entries already loaded
dups int // Number of duplicate preload tasks
used [][]byte // Tracks the entries used in the end

pool *subfetcherPool
}

// newSubfetcher creates a goroutine to prefetch state items belonging to a
// particular root hash.
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address) *subfetcher {
func newSubfetcher(db Database, state common.Hash, owner common.Hash, root common.Hash, addr common.Address, opts ...PrefetcherOption) *subfetcher {
sf := &subfetcher{
db: db,
state: state,
Expand All @@ -243,6 +253,7 @@ func newSubfetcher(db Database, state common.Hash, owner common.Hash, root commo
copy: make(chan chan Trie),
seen: make(map[string]struct{}),
}
options.As[prefetcherConfig](opts...).applyTo(sf)
go sf.loop()
return sf
}
Expand Down Expand Up @@ -294,7 +305,10 @@ func (sf *subfetcher) abort() {
// out of tasks or its underlying trie is retrieved for committing.
func (sf *subfetcher) loop() {
// No matter how the loop stops, signal anyone waiting that it's terminated
defer close(sf.term)
defer func() {
sf.pool.wait()
close(sf.term)
}()

// Start by opening the trie and stop processing if it fails
if sf.owner == (common.Hash{}) {
Expand Down Expand Up @@ -344,9 +358,9 @@ func (sf *subfetcher) loop() {
sf.dups++
} else {
if len(task) == common.AddressLength {
sf.trie.GetAccount(common.BytesToAddress(task))
sf.pool.GetAccount(common.BytesToAddress(task))
} else {
sf.trie.GetStorage(sf.addr, task)
sf.pool.GetStorage(sf.addr, task)
}
sf.seen[string(task)] = struct{}{}
}
Expand All @@ -358,8 +372,26 @@ func (sf *subfetcher) loop() {
ch <- sf.db.CopyTrie(sf.trie)

case <-sf.stop:
// Termination is requested, abort and leave remaining tasks
return
//libevm:start
//
// This is copied, with alteration, from ethereum/go-ethereum#29519
// and can be deleted once we update to include that change.

// Termination is requested, abort if no more tasks are pending. If
// there are some, exhaust them first.
sf.lock.Lock()
done := len(sf.tasks) == 0
sf.lock.Unlock()

if done {
return
}

select {
case sf.wake <- struct{}{}:
default:
}
//libevm:end
}
}
}
126 changes: 126 additions & 0 deletions core/state/trie_prefetcher.libevm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2024 the libevm authors.
//
// The libevm additions to go-ethereum are free software: you can redistribute
// them and/or modify them under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The libevm additions are distributed in the hope that they will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see
// <http://www.gnu.org/licenses/>.

package state

import (
"github.com/ava-labs/libevm/common"
"github.com/ava-labs/libevm/libevm/options"
"github.com/ava-labs/libevm/libevm/sync"
"github.com/ava-labs/libevm/log"
)

// A PrefetcherOption configures behaviour of trie prefetching.
type PrefetcherOption = options.Option[prefetcherConfig]

type prefetcherConfig struct {
newWorkers func() WorkerPool
}

// A WorkerPool executes functions asynchronously. Done() is called to signal
// that the pool is no longer needed and that Execute() is guaranteed to not be
// called again.
type WorkerPool interface {
Execute(func())
Done()
}

// WithWorkerPools configures trie prefetching to execute asynchronously. The
// provided constructor is called once for each trie being fetched but it MAY
// return the same pool.
func WithWorkerPools(ctor func() WorkerPool) PrefetcherOption {
return options.Func[prefetcherConfig](func(c *prefetcherConfig) {
c.newWorkers = ctor
})
}

type subfetcherPool struct {
workers WorkerPool
tries sync.Pool[Trie]
wg sync.WaitGroup
}

// applyTo configures the [subfetcher] to use a [WorkerPool] if one was provided
// with a [PrefetcherOption].
func (c *prefetcherConfig) applyTo(sf *subfetcher) {
sf.pool = &subfetcherPool{
tries: sync.Pool[Trie]{
// Although the workers may be shared between all subfetchers, each
// MUST have its own Trie pool.
New: func() Trie {
return sf.db.CopyTrie(sf.trie)
},
},
}
if c.newWorkers != nil {
sf.pool.workers = c.newWorkers()
}
}

// releaseWorkerPools calls Done() on all [WorkerPool]s. This MUST only be
// called after [subfetcher.abort] returns on ALL fetchers as a pool is allowed
// to be shared between them. This is because we guarantee in the public API
// that no further calls will be made to Execute() after a call to Done().
func (p *triePrefetcher) releaseWorkerPools() {
for _, f := range p.fetchers {
if w := f.pool.workers; w != nil {
w.Done()
}
}
}

func (p *subfetcherPool) wait() {
p.wg.Wait()
}

// execute runs the provided function with a copy of the subfetcher's Trie.
// Copies are stored in a [sync.Pool] to reduce creation overhead. If p was
// configured with a [WorkerPool] then it is used for function execution,
// otherwise `fn` is just called directly.
func (p *subfetcherPool) execute(fn func(Trie)) {
p.wg.Add(1)
do := func() {
t := p.tries.Get()
fn(t)
p.tries.Put(t)
p.wg.Done()
}

if w := p.workers; w != nil {
w.Execute(do)
} else {
do()
}
}

// GetAccount optimistically pre-fetches an account, dropping the returned value
// and logging errors. See [subfetcherPool.execute] re worker pools.
func (p *subfetcherPool) GetAccount(addr common.Address) {
p.execute(func(t Trie) {
if _, err := t.GetAccount(addr); err != nil {
log.Error("account prefetching failed", "address", addr, "err", err)
}
})
}

// GetStorage is the storage equivalent of [subfetcherPool.GetAccount].
func (p *subfetcherPool) GetStorage(addr common.Address, key []byte) {
p.execute(func(t Trie) {
if _, err := t.GetStorage(addr, key); err != nil {
log.Error("storage prefetching failed", "address", addr, "key", key, "err", err)
}
})
}
80 changes: 80 additions & 0 deletions core/state/trie_prefetcher.libevm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 the libevm authors.
//
// The libevm additions to go-ethereum are free software: you can redistribute
// them and/or modify them under the terms of the GNU Lesser General Public License
// as published by the Free Software Foundation, either version 3 of the License,
// or (at your option) any later version.
//
// The libevm additions are distributed in the hope that they will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser
// General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see
// <http://www.gnu.org/licenses/>.

package state

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/ava-labs/libevm/common"
)

type synchronisingWorkerPool struct {
t *testing.T
executed, unblock chan struct{}
done bool
preconditionsToStopPrefetcher int
}

var _ WorkerPool = (*synchronisingWorkerPool)(nil)

func (p *synchronisingWorkerPool) Execute(fn func()) {
fn()
select {
case <-p.executed:
default:
close(p.executed)
}

<-p.unblock
assert.False(p.t, p.done, "Done() called before Execute() returns")
p.preconditionsToStopPrefetcher++
}

func (p *synchronisingWorkerPool) Done() {
p.done = true
p.preconditionsToStopPrefetcher++
}

func TestStopPrefetcherWaitsOnWorkers(t *testing.T) {
pool := &synchronisingWorkerPool{
t: t,
executed: make(chan struct{}),
unblock: make(chan struct{}),
}
opt := WithWorkerPools(func() WorkerPool { return pool })

db := filledStateDB()
db.prefetcher = newTriePrefetcher(db.db, db.originalRoot, "", opt)
db.prefetcher.prefetch(common.Hash{}, common.Hash{}, common.Address{}, [][]byte{{}})

go func() {
<-pool.executed
// Sleep otherwise there is a small chance that we close pool.unblock
// between db.StopPrefetcher() returning and the assertion.
time.Sleep(time.Second)
close(pool.unblock)
}()

<-pool.executed
db.StopPrefetcher()
// If this errors then either Execute() hadn't returned or Done() wasn't
// called.
assert.Equalf(t, 2, pool.preconditionsToStopPrefetcher, "%T.StopPrefetcher() returned early", db)
}
Loading