Skip to content

Commit

Permalink
Update to use a defined API via an actor that monitors for timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
remyers committed Jan 9, 2023
1 parent 1ffd76b commit 7da270d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 50 deletions.
2 changes: 1 addition & 1 deletion channel-interceptor/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<manifestEntries>
<Main-Class>fr.acinq.eclair.plugins.channelinterceptor.ChannelInterceptorPlugin</Main-Class>
<Main-Class>fr.acinq.eclair.plugins.channelinterceptor.OpenChannelInterceptorPlugin</Main-Class>
</manifestEntries>
</transformer>
</transformers>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,9 @@
package fr.acinq.eclair.plugins.channelinterceptor

import akka.actor.typed.Behavior
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import fr.acinq.eclair.channel.ChannelConfig
import fr.acinq.eclair.io.OpenChannelReceived
import fr.acinq.eclair.io.Peer.{OutgoingMessage, SpawnChannelNonInitiator}
import akka.actor.typed.scaladsl.Behaviors
import fr.acinq.eclair.wire.protocol.{Error, OpenDualFundedChannel}
import fr.acinq.eclair.{AcceptOpenChannel, InterceptOpenChannelReceived, RejectOpenChannel}

/**
* Intercept OpenChannel and OpenDualFundedChannel messages received by the node. Respond to the peer
Expand All @@ -33,36 +30,26 @@ import fr.acinq.eclair.wire.protocol.{Error, OpenDualFundedChannel}

object OpenChannelInterceptor {

def apply(): Behavior[Command] = {
def apply(): Behavior[InterceptOpenChannelReceived] = {
Behaviors.setup {
context => new OpenChannelInterceptor(context).start()
_ => new OpenChannelInterceptor().start()
}
}

// @formatter:off
sealed trait Command
// @formatter:on

private case class WrappedOpenChannelReceived(openChannelReceived: OpenChannelReceived) extends Command

}

private class OpenChannelInterceptor(context: ActorContext[OpenChannelInterceptor.Command]) {

import OpenChannelInterceptor._
private class OpenChannelInterceptor() {

def start(): Behavior[Command] = {
context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[OpenChannelReceived](WrappedOpenChannelReceived))
private def start(): Behavior[InterceptOpenChannelReceived] = {

Behaviors.receiveMessage {
case WrappedOpenChannelReceived(wo) =>
wo.peer ! (wo.open match {
o: InterceptOpenChannelReceived =>
o.replyTo ! (o.open match {
case Left(_) =>
// example: accept all single funded open channel requests
SpawnChannelNonInitiator(wo.open, ChannelConfig.standard, wo.channelType, wo.localParams)
AcceptOpenChannel(o.temporaryChannelId, o.localParams, o.fundingAmount_opt)
case Right(o: OpenDualFundedChannel) =>
// example: fail all dual funded open channel requests
OutgoingMessage(Error(o.temporaryChannelId, "dual funded channel request rejected"), wo.connectionInfo.peerConnection)
RejectOpenChannel(o.temporaryChannelId, Error(o.temporaryChannelId, "dual funded channels are not supported"))
})
Behaviors.same
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,33 @@ import akka.actor.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.ClassicActorSystemOps
import akka.actor.typed.{ActorRef, SupervisorStrategy}
import fr.acinq.eclair.InterceptedMessageType.InterceptOpenChannel
import fr.acinq.eclair.{InterceptMessagePlugin, InterceptedMessageType, Kit, NodeParams, Plugin, PluginParams, Setup}
import fr.acinq.eclair.{InterceptOpenChannelPlugin, InterceptOpenChannelReceived, Kit, NodeParams, Plugin, PluginParams, Setup}
import grizzled.slf4j.Logging

/**
* Intercept OpenChannel messages received by the node and respond by continuing the process
* of accepting the request, potentially with different local parameters, or failing the request.
*/

