Skip to content

Commit

Permalink
Add swaps db and update swap actors to use it
Browse files Browse the repository at this point in the history
  • Loading branch information
remyers committed Sep 30, 2022
1 parent 00032d6 commit 1270c07
Show file tree
Hide file tree
Showing 20 changed files with 583 additions and 93 deletions.
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ class Setup(val datadir: File,
txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient)
channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory)
paymentInitiator = system.actorOf(SimpleSupervisor.props(PaymentInitiator.props(nodeParams, PaymentInitiator.SimplePaymentFactory(nodeParams, router, register)), "payment-initiator", SupervisorStrategy.Restart))
swapRegister = system.spawn(Behaviors.supervise(SwapRegister(nodeParams, paymentInitiator, watcher, register, bitcoinClient)).onFailure(typed.SupervisorStrategy.resume), "swap-register")
swapRegister = system.spawn(Behaviors.supervise(SwapRegister(nodeParams, paymentInitiator, watcher, register, bitcoinClient, nodeParams.db.swaps.restore().toSet)).onFailure(typed.SupervisorStrategy.resume), "swap-register")
peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, swapRegister)

switchboard = system.actorOf(SimpleSupervisor.props(Switchboard.props(nodeParams, peerFactory), "switchboard", SupervisorStrategy.Resume))
Expand Down
5 changes: 5 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/Databases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ trait Databases {
def peers: PeersDb
def payments: PaymentsDb
def pendingCommands: PendingCommandsDb
def swaps: SwapsDb
//@formatter:on
}

Expand All @@ -65,6 +66,7 @@ object Databases extends Logging {
peers: SqlitePeersDb,
payments: SqlitePaymentsDb,
pendingCommands: SqlitePendingCommandsDb,
swaps: SqliteSwapsDb,
private val backupConnection: Connection) extends Databases with FileBackup {
override def backup(backupFile: File): Unit = SqliteUtils.using(backupConnection.createStatement()) {
statement => {
Expand All @@ -83,6 +85,7 @@ object Databases extends Logging {
peers = new SqlitePeersDb(eclairJdbc),
payments = new SqlitePaymentsDb(eclairJdbc),
pendingCommands = new SqlitePendingCommandsDb(eclairJdbc),
swaps = new SqliteSwapsDb(eclairJdbc),
backupConnection = eclairJdbc
)
}
Expand All @@ -95,6 +98,7 @@ object Databases extends Logging {
payments: PgPaymentsDb,
pendingCommands: PgPendingCommandsDb,
dataSource: HikariDataSource,
swaps: PgSwapsDb,
lock: PgLock) extends Databases with ExclusiveLock {
override def obtainExclusiveLock(): Unit = lock.obtainExclusiveLock(dataSource)
}
Expand Down Expand Up @@ -154,6 +158,7 @@ object Databases extends Logging {
peers = new PgPeersDb,
payments = new PgPaymentsDb,
pendingCommands = new PgPendingCommandsDb,
swaps = new PgSwapsDb,
dataSource = ds,
lock = lock)

Expand Down
34 changes: 34 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import fr.acinq.eclair.db.DualDatabases.runAsync
import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.swap.SwapData
import fr.acinq.eclair.swap.SwapEvents.SwapEvent
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAddress, NodeAnnouncement}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, RealShortChannelId, ShortChannelId, TimestampMilli}
import grizzled.slf4j.Logging
Expand Down Expand Up @@ -39,6 +41,8 @@ case class DualDatabases(primary: Databases, secondary: Databases) extends Datab

override val pendingCommands: PendingCommandsDb = DualPendingCommandsDb(primary.pendingCommands, secondary.pendingCommands)

override val swaps: SwapsDb = DualSwapsDb(primary.swaps, secondary.swaps)

/** if one of the database supports file backup, we use it */
override def backup(backupFile: File): Unit = (primary, secondary) match {
case (f: FileBackup, _) => f.backup(backupFile)
Expand Down Expand Up @@ -388,3 +392,33 @@ case class DualPendingCommandsDb(primary: PendingCommandsDb, secondary: PendingC
primary.listSettlementCommands()
}
}

case class DualSwapsDb(primary: SwapsDb, secondary: SwapsDb) extends SwapsDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-pending-commands").build()))

override def add(swapData: SwapData): Unit = {
runAsync(secondary.add(swapData))
primary.add(swapData)
}

override def addResult(swapEvent: SwapEvent): Unit = {
runAsync(secondary.addResult(swapEvent))
primary.addResult(swapEvent)
}

override def remove(swapId: String): Unit = {
runAsync(secondary.remove(swapId))
primary.remove(swapId)
}

override def restore(): Seq[SwapData] = {
runAsync(secondary.restore())
primary.restore()
}

override def list(): Seq[SwapData] = {
runAsync(secondary.list())
primary.list()
}
}
81 changes: 81 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/SwapsDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db

