diff --git a/crates/ark/src/lsp/main_loop.rs b/crates/ark/src/lsp/main_loop.rs index 07d1ca941..19a8780a5 100644 --- a/crates/ark/src/lsp/main_loop.rs +++ b/crates/ark/src/lsp/main_loop.rs @@ -8,6 +8,7 @@ use std::collections::HashMap; use std::future; use std::pin::Pin; +use std::sync::RwLock; use anyhow::anyhow; use futures::StreamExt; @@ -34,17 +35,20 @@ use crate::lsp::state_handlers::ConsoleInputs; pub(crate) type TokioUnboundedSender = tokio::sync::mpsc::UnboundedSender; pub(crate) type TokioUnboundedReceiver = tokio::sync::mpsc::UnboundedReceiver; -// The global instance of the auxiliary event channel, used for sending log -// messages or spawning threads from free functions. Since this is an unbounded -// channel, sending a log message is not async nor blocking. Tokio senders are -// Send and Sync so this global variable can be safely shared across threads. -// -// Note that in case of duplicate LSP sessions (see -// https://github.com/posit-dev/ark/issues/622 and -// https://github.com/posit-dev/positron/issues/5321), it's possible for older -// LSPs to send log messages and tasks to the newer LSPs. -static mut AUXILIARY_EVENT_TX: std::cell::OnceCell> = - std::cell::OnceCell::new(); +/// The global instance of the auxiliary event channel, used for sending log messages or +/// spawning threads from free functions. Since this is an unbounded channel, sending a +/// log message is not async nor blocking. Tokio senders are Send and Sync so this global +/// variable can be safely shared across threads. +/// +/// LSP sessions can be restarted or reconnected at any time, which is why this is an +/// `RwLock`, but we expect that to be very rare. Read locking is not expected to be +/// contentious. +/// +/// Note that in case of duplicate LSP sessions (see +/// https://github.com/posit-dev/ark/issues/622 and +/// https://github.com/posit-dev/positron/issues/5321), it's possible for older +/// LSPs to send log messages and tasks to the newer LSPs. +static AUXILIARY_EVENT_TX: RwLock>> = RwLock::new(None); // This is the syntax for trait aliases until an official one is stabilised. // This alias is for the future of a `JoinHandle>` @@ -400,17 +404,16 @@ impl AuxiliaryState { // Channels for communication with the auxiliary loop let (auxiliary_event_tx, auxiliary_event_rx) = tokio_unbounded_channel::(); - // Set global instance of this channel. This is used for interacting - // with the auxiliary loop (logging messages or spawning a task) from - // free functions. - unsafe { - if let Some(val) = AUXILIARY_EVENT_TX.get_mut() { - // Reset channel if already set. Happens e.g. on reconnection after a refresh. - *val = auxiliary_event_tx; - } else { - // Set channel for the first time - AUXILIARY_EVENT_TX.set(auxiliary_event_tx).unwrap(); - } + // Set global instance of this channel. This is used for interacting with the + // auxiliary loop (logging messages or spawning a task) from free functions. + // Unfortunately this can theoretically be reset at any time, i.e. on reconnection + // after a refresh, which is why we need an RwLock. This is the only place we take + // a write lock though. We panic if we can't access the write lock, as that implies + // the auxiliary loop has gone down and something is very wrong. We hold the lock + // for as short as possible, hence the extra scope. + { + let mut tx = AUXILIARY_EVENT_TX.write().unwrap(); + *tx = Some(auxiliary_event_tx); } // List of pending tasks for which we manage the lifecycle (mainly relay @@ -487,17 +490,30 @@ impl AuxiliaryState { } } -fn auxiliary_tx() -> &'static TokioUnboundedSender { +fn with_auxiliary_tx(f: F) -> T +where + F: FnOnce(&TokioUnboundedSender) -> T, +{ + let auxiliary_event_tx = AUXILIARY_EVENT_TX + .read() + .expect("Can lock auxiliary event sender."); + // If we get here that means the LSP was initialised at least once. The // channel might be closed if the LSP was dropped, but it should exist. - unsafe { AUXILIARY_EVENT_TX.get().unwrap() } + let auxiliary_event_tx = auxiliary_event_tx + .as_ref() + .expect("LSP should have been initialized at least once by now."); + + f(auxiliary_event_tx) } fn send_auxiliary(event: AuxiliaryEvent) { - if let Err(err) = auxiliary_tx().send(event) { - // The error includes the event - log::warn!("LSP is shut down, can't send event:\n{err:?}"); - } + with_auxiliary_tx(|auxiliary_event_tx| { + if let Err(err) = auxiliary_event_tx.send(event) { + // The error includes the event + log::warn!("LSP is shut down, can't send event:\n{err:?}"); + } + }) } /// Send a message to the LSP client. This is non-blocking and treated on a @@ -510,7 +526,9 @@ pub(crate) fn log(level: lsp_types::MessageType, message: String) { // Check that channel is still alive in case the LSP was closed. // If closed, fallthrough. - if let Ok(_) = auxiliary_tx().send(AuxiliaryEvent::Log(level, message.clone())) { + if let Ok(_) = with_auxiliary_tx(|auxiliary_event_tx| { + auxiliary_event_tx.send(AuxiliaryEvent::Log(level, message.clone())) + }) { return; }