Skip to content

Commit

Permalink
Use an RwLock for AUXILIARY_EVENT_TX (#668)
Browse files Browse the repository at this point in the history
* Use an `RwLock` for `AUXILIARY_EVENT_TX`

We expect it to be write locked very rarely, so it should still be pretty cheap to get access to this safely

* Panic if we can't access the aux sender
  • Loading branch information
DavisVaughan authored Jan 23, 2025
1 parent ceaf332 commit 794cb85
Showing 1 changed file with 47 additions and 29 deletions.
76 changes: 47 additions & 29 deletions crates/ark/src/lsp/main_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use std::collections::HashMap;
use std::future;
use std::pin::Pin;
use std::sync::RwLock;

use anyhow::anyhow;
use futures::StreamExt;
Expand All @@ -34,17 +35,20 @@ use crate::lsp::state_handlers::ConsoleInputs;
pub(crate) type TokioUnboundedSender<T> = tokio::sync::mpsc::UnboundedSender<T>;
pub(crate) type TokioUnboundedReceiver<T> = tokio::sync::mpsc::UnboundedReceiver<T>;

// The global instance of the auxiliary event channel, used for sending log
// messages or spawning threads from free functions. Since this is an unbounded
// channel, sending a log message is not async nor blocking. Tokio senders are
// Send and Sync so this global variable can be safely shared across threads.
//
// Note that in case of duplicate LSP sessions (see
// https://github.com/posit-dev/ark/issues/622 and
// https://github.com/posit-dev/positron/issues/5321), it's possible for older
// LSPs to send log messages and tasks to the newer LSPs.
static mut AUXILIARY_EVENT_TX: std::cell::OnceCell<TokioUnboundedSender<AuxiliaryEvent>> =
std::cell::OnceCell::new();
/// The global instance of the auxiliary event channel, used for sending log messages or
/// spawning threads from free functions. Since this is an unbounded channel, sending a
/// log message is not async nor blocking. Tokio senders are Send and Sync so this global
/// variable can be safely shared across threads.
///
/// LSP sessions can be restarted or reconnected at any time, which is why this is an
/// `RwLock`, but we expect that to be very rare. Read locking is not expected to be
/// contentious.
///
/// Note that in case of duplicate LSP sessions (see
/// https://github.com/posit-dev/ark/issues/622 and
/// https://github.com/posit-dev/positron/issues/5321), it's possible for older
/// LSPs to send log messages and tasks to the newer LSPs.
static AUXILIARY_EVENT_TX: RwLock<Option<TokioUnboundedSender<AuxiliaryEvent>>> = RwLock::new(None);

// This is the syntax for trait aliases until an official one is stabilised.
// This alias is for the future of a `JoinHandle<anyhow::Result<T>>`
Expand Down Expand Up @@ -400,17 +404,16 @@ impl AuxiliaryState {
// Channels for communication with the auxiliary loop
let (auxiliary_event_tx, auxiliary_event_rx) = tokio_unbounded_channel::<AuxiliaryEvent>();

// Set global instance of this channel. This is used for interacting
// with the auxiliary loop (logging messages or spawning a task) from
// free functions.
unsafe {
if let Some(val) = AUXILIARY_EVENT_TX.get_mut() {
// Reset channel if already set. Happens e.g. on reconnection after a refresh.
*val = auxiliary_event_tx;
} else {
// Set channel for the first time
AUXILIARY_EVENT_TX.set(auxiliary_event_tx).unwrap();
}
// Set global instance of this channel. This is used for interacting with the
// auxiliary loop (logging messages or spawning a task) from free functions.
// Unfortunately this can theoretically be reset at any time, i.e. on reconnection
// after a refresh, which is why we need an RwLock. This is the only place we take
// a write lock though. We panic if we can't access the write lock, as that implies
// the auxiliary loop has gone down and something is very wrong. We hold the lock
// for as short as possible, hence the extra scope.
{
let mut tx = AUXILIARY_EVENT_TX.write().unwrap();
*tx = Some(auxiliary_event_tx);
}

// List of pending tasks for which we manage the lifecycle (mainly relay
Expand Down Expand Up @@ -487,17 +490,30 @@ impl AuxiliaryState {
}
}

fn auxiliary_tx() -> &'static TokioUnboundedSender<AuxiliaryEvent> {
fn with_auxiliary_tx<F, T>(f: F) -> T
where
F: FnOnce(&TokioUnboundedSender<AuxiliaryEvent>) -> T,
{
let auxiliary_event_tx = AUXILIARY_EVENT_TX
.read()
.expect("Can lock auxiliary event sender.");

// If we get here that means the LSP was initialised at least once. The
// channel might be closed if the LSP was dropped, but it should exist.
unsafe { AUXILIARY_EVENT_TX.get().unwrap() }
let auxiliary_event_tx = auxiliary_event_tx
.as_ref()
.expect("LSP should have been initialized at least once by now.");

f(auxiliary_event_tx)
}

fn send_auxiliary(event: AuxiliaryEvent) {
if let Err(err) = auxiliary_tx().send(event) {
// The error includes the event
log::warn!("LSP is shut down, can't send event:\n{err:?}");
}
with_auxiliary_tx(|auxiliary_event_tx| {
if let Err(err) = auxiliary_event_tx.send(event) {
// The error includes the event
log::warn!("LSP is shut down, can't send event:\n{err:?}");
}
})
}

/// Send a message to the LSP client. This is non-blocking and treated on a
Expand All @@ -510,7 +526,9 @@ pub(crate) fn log(level: lsp_types::MessageType, message: String) {

// Check that channel is still alive in case the LSP was closed.
// If closed, fallthrough.
if let Ok(_) = auxiliary_tx().send(AuxiliaryEvent::Log(level, message.clone())) {
if let Ok(_) = with_auxiliary_tx(|auxiliary_event_tx| {
auxiliary_event_tx.send(AuxiliaryEvent::Log(level, message.clone()))
}) {
return;
}

Expand Down

0 comments on commit 794cb85

Please sign in to comment.