Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Beacon API events to Electra #14855

Merged
merged 7 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions api/server/structs/conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,15 @@ func (a *AttestationElectra) ToConsensus() (*eth.AttestationElectra, error) {
}, nil
}

func SingleAttFromConsensus(a *eth.SingleAttestation) *SingleAttestation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a comment here but something i forgot why we did it, why do we not check if a.Data is not nil? do we do that somewhere else? i forgot.

return &SingleAttestation{
CommitteeIndex: fmt.Sprintf("%d", a.CommitteeId),
AttesterIndex: fmt.Sprintf("%d", a.AttesterIndex),
Data: AttDataFromConsensus(a.Data),
Signature: hexutil.Encode(a.Signature),
}
}

func (a *SingleAttestation) ToConsensus() (*eth.SingleAttestation, error) {
ci, err := strconv.ParseUint(a.CommitteeIndex, 10, 64)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion beacon-chain/core/feed/operation/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (

// AttesterSlashingReceived is sent after an attester slashing is received from gossip or rpc
AttesterSlashingReceived = 8

// SingleAttReceived is sent after a single attestation object is received from gossip or rpc
SingleAttReceived = 9
)

// UnAggregatedAttReceivedData is the data sent with UnaggregatedAttReceived events.
Expand All @@ -43,7 +46,7 @@ type UnAggregatedAttReceivedData struct {
// AggregatedAttReceivedData is the data sent with AggregatedAttReceived events.
type AggregatedAttReceivedData struct {
// Attestation is the aggregated attestation object.
Attestation *ethpb.AggregateAttestationAndProof
Attestation ethpb.AggregateAttAndProof
}

// ExitReceivedData is the data sent with ExitReceived events.
Expand Down Expand Up @@ -77,3 +80,8 @@ type ProposerSlashingReceivedData struct {
type AttesterSlashingReceivedData struct {
AttesterSlashing ethpb.AttSlashing
}

// SingleAttReceivedData is the data sent with SingleAttReceived events.
type SingleAttReceivedData struct {
Attestation ethpb.Att
}
7 changes: 7 additions & 0 deletions beacon-chain/monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ func (s *Service) monitorRoutine(stateChannel chan *feed.Event, stateSub event.S
} else {
s.processAggregatedAttestation(s.ctx, data.Attestation)
}
case operation.SingleAttReceived:
data, ok := e.Data.(*operation.SingleAttReceivedData)
if !ok {
log.Error("Event feed data is not of type *operation.SingleAttReceivedData")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i wonder why we don't print the Type of the data here for old ones, maybe that would be useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it because we already do that once in the marshal reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure you are looking at the correct file? I don't see any marshal reader in the monitor

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops then maybe we should print the type somewhere.

} else {
s.processUnaggregatedAttestation(s.ctx, data.Attestation)
}
case operation.ExitReceived:
data, ok := e.Data.(*operation.ExitReceivedData)
if !ok {
Expand Down
14 changes: 7 additions & 7 deletions beacon-chain/rpc/eth/beacon/handlers_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,13 @@ func (s *Server) handleAttestationsElectra(
}

for i, singleAtt := range validAttestations {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did we need to move this?

Copy link
Contributor Author

@rkapka rkapka Jan 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that we send a single attestation and not an Electra one. Previously we were sending att where att := singleAtt.ToAttestationElectra(committee)

Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: singleAtt,
rkapka marked this conversation as resolved.
Show resolved Hide resolved
},
})

targetState, err := s.AttestationStateFetcher.AttestationTargetState(ctx, singleAtt.Data.Target)
if err != nil {
return nil, nil, errors.Wrap(err, "could not get target state for attestation")
Expand All @@ -330,13 +337,6 @@ func (s *Server) handleAttestationsElectra(
}
att := singleAtt.ToAttestationElectra(committee)

s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})

wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
Expand Down
35 changes: 30 additions & 5 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
BlockTopic = "block"
// AttestationTopic represents a new submitted attestation event topic.
AttestationTopic = "attestation"
// SingleAttestationTopic represents a new submitted single attestation event topic.
SingleAttestationTopic = "single_attestation"
// VoluntaryExitTopic represents a new performed voluntary exit event topic.
VoluntaryExitTopic = "voluntary_exit"
// FinalizedCheckpointTopic represents a new finalized checkpoint event topic.
Expand Down Expand Up @@ -92,6 +94,7 @@ type lazyReader func() io.Reader
var opsFeedEventTopics = map[feed.EventType]string{
operation.AggregatedAttReceived: AttestationTopic,
operation.UnaggregatedAttReceived: AttestationTopic,
operation.SingleAttReceived: SingleAttestationTopic,
operation.ExitReceived: VoluntaryExitTopic,
operation.SyncCommitteeContributionReceived: SyncCommitteeContributionTopic,
operation.BLSToExecutionChangeReceived: BLSToExecutionChangeTopic,
Expand Down Expand Up @@ -403,7 +406,7 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWr
func jsonMarshalReader(name string, v any) io.Reader {
d, err := json.Marshal(v)
if err != nil {
log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data.")
log.WithError(err).WithField("type_name", fmt.Sprintf("%T", v)).Error("Could not marshal event data")
return nil
}
return bytes.NewBufferString("event: " + name + "\ndata: " + string(d) + "\n\n")
Expand All @@ -415,6 +418,8 @@ func topicForEvent(event *feed.Event) string {
return AttestationTopic
case *operation.UnAggregatedAttReceivedData:
return AttestationTopic
case *operation.SingleAttReceivedData:
return SingleAttestationTopic
case *operation.ExitReceivedData:
return VoluntaryExitTopic
case *operation.SyncCommitteeContributionReceivedData:
Expand Down Expand Up @@ -464,10 +469,20 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
return jsonMarshalReader(eventName, structs.HeadEventFromV1(v))
}, nil
case *operation.AggregatedAttReceivedData:
return func() io.Reader {
att := structs.AttFromConsensus(v.Attestation.Aggregate)
return jsonMarshalReader(eventName, att)
}, nil
switch att := v.Attestation.AggregateVal().(type) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this ever panic? v.Attestation being nil?

case *eth.Attestation:
return func() io.Reader {
att := structs.AttFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
case *eth.AttestationElectra:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shoot this must have been missed...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change in beacon-chain/sync/validate_aggregate_proof.go had to be introduced for this to have any effect anyway

return func() io.Reader {
att := structs.AttElectraFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of AggregatedAttReceivedData", v.Attestation)
}
case *operation.UnAggregatedAttReceivedData:
switch att := v.Attestation.(type) {
case *eth.Attestation:
Expand All @@ -483,6 +498,16 @@ func (s *Server) lazyReaderForEvent(ctx context.Context, event *feed.Event, topi
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of UnAggregatedAttReceivedData", v.Attestation)
}
case *operation.SingleAttReceivedData:
switch att := v.Attestation.(type) {
case *eth.SingleAttestation:
return func() io.Reader {
att := structs.SingleAttFromConsensus(att)
return jsonMarshalReader(eventName, att)
}, nil
default:
return nil, errors.Wrapf(errUnhandledEventData, "Unexpected type %T for the .Attestation field of SingleAttReceivedData", v.Attestation)
}
case *operation.ExitReceivedData:
return func() io.Reader {
return jsonMarshalReader(eventName, structs.SignedExitFromConsensus(v.Exit))
Expand Down
37 changes: 22 additions & 15 deletions beacon-chain/rpc/eth/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func (tr *topicRequest) testHttpRequest(ctx context.Context, _ *testing.T) *http
func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
topics, err := newTopicRequest([]string{
AttestationTopic,
SingleAttestationTopic,
VoluntaryExitTopic,
SyncCommitteeContributionTopic,
BLSToExecutionChangeTopic,
Expand All @@ -123,13 +124,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
vblob := blocks.NewVerifiedROBlob(ro)

return topics, []*feed.Event{
&feed.Event{
{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: util.HydrateAttestation(&eth.Attestation{}),
},
},
&feed.Event{
{
Type: operation.AggregatedAttReceived,
Data: &operation.AggregatedAttReceivedData{
Attestation: &eth.AggregateAttestationAndProof{
Expand All @@ -139,7 +140,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.SingleAttReceived,
Data: &operation.SingleAttReceivedData{
Attestation: util.HydrateSingleAttestation(&eth.SingleAttestation{}),
},
},
{
Type: operation.ExitReceived,
Data: &operation.ExitReceivedData{
Exit: &eth.SignedVoluntaryExit{
Expand All @@ -151,7 +158,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.SyncCommitteeContributionReceived,
Data: &operation.SyncCommitteeContributionReceivedData{
Contribution: &eth.SignedContributionAndProof{
Expand All @@ -170,7 +177,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.BLSToExecutionChangeReceived,
Data: &operation.BLSToExecutionChangeReceivedData{
Change: &eth.SignedBLSToExecutionChange{
Expand All @@ -183,13 +190,13 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.BlobSidecarReceived,
Data: &operation.BlobSidecarReceivedData{
Blob: &vblob,
},
},
&feed.Event{
{
Type: operation.AttesterSlashingReceived,
Data: &operation.AttesterSlashingReceivedData{
AttesterSlashing: &eth.AttesterSlashing{
Expand Down Expand Up @@ -222,7 +229,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.AttesterSlashingReceived,
Data: &operation.AttesterSlashingReceivedData{
AttesterSlashing: &eth.AttesterSlashingElectra{
Expand Down Expand Up @@ -255,7 +262,7 @@ func operationEventsFixtures(t *testing.T) (*topicRequest, []*feed.Event) {
},
},
},
&feed.Event{
{
Type: operation.ProposerSlashingReceived,
Data: &operation.ProposerSlashingReceivedData{
ProposerSlashing: &eth.ProposerSlashing{
Expand Down Expand Up @@ -367,7 +374,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
b, err := blocks.NewSignedBeaconBlock(util.HydrateSignedBeaconBlock(&eth.SignedBeaconBlock{}))
require.NoError(t, err)
events := []*feed.Event{
&feed.Event{
{
Type: statefeed.BlockProcessed,
Data: &statefeed.BlockProcessedData{
Slot: 0,
Expand All @@ -377,7 +384,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
Optimistic: false,
},
},
&feed.Event{
{
Type: statefeed.NewHead,
Data: &ethpb.EventHead{
Slot: 0,
Expand All @@ -389,7 +396,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ExecutionOptimistic: false,
},
},
&feed.Event{
{
Type: statefeed.Reorg,
Data: &ethpb.EventChainReorg{
Slot: 0,
Expand All @@ -402,7 +409,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
ExecutionOptimistic: false,
},
},
&feed.Event{
{
Type: statefeed.FinalizedCheckpoint,
Data: &ethpb.EventFinalizedCheckpoint{
Block: make([]byte, 32),
Expand Down Expand Up @@ -525,7 +532,7 @@ func TestStreamEvents_OperationsEvents(t *testing.T) {
request := topics.testHttpRequest(testSync.ctx, t)
w := NewStreamingResponseWriterRecorder(testSync.ctx)
events := []*feed.Event{
&feed.Event{
{
Type: statefeed.PayloadAttributes,
Data: payloadattribute.EventData{
ProposerIndex: 0,
Expand Down Expand Up @@ -577,7 +584,7 @@ func TestStuckReaderScenarios(t *testing.T) {

func wedgedWriterTestCase(t *testing.T, queueDepth func([]*feed.Event) int) {
topics, events := operationEventsFixtures(t)
require.Equal(t, 9, len(events))
require.Equal(t, 10, len(events))

// set eventFeedDepth to a number lower than the events we intend to send to force the server to drop the reader.
stn := mockChain.NewEventFeedWrapper()
Expand Down
Loading
Loading