From 79daa183214b884124a079026a8e6dfdcee8429a Mon Sep 17 00:00:00 2001 From: Daniel Firth Date: Wed, 3 Apr 2024 08:56:17 +0000 Subject: [PATCH] Switch to versioned protocol for ouroboros networking layer. Hydra now uses a versioned protocol for handshaking. In the event of a node attempting to connect using a different version of the networking protocol, a `HandshakeFailure` event will be recorded in the logs and sent as a server output on the API. --- CHANGELOG.md | 17 ++- hydra-node/hydra-node.cabal | 1 + hydra-node/json-schemas/api.yaml | 89 ++++++++++++ hydra-node/json-schemas/logs.yaml | 44 +++--- hydra-node/src/Hydra/API/ServerOutput.hs | 9 +- hydra-node/src/Hydra/HeadLogic.hs | 18 ++- hydra-node/src/Hydra/Network/Message.hs | 37 ++++- hydra-node/src/Hydra/Network/Ouroboros.hs | 130 ++++++++++++++---- .../Network/Ouroboros/VersionedProtocol.hs | 58 ++++++++ hydra-node/src/Hydra/Node/Network.hs | 26 +++- hydra-node/test/Hydra/HeadLogicSpec.hs | 1 + hydra-node/test/Hydra/NetworkSpec.hs | 82 +++++++++-- 12 files changed, 438 insertions(+), 74 deletions(-) create mode 100644 hydra-node/src/Hydra/Network/Ouroboros/VersionedProtocol.hs diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fce5863888..38f8685887d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,12 @@ changes. ## [0.17.0] - UNRELEASED +- **BREAKING** `hydra-node` `/commit` enpoint now also accepts a _blueprint/draft_ + transaction together with the `UTxO` which is spent in this transaction. `hydra-node` can + still be used like before if the provided `UTxO` is at public key address. In order to spend + from a script `UTxO`, and also unlock more involved use-cases, users need to provide additional + unsigned transaction that correctly specifies required data (like redeemers, validity ranges etc.) + - Add `GET /snapshot/utxo` API endpoint to query confirmed UTxO set on demand. - Always responds with the last confirmed UTxO @@ -17,13 +23,10 @@ changes. - `hydra-node` logs will now report `NetworkEvents` to distinguish between `ConnectivityEvent`s and `ReceivedMessage`s on the network. -## [0.17.0] - UNRELEASED - -- **BREAKING** `hydra-node` `/commit` enpoint now also accepts a _blueprint/draft_ - transaction together with the `UTxO` which is spent in this transaction. `hydra-node` can - still be used like before if the provided `UTxO` is at public key address. In order to spend - from a script `UTxO`, and also unlock more involved use-cases, users need to provide additional - unsigned transaction that correctly specifies required data (like redeemers, validity ranges etc.) +- Hydra now uses a versioned protocol for handshaking. In the event of a node + attempting to connect using a different version of the networking protocol, a + `HandshakeFailure` event will be recorded in the logs and sent as a server + output on the API. ## [0.16.0] - 2024-04-03 diff --git a/hydra-node/hydra-node.cabal b/hydra-node/hydra-node.cabal index ac0435a97d6..588888a1a72 100644 --- a/hydra-node/hydra-node.cabal +++ b/hydra-node/hydra-node.cabal @@ -95,6 +95,7 @@ library Hydra.Network.Ouroboros.Client Hydra.Network.Ouroboros.Server Hydra.Network.Ouroboros.Type + Hydra.Network.Ouroboros.VersionedProtocol Hydra.Network.Reliability Hydra.Node Hydra.Node.InputQueue diff --git a/hydra-node/json-schemas/api.yaml b/hydra-node/json-schemas/api.yaml index 56703f3ed3c..33bfbe6993b 100644 --- a/hydra-node/json-schemas/api.yaml +++ b/hydra-node/json-schemas/api.yaml @@ -74,6 +74,7 @@ channels: - $ref: "api.yaml#/components/messages/Greetings" - $ref: "api.yaml#/components/messages/PeerConnected" - $ref: "api.yaml#/components/messages/PeerDisconnected" + - $ref: "api.yaml#/components/messages/PeerHandshakeFailure" - $ref: "api.yaml#/components/messages/HeadIsInitializing" - $ref: "api.yaml#/components/messages/Committed" - $ref: "api.yaml#/components/messages/HeadIsOpen" @@ -333,6 +334,13 @@ components: payload: $ref: "api.yaml#/components/schemas/PeerDisconnected" + PeerHandshakeFailure: + title: PeerHandshakeFailure + description: | + A peer has failed to negotiate a protocol. + payload: + $ref: "api.yaml#/components/schemas/PeerHandshakeFailure" + HeadIsInitializing: title: HeadIsInitializing description: | @@ -494,6 +502,7 @@ components: - $ref: "api.yaml#/components/schemas/Greetings" - $ref: "api.yaml#/components/schemas/PeerConnected" - $ref: "api.yaml#/components/schemas/PeerDisconnected" + - $ref: "api.yaml#/components/schemas/PeerHandshakeFailure" - $ref: "api.yaml#/components/schemas/HeadIsInitializing" - $ref: "api.yaml#/components/schemas/Committed" - $ref: "api.yaml#/components/schemas/HeadIsOpen" @@ -571,6 +580,34 @@ components: timestamp: $ref: "api.yaml#/components/schemas/UTCTime" + PeerHandshakeFailure: + type: object + required: + - tag + - remoteHost + - ourVersion + - theirVersions + - seq + - timestamp + properties: + tag: + type: string + enum: ["PeerHandshakeFailure"] + remoteHost: + type: object + $ref: "api.yaml#/components/schemas/IP" + ourVersion: + type: integer + minimum: 0 + theirVersions: + type: array + items: + type: integer + seq: + $ref: "api.yaml#/components/schemas/SequenceNumber" + timestamp: + $ref: "api.yaml#/components/schemas/UTCTime" + HeadIsInitializing: type: object required: @@ -1976,3 +2013,55 @@ components: # NOTE: We don't want anyone to depend on this! ChainState: {} + + IP: + type: object + oneOf: + - title: IPv4 + type: object + properties: + tag: + type: string + enum: ["IPv4"] + ipv4: + type: string + - title: IPv6 + type: string + properties: + tag: + type: string + enum: ["IPv6"] + ipv6: + type: string + + MkHydraVersionedProtocolNumber: + type: object + required: + - hydraVersionedProtocolNumber + properties: + hydraVersionedProtocolNumber: + type: integer + minimum: 0 + + KnownHydraVersions: + oneOf: + - title: NoKnownHydraVersions + required: + - tag + properties: + tag: + type: string + enum: ["NoKnownHydraVersions"] + - title: KnownHydraVersions + type: object + required: + - tag + - fromKnownHydraVersions + properties: + tag: + type: string + enum: ["KnownHydraVersions"] + fromKnownHydraVersions: + type: array + items: + $ref: "api.yaml#/components/schemas/MkHydraVersionedProtocolNumber" diff --git a/hydra-node/json-schemas/logs.yaml b/hydra-node/json-schemas/logs.yaml index 9a8adea5e64..15b4859d300 100644 --- a/hydra-node/json-schemas/logs.yaml +++ b/hydra-node/json-schemas/logs.yaml @@ -1645,6 +1645,26 @@ definitions: nodeId: type: string + - title: HandshakeFailure + type: object + additionalProperties: false + required: + - tag + - remoteHost + - ourVersion + - theirVersions + properties: + tag: + type: string + enum: ["HandshakeFailure"] + remoteHost: + type: object + $ref: "api.yaml#/components/schemas/IP" + ourVersion: + $ref: "api.yaml#/components/schemas/MkHydraVersionedProtocolNumber" + theirVersions: + $ref: "api.yaml#/components/schemas/KnownHydraVersions" + - title: ReceivedMessage type: object additionalProperties: false @@ -2110,26 +2130,6 @@ definitions: items: $ref: "api.yaml#/components/schemas/TxId" - IP: - type: object - oneOf: - - title: IPv4 - type: object - properties: - tag: - type: string - enum: ["IPv4"] - ipv4: - type: string - - title: IPv6 - type: string - properties: - tag: - type: string - enum: ["IPv6"] - ipv6: - type: string - RunOptions: type: object required: @@ -2164,7 +2164,7 @@ definitions: type: string host: type: object - $ref: "logs.yaml#/definitions/IP" + $ref: "api.yaml#/components/schemas/IP" port: type: integer peers: @@ -2172,7 +2172,7 @@ definitions: items: $ref: "api.yaml#/components/schemas/Peer" apiHost: - $ref: "logs.yaml#/definitions/IP" + $ref: "api.yaml#/components/schemas/IP" apiPort: type: integer monitoringPort: diff --git a/hydra-node/src/Hydra/API/ServerOutput.hs b/hydra-node/src/Hydra/API/ServerOutput.hs index a7856b6a0bc..c980a9fcf27 100644 --- a/hydra-node/src/Hydra/API/ServerOutput.hs +++ b/hydra-node/src/Hydra/API/ServerOutput.hs @@ -15,7 +15,7 @@ import Hydra.Crypto (MultiSignature) import Hydra.HeadId (HeadId) import Hydra.HeadLogic.State (HeadState) import Hydra.Ledger (IsTx, UTxOType, ValidationError) -import Hydra.Network (NodeId) +import Hydra.Network (Host, NodeId) import Hydra.OnChainId (OnChainId) import Hydra.Party (Party) import Hydra.Prelude hiding (seq) @@ -53,6 +53,11 @@ instance IsChainState tx => FromJSON (TimedServerOutput tx) where data ServerOutput tx = PeerConnected {peer :: NodeId} | PeerDisconnected {peer :: NodeId} + | PeerHandshakeFailure + { remoteHost :: Host + , ourVersion :: Int + , theirVersions :: [Int] + } | HeadIsInitializing {headId :: HeadId, parties :: [Party]} | Committed {headId :: HeadId, party :: Party, utxo :: UTxOType tx} | HeadIsOpen {headId :: HeadId, utxo :: UTxOType tx} @@ -129,6 +134,7 @@ instance shrink = \case PeerConnected p -> PeerConnected <$> shrink p PeerDisconnected p -> PeerDisconnected <$> shrink p + PeerHandshakeFailure rh ov tv -> PeerHandshakeFailure <$> shrink rh <*> shrink ov <*> shrink tv HeadIsInitializing headId xs -> HeadIsInitializing <$> shrink headId <*> shrink xs Committed headId p u -> Committed <$> shrink headId <*> shrink p <*> shrink u HeadIsOpen headId u -> HeadIsOpen <$> shrink headId <*> shrink u @@ -178,6 +184,7 @@ prepareServerOutput ServerOutputConfig{utxoInSnapshot} response = case output response of PeerConnected{} -> encodedResponse PeerDisconnected{} -> encodedResponse + PeerHandshakeFailure{} -> encodedResponse HeadIsInitializing{} -> encodedResponse Committed{} -> encodedResponse HeadIsOpen{} -> encodedResponse diff --git a/hydra-node/src/Hydra/HeadLogic.hs b/hydra-node/src/Hydra/HeadLogic.hs index a156af84686..f655fd8633b 100644 --- a/hydra-node/src/Hydra/HeadLogic.hs +++ b/hydra-node/src/Hydra/HeadLogic.hs @@ -88,7 +88,7 @@ import Hydra.Ledger ( applyTransactions, txId, ) -import Hydra.Network.Message (Connectivity (..), Message (..), NetworkEvent (..)) +import Hydra.Network.Message (Connectivity (..), HydraVersionedProtocolNumber (..), KnownHydraVersions (..), Message (..), NetworkEvent (..)) import Hydra.OnChainId (OnChainId) import Hydra.Party (Party (vkey)) import Hydra.Snapshot (ConfirmedSnapshot (..), Snapshot (..), SnapshotNumber, getSnapshot) @@ -102,6 +102,22 @@ onConnectionEvent = \case causes [ClientEffect (ServerOutput.PeerConnected nodeId)] Disconnected{nodeId} -> causes [ClientEffect (ServerOutput.PeerDisconnected nodeId)] + HandshakeFailure{remoteHost, ourVersion, theirVersions} -> + causes + [ ClientEffect + ( ServerOutput.PeerHandshakeFailure + { remoteHost + , ourVersion = getVersion ourVersion + , theirVersions = getKnownVersions theirVersions + } + ) + ] + where + getVersion MkHydraVersionedProtocolNumber{hydraVersionedProtocolNumber} = hydraVersionedProtocolNumber + + getKnownVersions = \case + NoKnownHydraVersions -> [] + KnownHydraVersions{fromKnownHydraVersions} -> getVersion <$> fromKnownHydraVersions -- * The Coordinated Head protocol diff --git a/hydra-node/src/Hydra/Network/Message.hs b/hydra-node/src/Hydra/Network/Message.hs index 0e0210a43ca..116382979ee 100644 --- a/hydra-node/src/Hydra/Network/Message.hs +++ b/hydra-node/src/Hydra/Network/Message.hs @@ -8,7 +8,7 @@ import Cardano.Binary (serialize') import Cardano.Crypto.Util (SignableRepresentation, getSignableRepresentation) import Hydra.Crypto (Signature) import Hydra.Ledger (IsTx (TxIdType), UTxOType) -import Hydra.Network (NodeId) +import Hydra.Network (Host, NodeId) import Hydra.Party (Party) import Hydra.Snapshot (Snapshot, SnapshotNumber) @@ -21,9 +21,44 @@ data NetworkEvent msg instance Arbitrary msg => Arbitrary (NetworkEvent msg) where arbitrary = genericArbitrary +type HydraVersionedProtocolNumber :: Type +newtype HydraVersionedProtocolNumber = MkHydraVersionedProtocolNumber {hydraVersionedProtocolNumber :: Int} + deriving stock (Eq, Show, Generic, Ord) + deriving anyclass (ToJSON, FromJSON) + +instance Arbitrary HydraVersionedProtocolNumber where + arbitrary = genericArbitrary + +type KnownHydraVersions :: Type +data KnownHydraVersions + = KnownHydraVersions {fromKnownHydraVersions :: [HydraVersionedProtocolNumber]} + | NoKnownHydraVersions + deriving stock (Eq, Show, Generic) + deriving anyclass (ToJSON, FromJSON) + +instance Arbitrary KnownHydraVersions where + arbitrary = genericArbitrary + +type HydraHandshakeRefused :: Type +data HydraHandshakeRefused = HydraHandshakeRefused + { remoteHost :: Host + , ourVersion :: HydraVersionedProtocolNumber + , theirVersions :: KnownHydraVersions + } + deriving stock (Eq, Show, Generic) + deriving anyclass (ToJSON, FromJSON) + +instance Arbitrary HydraHandshakeRefused where + arbitrary = genericArbitrary + data Connectivity = Connected {nodeId :: NodeId} | Disconnected {nodeId :: NodeId} + | HandshakeFailure + { remoteHost :: Host + , ourVersion :: HydraVersionedProtocolNumber + , theirVersions :: KnownHydraVersions + } deriving stock (Generic, Eq, Show) deriving anyclass (ToJSON, FromJSON) diff --git a/hydra-node/src/Hydra/Network/Ouroboros.hs b/hydra-node/src/Hydra/Network/Ouroboros.hs index e2c56da2ad4..b69a0b1bbdb 100644 --- a/hydra-node/src/Hydra/Network/Ouroboros.hs +++ b/hydra-node/src/Hydra/Network/Ouroboros.hs @@ -2,15 +2,19 @@ -- This implements a dumb 'FireForget' protocol and maintains one connection to each peer. -- Contrary to other protocols implemented in Ouroboros, this is a push-based protocol. module Hydra.Network.Ouroboros ( - withOuroborosNetwork, withIOManager, - TraceOuroborosNetwork, - WithHost, module Hydra.Network, - encodeTraceSendRecvFireForget, + module Hydra.Network.Ouroboros, + module Hydra.Network.Ouroboros.VersionedProtocol, ) where import Control.Monad.Class.MonadAsync (wait) +import Hydra.Network.Ouroboros.VersionedProtocol ( + HydraNetworkConfig (..), + HydraVersionedProtocolData (..), + hydraVersionedProtocolCodec, + hydraVersionedProtocolDataCodec, + ) import Hydra.Prelude import Codec.CBOR.Term (Term) @@ -27,7 +31,8 @@ import Data.Aeson (object, withObject, (.:), (.=)) import Data.Aeson qualified as Aeson import Data.Aeson.Types qualified as Aeson import Data.Map.Strict as Map -import Hydra.Logging (Tracer, nullTracer) +import Data.Text qualified as T +import Hydra.Logging (Tracer (..), nullTracer) import Hydra.Network ( Host (..), Network (..), @@ -35,6 +40,11 @@ import Hydra.Network ( NetworkComponent, PortNumber, ) +import Hydra.Network.Message ( + HydraHandshakeRefused (..), + HydraVersionedProtocolNumber (..), + KnownHydraVersions (..), + ) import Hydra.Network.Ouroboros.Client as FireForget ( FireForgetClient (..), fireForgetClientPeer, @@ -53,9 +63,13 @@ import Network.Mux.Compat ( ) import Network.Socket ( AddrInfo (addrAddress), + NameInfoFlag (..), SockAddr, + Socket, defaultHints, getAddrInfo, + getNameInfo, + getPeerName, ) import Network.TypedProtocol.Codec ( AnyMessageAndAgency (..), @@ -69,7 +83,7 @@ import Ouroboros.Network.ErrorPolicy ( WithAddr (WithAddr), nullErrorPolicies, ) -import Ouroboros.Network.IOManager (withIOManager) +import Ouroboros.Network.IOManager (IOManager, withIOManager) import Ouroboros.Network.Mux ( MiniProtocol ( MiniProtocol, @@ -86,15 +100,9 @@ import Ouroboros.Network.Mux ( RunMiniProtocol (..), mkMiniProtocolCbFromPeer, ) -import Ouroboros.Network.Protocol.Handshake.Codec (noTimeLimitsHandshake) -import Ouroboros.Network.Protocol.Handshake.Type (Handshake, Message (..), RefuseReason (..)) -import Ouroboros.Network.Protocol.Handshake.Unversioned ( - UnversionedProtocol, - unversionedHandshakeCodec, - unversionedProtocol, - unversionedProtocolDataCodec, - ) -import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion) +import Ouroboros.Network.Protocol.Handshake.Codec (codecHandshake, noTimeLimitsHandshake) +import Ouroboros.Network.Protocol.Handshake.Type (Handshake, HandshakeProtocolError (..), Message (..), RefuseReason (..)) +import Ouroboros.Network.Protocol.Handshake.Version (acceptableVersion, queryVersion, simpleSingletonVersions) import Ouroboros.Network.Server.Socket (AcceptedConnectionsLimit (AcceptedConnectionsLimit)) import Ouroboros.Network.Snocket (makeSocketBearer, socketSnocket) import Ouroboros.Network.Socket ( @@ -122,10 +130,10 @@ withOuroborosNetwork :: (ToCBOR outbound, FromCBOR outbound) => (ToCBOR inbound, FromCBOR inbound) => Tracer IO (WithHost (TraceOuroborosNetwork outbound)) -> - Host -> - [Host] -> + HydraNetworkConfig -> + (HydraHandshakeRefused -> IO ()) -> NetworkComponent IO inbound outbound () -withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do +withOuroborosNetwork tracer HydraNetworkConfig{protocolVersion, localHost, remoteHosts} handshakeCallback networkCallback between = do bchan <- newBroadcastTChanIO let newBroadcastChannel = atomically $ dupTChan bchan -- NOTE: There should only be one `IOManager` instance per process. Should we @@ -139,12 +147,34 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do { broadcast = atomically . writeTChan bchan } where + resolveSockAddr :: Host -> IO SockAddr resolveSockAddr Host{hostname, port} = do is <- getAddrInfo (Just defaultHints) (Just $ toString hostname) (Just $ show port) case is of (info : _) -> pure $ addrAddress info _ -> error "getAdrrInfo failed.. do proper error handling" + getHost :: SockAddr -> IO Host + getHost sockAddr = do + (mHost, mPort) <- getNameInfo [NI_NUMERICHOST, NI_NUMERICSERV] True True sockAddr + maybe (error "getNameInfo failed.. do proper error handling") pure $ do + host <- T.pack <$> mHost + port <- readMaybe =<< mPort + pure $ Host host port + + connect :: + IOManager -> + IO t -> + ( t -> + OuroborosApplicationWithMinimalCtx + InitiatorMode + SockAddr + LByteString + IO + () + Void + ) -> + IO Void connect iomgr newBroadcastChannel app = do -- REVIEW(SN): move outside to have this information available? networkState <- newNetworkMutableState @@ -158,8 +188,33 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do (contramap (WithHost localHost . TraceErrorPolicy) tracer) networkState (subscriptionParams localAddr remoteAddrs) - (actualConnect iomgr newBroadcastChannel app) + ( \sock -> + actualConnect iomgr newBroadcastChannel app sock `catch` \e -> do + host <- getHost =<< getPeerName sock + onHandshakeError host e + ) + + onHandshakeError :: Host -> HandshakeProtocolError HydraVersionedProtocolNumber -> IO () + onHandshakeError remoteHost = \case + HandshakeError (VersionMismatch theirVersions _) -> do + handshakeCallback + HydraHandshakeRefused + { ourVersion = protocolVersion + , theirVersions = KnownHydraVersions theirVersions + , remoteHost + } + _ -> + handshakeCallback + HydraHandshakeRefused + { ourVersion = protocolVersion + , theirVersions = NoKnownHydraVersions + , remoteHost + } + subscriptionParams :: + SockAddr -> + [SockAddr] -> + SubscriptionParams a IPSubscriptionTarget subscriptionParams localAddr remoteAddrs = SubscriptionParams { spLocalAddresses = LocalAddresses (Just localAddr) Nothing Nothing @@ -168,24 +223,36 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do , spSubscriptionTarget = IPSubscriptionTarget remoteAddrs (length remoteAddrs) } + actualConnect :: + IOManager -> + IO t -> + (t -> OuroborosApplicationWithMinimalCtx 'InitiatorMode SockAddr LByteString IO () Void) -> + Socket -> + IO () actualConnect iomgr newBroadcastChannel app sn = do chan <- newBroadcastChannel connectToNodeSocket iomgr - unversionedHandshakeCodec + (codecHandshake hydraVersionedProtocolCodec) noTimeLimitsHandshake - unversionedProtocolDataCodec + hydraVersionedProtocolDataCodec networkConnectTracers (HandshakeCallbacks acceptableVersion queryVersion) - (unversionedProtocol (app chan)) + (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (app chan)) sn where + networkConnectTracers :: NetworkConnectTracers SockAddr HydraVersionedProtocolNumber networkConnectTracers = NetworkConnectTracers { nctMuxTracer = nullTracer - , nctHandshakeTracer = nullTracer + , nctHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer } + withServerListening :: + IOManager -> + OuroborosApplicationWithMinimalCtx 'ResponderMode SockAddr LByteString IO a b -> + IO b -> + IO () withServerListening iomgr app continuation = do networkState <- newNetworkMutableState localAddr <- resolveSockAddr localHost @@ -199,25 +266,28 @@ withOuroborosNetwork tracer localHost remoteHosts networkCallback between = do networkState (AcceptedConnectionsLimit maxBound maxBound 0) localAddr - unversionedHandshakeCodec + (codecHandshake hydraVersionedProtocolCodec) noTimeLimitsHandshake - unversionedProtocolDataCodec + hydraVersionedProtocolDataCodec (HandshakeCallbacks acceptableVersion queryVersion) - (unversionedProtocol (SomeResponderApplication app)) + (simpleSingletonVersions protocolVersion MkHydraVersionedProtocolData (SomeResponderApplication app)) nullErrorPolicies $ \_addr serverAsync -> do race_ (wait serverAsync) continuation where + notConfigureSocket :: a -> b -> IO () notConfigureSocket _ _ = pure () + networkServerTracers :: NetworkServerTracers SockAddr HydraVersionedProtocolNumber networkServerTracers = NetworkServerTracers { nstMuxTracer = nullTracer - , nstHandshakeTracer = nullTracer + , nstHandshakeTracer = contramap (WithHost localHost . TraceHandshake) tracer , nstErrorPolicyTracer = contramap (WithHost localHost . TraceErrorPolicy) tracer , nstAcceptPolicyTracer = contramap (WithHost localHost . TraceAcceptPolicy) tracer } + onIOException :: IOException -> IO () onIOException ioException = throwIO $ NetworkServerListenException @@ -307,7 +377,7 @@ data TraceOuroborosNetwork msg = TraceSubscriptions (WithIPList (SubscriptionTrace SockAddr)) | TraceErrorPolicy (WithAddr SockAddr ErrorPolicyTrace) | TraceAcceptPolicy AcceptConnectionsPolicyTrace - | TraceHandshake (WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake UnversionedProtocol CBOR.Term))) + | TraceHandshake (WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake HydraVersionedProtocolNumber CBOR.Term))) | TraceSendRecv (TraceSendRecv (FireForget msg)) -- NOTE: cardano-node would have orphan ToObject instances for most of these @@ -346,7 +416,7 @@ encodeWithAddr (WithAddr addr ev) = ] encodeTraceSendRecvHandshake :: - WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake UnversionedProtocol CBOR.Term)) -> + WithMuxBearer (ConnectionId SockAddr) (TraceSendRecv (Handshake HydraVersionedProtocolNumber CBOR.Term)) -> [Aeson.Pair] encodeTraceSendRecvHandshake = \case WithMuxBearer peerId (TraceSendMsg (AnyMessageAndAgency agency msg)) -> @@ -363,7 +433,7 @@ encodeTraceSendRecvHandshake = \case ++ encodeMsg msg where encodeMsg :: - Message (Handshake UnversionedProtocol Term) from to -> + Message (Handshake HydraVersionedProtocolNumber Term) from to -> [Aeson.Pair] encodeMsg = \case MsgProposeVersions versions -> diff --git a/hydra-node/src/Hydra/Network/Ouroboros/VersionedProtocol.hs b/hydra-node/src/Hydra/Network/Ouroboros/VersionedProtocol.hs new file mode 100644 index 00000000000..ddddde723d3 --- /dev/null +++ b/hydra-node/src/Hydra/Network/Ouroboros/VersionedProtocol.hs @@ -0,0 +1,58 @@ +module Hydra.Network.Ouroboros.VersionedProtocol where + +import Hydra.Prelude + +import Codec.CBOR.Term qualified as CBOR +import Data.Text qualified as T +import Hydra.Network (Host (..)) +import Hydra.Network.Message (HydraVersionedProtocolNumber (..)) +import Network.TypedProtocol.Pipelined () +import Ouroboros.Network.CodecCBORTerm (CodecCBORTerm (..)) +import Ouroboros.Network.Protocol.Handshake.Codec (VersionDataCodec, cborTermVersionDataCodec) +import Ouroboros.Network.Protocol.Handshake.Version (Accept (..), Acceptable, Queryable, acceptableVersion, queryVersion) + +hydraVersionedProtocolCodec :: CodecCBORTerm (String, Maybe Int) HydraVersionedProtocolNumber +hydraVersionedProtocolCodec = CodecCBORTerm{encodeTerm, decodeTerm} + where + encodeTerm :: HydraVersionedProtocolNumber -> CBOR.Term + encodeTerm x = CBOR.TInt $ hydraVersionedProtocolNumber x + + decodeTerm :: CBOR.Term -> Either (String, Maybe Int) HydraVersionedProtocolNumber + decodeTerm (CBOR.TInt x) = Right $ MkHydraVersionedProtocolNumber x + decodeTerm _ = Left ("unknown tag", Nothing) + +type HydraVersionedProtocolData :: Type +data HydraVersionedProtocolData = MkHydraVersionedProtocolData + deriving stock (Eq, Show, Generic, Ord) + +instance Acceptable HydraVersionedProtocolData where + acceptableVersion + MkHydraVersionedProtocolData + MkHydraVersionedProtocolData = Accept MkHydraVersionedProtocolData + +instance Queryable HydraVersionedProtocolData where + queryVersion MkHydraVersionedProtocolData = False + +hydraVersionedProtocolDataCodec :: + VersionDataCodec + CBOR.Term + HydraVersionedProtocolNumber + HydraVersionedProtocolData +hydraVersionedProtocolDataCodec = + cborTermVersionDataCodec + (const CodecCBORTerm{encodeTerm, decodeTerm}) + where + encodeTerm :: HydraVersionedProtocolData -> CBOR.Term + encodeTerm MkHydraVersionedProtocolData = CBOR.TNull + + decodeTerm :: CBOR.Term -> Either Text HydraVersionedProtocolData + decodeTerm CBOR.TNull = Right MkHydraVersionedProtocolData + decodeTerm t = Left $ T.pack $ "unexpected term: " ++ show t + +type HydraNetworkConfig :: Type +data HydraNetworkConfig = HydraNetworkConfig + { protocolVersion :: HydraVersionedProtocolNumber + , localHost :: Host + , remoteHosts :: [Host] + } + deriving stock (Eq, Show, Generic) diff --git a/hydra-node/src/Hydra/Node/Network.hs b/hydra-node/src/Hydra/Node/Network.hs index 48ee338f326..e54a337016b 100644 --- a/hydra-node/src/Hydra/Node/Network.hs +++ b/hydra-node/src/Hydra/Node/Network.hs @@ -79,8 +79,14 @@ import Hydra.Logging.Messages (HydraLog (..)) import Hydra.Network (Host (..), IP, NetworkComponent, NodeId, PortNumber) import Hydra.Network.Authenticate (Authenticated (..), Signed, withAuthentication) import Hydra.Network.Heartbeat (Heartbeat (..), withHeartbeat) -import Hydra.Network.Message (Connectivity, Message, NetworkEvent (..)) -import Hydra.Network.Ouroboros (TraceOuroborosNetwork, WithHost, withOuroborosNetwork) +import Hydra.Network.Message ( + Connectivity (..), + HydraHandshakeRefused (..), + HydraVersionedProtocolNumber (..), + Message, + NetworkEvent (..), + ) +import Hydra.Network.Ouroboros (HydraNetworkConfig (..), TraceOuroborosNetwork, WithHost, withOuroborosNetwork) import Hydra.Network.Reliability (MessagePersistence, ReliableMsg, mkMessagePersistence, withReliability) import Hydra.Node (HydraNodeLog (..)) import Hydra.Node.ParameterMismatch (ParamMismatch (..), ParameterMismatch (..)) @@ -110,6 +116,9 @@ data NetworkConfiguration m = NetworkConfiguration -- ^ This node's id. } +currentHydraVersionedProtocol :: HydraVersionedProtocolNumber +currentHydraVersionedProtocol = MkHydraVersionedProtocolNumber 1 + -- | Starts the network layer of a node, passing configured `Network` to its continuation. withNetwork :: forall tx. @@ -121,7 +130,7 @@ withNetwork :: -- | Produces a `NetworkComponent` that can send `msg` and consumes `Authenticated` @msg@. NetworkComponent IO (NetworkEvent (Message tx)) (Message tx) () withNetwork tracer configuration callback action = do - let localhost = Host{hostname = show host, port} + let localHost = Host{hostname = show host, port} me = deriveParty signingKey numberOfParties = length $ me : otherParties messagePersistence <- configureMessagePersistence (contramap Node tracer) persistenceDir numberOfParties @@ -130,7 +139,16 @@ withNetwork tracer configuration callback action = do withFlipHeartbeats $ withReliability (contramap Reliability tracer) messagePersistence me otherParties $ withAuthentication (contramap Authentication tracer) signingKey otherParties $ - withOuroborosNetwork (contramap Network tracer) localhost peers + withOuroborosNetwork + (contramap Network tracer) + HydraNetworkConfig + { protocolVersion = currentHydraVersionedProtocol + , localHost + , remoteHosts = peers + } + ( \HydraHandshakeRefused{remoteHost, ourVersion, theirVersions} -> + callback . ConnectivityEvent $ HandshakeFailure{remoteHost, ourVersion, theirVersions} + ) withHeartbeat nodeId reliability (callback . mapHeartbeat) $ \network -> action network diff --git a/hydra-node/test/Hydra/HeadLogicSpec.hs b/hydra-node/test/Hydra/HeadLogicSpec.hs index 47520f410e0..396fe75e363 100644 --- a/hydra-node/test/Hydra/HeadLogicSpec.hs +++ b/hydra-node/test/Hydra/HeadLogicSpec.hs @@ -448,6 +448,7 @@ spec = let input = connectivityChanged ttl connectivityMessage let outcome = update bobEnv ledger headState input stateChanges outcome `shouldBe` [] + outcome `hasEffectSatisfying` \case ClientEffect{} -> True; _ -> False prop "ignores abortTx of another head" $ \otherHeadId -> do let abortOtherHead = observeTx $ OnAbortTx{headId = otherHeadId} diff --git a/hydra-node/test/Hydra/NetworkSpec.hs b/hydra-node/test/Hydra/NetworkSpec.hs index 6353d57072c..e101c9e8ae4 100644 --- a/hydra-node/test/Hydra/NetworkSpec.hs +++ b/hydra-node/test/Hydra/NetworkSpec.hs @@ -8,12 +8,16 @@ import Test.Hydra.Prelude import Codec.CBOR.Read (deserialiseFromBytes) import Codec.CBOR.Write (toLazyByteString) -import Control.Concurrent.Class.MonadSTM (newTQueue, readTQueue, writeTQueue) +import Control.Concurrent.Class.MonadSTM (modifyTVar', newTQueue, newTVarIO, readTQueue, readTVarIO, writeTQueue) import Hydra.Ledger.Simple (SimpleTx (..)) import Hydra.Logging (nullTracer, showLogsOnFailure) import Hydra.Network (Host (..), Network) -import Hydra.Network.Message (Message (..)) -import Hydra.Network.Ouroboros (broadcast, withOuroborosNetwork) +import Hydra.Network.Message ( + HydraHandshakeRefused (..), + HydraVersionedProtocolNumber (..), + Message (..), + ) +import Hydra.Network.Ouroboros (HydraNetworkConfig (..), broadcast, withOuroborosNetwork) import Hydra.Network.Reliability (MessagePersistence (..)) import Hydra.Node.Network (configureMessagePersistence) import Hydra.Node.ParameterMismatch (ParameterMismatch) @@ -34,20 +38,82 @@ spec = do received <- atomically newTQueue showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2] <- fmap fromIntegral <$> randomUnusedTCPPorts 2 - withOuroborosNetwork tracer (Host lo port1) [Host lo port2] (const @_ @Integer $ pure ()) $ \hn1 -> - withOuroborosNetwork @Integer tracer (Host lo port2) [Host lo port1] (atomically . writeTQueue received) $ \_ -> do + let node1Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port1 + , remoteHosts = [Host lo port2] + } + node2Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port2 + , remoteHosts = [Host lo port1] + } + withOuroborosNetwork tracer node1Config (const $ pure ()) (const @_ @Integer $ pure ()) $ \hn1 -> + withOuroborosNetwork @Integer tracer node2Config (const $ pure ()) (atomically . writeTQueue received) $ \_ -> do withNodeBroadcastingForever hn1 1 $ atomically (readTQueue received) `shouldReturn` 1 + it "handshake failures should call the handshakeCallback" $ do + received <- atomically newTQueue + showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do + [port1, port2] <- fmap fromIntegral <$> randomUnusedTCPPorts 2 + let node1Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port1 + , remoteHosts = [Host lo port2] + } + node2Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 1 + , localHost = Host lo port2 + , remoteHosts = [Host lo port1] + } + createHandshakeCallback :: IO (HydraHandshakeRefused -> IO (), IO [Host]) + createHandshakeCallback = do + x <- newTVarIO [] + let f (HydraHandshakeRefused{remoteHost}) = atomically $ modifyTVar' x (remoteHost :) + let g = readTVarIO x + pure (f, g) + + (handshakeCallback1, getHandshakeFailures1) <- createHandshakeCallback + (handshakeCallback2, getHandshakeFailures2) <- createHandshakeCallback + + withOuroborosNetwork @Integer @() tracer node1Config handshakeCallback1 (const @_ @Integer $ pure ()) $ \_ -> + withOuroborosNetwork @Integer tracer node2Config handshakeCallback2 (atomically . writeTQueue received) $ \_ -> do + threadDelay 0.1 + getHandshakeFailures1 `shouldReturn` [Host lo port2] + getHandshakeFailures2 `shouldReturn` [Host lo port1] + it "broadcasts messages between 3 connected peers" $ do node1received <- atomically newTQueue node2received <- atomically newTQueue node3received <- atomically newTQueue showLogsOnFailure "NetworkSpec" $ \tracer -> failAfter 30 $ do [port1, port2, port3] <- fmap fromIntegral <$> randomUnusedTCPPorts 3 - withOuroborosNetwork @Integer tracer (Host lo port1) [Host lo port2, Host lo port3] (atomically . writeTQueue node1received) $ \hn1 -> - withOuroborosNetwork tracer (Host lo port2) [Host lo port1, Host lo port3] (atomically . writeTQueue node2received) $ \hn2 -> do - withOuroborosNetwork tracer (Host lo port3) [Host lo port1, Host lo port2] (atomically . writeTQueue node3received) $ \hn3 -> do + let node1Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port1 + , remoteHosts = [Host lo port2, Host lo port3] + } + node2Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port2 + , remoteHosts = [Host lo port1, Host lo port3] + } + node3Config = + HydraNetworkConfig + { protocolVersion = MkHydraVersionedProtocolNumber 0 + , localHost = Host lo port3 + , remoteHosts = [Host lo port2, Host lo port1] + } + withOuroborosNetwork @Integer tracer node1Config (const $ pure ()) (atomically . writeTQueue node1received) $ \hn1 -> + withOuroborosNetwork tracer node2Config (const $ pure ()) (atomically . writeTQueue node2received) $ \hn2 -> do + withOuroborosNetwork tracer node3Config (const $ pure ()) (atomically . writeTQueue node3received) $ \hn3 -> do withNodesBroadcastingForever [(hn1, 1), (hn2, 2), (hn3, 3)] $ assertAllnodesReceivedMessagesFromAllOtherNodes [(node1received, 1), (node2received, 2), (node3received, 3)]