Skip to content

Commit

Permalink
Update deps, correct fallback to DAS, improve gas logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferret-san committed Jun 4, 2024
1 parent 798c13c commit a9e30f7
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 65 deletions.
61 changes: 34 additions & 27 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1207,38 +1207,45 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context) (bool, error)
if config.DisableCelestiaFallbackStoreDataOnChain && config.DisableCelestiaFallbackStoreDataOnDAS {
return false, errors.New("unable to post batch to Celestia and fallback storing data on chain and das is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else {
sequencerMsg = celestiaMsg
}
}
if config.DisableCelestiaFallbackStoreDataOnDAS {
log.Warn("Falling back to storing data on chain ", "err", err)
} else {
log.Warn("Falling back to storing data on DAC ", "err", err)

if b.daWriter != nil {
if b.celestiaWriter != nil && config.DisableCelestiaFallbackStoreDataOnDAS {
return false, errors.New("found Celestia DA enabled and DAS, but fallbacks to DAS aredisabled")
}
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}
}

gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
}
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}
// We nest the anytrust logic here for now as using this fork liekly means your primary DA is Celestia
// and the Anytrust DAC is instead used as a fallback
if b.daWriter != nil {
if config.DisableCelestiaFallbackStoreDataOnDAS {
return false, errors.New("found Celestia DA enabled and DAS, but fallbacks to DAS are disabled")
}
if !b.redisLock.AttemptLock(ctx) {
return false, errAttemptLockFailed
}

gotNonce, gotMeta, err := b.dataPoster.GetNextNonceAndMeta(ctx)
if err != nil {
return false, err
}
if nonce != gotNonce || !bytes.Equal(batchPositionBytes, gotMeta) {
return false, fmt.Errorf("%w: nonce changed from %d to %d while creating batch", storage.ErrStorageRace, nonce, gotNonce)
}

cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
cert, err := b.daWriter.Store(ctx, sequencerMsg, uint64(time.Now().Add(config.DASRetentionPeriod).Unix()), []byte{}) // b.daWriter will append signature if enabled
if errors.Is(err, das.BatchToDasFailed) {
if config.DisableDasFallbackStoreDataOnChain {
return false, errors.New("unable to batch to DAS and fallback storing data on chain is disabled")
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
}
}
log.Warn("Falling back to storing data on chain", "err", err)
} else if err != nil {
return false, err
} else {
sequencerMsg = das.Serialize(cert)
sequencerMsg = celestiaMsg
}
}

Expand Down
4 changes: 3 additions & 1 deletion arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,9 @@ func createNodeImpl(
}
} else if l2Config.ArbitrumChainParams.DataAvailabilityCommittee {
return nil, errors.New("a data availability service is required for this chain, but it was not configured")
} else if config.Celestia.Enable {
}