class ChannelInterceptorPlugin extends Plugin with Logging {
var pluginKit: ChannelInterceptorKit = _
class OpenChannelInterceptorPlugin extends Plugin with Logging {
var pluginKit: OpenChannelInterceptorKit = _

override def params: PluginParams = new InterceptMessagePlugin {
override def params: PluginParams = new InterceptOpenChannelPlugin {
// @formatter:off
override def name: String = "ChannelInterceptorPlugin"
override def canIntercept: Set[InterceptedMessageType.Value] = Set(InterceptOpenChannel)
override def name: String = "OpenChannelInterceptorPlugin"
// @formatter:on
override def getOpenChannelInterceptor: ActorRef[InterceptOpenChannelReceived] = {
pluginKit.openChannelInterceptor
}
}

override def onSetup(setup: Setup): Unit = {
}

override def onKit(kit: Kit): Unit = {
val openChannelInterceptor = kit.system.spawn(Behaviors.supervise(OpenChannelInterceptor()).onFailure(SupervisorStrategy.restart), "open-channel-interceptor")
pluginKit = ChannelInterceptorKit(kit.nodeParams, kit.system, openChannelInterceptor)
pluginKit = OpenChannelInterceptorKit(kit.nodeParams, kit.system, openChannelInterceptor)
}
}

case class ChannelInterceptorKit(nodeParams: NodeParams, system: ActorSystem, openChannelInterceptor: ActorRef[OpenChannelInterceptor.Command])
case class OpenChannelInterceptorKit(nodeParams: NodeParams, system: ActorSystem, openChannelInterceptor: ActorRef[InterceptOpenChannelReceived])
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
package fr.acinq.eclair.plugins.channelinterceptor

import akka.actor.testkit.typed.scaladsl.{ScalaTestWithActorTestKit, TestProbe}
import akka.actor.typed.eventstream.EventStream
import akka.actor.typed.scaladsl.adapter.TypedActorRefOps
import com.typesafe.config.ConfigFactory
import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Crypto, DeterministicWallet, SatoshiLong}
import fr.acinq.eclair.TestConstants.{Alice, Bob}
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.{ChannelConfig, ChannelFlags, ChannelTypes, LocalParams}
import fr.acinq.eclair.io.Peer.{OutgoingMessage, SpawnChannelNonInitiator}
import fr.acinq.eclair.io.{ConnectionInfo, OpenChannelReceived}
import fr.acinq.eclair.wire.protocol.{Init, NodeAddress, OpenChannel, OpenDualFundedChannel}
import fr.acinq.eclair.{CltvExpiryDelta, Features, MilliSatoshiLong, UInt64, randomKey}
import fr.acinq.eclair.channel.{ChannelFlags, LocalParams}
import fr.acinq.eclair.wire.protocol.{OpenChannel, OpenDualFundedChannel}
import fr.acinq.eclair.{AcceptOpenChannel, CltvExpiryDelta, Features, InterceptOpenChannelReceived, InterceptOpenChannelResponse, MilliSatoshiLong, RejectOpenChannel, UInt64, randomKey}
import org.scalatest.funsuite.AnyFunSuiteLike
import scodec.bits.ByteVector

Expand All @@ -37,20 +32,21 @@ class OpenChannelInterceptorSpec extends ScalaTestWithActorTestKit(ConfigFactory
val publicKey: Crypto.PublicKey = PrivateKey(ByteVector32.One).publicKey

test("should intercept and respond to OpenChannelReceived events") {
val openChannel = OpenChannel(ByteVector32.Zeroes, ByteVector32.Zeroes, 0 sat, 0 msat, 1 sat, UInt64(1), 1 sat, 1 msat, FeeratePerKw(1 sat), CltvExpiryDelta(1), 1, publicKey, publicKey, publicKey, publicKey, publicKey, publicKey, ChannelFlags.Private)
val openDualChannel = OpenDualFundedChannel(ByteVector32.Zeroes, ByteVector32.One, FeeratePerKw(1 sat), FeeratePerKw(1 sat), 0 sat, 0 sat, UInt64(0), 0 msat, CltvExpiryDelta(155), 0, 0, publicKey, publicKey, publicKey, publicKey, publicKey, publicKey, ChannelFlags(true))
val fundingAmount = 1 sat
val temporaryChannelId = ByteVector32.Zeroes
val openChannel = OpenChannel(ByteVector32.Zeroes, temporaryChannelId, fundingAmount, 0 msat, 1 sat, UInt64(1), 1 sat, 1 msat, FeeratePerKw(1 sat), CltvExpiryDelta(1), 1, publicKey, publicKey, publicKey, publicKey, publicKey, publicKey, ChannelFlags.Private)
val openDualChannel = OpenDualFundedChannel(ByteVector32.Zeroes, temporaryChannelId, FeeratePerKw(1 sat), FeeratePerKw(1 sat), fundingAmount, 0 sat, UInt64(0), 0 msat, CltvExpiryDelta(155), 0, 0, publicKey, publicKey, publicKey, publicKey, publicKey, publicKey, ChannelFlags(true))
val localParams = LocalParams(randomKey().publicKey, DeterministicWallet.KeyPath(Seq(42L)), 1 sat, Long.MaxValue.msat, Some(500 sat), 1 msat, CltvExpiryDelta(144), 50, isInitiator = false, ByteVector.empty, None, Features.empty)

testKit.spawn(OpenChannelInterceptor())
val peerProbe = TestProbe[Any]()
val connectionInfo = ConnectionInfo(NodeAddress.fromParts("1.2.3.4", 9735).get, peerProbe.ref.toClassic, Init(Alice.nodeParams.features.initFeatures()), Init(Bob.nodeParams.features.initFeatures()))
val openChannelInterceptor = testKit.spawn(OpenChannelInterceptor())
val peerProbe = TestProbe[InterceptOpenChannelResponse]()

// approve and continue single funded open channel
testKit.system.eventStream ! EventStream.Publish(OpenChannelReceived(peerProbe.ref.toClassic, Left(openChannel), ChannelTypes.Standard(), localParams, connectionInfo))
assert(peerProbe.expectMessageType[SpawnChannelNonInitiator] == SpawnChannelNonInitiator(Left(openChannel), ChannelConfig.standard, ChannelTypes.Standard(), localParams))
openChannelInterceptor ! InterceptOpenChannelReceived(peerProbe.ref, Left(openChannel), temporaryChannelId, localParams, fundingAmount)
assert(peerProbe.expectMessageType[AcceptOpenChannel] == AcceptOpenChannel(temporaryChannelId, localParams))

// fail request to open dual funded channel
testKit.system.eventStream ! EventStream.Publish(OpenChannelReceived(peerProbe.ref.toClassic, Right(openDualChannel), ChannelTypes.Standard(), localParams, connectionInfo))
peerProbe.expectMessageType[OutgoingMessage]
openChannelInterceptor ! InterceptOpenChannelReceived(peerProbe.ref, Right(openDualChannel), temporaryChannelId, localParams, fundingAmount)
peerProbe.expectMessageType[RejectOpenChannel]
}
}

0 comments on commit 7da270d

Please sign in to comment.