Skip to content

Commit

Permalink
Merge pull request #871 from starius/sweepbatcher-fixes
Browse files Browse the repository at this point in the history
sweepbatcher: fix some minor bugs
  • Loading branch information
starius authored Jan 17, 2025
2 parents 7cfceb7 + 6efd010 commit 188e567
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ type batch struct {
// currentHeight is the current block height.
currentHeight int32

// blockEpochChan is the channel over which block epoch notifications
// are received.
blockEpochChan chan int32

// spendChan is the channel over which spend notifications are received.
spendChan chan *chainntnfs.SpendDetail

Expand Down Expand Up @@ -362,7 +358,6 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -407,7 +402,6 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -626,6 +620,16 @@ func (b *batch) Run(ctx context.Context) error {
return err
}

// Set currentHeight here, because it may be needed in monitorSpend.
select {
case b.currentHeight = <-blockChan:
b.log.Debugf("initial height for the batch is %v",
b.currentHeight)

case <-runCtx.Done():
return runCtx.Err()
}

// If a primary sweep exists we immediately start monitoring for its
// spend.
if b.primarySweepID != lntypes.ZeroHash {
Expand All @@ -642,9 +646,9 @@ func (b *batch) Run(ctx context.Context) error {
skipBefore := clock.Now().Add(b.cfg.initialDelay)

// initialDelayChan is a timer which fires upon initial delay end.
// If initialDelay is 0, it does not fire to prevent race with
// blockChan which also fires immediately with current tip. Such a race
// may result in double publishing if batchPublishDelay is also 0.
// If initialDelay is set to 0, it will not trigger to avoid setting up
// timerChan twice, which could lead to double publishing if
// batchPublishDelay is also 0.
var initialDelayChan <-chan time.Time
if b.cfg.initialDelay > 0 {
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
Expand All @@ -653,9 +657,10 @@ func (b *batch) Run(ctx context.Context) error {
// We use a timer in order to not publish new transactions at the same
// time as the block epoch notification. This is done to prevent
// unnecessary transaction publishments when a spend is detected on that
// block. This timer starts after new block arrives or initialDelay
// block. This timer starts after new block arrives (including the
// current tip which we read from blockChan above) or when initialDelay
// completes.
var timerChan <-chan time.Time
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)

b.log.Infof("started, primary %x, total sweeps %v",
b.primarySweepID[0:6], len(b.sweeps))
Expand Down Expand Up @@ -1872,7 +1877,10 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int,
totalSweptAmt btcutil.Amount) (btcutil.Amount, btcutil.Amount) {

totalFee := int64(totalSweptAmt) - spendTx.TxOut[0].Value
totalFee := int64(totalSweptAmt)
if len(spendTx.TxOut) > 0 {
totalFee -= spendTx.TxOut[0].Value
}
feePortionPerSweep := totalFee / int64(numSweeps)
roundingDiff := totalFee - (int64(numSweeps) * feePortionPerSweep)

Expand Down Expand Up @@ -1900,7 +1908,11 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
notifyList = make([]sweep, 0, len(b.sweeps))
)
b.batchTxid = &txHash
b.batchPkScript = spendTx.TxOut[0].PkScript
if len(spendTx.TxOut) > 0 {
b.batchPkScript = spendTx.TxOut[0].PkScript
} else {
b.log.Warnf("transaction %v has no outputs", txHash)
}

// As a previous version of the batch transaction may get confirmed,
// which does not contain the latest sweeps, we need to detect the
Expand Down

0 comments on commit 188e567

Please sign in to comment.