From fedc74cb2d8711f5654123a2cfb24b3e889d5bc5 Mon Sep 17 00:00:00 2001 From: Davis Vaughan Date: Mon, 27 Jan 2025 09:39:42 -0500 Subject: [PATCH] Set the client side SUB IOPub socket subscription *before* we connect (#673) --- .../amalthea/src/fixtures/dummy_frontend.rs | 13 ++---- crates/amalthea/src/socket/socket.rs | 46 ++++++++----------- 2 files changed, 22 insertions(+), 37 deletions(-) diff --git a/crates/amalthea/src/fixtures/dummy_frontend.rs b/crates/amalthea/src/fixtures/dummy_frontend.rs index 48af08d17..de083abfa 100644 --- a/crates/amalthea/src/fixtures/dummy_frontend.rs +++ b/crates/amalthea/src/fixtures/dummy_frontend.rs @@ -182,15 +182,10 @@ impl DummyFrontend { ) .unwrap(); - // Subscribe to IOPub! Server's XPUB socket will receive a notification of - // our subscription with `subscription`, then will publish an IOPub `Welcome` - // message, sending back our `subscription`. - iopub_socket.subscribe(b"").unwrap(); - - // Immediately block until we've received the IOPub welcome message. - // This confirms that we've fully subscribed and avoids dropping any - // of the initial IOPub messages that a server may send if we start - // perform requests immediately. + // Immediately block until we've received the IOPub welcome message from the XPUB + // server side socket. This confirms that we've fully subscribed and avoids + // dropping any of the initial IOPub messages that a server may send if we start + // to perform requests immediately (in particular, busy/idle messages). // https://github.com/posit-dev/ark/pull/577 assert_matches!(Self::recv(&iopub_socket), Message::Welcome(data) => { assert_eq!(data.content.subscription, String::from("")); diff --git a/crates/amalthea/src/socket/socket.rs b/crates/amalthea/src/socket/socket.rs index 0ffb812b8..a2626391a 100644 --- a/crates/amalthea/src/socket/socket.rs +++ b/crates/amalthea/src/socket/socket.rs @@ -62,6 +62,24 @@ impl Socket { .map_err(|err| Error::CreateSocketFailed(name.clone(), err))?; } + if name == "IOPub" && kind == zmq::SocketType::SUB { + // For the client side of IOPub (in tests and eventually kallichore), we need + // to subscribe our SUB to messages from the XPUB on the server side. We use + // `""` to subscribe to all message types, there is no reason to filter any + // out. It is very important that we subscribe BEFORE we `connect()`. If we + // don't subscribe first, then the XPUB on the server side can come online + // first and processes our `connect()` before we've actually subscribed, which + // causes the welcome message the XPUB sends us to get dropped, preventing us + // from correctly starting up, because we block until we've received that + // welcome message. In the link below, you can see proof that zmq only sends + // the welcome message out when it processes our `connect()` call, so if we + // aren't subscribed by that point, we miss it. + // https://github.com/zeromq/libzmq/blob/34f7fa22022bed9e0e390ed3580a1c83ac4a2834/src/xpub.cpp#L56-L65 + socket + .set_subscribe(b"") + .map_err(|err| Error::CreateSocketFailed(name.clone(), err))?; + } + // If this is a debug build, set `ZMQ_ROUTER_MANDATORY` on all `ROUTER` // sockets, so that we get errors instead of silent message drops for // unroutable messages. @@ -196,32 +214,4 @@ impl Socket { pub fn has_incoming_data(&self) -> zmq::Result { self.poll_incoming(0) } - - /// Subscribes a SUB socket to messages from an XPUB socket. - /// - /// Use `b""` to subscribe to all messages. - /// - /// Note that this needs to be called *after* the socket connection is - /// established on both ends. - pub fn subscribe(&self, subscription: &[u8]) -> Result<(), Error> { - let socket_type = match self.socket.get_socket_type() { - Ok(socket_type) => socket_type, - Err(err) => return Err(Error::ZmqError(self.name.clone(), err)), - }; - - if socket_type != zmq::SocketType::SUB { - return Err(crate::anyhow!( - "Can't subscribe on a non-SUB socket. This socket is a {socket_type:?}." - )); - } - - // Currently, all SUB sockets subscribe to all topics; in theory - // frontends could subscribe selectively, but in practice all known - // Jupyter frontends subscribe to all topics and just ignore topics - // they don't recognize. - match self.socket.set_subscribe(subscription) { - Ok(_) => Ok(()), - Err(err) => Err(Error::ZmqError(self.name.clone(), err)), - } - } }