Skip to content

Commit

Permalink
feat: batching (#343)
Browse files Browse the repository at this point in the history
* batching init

* refactor config

* fix commit aggregation

* unto test changes

* fix poller loop

* snap batching
  • Loading branch information
LexLuthr authored Jan 6, 2025
1 parent 65eae8d commit d0e5f5e
Show file tree
Hide file tree
Showing 16 changed files with 1,506 additions and 498 deletions.
15 changes: 10 additions & 5 deletions cmd/curio/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
si := dependencies.Si
bstore := dependencies.Bstore
machine := dependencies.ListenAddr
prover := dependencies.Prover
var activeTasks []harmonytask.TaskInterface

sender, sendTask := message.NewSender(full, full, db)
Expand Down Expand Up @@ -195,7 +196,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
cfg.Subsystems.EnableUpdateSubmit

if hasAnySealingTask {
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine)
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine, prover)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -243,14 +244,18 @@ func addSealingTasks(
ctx context.Context, hasAnySealingTask bool, db *harmonydb.DB, full api.Chain, sender *message.Sender,
as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig, slrLazy *lazy.Lazy[*ffi.SealCalls],
asyncParams func() func() (bool, error), si paths.SectorIndex, stor *paths.Remote,
bstore curiochain.CurioBlockstore, machineHostPort string) ([]harmonytask.TaskInterface, error) {
bstore curiochain.CurioBlockstore, machineHostPort string, prover storiface.Prover) ([]harmonytask.TaskInterface, error) {
var activeTasks []harmonytask.TaskInterface
// Sealing / Snap

var sp *seal.SealPoller
var slr *ffi.SealCalls
var err error
if hasAnySealingTask {
sp = seal.NewPoller(db, full)
sp, err = seal.NewPoller(db, full, cfg)
if err != nil {
return nil, xerrors.Errorf("creating seal poller: %w", err)
}
go sp.RunPoller(ctx)

slr = must.One(slrLazy.Val())
Expand Down Expand Up @@ -303,7 +308,7 @@ func addSealingTasks(
}

if cfg.Subsystems.EnableSendPrecommitMsg {
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee, cfg.Fees.CollateralFromMinerBalance, cfg.Fees.DisableCollateralFallback)
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg)
activeTasks = append(activeTasks, precommitTask)
}
if cfg.Subsystems.EnablePoRepProof {
Expand All @@ -321,7 +326,7 @@ func addSealingTasks(
}
}
if cfg.Subsystems.EnableSendCommitMsg {
commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg)
commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg, prover)
activeTasks = append(activeTasks, commitTask)
}

Expand Down
92 changes: 92 additions & 0 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 74 additions & 0 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package config
import (
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/chain/types"
)

Expand All @@ -27,6 +30,10 @@ func DefaultCurioConfig() *CurioConfig {
Base: types.MustParseFIL("0"),
PerSector: types.MustParseFIL("0.03"), // enough for 6 agg and 1nFIL base fee
},
MaxUpdateBatchGasFee: BatchFeeConfig{
Base: types.MustParseFIL("0"),
PerSector: types.MustParseFIL("0.03"),
},

MaxTerminateGasFee: types.MustParseFIL("0.5"),
MaxWindowPoStGasFee: types.MustParseFIL("5"),
Expand Down Expand Up @@ -70,6 +77,23 @@ func DefaultCurioConfig() *CurioConfig {
AlertManagerURL: "http://localhost:9093/api/v2/alerts",
},
},
Batching: CurioBatchingConfig{
PreCommit: PreCommitBatchingConfig{
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(4 * time.Hour),
Slack: Duration(6 * time.Hour),
},
Commit: CommitBatchingConfig{
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(1 * time.Hour),
Slack: Duration(1 * time.Hour),
},
Update: UpdateBatchingConfig{
BaseFeeThreshold: types.MustParseFIL("0.005"),
Timeout: Duration(1 * time.Hour),
Slack: Duration(1 * time.Hour),
},
},
}
}

Expand All @@ -85,6 +109,7 @@ type CurioConfig struct {
Seal CurioSealConfig
Apis ApisConfig
Alerting CurioAlertingConfig
Batching CurioBatchingConfig
}