if config.Celestia.Enable {
celestiaService, err := celestia.NewCelestiaDA(&config.Celestia, nil)
if err != nil {
return nil, err
Expand Down
11 changes: 3 additions & 8 deletions cmd/replay/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,17 +319,12 @@ func main() {
panic(fmt.Sprintf("Error opening state db: %v", err.Error()))
}

readMessage := func(arbChainParams params.ArbitrumChainParams) *arbostypes.MessageWithMetadata {
readMessage := func() *arbostypes.MessageWithMetadata {
var delayedMessagesRead uint64
if lastBlockHeader != nil {
delayedMessagesRead = lastBlockHeader.Nonce.Uint64()
}

// TODO: consider removing this panic
if arbChainParams.DataAvailabilityCommittee && arbChainParams.CelestiaDA {
panic(fmt.Sprintf("Error Multiple DA providers enabled: DAC is %v and CelestiaDA is %v", arbChainParams.DataAvailabilityCommittee, arbChainParams.CelestiaDA))
}

backend := WavmInbox{}
var keysetValidationMode = arbstate.KeysetPanicIfInvalid
if backend.GetPositionWithinMessage() > 0 {
Expand Down Expand Up @@ -394,7 +389,7 @@ func main() {
// need to add Celestia or just "ExternalDA" as an option to the ArbitrumChainParams
// for now we hard code Cthis to treu and hardcode Celestia in `readMessage`
// to test the integration
message := readMessage(chainConfig.ArbitrumChainParams)
message := readMessage()

chainContext := WavmChainContext{}
batchFetcher := func(batchNum uint64) ([]byte, error) {
Expand All @@ -408,7 +403,7 @@ func main() {
} else {
// Initialize ArbOS with this init message and create the genesis block.

message := readMessage(params.ArbitrumChainParams{})
message := readMessage()

initMessage, err := message.Message.ParseInitMessage()
if err != nil {
Expand Down
93 changes: 65 additions & 28 deletions das/celestia/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math/big"
"strings"
"time"

"github.com/spf13/pflag"
Expand All @@ -28,20 +29,29 @@ import (

type DAConfig struct {
Enable bool `koanf:"enable"`
GasPrice float64 `koanf:"gas-price"`
Rpc string `koanf:"rpc"`
NamespaceId string `koanf:"namespace-id"`
AuthToken string `koanf:"auth-token"`
GasPrice float64 `koanf:"gas-price" reload:"hot"`
GasMultiplier float64 `koanf:"gas-multiplier" reload:"hot"`
Rpc string `koanf:"rpc" reload:"hot"`
NamespaceId string `koanf:"namespace-id" `
AuthToken string `koanf:"auth-token" reload:"hot"`
NoopWriter bool `koanf:"noop-writer" reload:"hot"`
ValidatorConfig *ValidatorConfig `koanf:"validator-config"`
}

type ValidatorConfig struct {
TendermintRPC string `koanf:"tendermint-rpc"`
EthClient string `koanf:"eth-rpc"`
TendermintRPC string `koanf:"tendermint-rpc" reload:"hot"`
EthClient string `koanf:"eth-rpc" reload:"hot"`
BlobstreamAddr string `koanf:"blobstream"`
}

var (
// ErrTxTimedout is the error message returned by the DA when mempool is congested
ErrTxTimedout = errors.New("timed out waiting for tx to be included in a block")

// ErrTxAlreadyInMempool is the error message returned by the DA when tx is already in mempool
ErrTxAlreadyInMempool = errors.New("tx already in mempool")
)

// CelestiaMessageHeaderFlag indicates that this data is a Blob Pointer
// which will be used to retrieve data from Celestia
const CelestiaMessageHeaderFlag byte = 0x63
Expand Down Expand Up @@ -162,42 +172,69 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
return nil, err
}

commitment, err := blob.CreateCommitment(dataBlob)
if err != nil {
log.Warn("Error creating commitment", "err", err)
return nil, err
}
height := uint64(0)
submitted := false
// this will trigger node to use the default gas price from celestia app
gasPrice := -1.0
for !submitted {
height, err = c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, gasPrice)
if err != nil {
switch {
case strings.Contains(err.Error(), ErrTxTimedout.Error()), strings.Contains(err.Error(), ErrTxAlreadyInMempool.Error()):
log.Warn("Failed to submit blob, bumping gas price and retrying...", "err", err)
if gasPrice == -1.0 {
gasPrice = c.Cfg.GasPrice
} else {
gasPrice = gasPrice * c.Cfg.GasMultiplier
}
continue
default:
log.Warn("Blob Submission error", "err", err)
return nil, err
}
}

height, err := c.Client.Blob.Submit(ctx, []*blob.Blob{dataBlob}, openrpc.GasPrice(c.Cfg.GasPrice))
if err != nil {
log.Warn("Blob Submission error", "err", err)
return nil, err
}
if height == 0 {
log.Warn("Unexpected height from blob response", "height", height)
return nil, errors.New("unexpected response code")
if height == 0 {
log.Warn("Unexpected height from blob response", "height", height)
return nil, errors.New("unexpected response code")
}

submitted = true
}

proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, commitment)
proofs, err := c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
log.Warn("Error retrieving proof", "err", err)
return nil, err
}

included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, commitment)
proofRetries := 0
for proofs == nil {
log.Warn("Retrieved empty proof from GetProof, fetching again...", "proofRetries", proofRetries)
time.Sleep(time.Millisecond * 100)
proofs, err = c.Client.Blob.GetProof(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
log.Warn("Error retrieving proof", "err", err)
return nil, err
}
proofRetries++
}

included, err := c.Client.Blob.Included(ctx, height, *c.Namespace, proofs, dataBlob.Commitment)
if err != nil || !included {
log.Warn("Error checking for inclusion", "err", err, "proof", proofs)
return nil, err
}
log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(commitment))
log.Info("Succesfully posted blob", "height", height, "commitment", hex.EncodeToString(dataBlob.Commitment))

// we fetch the blob so that we can get the correct start index in the square
blob, err := c.Client.Blob.Get(ctx, height, *c.Namespace, commitment)
dataBlob, err = c.Client.Blob.Get(ctx, height, *c.Namespace, dataBlob.Commitment)
if err != nil {
return nil, err
}
if blob.Index <= 0 {
log.Warn("Unexpected index from blob response", "index", blob.Index)

if dataBlob.Index() <= 0 {
log.Warn("Unexpected index from blob response", "index", dataBlob.Index())
return nil, errors.New("unexpected response code")
}

Expand All @@ -213,7 +250,7 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
}

txCommitment, dataRoot := [32]byte{}, [32]byte{}
copy(txCommitment[:], commitment)
copy(txCommitment[:], dataBlob.Commitment)

copy(dataRoot[:], header.DataHash)

Expand All @@ -222,12 +259,12 @@ func (c *CelestiaDA) Store(ctx context.Context, message []byte) ([]byte, error)
// ODS size
odsSize := squareSize / 2

blobIndex := uint64(blob.Index)
blobIndex := uint64(dataBlob.Index())
// startRow
startRow := blobIndex / squareSize
if odsSize*startRow > blobIndex {
// return an empty batch
return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, blob.Index)
return nil, fmt.Errorf("storing Celestia information, odsSize*startRow=%v was larger than blobIndex=%v", odsSize*startRow, dataBlob.Index())
}
startIndexOds := blobIndex - odsSize*startRow
blobPointer := types.BlobPointer{
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.10
github.com/aws/aws-sdk-go-v2/service/s3 v1.26.9
github.com/cavaliergopher/grab/v3 v3.0.1
github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a
github.com/celestiaorg/nmt v0.20.0
github.com/celestiaorg/rsmt2d v0.11.0
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
Expand Down Expand Up @@ -85,6 +85,8 @@ require (
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect
github.com/celestiaorg/go-fraud v0.2.0 // indirect
github.com/celestiaorg/go-header v0.4.1 // indirect
github.com/celestiaorg/go-square v1.0.1 // indirect
github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b // indirect
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/ceramicnetwork/go-dag-jose v0.1.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,20 @@ github.com/celestiaorg/celestia-core v1.29.0-tm-v0.34.29 h1:Fd7ymPUzExPGNl2gZw4i
github.com/celestiaorg/celestia-core v1.29.0-tm-v0.34.29/go.mod h1:xrICN0PBhp3AdTaZ8q4wS5Jvi32V02HNjaC2EsWiEKk=
github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1 h1:CLhcfNP4496pg0aptcgHJubNXoY97PMHF0sDWx4HRrg=
github.com/celestiaorg/celestia-openrpc v0.4.0-rc.1/go.mod h1:+2xwD+PBy76D2XOAwDbkuNVUSAvwUFV54cQqMFBA1s0=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240530213251-6ed9977848e1 h1:QTIYNnZfdh5yGFkmB+XNWtKy1q336iiYP3WDKZwkEe0=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240530213251-6ed9977848e1/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174244-9787dffd2ad6 h1:5fph1ybOZAFJqLvYmTayBY0EgvzerxgbmLHUpAu1z5I=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174244-9787dffd2ad6/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a h1:T9vsMQvvYvA/1FObT0eqL0J+XJJx/wG6Jzf+DH+IFxc=
github.com/celestiaorg/celestia-openrpc v0.4.1-0.20240603174346-256ddd020a0a/go.mod h1:7kEhBB4KZh4vw3v5pMuMocNgYOk8uOpFZTo0cNpRjXc=
github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I=
github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc=
github.com/celestiaorg/go-header v0.4.1 h1:bjbUcKDnhrJJ9EoE7vtPpgleNLVjc2S+cB4/qe8nQmo=
github.com/celestiaorg/go-header v0.4.1/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c=
github.com/celestiaorg/go-square v1.0.1 h1:LEG1zrw4i03VBMElQF8GAbKYgh1bT1uGzWxasU2ePuo=
github.com/celestiaorg/go-square v1.0.1/go.mod h1:XMv5SGCeGSkynW2OOsedugaW/rQlvzxGzWGxTKsyYOU=
github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b h1:jo6M4RJnr33sQC/TTraP5gA6ZgbFO/QqzX8e/lIQC7Q=
github.com/celestiaorg/go-square/merkle v0.0.0-20240429192549-dea967e1533b/go.mod h1:86qIYnEhmn/hfW+xvw98NOI3zGaDEB3x8JGjYo2FqLs=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc=
github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4/go.mod h1:fzuHnhzj1pUygGz+1ZkB3uQbEUL4htqCGJ4Qs2LwMZA=
github.com/celestiaorg/nmt v0.20.0 h1:9i7ultZ8Wv5ytt8ZRaxKQ5KOOMo4A2K2T/aPGjIlSas=
Expand Down

0 comments on commit a9e30f7

Please sign in to comment.