Skip to content

Commit

Permalink
Set the client side SUB IOPub socket subscription *before* we connect
Browse files Browse the repository at this point in the history
  • Loading branch information
DavisVaughan committed Jan 23, 2025
1 parent 8a957a4 commit c7cfe6b
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 37 deletions.
13 changes: 4 additions & 9 deletions crates/amalthea/src/fixtures/dummy_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,10 @@ impl DummyFrontend {
)
.unwrap();

// Subscribe to IOPub! Server's XPUB socket will receive a notification of
// our subscription with `subscription`, then will publish an IOPub `Welcome`
// message, sending back our `subscription`.
iopub_socket.subscribe(b"").unwrap();

// Immediately block until we've received the IOPub welcome message.
// This confirms that we've fully subscribed and avoids dropping any
// of the initial IOPub messages that a server may send if we start
// perform requests immediately.
// Immediately block until we've received the IOPub welcome message from the XPUB
// server side socket. This confirms that we've fully subscribed and avoids
// dropping any of the initial IOPub messages that a server may send if we start
// to perform requests immediately (in particular, busy/idle messages).
// https://github.com/posit-dev/ark/pull/577
assert_matches!(Self::recv(&iopub_socket), Message::Welcome(data) => {
assert_eq!(data.content.subscription, String::from(""));
Expand Down
46 changes: 18 additions & 28 deletions crates/amalthea/src/socket/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,24 @@ impl Socket {
.map_err(|err| Error::CreateSocketFailed(name.clone(), err))?;
}

if name == "IOPub" && kind == zmq::SocketType::SUB {
// For the client side of IOPub (in tests and eventually kallichore), we need
// to subscribe our SUB to messages from the XPUB on the server side. We use
// `""` to subscribe to all message types, there is no reason to filter any
// out. It is very important that we subscribe BEFORE we `connect()`. If we
// don't subscribe first, then the XPUB on the server side can come online
// first and processes our `connect()` before we've actually subscribed, which
// causes the welcome message the XPUB sends us to get dropped, preventing us
// from correctly starting up, because we block until we've received that
// welcome message. In the link below, you can see proof that zmq only sends
// the welcome message out when it processes our `connect()` call, so if we
// aren't subscribed by that point, we miss it.
// https://github.com/zeromq/libzmq/blob/34f7fa22022bed9e0e390ed3580a1c83ac4a2834/src/xpub.cpp#L56-L65
socket
.set_subscribe(b"")
.map_err(|err| Error::CreateSocketFailed(name.clone(), err))?;
}

// If this is a debug build, set `ZMQ_ROUTER_MANDATORY` on all `ROUTER`
// sockets, so that we get errors instead of silent message drops for
// unroutable messages.
Expand Down Expand Up @@ -196,32 +214,4 @@ impl Socket {
pub fn has_incoming_data(&self) -> zmq::Result<bool> {
self.poll_incoming(0)
}

/// Subscribes a SUB socket to messages from an XPUB socket.
///
/// Use `b""` to subscribe to all messages.
///
/// Note that this needs to be called *after* the socket connection is
/// established on both ends.
pub fn subscribe(&self, subscription: &[u8]) -> Result<(), Error> {
let socket_type = match self.socket.get_socket_type() {
Ok(socket_type) => socket_type,
Err(err) => return Err(Error::ZmqError(self.name.clone(), err)),
};

if socket_type != zmq::SocketType::SUB {
return Err(crate::anyhow!(
"Can't subscribe on a non-SUB socket. This socket is a {socket_type:?}."
));
}

// Currently, all SUB sockets subscribe to all topics; in theory
// frontends could subscribe selectively, but in practice all known
// Jupyter frontends subscribe to all topics and just ignore topics
// they don't recognize.
match self.socket.set_subscribe(subscription) {
Ok(_) => Ok(()),
Err(err) => Err(Error::ZmqError(self.name.clone(), err)),
}
}
}

0 comments on commit c7cfe6b

Please sign in to comment.