func DefaultDefaultMaxFee() types.FIL {
Expand All @@ -96,6 +121,10 @@ type BatchFeeConfig struct {
PerSector types.FIL
}

func (b *BatchFeeConfig) FeeForSectors(nSectors int) abi.TokenAmount {
return big.Add(big.Int(b.Base), big.Mul(big.NewInt(int64(nSectors)), big.Int(b.PerSector)))
}

type CurioSubsystemsConfig struct {
// EnableWindowPost enables window post to be executed on this curio instance. Each machine in the cluster
// with WindowPoSt enabled will also participate in the window post scheduler. It is possible to have multiple
Expand Down Expand Up @@ -288,6 +317,7 @@ type CurioFees struct {
// maxBatchFee = maxBase + maxPerSector * nSectors
MaxPreCommitBatchGasFee BatchFeeConfig
MaxCommitBatchGasFee BatchFeeConfig
MaxUpdateBatchGasFee BatchFeeConfig

MaxTerminateGasFee types.FIL
// WindowPoSt is a high-value operation, so the default fee should be high.
Expand Down Expand Up @@ -500,3 +530,47 @@ type ApisConfig struct {
// Chain API auth secret for the Curio nodes to use.
StorageRPCSecret string
}

type CurioBatchingConfig struct {
// Precommit Batching configuration
PreCommit PreCommitBatchingConfig

// Commit batching configuration
Commit CommitBatchingConfig

// Snap Deals batching configuration
Update UpdateBatchingConfig
}

type PreCommitBatchingConfig struct {
// Base fee value below which we should try to send Precommit messages immediately
BaseFeeThreshold types.FIL

// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deal in batch would start expiring
Slack Duration
}

type CommitBatchingConfig struct {
// Base fee value below which we should try to send Commit messages immediately
BaseFeeThreshold types.FIL

// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deals in batch would start expiring
Slack Duration
}

type UpdateBatchingConfig struct {
// Base fee value below which we should try to send Commit messages immediately
BaseFeeThreshold types.FIL

// Maximum amount of time any given sector in the batch can wait for the batch to accumulate
Timeout Duration

// Time buffer for forceful batch submission before sectors/deals in batch would start expiring
Slack Duration
}
5 changes: 5 additions & 0 deletions deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ type Deps struct {
ListenAddr string
Name string
Alert *alertmanager.AlertNow
Prover storiface.Prover
}

const (
Expand Down Expand Up @@ -348,6 +349,10 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`,
deps.Name = cctx.String("name")
}

if deps.Prover == nil {
deps.Prover = ffiwrapper.ProofProver
}

return nil
}

Expand Down
57 changes: 57 additions & 0 deletions documentation/en/configuration/default-curio-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,13 @@ description: The default curio configuration
# type: types.FIL
#PerSector = "0.03 FIL"

[Fees.MaxUpdateBatchGasFee]
# type: types.FIL
#Base = "0 FIL"

# type: types.FIL
#PerSector = "0.03 FIL"


[[Addresses]]
#PreCommitControl = []
Expand Down Expand Up @@ -502,4 +509,54 @@ description: The default curio configuration
# type: string
#WebHookURL = ""


[Batching]
[Batching.PreCommit]
# Base fee value below which we should try to send Precommit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: Duration
#Timeout = "4h0m0s"

# Time buffer for forceful batch submission before sectors/deal in batch would start expiring
#
# type: Duration
#Slack = "6h0m0s"

[Batching.Commit]
# Base fee value below which we should try to send Commit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: Duration
#Timeout = "1h0m0s"

# Time buffer for forceful batch submission before sectors/deals in batch would start expiring
#
# type: Duration
#Slack = "1h0m0s"

[Batching.Update]
# Base fee value below which we should try to send Commit messages immediately
#
# type: types.FIL
#BaseFeeThreshold = "0.005 FIL"

# Maximum amount of time any given sector in the batch can wait for the batch to accumulate
#
# type: Duration
#Timeout = "1h0m0s"

# Time buffer for forceful batch submission before sectors/deals in batch would start expiring
#
# type: Duration
#Slack = "1h0m0s"

```
Loading

0 comments on commit d0e5f5e

Please sign in to comment.