Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
claravanstaden committed Jan 23, 2025
1 parent 77005a9 commit cd99588
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 41 deletions.
71 changes: 71 additions & 0 deletions relayer/relays/execution/event_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package execution

import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/snowfork/snowbridge/relayer/chain/ethereum"
"github.com/snowfork/snowbridge/relayer/config"
"github.com/snowfork/snowbridge/relayer/contracts"
assert "github.com/stretchr/testify/require"
"testing"
)

func TestHexToBytes(t *testing.T) {
config := SourceConfig{
Ethereum: config.EthereumConfig{
Endpoint: "ws://127.0.0.1:8546/",
},
Contracts: ContractsConfig{
Gateway: "0xb8ea8cb425d85536b158d661da1ef0895bb92f1d",
},
}
ethconn := ethereum.NewConnection(&config.Ethereum, nil)

err := ethconn.Connect(context.Background())
assert.NoError(t, err)

address := common.HexToAddress(config.Contracts.Gateway)
contract, err := contracts.NewGateway(address, ethconn.Client())
assert.NoError(t, err)

start := uint64(0)
end := uint64(1000)
opts := bind.FilterOpts{
Start: start,
End: &end,
Context: context.Background(),
}
iter, err := contract.FilterOutboundMessageAccepted(&opts)
assert.NoError(t, err)

var events []*contracts.GatewayOutboundMessageAccepted
done := false

for {
more := iter.Next()
if !more {
err = iter.Error()
assert.NoError(t, err)
break
}
if iter.Event.Nonce >= start {
events = append(events, iter.Event)
}

if iter.Event.Nonce == start && opts.Start != 0 {
// This iteration of findEventsWithFilter contains the last nonce we are interested in,
// although the nonces might not be ordered in ascending order in the iterator. So there might be more
// nonces that need to be appended (and we need to keep looping until "more" is false, even though we
// already have found the oldest nonce.
done = true
}
}

if done {
iter.Close()
}

fmt.Printf("events: %v", events)
}
99 changes: 60 additions & 39 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,24 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
return fmt.Errorf("get last block number: %w", err)
}

log.WithFields(log.Fields{
"blockNumber": blockNumber,
}).Info("block number is")