import fr.acinq.eclair.payment.Bolt11Invoice
import fr.acinq.eclair.swap.SwapEvents.SwapEvent
import fr.acinq.eclair.swap.SwapRole.Maker
import fr.acinq.eclair.swap.{SwapData, SwapRole}
import fr.acinq.eclair.wire.protocol._
import org.json4s.jackson.JsonMethods.{compact, parse, render}
import org.json4s.jackson.Serialization

import java.sql.{PreparedStatement, ResultSet}

trait SwapsDb {

def add(swapData: SwapData): Unit

def addResult(swapEvent: SwapEvent): Unit

def remove(swapId: String): Unit

def restore(): Seq[SwapData]

def list(): Seq[SwapData]

}

object SwapsDb {
import fr.acinq.eclair.json.JsonSerializers.formats

def setSwapData(statement: PreparedStatement, swapData: SwapData): Unit = {
statement.setString(1, swapData.request.swapId)
statement.setString(2, Serialization.write(swapData.request))
statement.setString(3, Serialization.write(swapData.agreement))
statement.setString(4, swapData.invoice.toString)
statement.setString(5, Serialization.write(swapData.openingTxBroadcasted))
statement.setInt(6, swapData.swapRole.id)
statement.setBoolean(7, swapData.isInitiator)
statement.setString(8, "")
}

def getSwapData(rs: ResultSet): SwapData = {
val isInitiator = rs.getBoolean("is_initiator")
val isMaker = SwapRole(rs.getInt("swap_role")) == Maker
val request_json = rs.getString("request")
val agreement_json = rs.getString("agreement")
val openingTxBroadcasted_json = rs.getString("opening_tx_broadcasted")
val (request, agreement) = (isInitiator, isMaker) match {
case (true, true) => (Serialization.read[SwapInRequest](compact(render(parse(request_json).camelizeKeys))),
Serialization.read[SwapInAgreement](compact(render(parse(agreement_json).camelizeKeys))))
case (false, false) => (Serialization.read[SwapInRequest](compact(render(parse(request_json).camelizeKeys))),
Serialization.read[SwapInAgreement](compact(render(parse(agreement_json).camelizeKeys))))
case (true, false) => (Serialization.read[SwapOutRequest](compact(render(parse(request_json).camelizeKeys))),
Serialization.read[SwapOutAgreement](compact(render(parse(agreement_json).camelizeKeys))))
case (false, true) => (Serialization.read[SwapOutRequest](compact(render(parse(request_json).camelizeKeys))),
Serialization.read[SwapOutAgreement](compact(render(parse(agreement_json).camelizeKeys))))
}
SwapData(
request,
agreement,
Bolt11Invoice.fromString(rs.getString("invoice")).get,
Serialization.read[OpeningTxBroadcasted](compact(render(parse(openingTxBroadcasted_json).camelizeKeys))),
SwapRole(rs.getInt("swap_role")),
rs.getBoolean("is_initiator"))
}
}
100 changes: 100 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgSwapsDb.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db.pg

import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.SwapsDb
import fr.acinq.eclair.db.SwapsDb.{getSwapData, setSwapData}
import fr.acinq.eclair.db.pg.PgUtils.PgLock.NoLock.withLock
import fr.acinq.eclair.swap.SwapData
import fr.acinq.eclair.swap.SwapEvents.SwapEvent
import grizzled.slf4j.Logging

import javax.sql.DataSource

object PgSwapsDb {
val DB_NAME = "swaps"
val CURRENT_VERSION = 1
}

class PgSwapsDb(implicit ds: DataSource) extends SwapsDb with Logging {

import PgUtils._
import ExtendedResultSet._
import PgSwapsDb._

inTransaction { pg =>
using(pg.createStatement(), inTransaction = true) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE swaps (swap_id TEXT NOT NULL PRIMARY KEY, request TEXT NOT NULL, agreement TEXT NOT NULL, invoice TEXT NOT NULL, opening_tx_broadcasted TEXT NOT NULL, swap_role BIGINT NOT NULL, is_initiator BOOLEAN NOT NULL, result TEXT NOT NULL)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}
}

override def add(swapData: SwapData): Unit = withMetrics("swaps/add", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement(
"""INSERT INTO swaps (swap_id, request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result)
VALUES (?, ?::JSON, ?::JSON, ?, ?::JSON, ?, ?, ?) ON CONFLICT (swap_id) DO NOTHING""")) { statement =>
setSwapData(statement, swapData)
statement.executeUpdate()
}
}
}

override def addResult(swapEvent: SwapEvent): Unit = withMetrics("swaps/add_result", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("UPDATE swaps SET result=? WHERE swap_id=?")) { statement =>
statement.setString(1, swapEvent.toString)
statement.setString(2, swapEvent.swapId)
statement.executeUpdate()
}
}
}

