forked from ObolNetwork/charon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinterfaces.go
320 lines (265 loc) · 16 KB
/
interfaces.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
// Copyright © 2022-2024 Obol Labs Inc. Licensed under the terms of a Business Source License 1.1
package core
import (
"context"
eth2api "github.com/attestantio/go-eth2-client/api"
"github.com/attestantio/go-eth2-client/spec/altair"
eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
)
// Scheduler triggers the start of a duty workflow.
type Scheduler interface {
// SubscribeDuties subscribes a callback function for triggered duties.
SubscribeDuties(func(context.Context, Duty, DutyDefinitionSet) error)
// SubscribeSlots subscribes a callback function for triggered slots.
SubscribeSlots(func(context.Context, Slot) error)
// GetDutyDefinition returns the definition set for a duty if already resolved.
GetDutyDefinition(context.Context, Duty) (DutyDefinitionSet, error)
}
// Fetcher fetches proposed unsigned duty data.
type Fetcher interface {
// Fetch triggers fetching of a proposed duty data set.
Fetch(context.Context, Duty, DutyDefinitionSet) error
// Subscribe registers a callback for proposed unsigned duty data sets.
Subscribe(func(context.Context, Duty, UnsignedDataSet) error)
// RegisterAggSigDB registers a function to get resolved aggregated
// signed data from the AggSigDB (e.g., randao reveals).
RegisterAggSigDB(func(context.Context, Duty, PubKey) (SignedData, error))
// RegisterAwaitAttData registers a function to get attestation data from DutyDB.
RegisterAwaitAttData(func(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error))
}
// DutyDB persists unsigned duty data sets and makes it available for querying. It also acts as slashing database.
type DutyDB interface {
// Store stores the unsigned duty data set.
Store(context.Context, Duty, UnsignedDataSet) error
// AwaitProposal blocks and returns the proposed beacon block
// for the slot when available.
AwaitProposal(ctx context.Context, slot uint64) (*eth2api.VersionedProposal, error)
// AwaitAttestation blocks and returns the attestation data
// for the slot and committee index when available.
AwaitAttestation(ctx context.Context, slot, commIdx uint64) (*eth2p0.AttestationData, error)
// PubKeyByAttestation returns the validator PubKey for the provided attestation data
// slot, committee index and validator committee index. This allows mapping of attestation
// data response to validator.
PubKeyByAttestation(ctx context.Context, slot, commIdx, valCommIdx uint64) (PubKey, error)
// AwaitAggAttestation blocks and returns the aggregated attestation for the slot
// and attestation when available.
AwaitAggAttestation(ctx context.Context, slot uint64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)
// AwaitSyncContribution blocks and returns the sync committee contribution data for the slot and
// the subcommittee and the beacon block root when available.
AwaitSyncContribution(ctx context.Context, slot, subcommIdx uint64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error)
}
// Consensus comes to consensus on proposed duty data.
type Consensus interface {
// Participate run the duty's consensus instance without a proposed value (if Propose not called yet).
Participate(context.Context, Duty) error
// Propose provides the consensus instance proposed value (and run it if Participate not called yet).
Propose(context.Context, Duty, UnsignedDataSet) error
// Subscribe registers a callback for resolved (reached consensus) duty unsigned data set.
Subscribe(func(context.Context, Duty, UnsignedDataSet) error)
}
// ValidatorAPI provides a beacon node API to validator clients. It serves duty data from the DutyDB and stores partial signed data in the ParSigDB.
type ValidatorAPI interface {
// RegisterAwaitProposal registers a function to query unsigned beacon block proposals by providing the slot.
RegisterAwaitProposal(func(ctx context.Context, slot uint64) (*eth2api.VersionedProposal, error))
// RegisterAwaitAttestation registers a function to query attestation data.
RegisterAwaitAttestation(func(ctx context.Context, slot, commIdx uint64) (*eth2p0.AttestationData, error))
// RegisterAwaitSyncContribution registers a function to query sync contribution data.
RegisterAwaitSyncContribution(func(ctx context.Context, slot, subcommIdx uint64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error))
// RegisterPubKeyByAttestation registers a function to query validator by attestation.
RegisterPubKeyByAttestation(func(ctx context.Context, slot, commIdx, valCommIdx uint64) (PubKey, error))
// RegisterGetDutyDefinition registers a function to query duty definitions.
RegisterGetDutyDefinition(func(context.Context, Duty) (DutyDefinitionSet, error))
// RegisterAwaitAggAttestation registers a function to query aggregated attestation.
RegisterAwaitAggAttestation(fn func(ctx context.Context, slot uint64, attestationDataRoot eth2p0.Root) (*eth2p0.Attestation, error))
// RegisterAwaitAggSigDB registers a function to query aggregated signed data from aggSigDB.
RegisterAwaitAggSigDB(func(context.Context, Duty, PubKey) (SignedData, error))
// Subscribe registers a function to store partially signed data sets.
Subscribe(func(context.Context, Duty, ParSignedDataSet) error)
}
// ParSigDB persists partial signatures and sends them to the
// partial signature exchange and aggregation.
type ParSigDB interface {
// StoreInternal stores an internally received partially signed duty data set.
StoreInternal(context.Context, Duty, ParSignedDataSet) error
// StoreExternal stores an externally received partially signed duty data set.
StoreExternal(context.Context, Duty, ParSignedDataSet) error
// SubscribeInternal registers a callback when an internal
// partially signed duty set is stored.
SubscribeInternal(func(context.Context, Duty, ParSignedDataSet) error)
// SubscribeThreshold registers a callback when *threshold*
// partially signed duty is reached for the set of DVs.
SubscribeThreshold(func(context.Context, Duty, map[PubKey][]ParSignedData) error)
}
// ParSigEx exchanges partially signed duty data sets.
type ParSigEx interface {
// Broadcast broadcasts the partially signed duty data set to all peers.
Broadcast(context.Context, Duty, ParSignedDataSet) error
// Subscribe registers a callback when a partially signed duty set
// is received from a peer.
Subscribe(func(context.Context, Duty, ParSignedDataSet) error)
}
// SigAgg aggregates threshold partial signatures.
type SigAgg interface {
// Aggregate aggregates the partially signed duty datas for the set of DVs.
Aggregate(context.Context, Duty, map[PubKey][]ParSignedData) error
// Subscribe registers a callback for aggregated signed duty data set.
Subscribe(func(context.Context, Duty, SignedDataSet) error)
}
// AggSigDB persists aggregated signed duty data.
type AggSigDB interface {
// Store stores aggregated signed duty data set.
Store(context context.Context, duty Duty, data SignedDataSet) error
// Await blocks and returns the aggregated signed duty data when available.
Await(context context.Context, duty Duty, pubKey PubKey) (SignedData, error)
// Run runs AggSigDB lifecycle until context is cancelled.
Run(context context.Context)
}
// Broadcaster broadcasts aggregated signed duty data set to the beacon node.
type Broadcaster interface {
Broadcast(context.Context, Duty, SignedDataSet) error
}
// InclusionChecker checks whether submitted duties have been included on-chain.
// TODO(corver): Merge this with tracker below as a compose multi tracker.
type InclusionChecker interface {
// Submitted is called when a duty set has been submitted.
Submitted(Duty, SignedDataSet) error
}
// Tracker sends core component events for further analysis and instrumentation.
type Tracker interface {
// FetcherFetched sends Fetcher component's events to tracker.
FetcherFetched(Duty, DutyDefinitionSet, error)
// ConsensusProposed sends Consensus component's events to tracker.
ConsensusProposed(Duty, UnsignedDataSet, error)
// DutyDBStored sends DutyDB component's store events to tracker.
DutyDBStored(Duty, UnsignedDataSet, error)
// ParSigDBStoredInternal sends ParSigDB component's store internal events to tracker.
ParSigDBStoredInternal(Duty, ParSignedDataSet, error)
// ParSigExBroadcasted sends ParSigEx component's broadcast events to tracker.
ParSigExBroadcasted(Duty, ParSignedDataSet, error)
// ParSigDBStoredExternal sends ParSigDB component's store external events to tracker.
ParSigDBStoredExternal(Duty, ParSignedDataSet, error)
// SigAggAggregated sends SigAgg component's aggregate events to tracker.
SigAggAggregated(Duty, map[PubKey][]ParSignedData, error)
// AggSigDBStored sends AggSigDB component's store events to tracker.
AggSigDBStored(Duty, SignedDataSet, error)
// BroadcasterBroadcast sends Broadcaster component's broadcast events to tracker.
BroadcasterBroadcast(Duty, SignedDataSet, error)
// InclusionChecked sends InclusionChecker component's check events to tracker.
InclusionChecked(Duty, PubKey, SignedData, error)
}
// wireFuncs defines the core workflow components as a list of input and output functions
// instead as interfaces, since functions are easier to wrap than interfaces.
type wireFuncs struct {
SchedulerSubscribeDuties func(func(context.Context, Duty, DutyDefinitionSet) error)
SchedulerSubscribeSlots func(func(context.Context, Slot) error)
SchedulerGetDutyDefinition func(context.Context, Duty) (DutyDefinitionSet, error)
FetcherFetch func(context.Context, Duty, DutyDefinitionSet) error
FetcherSubscribe func(func(context.Context, Duty, UnsignedDataSet) error)
FetcherRegisterAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error))
FetcherRegisterAwaitAttData func(func(ctx context.Context, slot uint64, commIdx uint64) (*eth2p0.AttestationData, error))
ConsensusParticipate func(context.Context, Duty) error
ConsensusPropose func(context.Context, Duty, UnsignedDataSet) error
ConsensusSubscribe func(func(context.Context, Duty, UnsignedDataSet) error)
DutyDBStore func(context.Context, Duty, UnsignedDataSet) error
DutyDBAwaitProposal func(ctx context.Context, slot uint64) (*eth2api.VersionedProposal, error)
DutyDBAwaitAttestation func(ctx context.Context, slot, commIdx uint64) (*eth2p0.AttestationData, error)
DutyDBPubKeyByAttestation func(ctx context.Context, slot, commIdx, valCommIdx uint64) (PubKey, error)
DutyDBAwaitAggAttestation func(ctx context.Context, slot uint64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error)
DutyDBAwaitSyncContribution func(ctx context.Context, slot, subcommIdx uint64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error)
VAPIRegisterAwaitAttestation func(func(ctx context.Context, slot, commIdx uint64) (*eth2p0.AttestationData, error))
VAPIRegisterAwaitSyncContribution func(func(ctx context.Context, slot, subcommIdx uint64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error))
VAPIRegisterAwaitProposal func(func(ctx context.Context, slot uint64) (*eth2api.VersionedProposal, error))
VAPIRegisterGetDutyDefinition func(func(context.Context, Duty) (DutyDefinitionSet, error))
VAPIRegisterPubKeyByAttestation func(func(ctx context.Context, slot, commIdx, valCommIdx uint64) (PubKey, error))
VAPIRegisterAwaitAggAttestation func(func(ctx context.Context, slot uint64, attestationRoot eth2p0.Root) (*eth2p0.Attestation, error))
VAPIRegisterAwaitAggSigDB func(func(context.Context, Duty, PubKey) (SignedData, error))
VAPISubscribe func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBStoreInternal func(context.Context, Duty, ParSignedDataSet) error
ParSigDBStoreExternal func(context.Context, Duty, ParSignedDataSet) error
ParSigDBSubscribeInternal func(func(context.Context, Duty, ParSignedDataSet) error)
ParSigDBSubscribeThreshold func(func(context.Context, Duty, map[PubKey][]ParSignedData) error)
ParSigExBroadcast func(context.Context, Duty, ParSignedDataSet) error
ParSigExSubscribe func(func(context.Context, Duty, ParSignedDataSet) error)
SigAggAggregate func(context.Context, Duty, map[PubKey][]ParSignedData) error
SigAggSubscribe func(func(context.Context, Duty, SignedDataSet) error)
AggSigDBStore func(context.Context, Duty, SignedDataSet) error
AggSigDBAwait func(context.Context, Duty, PubKey) (SignedData, error)
BroadcasterBroadcast func(context.Context, Duty, SignedDataSet) error
}
// WireOption defines a functional option to configure wiring.
type WireOption func(*wireFuncs)
// Wire wires the workflow components together.
func Wire(sched Scheduler,
fetch Fetcher,
cons Consensus,
dutyDB DutyDB,
vapi ValidatorAPI,
parSigDB ParSigDB,
parSigEx ParSigEx,
sigAgg SigAgg,
aggSigDB AggSigDB,
bcast Broadcaster,
opts ...WireOption,
) {
w := wireFuncs{
SchedulerSubscribeDuties: sched.SubscribeDuties,
SchedulerSubscribeSlots: sched.SubscribeSlots,
SchedulerGetDutyDefinition: sched.GetDutyDefinition,
FetcherFetch: fetch.Fetch,
FetcherSubscribe: fetch.Subscribe,
FetcherRegisterAggSigDB: fetch.RegisterAggSigDB,
FetcherRegisterAwaitAttData: fetch.RegisterAwaitAttData,
ConsensusParticipate: cons.Participate,
ConsensusPropose: cons.Propose,
ConsensusSubscribe: cons.Subscribe,
DutyDBStore: dutyDB.Store,
DutyDBAwaitAttestation: dutyDB.AwaitAttestation,
DutyDBAwaitProposal: dutyDB.AwaitProposal,
DutyDBPubKeyByAttestation: dutyDB.PubKeyByAttestation,
DutyDBAwaitAggAttestation: dutyDB.AwaitAggAttestation,
DutyDBAwaitSyncContribution: dutyDB.AwaitSyncContribution,
VAPIRegisterAwaitProposal: vapi.RegisterAwaitProposal,
VAPIRegisterAwaitAttestation: vapi.RegisterAwaitAttestation,
VAPIRegisterAwaitSyncContribution: vapi.RegisterAwaitSyncContribution,
VAPIRegisterGetDutyDefinition: vapi.RegisterGetDutyDefinition,
VAPIRegisterPubKeyByAttestation: vapi.RegisterPubKeyByAttestation,
VAPIRegisterAwaitAggAttestation: vapi.RegisterAwaitAggAttestation,
VAPIRegisterAwaitAggSigDB: vapi.RegisterAwaitAggSigDB,
VAPISubscribe: vapi.Subscribe,
ParSigDBStoreInternal: parSigDB.StoreInternal,
ParSigDBStoreExternal: parSigDB.StoreExternal,
ParSigDBSubscribeInternal: parSigDB.SubscribeInternal,
ParSigDBSubscribeThreshold: parSigDB.SubscribeThreshold,
ParSigExBroadcast: parSigEx.Broadcast,
ParSigExSubscribe: parSigEx.Subscribe,
SigAggAggregate: sigAgg.Aggregate,
SigAggSubscribe: sigAgg.Subscribe,
AggSigDBStore: aggSigDB.Store,
AggSigDBAwait: aggSigDB.Await,
BroadcasterBroadcast: bcast.Broadcast,
}
for _, opt := range opts {
opt(&w)
}
w.SchedulerSubscribeDuties(w.FetcherFetch)
w.SchedulerSubscribeDuties(func(ctx context.Context, duty Duty, _ DutyDefinitionSet) error {
return w.ConsensusParticipate(ctx, duty)
})
w.FetcherSubscribe(w.ConsensusPropose)
w.FetcherRegisterAggSigDB(w.AggSigDBAwait)
w.FetcherRegisterAwaitAttData(w.DutyDBAwaitAttestation)
w.ConsensusSubscribe(w.DutyDBStore)
w.VAPIRegisterAwaitProposal(w.DutyDBAwaitProposal)
w.VAPIRegisterAwaitAttestation(w.DutyDBAwaitAttestation)
w.VAPIRegisterAwaitSyncContribution(w.DutyDBAwaitSyncContribution)
w.VAPIRegisterGetDutyDefinition(w.SchedulerGetDutyDefinition)
w.VAPIRegisterPubKeyByAttestation(w.DutyDBPubKeyByAttestation)
w.VAPIRegisterAwaitAggAttestation(w.DutyDBAwaitAggAttestation)
w.VAPIRegisterAwaitAggSigDB(w.AggSigDBAwait)
w.VAPISubscribe(w.ParSigDBStoreInternal)
w.ParSigDBSubscribeInternal(w.ParSigExBroadcast)
w.ParSigExSubscribe(w.ParSigDBStoreExternal)
w.ParSigDBSubscribeThreshold(w.SigAggAggregate)
w.SigAggSubscribe(w.AggSigDBStore)
w.SigAggSubscribe(w.BroadcasterBroadcast)
}