diff --git a/DESCRIPTION b/DESCRIPTION index 0565d7ef5..701da4ca9 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -132,6 +132,7 @@ Collate: 'deprecated.R' 'devmode.R' 'diagnose.R' + 'extended-task.R' 'fileupload.R' 'graph.R' 'reactives.R' diff --git a/NAMESPACE b/NAMESPACE index d13edda0f..01e6ac997 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -53,6 +53,7 @@ S3method(str,reactivevalues) export("conditionStackTrace<-") export(..stacktraceoff..) export(..stacktraceon..) +export(ExtendedTask) export(HTML) export(MockShinySession) export(NS) diff --git a/R/extended-task.R b/R/extended-task.R new file mode 100644 index 000000000..2d3085771 --- /dev/null +++ b/R/extended-task.R @@ -0,0 +1,203 @@ +#' Task or computation that proceeds in the background +#' +#' @description In normal Shiny reactive code, whenever an observer, calc, or +#' output is busy computing, it blocks the current session from receiving any +#' inputs or attempting to proceed with any other computation related to that +#' session. +#' +#' The `ExtendedTask` class allows you to have an expensive operation that is +#' started by a reactive effect, and whose (eventual) results can be accessed +#' by a regular observer, calc, or output; but during the course of the +#' operation, the current session is completely unblocked, allowing the user +#' to continue using the rest of the app while the operation proceeds in the +#' background. +#' +#' Note that each `ExtendedTask` object does not represent a _single +#' invocation_ of its long-running function. Rather, it's an object that is +#' used to invoke the function with different arguments, keeps track of +#' whether an invocation is in progress, and provides ways to get at the +#' current status or results of the operation. A single `ExtendedTask` object +#' does not permit overlapping invocations: if the `invoke()` method is called +#' before the previous `invoke()` is completed, the new invocation will not +#' begin until the previous invocation has completed. +#' +#' @section `ExtendedTask` versus asynchronous reactives: +#' +#' Shiny has long supported [using +#' \{promises\}](https://rstudio.github.io/promises/articles/promises_06_shiny.html) +#' to write asynchronous observers, calcs, or outputs. You may be wondering +#' what the differences are between those techniques and this class. +#' +#' Asynchronous observers, calcs, and outputs are not--and have never +#' been--designed to let a user start a long-running operation, while keeping +#' that very same (browser) session responsive to other interactions. Instead, +#' they unblock other sessions, so you can take a long-running operation that +#' would normally bring the entire R process to a halt and limit the blocking +#' to just the session that started the operation. (For more details, see the +#' section on ["The Flush +#' Cycle"](https://rstudio.github.io/promises/articles/promises_06_shiny.html#the-flush-cycle).) +#' +#' `ExtendedTask`, on the other hand, invokes an asynchronous function (that +#' is, a function that quickly returns a promise) and allows even that very +#' session to immediately unblock and carry on with other user interactions. +#' +#' @export +ExtendedTask <- R6Class("ExtendedTask", portable = TRUE, + public = list( + #' @description + #' Creates a new `ExtendedTask` object. `ExtendedTask` should generally be + #' created either at the top of a server function, or at the top of a module + #' server function. + #' + #' @param func The long-running operation to execute. This should be an + #' asynchronous function, meaning, it should use the + #' [\{promises\}](https://rstudio.github.io/promises/) package, most + #' likely in conjuction with the + #' [\{future\}](https://rstudio.github.io/promises/articles/promises_04_futures.html) + #' package. It's also important that this logic does not read from any + #' reactive inputs/sources, as inputs may change after the function is + #' invoked; instead, if the function needs to access reactive inputs, it + #' should take parameters and the caller of the `invoke()` method should + #' read reactive inputs and pass them as arguments. + initialize = function(func) { + private$func <- func + private$rv_status <- reactiveVal("initial") + private$rv_running <- reactiveVal(FALSE) + private$rv_value <- reactiveVal(NULL) + private$rv_error <- reactiveVal(NULL) + private$invocation_queue <- fastmap::fastqueue() + }, + #' @description + #' Starts executing the long-running operation. If this `ExtendedTask` is + #' already running (meaning, a previous call to `invoke()` is not yet + #' complete) then enqueues this invocation until after the current + #' invocation, and any already-enqueued invocation, completes. + #' + #' @param ... Parameters to use for this invocation of the underlying + #' function. If reactive inputs are needed by the underlying function, + #' they should be read by the caller of `invoke` and passed in as + #' arguments. + invoke = function(...) { + args <- rlang::dots_list(..., .ignore_empty = "none") + + if ( + private$rv_status() == "running" || + private$invocation_queue$size() > 0 + ) { + private$invocation_queue$add(args) + } else { + private$do_invoke(args) + } + invisible(NULL) + }, + #' @description + #' Returns one of the following values: + #' + #' * `"initial"`: This `ExtendedTask` has not yet been invoked + #' * `"running"`: An invocation is currently running + #' * `"success"`: An invocation completed successfully, and a value can be + #' retrieved via the `result()` method + #' * `"error"`: An invocation completed with an error, which will be + #' re-thrown if you call the `result()` method + #' + #' This is a reactive read that invalidates the caller when the task's + #' status changes. + status = function() { + private$rv_status() + }, + #' @description + #' Attempts to read the results of the most recent invocation. This is a + #' reactive read that invalidates as the task's status changes. + #' + #' The actual behavior differs greatly depending on the current status of + #' the task: + #' + #' * `"initial"`: Throws a silent error (like [`req(FALSE)`][req()]). If + #' this happens during output rendering, the output will be blanked out. + #' * `"running"`: Throws a special silent error that, if it happens during + #' output rendering, makes the output appear "in progress" until further + #' notice. + #' * `"success"`: Returns the return value of the most recent invocation. + #' * `"error"`: Throws whatever error was thrown by the most recent + #' invocation. + #' + #' This method is intended to be called fairly naively by any output or + #' reactive expression that cares about the output--you just have to be + #' aware that if the result isn't ready for whatever reason, processing will + #' stop in much the same way as `req(FALSE)` does, but when the result is + #' ready you'll get invalidated, and when you run again the result should be + #' there. + #' + #' Note that the `result()` method is generally not meant to be used with + #' [observeEvent()], [eventReactive()], [bindEvent()], or [isolate()] as the + #' invalidation will be ignored. + result = function() { + switch (private$rv_status(), + running = req(FALSE, cancelOutput="progress"), + success = if (private$rv_value()$visible) { + private$rv_value()$value + } else { + invisible(private$rv_value()$value) + }, + error = stop(private$rv_error()), + # default case (initial, cancelled) + req(FALSE) + ) + } + ), + private = list( + func = NULL, + # reactive value with "initial"|"running"|"success"|"error" + rv_status = NULL, + rv_value = NULL, + rv_error = NULL, + invocation_queue = NULL, + + do_invoke = function(args) { + private$rv_status("running") + private$rv_value(NULL) + private$rv_error(NULL) + + p <- NULL + tryCatch({ + maskReactiveContext({ + # TODO: Bounce the do.call off of a promise_resolve(), so that the + # call to invoke() always returns immediately? + result <- do.call(private$func, args) + p <- promises::as.promise(result) + }) + }, error = function(e) { + private$on_error(e) + }) + + promises::finally( + promises::then(p, + onFulfilled = function(value, .visible) { + private$on_success(list(value=value, visible=.visible)) + }, + onRejected = function(error) { + private$on_error(error) + } + ), + onFinally = function() { + if (private$invocation_queue$size() > 0) { + private$do_invoke(private$invocation_queue$remove()) + } + } + ) + + + invisible(NULL) + }, + + on_error = function(err) { + private$rv_status("error") + private$rv_error(err) + }, + + on_success = function(value) { + private$rv_status("success") + private$rv_value(value) + } + ) +) diff --git a/R/react.R b/R/react.R index 226583f64..5232486b8 100644 --- a/R/react.R +++ b/R/react.R @@ -219,10 +219,10 @@ getDummyContext <- function() { wrapForContext <- function(func, ctx) { force(func) - force(ctx) + force(ctx) # may be NULL (in the case of maskReactiveContext()) function(...) { - ctx$run(function() { + .getReactiveEnvironment()$runWith(ctx, function() { captureStackTraces( func(...) ) @@ -234,12 +234,18 @@ reactivePromiseDomain <- function() { promises::new_promise_domain( wrapOnFulfilled = function(onFulfilled) { force(onFulfilled) - ctx <- getCurrentContext() + + # ctx will be NULL if we're in a maskReactiveContext() + ctx <- if (hasCurrentContext()) getCurrentContext() else NULL + wrapForContext(onFulfilled, ctx) }, wrapOnRejected = function(onRejected) { force(onRejected) - ctx <- getCurrentContext() + + # ctx will be NULL if we're in a maskReactiveContext() + ctx <- if (hasCurrentContext()) getCurrentContext() else NULL + wrapForContext(onRejected, ctx) } ) diff --git a/man/ExtendedTask.Rd b/man/ExtendedTask.Rd new file mode 100644 index 000000000..6998dd438 --- /dev/null +++ b/man/ExtendedTask.Rd @@ -0,0 +1,182 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/extended-task.R +\name{ExtendedTask} +\alias{ExtendedTask} +\title{Task or computation that proceeds in the background} +\description{ +In normal Shiny reactive code, whenever an observer, calc, or +output is busy computing, it blocks the current session from receiving any +inputs or attempting to proceed with any other computation related to that +session. + +The \code{ExtendedTask} class allows you to have an expensive operation that is +started by a reactive effect, and whose (eventual) results can be accessed +by a regular observer, calc, or output; but during the course of the +operation, the current session is completely unblocked, allowing the user +to continue using the rest of the app while the operation proceeds in the +background. + +Note that each \code{ExtendedTask} object does not represent a \emph{single +invocation} of its long-running function. Rather, it's an object that is +used to invoke the function with different arguments, keeps track of +whether an invocation is in progress, and provides ways to get at the +current status or results of the operation. A single \code{ExtendedTask} object +does not permit overlapping invocations: if the \code{invoke()} method is called +before the previous \code{invoke()} is completed, the new invocation will not +begin until the previous invocation has completed. +} +\section{\code{ExtendedTask} versus asynchronous reactives}{ + + +Shiny has long supported \href{https://rstudio.github.io/promises/articles/promises_06_shiny.html}{using \{promises\}} +to write asynchronous observers, calcs, or outputs. You may be wondering +what the differences are between those techniques and this class. + +Asynchronous observers, calcs, and outputs are not--and have never +been--designed to let a user start a long-running operation, while keeping +that very same (browser) session responsive to other interactions. Instead, +they unblock other sessions, so you can take a long-running operation that +would normally bring the entire R process to a halt and limit the blocking +to just the session that started the operation. (For more details, see the +section on \href{https://rstudio.github.io/promises/articles/promises_06_shiny.html#the-flush-cycle}{"The Flush Cycle"}.) + +\code{ExtendedTask}, on the other hand, invokes an asynchronous function (that +is, a function that quickly returns a promise) and allows even that very +session to immediately unblock and carry on with other user interactions. +} + +\section{Methods}{ +\subsection{Public methods}{ +\itemize{ +\item \href{#method-ExtendedTask-new}{\code{ExtendedTask$new()}} +\item \href{#method-ExtendedTask-invoke}{\code{ExtendedTask$invoke()}} +\item \href{#method-ExtendedTask-status}{\code{ExtendedTask$status()}} +\item \href{#method-ExtendedTask-result}{\code{ExtendedTask$result()}} +\item \href{#method-ExtendedTask-clone}{\code{ExtendedTask$clone()}} +} +} +\if{html}{\out{