Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sweepbatcher: fix some minor bugs #871

Merged
merged 3 commits into from
Jan 17, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions sweepbatcher/sweep_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,6 @@ type batch struct {
// currentHeight is the current block height.
currentHeight int32

// blockEpochChan is the channel over which block epoch notifications
// are received.
blockEpochChan chan int32

// spendChan is the channel over which spend notifications are received.
spendChan chan *chainntnfs.SpendDetail

Expand Down Expand Up @@ -362,7 +358,6 @@ func NewBatch(cfg batchConfig, bk batchKit) *batch {
id: -1,
state: Open,
sweeps: make(map[lntypes.Hash]sweep),
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -407,7 +402,6 @@ func NewBatchFromDB(cfg batchConfig, bk batchKit) (*batch, error) {
state: bk.state,
primarySweepID: bk.primaryID,
sweeps: bk.sweeps,
blockEpochChan: make(chan int32),
spendChan: make(chan *chainntnfs.SpendDetail),
confChan: make(chan *chainntnfs.TxConfirmation, 1),
reorgChan: make(chan struct{}, 1),
Expand Down Expand Up @@ -626,6 +620,16 @@ func (b *batch) Run(ctx context.Context) error {
return err
}

// Set currentHeight here, because it may be needed in monitorSpend.
select {
case b.currentHeight = <-blockChan:
b.log.Debugf("initial height for the batch is %v",
b.currentHeight)

case <-runCtx.Done():
return runCtx.Err()
}

// If a primary sweep exists we immediately start monitoring for its
// spend.
if b.primarySweepID != lntypes.ZeroHash {
Expand All @@ -642,9 +646,9 @@ func (b *batch) Run(ctx context.Context) error {
skipBefore := clock.Now().Add(b.cfg.initialDelay)

// initialDelayChan is a timer which fires upon initial delay end.
// If initialDelay is 0, it does not fire to prevent race with
// blockChan which also fires immediately with current tip. Such a race
// may result in double publishing if batchPublishDelay is also 0.
// If initialDelay is set to 0, it will not trigger to avoid setting up
// timerChan twice, which could lead to double publishing if
// batchPublishDelay is also 0.
var initialDelayChan <-chan time.Time
if b.cfg.initialDelay > 0 {
initialDelayChan = clock.TickAfter(b.cfg.initialDelay)
Expand All @@ -653,9 +657,10 @@ func (b *batch) Run(ctx context.Context) error {
// We use a timer in order to not publish new transactions at the same
// time as the block epoch notification. This is done to prevent
// unnecessary transaction publishments when a spend is detected on that
// block. This timer starts after new block arrives or initialDelay
// block. This timer starts after new block arrives (including the
// current tip which we read from blockChan above) or when initialDelay
// completes.
var timerChan <-chan time.Time
timerChan := clock.TickAfter(b.cfg.batchPublishDelay)

b.log.Infof("started, primary %x, total sweeps %v",
b.primarySweepID[0:6], len(b.sweeps))
Expand Down Expand Up @@ -1872,7 +1877,10 @@ func (b *batch) monitorConfirmations(ctx context.Context) error {
func getFeePortionForSweep(spendTx *wire.MsgTx, numSweeps int,
totalSweptAmt btcutil.Amount) (btcutil.Amount, btcutil.Amount) {

totalFee := int64(totalSweptAmt) - spendTx.TxOut[0].Value
totalFee := int64(totalSweptAmt)
if len(spendTx.TxOut) > 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is no TxOut then totalSweptAmt would be considered as totalFee which doesn't seem right. Should we handle an error in an else clause?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no outputs, it means the entire amount was used for miner fees, so the calculation is correct.

That said, this scenario is undesirable, and the user should be warned if it occurs. The function is invoked from handleSpend, which already logs a warning in such cases.

For this situation to arise, the sweeping side would need to be offline (not sweeping) for the entire period, and the other side would have to intentionally choose to burn the coin instead of collecting it. While this is highly unlikely, it is technically possible.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification.

totalFee -= spendTx.TxOut[0].Value
}
feePortionPerSweep := totalFee / int64(numSweeps)
roundingDiff := totalFee - (int64(numSweeps) * feePortionPerSweep)

Expand Down Expand Up @@ -1900,7 +1908,11 @@ func (b *batch) handleSpend(ctx context.Context, spendTx *wire.MsgTx) error {
notifyList = make([]sweep, 0, len(b.sweeps))
)
b.batchTxid = &txHash
b.batchPkScript = spendTx.TxOut[0].PkScript
if len(spendTx.TxOut) > 0 {
b.batchPkScript = spendTx.TxOut[0].PkScript
} else {
b.log.Warnf("transaction %v has no outputs", txHash)
bhandras marked this conversation as resolved.
Show resolved Hide resolved
}

// As a previous version of the batch transaction may get confirmed,
// which does not contain the latest sweeps, we need to detect the
Expand Down
Loading