for _, paraNonce := range paraNonces {
events, err := r.findEvents(ctx, blockNumber, paraNonce+1)
log.WithFields(log.Fields{
"nonce": paraNonce,
}).Info("Finding events for nonce")
events, err := r.findEvents(ctx, blockNumber, paraNonce)
if err != nil {
return fmt.Errorf("find events: %w", err)
}

log.WithFields(log.Fields{
"events": events,
"paraNonce": paraNonce,
}).Info("Found events for nonce")

for _, ev := range events {
err := r.waitAndSend(ctx, ev)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
Expand All @@ -170,7 +182,7 @@ func (r *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload,

// There is already a valid finalized header on-chain that can prove the message
if proof.FinalizedPayload == nil {
err := r.writer.WriteToParachainAndWatch(ctx, "EthereumInboundQueue.submit", inboundMsg)
err := r.writer.WriteToParachainAndWatch(ctx, "EthereumInboundQueueV2.submit", inboundMsg)
if err != nil {
return fmt.Errorf("submit message to inbound queue: %w", err)
}
Expand All @@ -184,7 +196,7 @@ func (r *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload,
"message_slot": proof.HeaderPayload.Header.Slot,
}).Debug("Batching finalized header update with message")

extrinsics := []string{"EthereumBeaconClient.submit", "EthereumInboundQueue.submit"}
extrinsics := []string{"EthereumBeaconClient.submit", "EthereumInboundQueueV2.submit"}
payloads := []interface{}{proof.FinalizedPayload.Payload, inboundMsg}
// Batch the finalized header update with the inbound message
err := r.writer.BatchCall(ctx, extrinsics, payloads)
Expand All @@ -196,39 +208,33 @@ func (r *Relay) writeToParachain(ctx context.Context, proof scale.ProofPayload,
}

func (r *Relay) fetchUnprocessedParachainNonces(latest uint64) ([]uint64, error) {
log.WithField("latest", latest).Info("latest nonce is")
unprocessedNonces := []uint64{}
startKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "NonceBitmap", nil)
if err != nil {
return unprocessedNonces, fmt.Errorf("create storage key for EthereumInboundQueue.NonceBitmap: %w", err)
}
latestBucket := latest / 128

for b := uint64(0); b <= latestBucket; b++ {
encodedBucket, _ := types.EncodeToBytes(types.NewU64(b))
bucketKey, _ := types.CreateStorageKey(
r.paraconn.Metadata(),
"EthereumInboundQueueV2",
"NonceBitmap",
encodedBucket,
nil,
)

blockHash, err := r.paraconn.API().RPC.Chain.GetBlockHashLatest()
if err != nil {
return unprocessedNonces, fmt.Errorf("fetch latest parachain block hash: %w", err)
}

keys, err := r.paraconn.API().RPC.State.GetKeysPaged(startKey, 100, nil, blockHash)
if err != nil {
return unprocessedNonces, fmt.Errorf("fetch storage EthereumInboundQueue.NonceBitmap keys: %w", err)
}
for _, key := range keys {
var value types.U128
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(key, &value)
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(bucketKey, &value)
if err != nil {
return unprocessedNonces, fmt.Errorf("fetch latest parachain value: %w", err)
return nil, fmt.Errorf("failed to read bucket %d: %w", b, err)
}
if !ok || value.Int.Cmp(big.NewInt(0)) == 0 {
continue // Skip empty buckets
}

unprocessedNonces = extractUnprocessedNonces(value, latest)

if len(unprocessedNonces) > 0 && unprocessedNonces[len(unprocessedNonces)-1] >= latest {
break
// "Missing" means the chain doesn't store it => it's 0
if !ok {
value = types.NewU128(*big.NewInt(0))
}

fmt.Printf("Unprocessed nonces for bucket %s: %v\n", key.Hex(), unprocessedNonces)
// Now parse bits from value...
bucketNonces := extractUnprocessedNonces(value, latest, b)
unprocessedNonces = append(unprocessedNonces, bucketNonces...)
}

log.WithFields(logrus.Fields{
Expand All @@ -243,15 +249,15 @@ func (r *Relay) isParachainNonceSet(index uint64) (bool, error) {
bitPosition := index % 128

encodedBucket, err := types.EncodeToBytes(types.NewU64(bucket))
bucketKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueue", "NonceBitmap", encodedBucket)
bucketKey, err := types.CreateStorageKey(r.paraconn.Metadata(), "EthereumInboundQueueV2", "NonceBitmap", encodedBucket)
if err != nil {
return false, fmt.Errorf("create storage key for EthereumInboundQueue.NonceBitmap: %w", err)
return false, fmt.Errorf("create storage key for EthereumInboundQueueV2.NonceBitmap: %w", err)
}

var bucketValue types.U128
ok, err := r.paraconn.API().RPC.State.GetStorageLatest(bucketKey, &bucketValue)
if err != nil {
return false, fmt.Errorf("fetch storage EthereumInboundQueue.NonceBitmap keys: %w", err)
return false, fmt.Errorf("fetch storage EthereumInboundQueueV2.NonceBitmap keys: %w", err)
}
if !ok {
return false, fmt.Errorf("bucket does not exist: %w", err)
Expand All @@ -265,19 +271,28 @@ func checkBitState(bucketValue types.U128, bitPosition uint64) bool {
return new(big.Int).And(bucketValue.Int, mask).Cmp(big.NewInt(0)) != 0
}

func extractUnprocessedNonces(bitmap types.U128, latest uint64) []uint64 {
func extractUnprocessedNonces(bitmap types.U128, latest uint64, bucketIndex uint64) []uint64 {
var unprocessed []uint64
// Each bucket covers 128 nonces
baseNonce := bucketIndex * 128

for i := 0; i < 128; i++ {
if uint64(i) > latest {
break // Stop processing if index exceeds `latest`
nonce := baseNonce + uint64(i)
// Ignore nonce 0 since valid nonces start at 1
if nonce < 1 {
continue
}

// If we've passed the latest nonce to consider, stop checking further bits.
if nonce > latest {
break
}
// Check if bit `i` is unset (meaning unprocessed).
mask := new(big.Int).Lsh(big.NewInt(1), uint(i))
bit := new(big.Int).And(bitmap.Int, mask)
if bit.Cmp(big.NewInt(0)) == 0 {
unprocessed = append(unprocessed, uint64(i))
if new(big.Int).And(bitmap.Int, mask).Cmp(big.NewInt(0)) == 0 {
unprocessed = append(unprocessed, nonce)
}
}

return unprocessed
}

Expand Down Expand Up @@ -535,7 +550,13 @@ func (r *Relay) isInFinalizedBlock(ctx context.Context, event *contracts.Gateway
return fmt.Errorf("get block header: %w", err)
}

return r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
err = r.beaconHeader.CheckHeaderFinalized(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if err != nil {
log.Info("Message is not in a finalized block")
} else {
log.Info("Message is in a finalized block")
}
return err
}

func (r *Relay) UnprocessedNonces() {
Expand Down
2 changes: 1 addition & 1 deletion smoketest/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub const ETHEREUM_ADDRESS: [u8; 20] = hex!("90A987B944Cb1dCcE5564e5FDeCD7a54D3d
// The deployment addresses of the following contracts are stable in our E2E env, unless we modify
// the order in contracts are deployed in DeployScript.sol.
pub const DEFAULT_GATEWAY_PROXY_CONTRACT: [u8; 20] =
hex!("8cf6147918a5cbb672703f879f385036f8793a24");
hex!("b8ea8cb425d85536b158d661da1ef0895bb92f1d");
pub const DEFAULT_WETH_CONTRACT: [u8; 20] = hex!("774667629726ec1FaBEbCEc0D9139bD1C8f72a23");
pub const AGENT_EXECUTOR_CONTRACT: [u8; 20] = hex!("Fc97A6197dc90bef6bbEFD672742Ed75E9768553");

Expand Down
2 changes: 1 addition & 1 deletion smoketest/tests/register_token_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn register_token_v2() {
let weth = weth9::WETH9::new(weth_addr, ethereum_client.clone());

let receipt = gateway
.v_2_register_token(weth.address(), 1_000_000_000u128)
.v_2_register_token(weth.address(), 0, 1_000_000_000u128, 1_500_000_000u128)
.value(3_000_000_000_u128)
.send()
.await
Expand Down

0 comments on commit cd99588

Please sign in to comment.