Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

upload bundles directly after fetching #44

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions node/celestia.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (c *CelestiaClient) PublishBundle(blocks Bundle) (*CelestiaPointer, float64
// gas price is defined by each node operator. 0.003 is a good default to be accepted
gasPrice := c.GasPrice()

if c.gasPriceIncreasePercent != nil {
if c.gasPriceIncreasePercent.Int64() > 0 {
apiPrice := gasPrice
gasPrice *= 1 + float64(c.gasPriceIncreasePercent.Int64())/100
c.logger.Info("Gas price increased", "percent", c.gasPriceIncreasePercent, "old_gas_price", apiPrice, "new_gas_price", gasPrice)
Expand All @@ -166,10 +166,17 @@ func (c *CelestiaClient) PublishBundle(blocks Bundle) (*CelestiaPointer, float64
}

// Increase gas price by 20% if the transaction fails
gasPrice *= 1.2
fee = int64(gasPrice * float64(gasLimit))

c.logger.Warn("Failed to submit blob, retrying after delay", "delay", c.retryDelay, "attempt", i+1, "fee", fee, "gas_limit", gasLimit, "gas_price", gasPrice, "error", err)
newGasPrice := gasPrice * 1.2
fee = int64(newGasPrice * float64(gasLimit))

c.logger.Warn("Failed to submit blob, retrying after delay",
"delay", c.retryDelay,
"attempt", i+1,
"fee", fee,
"gas_limit", gasLimit,
"old_gas_price", gasPrice,
"new_gas_price", newGasPrice,
"error", err)

i++

Expand Down
8 changes: 1 addition & 7 deletions rollup/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (r *Rollup) GetInfo() (*RollupInfo, error) {
return nil, fmt.Errorf("failed to get rollup head: %w", err)
}

r.Opts.Logger.Info("Latest Rollup Head", "head", latestRollupHead)
r.Opts.Logger.Info("Latest Rollup Head", "head", latestRollupHead.L2Height)

// hash latest rollup head
latestRollupHash, err := r.Ethereum.HashHeader(&latestRollupHead)
Expand Down Expand Up @@ -88,12 +88,6 @@ type RollupBlockInfo struct {
BundleSize uint64 `pretty:"Bundle Size"`
*canonicalStateChainContract.CanonicalStateChainHeader `pretty:"Header"`

DataAvailability struct {
CelestiaHeight uint64 `pretty:"Celestia Height"`
CelestiaShareStart uint64 `pretty:"Shares Start"`
CelestiaShareLen uint64 `pretty:"Shares"`
} `pretty:"Data Availability"`

Distance struct {
FromLatestInEpochs uint64 `pretty:"From Latest Epoch"`
FromLatestInL1height uint64 `pretty:"From Latest L1 Height"`
Expand Down
66 changes: 42 additions & 24 deletions rollup/rollup.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func (r *Rollup) CreateNextBlock() (*Block, error) {
fetchTarget := head.L2Height + blocksToFetch
fetchStart := head.L2Height + 1

bundles, err := r.fetchBundles(fetchStart, fetchTarget)
bundles, pointers, err := r.fetchBundles(fetchStart, fetchTarget)
if err != nil {
return nil, fmt.Errorf("createNextBlock: Failed to fetch bundles: %w", err)
}
Expand All @@ -106,27 +106,16 @@ func (r *Rollup) CreateNextBlock() (*Block, error) {
return nil, fmt.Errorf("createNextBlock: Failed to validate bundles: %w", err)
}

// 7. upload the bundle to celestia
r.Opts.Logger.Info("Publishing bundles to Celestia", "bundles", len(bundles), "bundles_size", fetchStart-head.L2Height-1, "ll_height", llHeight, "ll_epoch", epoch)
pointers := make([]canonicalStateChainContract.CanonicalStateChainCelestiaPointer, 0)
for i, bundle := range bundles {
pointer, gasPrice, err := r.Celestia.PublishBundle(*bundle)
if err != nil {
return nil, fmt.Errorf("createNextBlock: Failed to publish bundle: %w", err)
}
r.Opts.Logger.Debug("Published bundle to Celestia", "gas_price", gasPrice, "bundle", i, "bundle_size", bundle.Size(), "celestia_tx", pointer.TxHash.Hex())
pointers = append(pointers, canonicalStateChainContract.CanonicalStateChainCelestiaPointer{
Height: pointer.Height,
ShareStart: big.NewInt(int64(pointer.ShareStart)),
ShareLen: uint16(pointer.ShareLen),
})
if len(bundles) == 0 {
return nil, fmt.Errorf("createNextBlock: No bundles to publish")
}

// Delay between publishing bundles to Celestia to mitigate 'incorrect account sequence' errors
time.Sleep(20 * time.Second)
if len(pointers) == 0 {
return nil, fmt.Errorf("createNextBlock: No pointers to publish")
}

if len(bundles) == 0 {
return nil, fmt.Errorf("createNextBlock: No bundles to publish")
if len(bundles) != len(pointers) {
return nil, fmt.Errorf("createNextBlock: Bundles and pointers are not the same length")
}

// 8. create the rollup header
Expand Down Expand Up @@ -343,30 +332,59 @@ func (r *Rollup) GetBlockByHash(hash common.Hash) (*Block, error) {
return &Block{CanonicalStateChainHeader: &header, Bundles: bundles}, nil
}

func (r *Rollup) fetchBundles(fetchStart, fetchTarget uint64) ([]*node.Bundle, error) {
func (r *Rollup) fetchBundles(fetchStart, fetchTarget uint64) ([]*node.Bundle, []canonicalStateChainContract.CanonicalStateChainCelestiaPointer, error) {
bundles := make([]*node.Bundle, 0)
pointers := make([]canonicalStateChainContract.CanonicalStateChainCelestiaPointer, 0)

for fetchStart < fetchTarget && uint64(len(bundles)) < r.Opts.BundleCount {
// Calculate the range of blocks to fetch
from := fetchStart
to := fetchStart + r.Opts.BundleSize - 1

// Ensure we don't fetch more than the fetchTarget
if to > fetchTarget {
to = fetchTarget
}

l := r.Opts.Logger.With("bundle_num", len(bundles), "from", from, "to", to)

// Fetch the next bundle from the layer two network
l.Info("Fetching bundle...")
bundle, err := r.fetchBundle(from, to)
if err != nil {
r.Opts.Logger.Error("Failed to fetch bundle", "from", from, "to", to, "error", err)
} else {
bundles = append(bundles, bundle)
l.Error("Failed to fetch bundle, retrying", "error", err)
continue
}
l = l.With("bundle_size", bundle.Size(), "actual_to", bundle.Blocks[len(bundle.Blocks)-1].Number())
l.Info("Bundle fetched successfully!")

// Publish the bundle to the data availability layer (Celestia)
l.Info("Publishing bundle to Celestia...")
pointer, gasPrice, err := r.Celestia.PublishBundle(*bundle)
if err != nil {
l.Error("Failed to publish bundle, will refetch and retry")
continue
}
l.Info("Bundle published successfully!", "gas_price", gasPrice, "celestia_tx", pointer.TxHash.Hex())

// If successfully fetched and published the bundle, add it to the list of bundles and pointers
bundles = append(bundles, bundle)

pointers = append(pointers, canonicalStateChainContract.CanonicalStateChainCelestiaPointer{
Height: pointer.Height,
ShareStart: big.NewInt(int64(pointer.ShareStart)),
ShareLen: uint16(pointer.ShareLen),
})

// Update the fetchStart to the next block to begin fetching from
fetchStart = bundle.Blocks[len(bundle.Blocks)-1].Number().Uint64() + 1
}

return bundles, nil
return bundles, pointers, nil
}

func (r *Rollup) fetchBundle(from, to uint64) (*node.Bundle, error) {

if r.Opts.Store {
bundle, err := r.Node.Store.GetBundle(from, to)
if err == nil {
Expand Down
Loading