Skip to content

Commit

Permalink
Change to safe resume supervision strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
remyers committed Dec 7, 2022
1 parent cc1e298 commit d272226
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -210,35 +210,39 @@ private class SwapMaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,
}
}

def createOpeningTx(request: SwapRequest, agreement: SwapAgreement, isInitiator: Boolean): Behavior[SwapCommand] = {
val receivePayment = ReceiveStandardPayment(Some(toMilliSatoshi(Satoshi(request.amount))), Left("send-swap-in"))
val createInvoice = context.spawnAnonymous(CreateInvoiceActor(nodeParams))
createInvoice ! CreateInvoiceActor.CreateInvoice(context.messageAdapter[Bolt11Invoice](InvoiceResponse).toClassic, receivePayment)

receiveSwapMessage[CreateOpeningTxMessages](context, "createOpeningTx") {
case InvoiceResponse(invoice: Bolt11Invoice) => fundOpening(wallet, feeRatePerKw)((request.amount + agreement.premium).sat, makerPubkey(request.swapId), takerPubkey(request, agreement, isInitiator), invoice)
Behaviors.same
case OpeningTxFunded(invoice, fundingResponse) =>
commitOpening(wallet)(request.swapId, invoice, fundingResponse, "swap-in-sender-opening")
Behaviors.same
case OpeningTxCommitted(invoice, openingTxBroadcasted) =>
db.add(SwapData(request, agreement, invoice, openingTxBroadcasted, Maker, isInitiator, remoteNodeId))
awaitClaimPayment(request, agreement, invoice, openingTxBroadcasted, isInitiator)
case OpeningTxFailed(error, None) => swapCanceled(InternalError(request.swapId, s"failed to fund swap open tx, error: $error"))
case OpeningTxFailed(error, Some(r)) => rollback(wallet)(error, r.fundingTx)
Behaviors.same
case RollbackSuccess(error, value) => swapCanceled(InternalError(request.swapId, s"rollback: Success($value), error: $error"))
case RollbackFailure(error, t) => swapCanceled(InternalError(request.swapId, s"rollback exception: $t, error: $error"))
case SwapMessageReceived(_) => Behaviors.same // ignore
case StateTimeout =>
// TODO: are we sure the opening transaction has not yet been committed? should we rollback locked funding outputs?
swapCanceled(InternalError(request.swapId, "timeout during CreateOpeningTx"))
case CancelRequested(replyTo) => replyTo ! SwapError(request.swapId, "Can not cancel swap after opening tx committed.")
Behaviors.same // ignore
case GetStatus(replyTo) => replyTo ! SwapStatus(request.swapId, context.self.toString, "createOpeningTx", request, Some(agreement))
Behaviors.same
def createOpeningTx(request: SwapRequest, agreement: SwapAgreement, isInitiator: Boolean): Behavior[SwapCommand] =
db.find(request.swapId) match {
case Some(s: SwapData) =>
awaitClaimPayment(request, agreement, s.invoice, s.openingTxBroadcasted, isInitiator)
case None =>
val receivePayment = ReceiveStandardPayment(Some(toMilliSatoshi(Satoshi(request.amount))), Left("send-swap-in"))
val createInvoice = context.spawnAnonymous(CreateInvoiceActor(nodeParams))
createInvoice ! CreateInvoiceActor.CreateInvoice(context.messageAdapter[Bolt11Invoice](InvoiceResponse).toClassic, receivePayment)

receiveSwapMessage[CreateOpeningTxMessages](context, "createOpeningTx") {
case InvoiceResponse(invoice: Bolt11Invoice) => fundOpening(wallet, feeRatePerKw)((request.amount + agreement.premium).sat, makerPubkey(request.swapId), takerPubkey(request, agreement, isInitiator), invoice)
Behaviors.same
case OpeningTxFunded(invoice, fundingResponse) =>
commitOpening(wallet)(request.swapId, invoice, fundingResponse, "swap-in-sender-opening")
Behaviors.same
case OpeningTxCommitted(invoice, openingTxBroadcasted) =>
db.add(SwapData(request, agreement, invoice, openingTxBroadcasted, Maker, isInitiator, remoteNodeId))
awaitClaimPayment(request, agreement, invoice, openingTxBroadcasted, isInitiator)
case OpeningTxFailed(error, None) => swapCanceled(InternalError(request.swapId, s"failed to fund swap open tx, error: $error"))
case OpeningTxFailed(error, Some(r)) => rollback(wallet)(error, r.fundingTx)
Behaviors.same
case RollbackSuccess(error, value) => swapCanceled(InternalError(request.swapId, s"rollback: Success($value), error: $error"))
case RollbackFailure(error, t) => swapCanceled(InternalError(request.swapId, s"rollback exception: $t, error: $error"))
case SwapMessageReceived(_) => Behaviors.same // ignore
case StateTimeout =>
// TODO: are we sure the opening transaction has not yet been committed? should we rollback locked funding outputs?
swapCanceled(InternalError(request.swapId, "timeout during CreateOpeningTx"))
case CancelRequested(replyTo) => replyTo ! SwapError(request.swapId, "Can not cancel swap after opening tx committed.")
Behaviors.same // ignore
case GetStatus(replyTo) => replyTo ! SwapStatus(request.swapId, context.self.toString, "createOpeningTx", request, Some(agreement))
Behaviors.same
}
}
}

def awaitClaimPayment(request: SwapRequest, agreement: SwapAgreement, invoice: Bolt11Invoice, openingTxBroadcasted: OpeningTxBroadcasted, isInitiator: Boolean): Behavior[SwapCommand] =
nodeParams.db.payments.getIncomingPayment(invoice.paymentHash) match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,10 @@ private class SwapRegister(context: ActorContext[Command], nodeParams: NodeParam

private def spawnSwap(swapRole: SwapRole, remoteNodeId: PublicKey, scid: String) = {
swapRole match {
case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(remoteNodeId, nodeParams, watcher, switchboard, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapMaker-" + scid)
case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(remoteNodeId, nodeParams, paymentInitiator, watcher, switchboard, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.stop), "SwapTaker-" + scid)
// swap maker is safe to resume because an opening transaction will only be funded once
case SwapRole.Maker => context.spawn(Behaviors.supervise(SwapMaker(remoteNodeId, nodeParams, watcher, switchboard, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.resume), "SwapMaker-" + scid)
// swap taker is safe to resume because a payment will only be sent once
case SwapRole.Taker => context.spawn(Behaviors.supervise(SwapTaker(remoteNodeId, nodeParams, paymentInitiator, watcher, switchboard, wallet, keyManager, db)).onFailure(typed.SupervisorStrategy.resume), "SwapTaker-" + scid)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,26 +260,29 @@ private class SwapTaker(remoteNodeId: PublicKey, shortChannelId: ShortChannelId,
}
}

def validateOpeningTx(request: SwapRequest, agreement: SwapAgreement, openingTxBroadcasted: OpeningTxBroadcasted, openingTx: Transaction, isInitiator: Boolean): Behavior[SwapCommand] = {
Bolt11Invoice.fromString(openingTxBroadcasted.payreq) match {
case Failure(e) => sendCoopClose(request, s"Could not parse payreq: $e")
case Success(invoice) if invoice.amount_opt.isDefined && invoice.amount_opt.get > request.amount.sat.toMilliSatoshi =>
sendCoopClose(request, s"Invoice amount ${invoice.amount_opt.get} > requested on-chain amount ${request.amount.sat.toMilliSatoshi}")
case Success(invoice) if invoice.routingInfo.flatten.exists(hop => hop.shortChannelId != shortChannelId) =>
sendCoopClose(request, s"Channel hop other than $shortChannelId found in invoice hints ${invoice.routingInfo}")
case Success(invoice) if invoice.isExpired() =>
sendCoopClose(request, s"Invoice is expired.")
case Success(invoice) if invoice.minFinalCltvExpiryDelta >= CltvExpiryDelta(claimByCsvDelta.toInt / 2) =>
sendCoopClose(request, s"Invoice min-final-cltv-expiry delta too long.")
case Success(invoice) if validOpeningTx(openingTx, openingTxBroadcasted.scriptOut, (request.amount + agreement.premium).sat, makerPubkey(request, agreement, isInitiator), takerPubkey(request.swapId), invoice.paymentHash) =>
// save restore point before a payment is initiated
db.add(SwapData(request, agreement, invoice, openingTxBroadcasted, Taker, isInitiator, remoteNodeId))
payInvoice(nodeParams)(paymentInitiator, request.swapId, invoice)
payClaimInvoice(request, agreement, openingTxBroadcasted, invoice, isInitiator)
case Success(_) =>
sendCoopClose(request, s"Invalid opening tx: $openingTx")
def validateOpeningTx(request: SwapRequest, agreement: SwapAgreement, openingTxBroadcasted: OpeningTxBroadcasted, openingTx: Transaction, isInitiator: Boolean): Behavior[SwapCommand] =
db.find(request.swapId) match {
case Some(s: SwapData) => payClaimInvoice(request, agreement, openingTxBroadcasted, s.invoice, isInitiator)
case None =>
Bolt11Invoice.fromString(openingTxBroadcasted.payreq) match {
case Failure(e) => sendCoopClose(request, s"Could not parse payreq: $e")
case Success(invoice) if invoice.amount_opt.isDefined && invoice.amount_opt.get > request.amount.sat.toMilliSatoshi =>
sendCoopClose(request, s"Invoice amount ${invoice.amount_opt.get} > requested on-chain amount ${request.amount.sat.toMilliSatoshi}")
case Success(invoice) if invoice.routingInfo.flatten.exists(hop => hop.shortChannelId != shortChannelId) =>
sendCoopClose(request, s"Channel hop other than $shortChannelId found in invoice hints ${invoice.routingInfo}")
case Success(invoice) if invoice.isExpired() =>
sendCoopClose(request, s"Invoice is expired.")
case Success(invoice) if invoice.minFinalCltvExpiryDelta >= CltvExpiryDelta(claimByCsvDelta.toInt / 2) =>
sendCoopClose(request, s"Invoice min-final-cltv-expiry delta too long.")
case Success(invoice) if validOpeningTx(openingTx, openingTxBroadcasted.scriptOut, (request.amount + agreement.premium).sat, makerPubkey(request, agreement, isInitiator), takerPubkey(request.swapId), invoice.paymentHash) =>
// save restore point before a payment is initiated
db.add(SwapData(request, agreement, invoice, openingTxBroadcasted, Taker, isInitiator, remoteNodeId))
payInvoice(nodeParams)(paymentInitiator, request.swapId, invoice)
payClaimInvoice(request, agreement, openingTxBroadcasted, invoice, isInitiator)
case Success(_) =>
sendCoopClose(request, s"Invalid opening tx: $openingTx")
}
}
}

def payClaimInvoice(request: SwapRequest, agreement: SwapAgreement, openingTxBroadcasted: OpeningTxBroadcasted, invoice: Bolt11Invoice, isInitiator: Boolean): Behavior[SwapCommand] = {
watchForPayment(watch = true) // subscribe to payment event notifications
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ case class SwapInReceiverSpec() extends ScalaTestWithActorTestKit(ConfigFactory.

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Coop close offered to peer: Lightning payment not sent."))
db.remove(swapId)
}

test("send cooperative close after a restore with the payment already marked as failed") { f =>
Expand All @@ -153,6 +154,7 @@ case class SwapInReceiverSpec() extends ScalaTestWithActorTestKit(ConfigFactory.

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Coop close offered to peer: Lightning payment failed"))
db.remove(swapId)
}

test("claim by invoice after a restore with the payment already marked as paid") { f =>
Expand Down Expand Up @@ -189,6 +191,7 @@ case class SwapInReceiverSpec() extends ScalaTestWithActorTestKit(ConfigFactory.

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Claimed by paid invoice:"))
db.remove(swapId)
}

test("claim by invoice after a restore with the payment marked as pending and later paid") { f =>
Expand Down Expand Up @@ -233,6 +236,7 @@ case class SwapInReceiverSpec() extends ScalaTestWithActorTestKit(ConfigFactory.

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Claimed by paid invoice:"))
db.remove(swapId)
}

test("happy path for new swap in") { f =>
Expand Down Expand Up @@ -289,6 +293,7 @@ case class SwapInReceiverSpec() extends ScalaTestWithActorTestKit(ConfigFactory.

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Claimed by paid invoice:"))
db.remove(swapId)
}

test("invalid invoice, min_final-cltv-expiry of invoice greater than the claim-by-csv delta") { f =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ case class SwapInSenderSpec() extends ScalaTestWithActorTestKit(ConfigFactory.lo

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Invoice payment received"))
db.remove(swapId)
}

test("happy path for new swap") { f =>
Expand Down Expand Up @@ -177,6 +178,7 @@ case class SwapInSenderSpec() extends ScalaTestWithActorTestKit(ConfigFactory.lo

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Invoice payment received"))
db.remove(swapId)
}

test("claim refund by coop close path from restored swap") { f =>
Expand Down Expand Up @@ -217,6 +219,7 @@ case class SwapInSenderSpec() extends ScalaTestWithActorTestKit(ConfigFactory.lo

// the swap result has been recorded in the db
assert(db.list().head.result.contains("Claimed by coop"))
db.remove(swapId)
}

test("claim refund by csv path from restored swap") { f =>
Expand Down

0 comments on commit d272226

Please sign in to comment.