Skip to content

Commit

Permalink
Activate peerDAS at electra. (#14734)
Browse files Browse the repository at this point in the history
  • Loading branch information
nalepae authored Dec 27, 2024
1 parent f882bd2 commit 859ac00
Show file tree
Hide file tree
Showing 32 changed files with 513 additions and 406 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/core/time/slot_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func HigherEqualThanAltairVersionAndEpoch(s state.BeaconState, e primitives.Epoc

// PeerDASIsActive checks whether peerDAS is active at the provided slot.
func PeerDASIsActive(slot primitives.Slot) bool {
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().Eip7594ForkEpoch
return params.PeerDASEnabled() && slots.ToEpoch(slot) >= params.BeaconConfig().ElectraForkEpoch
}

// CanUpgradeToAltair returns true if the input `slot` can upgrade to Altair.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/das/availability_columns.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(
// ignore their response and decrease their peer score.
roDataColumns, err := entry.filterColumns(blockRoot, blockCommitments)
if err != nil {
return errors.Wrap(err, "incomplete BlobSidecar batch")
return errors.Wrap(err, "incomplete DataColumnSidecar batch")
}

// Create verified RO data columns from RO data columns.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/das/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitme
commitmentsCount := commitmentsArray.count()
sidecars := make([]blocks.RODataColumn, 0, commitmentsCount)

for i := uint64(0); i < fieldparams.NumberOfColumns; i++ {
// Skip if we arleady store this data column.
for i := range uint64(fieldparams.NumberOfColumns) {
// Skip if we already store this data column.
if e.diskSummary.HasIndex(i) {
continue
}
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/p2p/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,10 @@ func (s *Service) RefreshPersistentSubnets() {

isBitSUpToDate := bytes.Equal(bitS, inRecordBitS) && bytes.Equal(bitS, currentBitSInMetadata)

// Compare current epoch with EIP-7594 fork epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
// Compare current epoch with the Electra fork epoch.
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch

if currentEpoch < eip7594ForkEpoch {
if currentEpoch < electraForkEpoch {
// Altair behaviour.
if metadataVersion == version.Altair && isBitVUpToDate && isBitSUpToDate {
// Nothing to do, return early.
Expand Down
8 changes: 4 additions & 4 deletions beacon-chain/p2p/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestStartDiscV5_DiscoverAllPeers(t *testing.T) {
func TestCreateLocalNode(t *testing.T) {
params.SetupTestConfigCleanup(t)
cfg := params.BeaconConfig()
cfg.Eip7594ForkEpoch = 1
cfg.ElectraForkEpoch = 1
params.OverrideBeaconConfig(cfg)
testCases := []struct {
name string
Expand Down Expand Up @@ -626,7 +626,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {

const (
altairForkEpoch = 5
eip7594ForkEpoch = 10
electraForkEpoch = 10
)

custodySubnetCount := params.BeaconConfig().CustodyRequirement
Expand All @@ -635,7 +635,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
defaultCfg := params.BeaconConfig()
cfg := defaultCfg.Copy()
cfg.AltairForkEpoch = altairForkEpoch
cfg.Eip7594ForkEpoch = eip7594ForkEpoch
cfg.ElectraForkEpoch = electraForkEpoch
params.OverrideBeaconConfig(cfg)

// Compute the number of seconds per epoch.
Expand Down Expand Up @@ -706,7 +706,7 @@ func TestRefreshPersistentSubnets(t *testing.T) {
},
{
name: "PeerDAS",
epochSinceGenesis: eip7594ForkEpoch,
epochSinceGenesis: electraForkEpoch,
checks: []check{
{
pingCount: 0,
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/p2p/rpc_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func TopicFromMessage(msg string, epoch primitives.Epoch) (string, error) {
}

// Check if the message is to be updated in peerDAS.
isPeerDAS := epoch >= params.BeaconConfig().Eip7594ForkEpoch
isPeerDAS := epoch >= params.BeaconConfig().ElectraForkEpoch
if isPeerDAS && peerDASMapping[msg] {
version = SchemaVersionV3
}
Expand Down
5 changes: 1 addition & 4 deletions beacon-chain/rpc/eth/config/handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func TestGetSpec(t *testing.T) {
config.DenebForkEpoch = 105
config.ElectraForkVersion = []byte("ElectraForkVersion")
config.ElectraForkEpoch = 107
config.Eip7594ForkEpoch = 109
config.BLSWithdrawalPrefixByte = byte('b')
config.ETH1AddressWithdrawalPrefixByte = byte('c')
config.GenesisDelay = 24
Expand Down Expand Up @@ -190,7 +189,7 @@ func TestGetSpec(t *testing.T) {
data, ok := resp.Data.(map[string]interface{})
require.Equal(t, true, ok)

assert.Equal(t, 156, len(data))
assert.Equal(t, 155, len(data))
for k, v := range data {
t.Run(k, func(t *testing.T) {
switch k {
Expand Down Expand Up @@ -268,8 +267,6 @@ func TestGetSpec(t *testing.T) {
assert.Equal(t, "0x"+hex.EncodeToString([]byte("ElectraForkVersion")), v)
case "ELECTRA_FORK_EPOCH":
assert.Equal(t, "107", v)
case "EIP7594_FORK_EPOCH":
assert.Equal(t, "109", v)
case "MIN_ANCHOR_POW_BLOCK_DIFFICULTY":
assert.Equal(t, "1000", v)
case "BLS_WITHDRAWAL_PREFIX":
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/rpc/lookup/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,12 +471,12 @@ func (p *BeaconDbBlocker) Blobs(ctx context.Context, id string, indices map[uint
blockSlot := b.Block().Slot()

// Get the first peerDAS epoch.
eip7594ForkEpoch := params.BeaconConfig().Eip7594ForkEpoch
electraForkEpoch := params.BeaconConfig().ElectraForkEpoch

// Compute the first peerDAS slot.
peerDASStartSlot := primitives.Slot(math.MaxUint64)
if eip7594ForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(eip7594ForkEpoch)
if electraForkEpoch != primitives.Epoch(math.MaxUint64) {
peerDASStartSlot, err = slots.EpochStart(electraForkEpoch)
if err != nil {
return nil, &core.RpcError{Err: errors.Wrap(err, "could not calculate peerDAS start slot"), Reason: core.Internal}
}
Expand Down
7 changes: 4 additions & 3 deletions beacon-chain/sync/block_batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ func (bb *blockRangeBatcher) next(ctx context.Context, stream libp2pcore.Stream)
if !more {
return blockBatch{}, false
}
if err := bb.limiter.validateRequest(stream, bb.size); err != nil {
return blockBatch{err: errors.Wrap(err, "throttled by rate limiter")}, false
}
// TODO: Uncomment out of devnet.
// if err := bb.limiter.validateRequest(stream, bb.size); err != nil {
// return blockBatch{err: errors.Wrap(err, "throttled by rate limiter")}, false
// }

// Wait for the ticker before doing anything expensive, unless this is the first batch.
if bb.ticker != nil && bb.current != nil {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/data_columns_sampling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTes
ctx: context.Background(),
p2pSvc: p2pSvc,
peers: []*p2ptest.TestP2P{},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb},
ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Electra},
chainSvc: chainSvc,
blockProcessedData: blockProcessedData,
blobs: blobs,
Expand Down
49 changes: 35 additions & 14 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package initialsync
import (
"context"
"fmt"
"math"
"slices"
"sort"
"strings"
Expand All @@ -12,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
Expand Down Expand Up @@ -337,19 +337,33 @@ func (f *blocksFetcher) handleRequest(ctx context.Context, start primitives.Slot
return response
}

if coreTime.PeerDASIsActive(start) {
response.err = f.fetchDataColumnsFromPeers(ctx, response.bwb, nil, delay, batchSize)
return response
// Compute the first electra slot.
firstElectraSlot, err := slots.EpochStart(params.BeaconConfig().ElectraForkEpoch)
if err != nil {
firstElectraSlot = math.MaxUint64
}

if err := f.fetchBlobsFromPeer(ctx, response.bwb, response.pid, peers); err != nil {
// Find the first block with a slot greater than or equal to the first electra slot.
// (Blocks are sorted by slot)
firstElectraIndex := sort.Search(len(response.bwb), func(i int) bool {
return response.bwb[i].Block.Block().Slot() >= firstElectraSlot
})

preElectraBwbs := response.bwb[:firstElectraIndex]
postElectraBwbs := response.bwb[firstElectraIndex:]

// Fetch blobs.
if err := f.fetchBlobsFromPeer(ctx, preElectraBwbs, response.pid, peers); err != nil {
response.err = err
return response
}

// Fetch data columns.
response.err = f.fetchDataColumnsFromPeers(ctx, postElectraBwbs, nil, delay, batchSize)
return response
}

// fetchBlocksFromPeer fetches blocks from a single randomly selected peer.
// fetchBlocksFromPeer fetches blocks from a single randomly selected peer, sorted by slot.
func (f *blocksFetcher) fetchBlocksFromPeer(
ctx context.Context,
start primitives.Slot, count uint64,
Expand All @@ -369,20 +383,19 @@ func (f *blocksFetcher) fetchBlocksFromPeer(
// peers are dialed first.
peers = append(bestPeers, peers...)
peers = dedupPeers(peers)
for i := 0; i < len(peers); i++ {
p := peers[i]
blocks, err := f.requestBlocks(ctx, req, p)
for _, peer := range peers {
blocks, err := f.requestBlocks(ctx, req, peer)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("Could not request blocks by range from peer")
log.WithField("peer", peer).WithError(err).Debug("Could not request blocks by range from peer")
continue
}
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(p)
f.p2p.Peers().Scorers().BlockProviderScorer().Touch(peer)
robs, err := sortedBlockWithVerifiedBlobSlice(blocks)
if err != nil {
log.WithField("peer", p).WithError(err).Debug("invalid BeaconBlocksByRange response")
log.WithField("peer", peer).WithError(err).Debug("invalid BeaconBlocksByRange response")
continue
}
return robs, p, err
return robs, peer, err
}
return nil, "", errNoPeersAvailable
}
Expand Down Expand Up @@ -565,6 +578,10 @@ func missingCommitError(root [32]byte, slot primitives.Slot, missing [][]byte) e
// fetchBlobsFromPeer fetches blocks from a single randomly selected peer.
// This function mutates the input `bwb` argument.
func (f *blocksFetcher) fetchBlobsFromPeer(ctx context.Context, bwb []blocks.BlockWithROBlobs, pid peer.ID, peers []peer.ID) error {
if len(bwb) == 0 {
return nil
}

ctx, span := trace.StartSpan(ctx, "initialsync.fetchBlobsFromPeer")
defer span.End()
if slots.ToEpoch(f.clock.CurrentSlot()) < params.BeaconConfig().DenebForkEpoch {
Expand Down Expand Up @@ -936,7 +953,7 @@ type bwbsMissingColumns struct {
// fetchDataColumnsFromPeers looks at the blocks in `bwb` and retrieves all
// data columns for with the block has blob commitments, and for which our store is missing data columns
// we should custody.
// This function mutates `bwb` by adding the retrieved data columns.
// This function mutates `bwbs` by adding the retrieved data columns.
// Prerequisite: bwb is sorted by slot.
func (f *blocksFetcher) fetchDataColumnsFromPeers(
ctx context.Context,
Expand All @@ -951,6 +968,10 @@ func (f *blocksFetcher) fetchDataColumnsFromPeers(
maxAllowedStall = 5 // Number of trials before giving up.
)

if len(bwbs) == 0 {
return nil
}

// Generate random identifier.
identifier := f.rand.Intn(maxIdentifier)

Expand Down
Loading

0 comments on commit 859ac00

Please sign in to comment.