override def remove(swapId: String): Unit = withMetrics("swaps/remove", DbBackends.Postgres) {
withLock { pg =>
using(pg.prepareStatement("DELETE FROM swaps WHERE swap_id=?")) { statement =>
statement.setString(1, swapId)
statement.executeUpdate()
}
}
}

override def restore(): Seq[SwapData] = withMetrics("swaps/restore", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT swap_id, request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result FROM swaps WHERE result=?")) { statement =>
statement.setString(1, "")
statement.executeQuery().map(rs => getSwapData(rs)).toSeq
}
}
}

override def list(): Seq[SwapData] = withMetrics("swaps/list", DbBackends.Postgres) {
inTransaction { pg =>
using(pg.prepareStatement("SELECT request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result FROM swaps")) { statement =>
statement.executeQuery().map(rs => getSwapData(rs)).toSeq
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright 2022 ACINQ SAS
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package fr.acinq.eclair.db.sqlite

import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.SwapsDb
import fr.acinq.eclair.db.SwapsDb.{getSwapData, setSwapData}
import fr.acinq.eclair.swap.SwapData
import fr.acinq.eclair.swap.SwapEvents.SwapEvent
import grizzled.slf4j.Logging

import java.sql.Connection

object SqliteSwapsDb {
val DB_NAME = "swaps"
val CURRENT_VERSION = 1
}

class SqliteSwapsDb (val sqlite: Connection) extends SwapsDb with Logging {

import SqliteUtils._
import ExtendedResultSet._
import SqliteSwapsDb._

using(sqlite.createStatement(), inTransaction = true) { statement =>
getVersion(statement, DB_NAME) match {
case None =>
statement.executeUpdate("CREATE TABLE swaps (swap_id STRING NOT NULL PRIMARY KEY, request STRING NOT NULL, agreement STRING NOT NULL, invoice STRING NOT NULL, opening_tx_broadcasted STRING NOT NULL, swap_role INTEGER NOT NULL, is_initiator BOOLEAN NOT NULL, result STRING NOT NULL)")
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
}
setVersion(statement, DB_NAME, CURRENT_VERSION)
}

override def add(swapData: SwapData): Unit = withMetrics("swaps/add", DbBackends.Sqlite) {
using(sqlite.prepareStatement(
"""INSERT INTO swaps (swap_id, request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result)
VALUES (?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT (swap_id) DO NOTHING""")) { statement =>
setSwapData(statement, swapData)
statement.executeUpdate()
}
}

override def addResult(swapEvent: SwapEvent): Unit = withMetrics("swaps/add_result", DbBackends.Sqlite) {
using(sqlite.prepareStatement("UPDATE swaps SET result=? WHERE swap_id=?")) { statement =>
statement.setString(1, swapEvent.toString)
statement.setString(2, swapEvent.swapId)
statement.executeUpdate()
}
}

override def remove(swapId: String): Unit = withMetrics("swaps/remove", DbBackends.Sqlite) {
using(sqlite.prepareStatement("DELETE FROM swaps WHERE swap_id=?")) { statement =>
statement.setString(1, swapId)
statement.executeUpdate()
}
}

override def restore(): Seq[SwapData] = withMetrics("swaps/restore", DbBackends.Sqlite) {
using(sqlite.prepareStatement("SELECT swap_id, request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result FROM swaps WHERE result=?")) { statement =>
statement.setString(1, "")
statement.executeQuery().map(rs => getSwapData(rs)).toSeq
}
}

override def list(): Seq[SwapData] = withMetrics("swaps/list", DbBackends.Sqlite) {
using(sqlite.prepareStatement("SELECT swap_id, request, agreement, invoice, opening_tx_broadcasted, swap_role, is_initiator, result FROM swaps")) { statement =>
statement.executeQuery().map(rs => getSwapData(rs)).toSeq
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ object SwapCommands {
// @formatter:off
case class StartSwapInSender(amount: Satoshi, swapId: String, shortChannelId: ShortChannelId) extends SwapCommand
case class StartSwapOutReceiver(request: SwapOutRequest) extends SwapCommand
case class RestoreSwapMaker(swapData: SwapData) extends SwapCommand
case class RestoreSwap(swapData: SwapData) extends SwapCommand
case object AbortSwap extends SwapCommand

sealed trait CreateSwapMessages extends SwapCommand
Expand Down Expand Up @@ -77,7 +77,6 @@ object SwapCommands {
// @formatter:off
case class StartSwapInReceiver(request: SwapInRequest) extends SwapCommand
case class StartSwapOutSender(amount: Satoshi, swapId: String, shortChannelId: ShortChannelId) extends SwapCommand
case class RestoreSwapTaker(swapData: SwapData) extends SwapCommand

sealed trait SendAgreementMessages extends SwapCommand
sealed trait AwaitFeePaymentMessages extends SwapCommand
Expand Down
Loading

0 comments on commit 1270c07

Please sign in to comment.