diff --git a/NAMESPACE b/NAMESPACE index ac9ce731..c81c72fa 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -9,6 +9,7 @@ export(exists_loop) export(global_loop) export(later) export(loop_empty) +export(loop_queue_length) export(next_op_secs) export(run_now) export(with_loop) diff --git a/R/RcppExports.R b/R/RcppExports.R index ec9f790b..9d3c3249 100644 --- a/R/RcppExports.R +++ b/R/RcppExports.R @@ -49,6 +49,10 @@ ensureInitialized <- function() { invisible(.Call('_later_ensureInitialized', PACKAGE = 'later')) } +queueLength <- function(loop_id) { + .Call('_later_queueLength', PACKAGE = 'later', loop_id) +} + execLater <- function(callback, delaySecs, loop_id) { .Call('_later_execLater', PACKAGE = 'later', callback, delaySecs, loop_id) } diff --git a/R/later.R b/R/later.R index 91dcd32f..7b8f0d76 100644 --- a/R/later.R +++ b/R/later.R @@ -314,6 +314,17 @@ loop_empty <- function(loop = current_loop()) { idle(loop$id) } +#' Check how many callbacks are queued in a later loop +#' +#' Returns the number of callbacks that are scheduled to execute in the present +#' or future. +#' +#' @inheritParams create_loop +#' @export +loop_queue_length <- function(loop = current_loop()) { + queueLength(loop$id) +} + #' Relative time to next scheduled operation #' #' Returns the duration between now and the earliest operation that is currently diff --git a/man/loop_queue_length.Rd b/man/loop_queue_length.Rd new file mode 100644 index 00000000..4d0879ef --- /dev/null +++ b/man/loop_queue_length.Rd @@ -0,0 +1,15 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/later.R +\name{loop_queue_length} +\alias{loop_queue_length} +\title{Check how many callbacks are queued in a later loop} +\usage{ +loop_queue_length(loop = current_loop()) +} +\arguments{ +\item{loop}{A handle to an event loop.} +} +\description{ +Returns the number of callbacks that are scheduled to execute in the present +or future. +} diff --git a/src/RcppExports.cpp b/src/RcppExports.cpp index fa436559..38000f72 100644 --- a/src/RcppExports.cpp +++ b/src/RcppExports.cpp @@ -134,6 +134,17 @@ BEGIN_RCPP return R_NilValue; END_RCPP } +// queueLength +size_t queueLength(int loop_id); +RcppExport SEXP _later_queueLength(SEXP loop_idSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< int >::type loop_id(loop_idSEXP); + rcpp_result_gen = Rcpp::wrap(queueLength(loop_id)); + return rcpp_result_gen; +END_RCPP +} // execLater std::string execLater(Rcpp::Function callback, double delaySecs, int loop_id); RcppExport SEXP _later_execLater(SEXP callbackSEXP, SEXP delaySecsSEXP, SEXP loop_idSEXP) { diff --git a/src/callback_registry.cpp b/src/callback_registry.cpp index 0357ee0f..946e4c27 100644 --- a/src/callback_registry.cpp +++ b/src/callback_registry.cpp @@ -343,6 +343,11 @@ bool CallbackRegistry::empty() const { return this->queue.empty(); } +size_t CallbackRegistry::queueLength() const { + Guard guard(mutex); + return this->queue.size(); +} + // Returns true if the smallest timestamp exists and is not in the future. bool CallbackRegistry::due(const Timestamp& time, bool recursive) const { ASSERT_MAIN_THREAD() diff --git a/src/callback_registry.h b/src/callback_registry.h index cfabf7c0..16a2fae9 100644 --- a/src/callback_registry.h +++ b/src/callback_registry.h @@ -138,6 +138,9 @@ class CallbackRegistry { // Is the registry completely empty? bool empty() const; + // How many callbacks are currently queued? + size_t queueLength() const; + // Is anything ready to execute? bool due(const Timestamp& time = Timestamp(), bool recursive = true) const; diff --git a/src/init.c b/src/init.c index a43162a0..b6818e70 100644 --- a/src/init.c +++ b/src/init.c @@ -12,6 +12,7 @@ Check these declarations against the C/Fortran source code. extern SEXP _later_ensureInitialized(); extern SEXP _later_execCallbacks(SEXP, SEXP, SEXP); extern SEXP _later_idle(SEXP); +extern SEXP _later_queueLength(SEXP); extern SEXP _later_execLater(SEXP, SEXP, SEXP); extern SEXP _later_cancel(SEXP, SEXP); extern SEXP _later_nextOpSecs(SEXP); @@ -29,6 +30,7 @@ static const R_CallMethodDef CallEntries[] = { {"_later_ensureInitialized", (DL_FUNC) &_later_ensureInitialized, 0}, {"_later_execCallbacks", (DL_FUNC) &_later_execCallbacks, 3}, {"_later_idle", (DL_FUNC) &_later_idle, 1}, + {"_later_queueLength", (DL_FUNC) &_later_queueLength, 1}, {"_later_execLater", (DL_FUNC) &_later_execLater, 3}, {"_later_cancel", (DL_FUNC) &_later_cancel, 2}, {"_later_nextOpSecs", (DL_FUNC) &_later_nextOpSecs, 1}, diff --git a/src/later.cpp b/src/later.cpp index 7ad2c9f7..5fa980f1 100644 --- a/src/later.cpp +++ b/src/later.cpp @@ -298,6 +298,16 @@ void ensureInitialized() { initialized = true; } +// [[Rcpp::export]] +size_t queueLength(int loop_id) { + ASSERT_MAIN_THREAD() + shared_ptr registry = callbackRegistryTable.getRegistry(loop_id); + if (registry == nullptr) { + Rf_error("CallbackRegistry does not exist."); + } + return registry->queueLength(); +} + // [[Rcpp::export]] std::string execLater(Rcpp::Function callback, double delaySecs, int loop_id) { ASSERT_MAIN_THREAD() diff --git a/tests/testthat/test-private-loops.R b/tests/testthat/test-private-loops.R index 48d511dc..89e9faee 100644 --- a/tests/testthat/test-private-loops.R +++ b/tests/testthat/test-private-loops.R @@ -421,3 +421,25 @@ test_that("list_queue", { q <- list_queue(l) expect_equal(length(q), 0) }) + + +describe("Queue length with nested loops", { + x <- 0 + later(~{x <<- 1}, 0) + expect_equal(loop_queue_length(), 1) + with_temp_loop({ + expect_equal(loop_queue_length(), 0) + + later(~{x <<- 2}) + expect_equal(loop_queue_length(), 1) + + run_now() + expect_identical(x, 2) + expect_equal(loop_queue_length(), 0) + expect_equal(loop_queue_length(loop = global_loop()), 1) + + run_now(loop = global_loop()) + expect_equal(loop_queue_length(loop = global_loop()), 0) + expect_identical(x, 1) + }) +}) \ No newline at end of file