Changes to flush cycle to support async

- Moved (in|de)crementBusyCount calls out of Context and into Observer
- decrementBusyCount is (effectively) deferred for async observers until
  the async operation is complete
- invalidateLater didn't force(session), almost certainly was buggy
- invalidateLater, reactiveTimer, and manageInputs all now use a new
  session$cycleStartAction, which delays their effect until observers
  (including async ones) are done executing
This commit is contained in:
Joe Cheng
2017-10-11 11:50:10 -07:00
parent 2314f63424
commit 2602dc15b0
8 changed files with 137 additions and 31 deletions

View File

@@ -63,6 +63,7 @@ Depends:
R (>= 3.0.2),
methods
Remotes:
r-lib/later,
rstudio/promises
Imports:
utils,
@@ -75,7 +76,7 @@ Imports:
htmltools (>= 0.3.5),
R6 (>= 2.0),
sourcetools,
later,
later (>= 0.5.0.9001),
promises,
tools
Suggests:

View File

@@ -55,9 +55,6 @@ Context <- R6Class(
addPendingFlush = function(priority) {
"Tell the reactive environment that this context should be flushed the
next time flushReact() called."
if (!is.null(.domain)) {
.domain$incrementBusyCount()
}
.getReactiveEnvironment()$addPendingFlush(self, priority)
},
onFlush = function(func) {
@@ -67,12 +64,6 @@ Context <- R6Class(
executeFlushCallbacks = function() {
"For internal use only."
on.exit({
if (!is.null(.domain)) {
.domain$decrementBusyCount()
}
}, add = TRUE)
lapply(.flushCallbacks, function(flushCallback) {
flushCallback()
})

View File

@@ -1026,6 +1026,9 @@ registerDebugHook("observerFunc", environment(), label)
continue <- function() {
ctx$addPendingFlush(.priority)
if (!is.null(.domain)) {
.domain$incrementBusyCount()
}
}
if (.suspended == FALSE)
@@ -1035,9 +1038,22 @@ registerDebugHook("observerFunc", environment(), label)
})
ctx$onFlush(function() {
if (!is.null(.domain)) {
on.exit(.domain$decrementBusyCount(), add = TRUE)
}
tryCatch({
if (!.destroyed)
shinyCallingHandlers(run())
if (!.destroyed) {
result <- shinyCallingHandlers(run())
if (!is.null(.domain)) {
if (promises::is.promise(result)) {
# If this observer is async, it's necessary to maintain the busy
# count until the async operation is complete
.domain$incrementBusyCount()
finally(result, .domain$decrementBusyCount)
}
}
}
}, error = function(e) {
printError(e)
@@ -1475,13 +1491,21 @@ reactiveTimer <- function(intervalMs=1000, session = getDefaultReactiveDomain())
#' }
#' @export
invalidateLater <- function(millis, session = getDefaultReactiveDomain()) {
force(session)
ctx <- .getReactiveEnvironment()$currentContext()
timerCallbacks$schedule(millis, function() {
# Quit if the session is closed
if (!is.null(session) && session$isClosed()) {
if (is.null(session)) {
ctx$invalidate()
return(invisible())
}
ctx$invalidate()
if (!session$isClosed()) {
session$cycleStartAction(function() {
ctx$invalidate()
})
}
invisible()
})
invisible()
}

View File

@@ -309,6 +309,10 @@ createAppHandlers <- function(httpHandlers, serverFuncSource) {
# The HTTP_GUID, if it exists, is for Shiny Server reporting purposes
shinysession$startTiming(ws$request$HTTP_GUID)
shinysession$requestFlush()
# Make httpuv return control to Shiny quickly, instead of waiting
# for the usual timeout
httpuv::interrupt()
})
})
}
@@ -423,7 +427,12 @@ startApp <- function(appObj, port, host, quiet) {
# Run an application that was created by \code{\link{startApp}}. This
# function should normally be called in a \code{while(TRUE)} loop.
serviceApp <- function() {
later::run_now() || timerCallbacks$executeElapsed()
later::run_now()
flushReact()
flushPendingSessions()
timerCallbacks$executeElapsed()
flushReact()
flushPendingSessions()

View File

@@ -421,6 +421,7 @@ ShinySession <- R6Class(
invalidatedOutputValues = 'Map',
invalidatedOutputErrors = 'Map',
inputMessageQueue = list(), # A list of inputMessages to send when flushed
cycleStartActionQueue = list(), # A list of actions to perform to start a cycle
.outputs = list(), # Keeps track of all the output observer objects
.outputOptions = list(), # Options for each of the output observer objects
progressKeys = 'character',
@@ -731,7 +732,7 @@ ShinySession <- R6Class(
} else if (identical(format, "rds")) {
tmpfile <- tempfile("shinytest", fileext = ".rds")
saveRDS(values, tmpfile)
on.exit(unlink(tmpfile))
on.exit(unlink(tmpfile), add = TRUE)
content <- readBin(tmpfile, "raw", n = file.info(tmpfile)$size)
httpResponse(200, "application/octet-stream", content)
@@ -755,6 +756,15 @@ ShinySession <- R6Class(
getSnapshotPreprocessInput = function(name) {
fun <- private$.input$getMeta(name, "shiny.snapshot.preprocess")
fun %OR% identity
},
# See cycleStartAction
startCycle = function() {
if (length(private$cycleStartActionQueue) > 0) {
head <- private$cycleStartActionQueue[[1L]]
private$cycleStartActionQueue <- private$cycleStartActionQueue[-1L]
head()
}
}
),
public = list(
@@ -1218,6 +1228,9 @@ ShinySession <- R6Class(
}
},
flushOutput = function() {
if (private$busyCount > 0)
return()
appsNeedingFlush$remove(self$token)
if (self$isClosed())
@@ -1244,7 +1257,7 @@ ShinySession <- R6Class(
on.exit({
# ..stacktraceon matches with the top-level ..stacktraceoff..
private$flushedCallbacks$invoke(..stacktraceon = TRUE)
})
}, add = TRUE)
if (!hasPendingUpdates()) {
# Normally, if there are no updates, simply return without sending
@@ -1275,6 +1288,18 @@ ShinySession <- R6Class(
inputMessages = inputMessages
)
},
# Schedule an action to execute not (necessarily) now, but when no observers
# that belong to this session are busy executing. This helps prevent (but
# does not guarantee) inputs and reactive values from changing underneath
# async observers as they run.
cycleStartAction = function(callback) {
private$cycleStartActionQueue <- c(private$cycleStartActionQueue, list(callback))
# If no observers are running in this session, we're safe to proceed.
# Otherwise, startCycle() will be called later, via decrementBusyCount().
if (private$busyCount == 0L) {
private$startCycle()
}
},
showProgress = function(id) {
'Send a message to the client that recalculation of the output identified
by \\code{id} is in progress. There is currently no mechanism for
@@ -1819,24 +1844,26 @@ ShinySession <- R6Class(
},
# Set the normal and client data input variables
manageInputs = function(data) {
force(data)
self$cycleStartAction(function() {
private$inputReceivedCallbacks$invoke(data)
private$inputReceivedCallbacks$invoke(data)
data_names <- names(data)
data_names <- names(data)
# Separate normal input variables from client data input variables
clientdata_idx <- grepl("^.clientdata_", data_names)
# Separate normal input variables from client data input variables
clientdata_idx <- grepl("^.clientdata_", data_names)
# Set normal (non-clientData) input values
private$.input$mset(data[data_names[!clientdata_idx]])
# Set normal (non-clientData) input values
private$.input$mset(data[data_names[!clientdata_idx]])
# Strip off .clientdata_ from clientdata input names, and set values
input_clientdata <- data[data_names[clientdata_idx]]
names(input_clientdata) <- sub("^.clientdata_", "",
names(input_clientdata))
private$.clientData$mset(input_clientdata)
# Strip off .clientdata_ from clientdata input names, and set values
input_clientdata <- data[data_names[clientdata_idx]]
names(input_clientdata) <- sub("^.clientdata_", "",
names(input_clientdata))
private$.clientData$mset(input_clientdata)
self$manageHiddenOutputs()
self$manageHiddenOutputs()
})
},
outputOptions = function(name, ...) {
# If no name supplied, return the list of options for all outputs
@@ -1880,6 +1907,19 @@ ShinySession <- R6Class(
private$busyCount <- private$busyCount - 1L
if (private$busyCount == 0L) {
private$sendMessage(busy = "idle")
self$requestFlush()
# We defer the call to startCycle() using later(), to defend against
# cycles where we continually call startCycle which causes an observer
# to fire which calls startCycle which causes an observer to fire...
#
# It's OK for these cycles to occur, but we must return control to the
# event loop between iterations (or at least sometimes) in order to not
# make the whole Shiny app go unresponsive.
later::later(function() {
if (private$busyCount == 0L) {
private$startCycle()
}
})
}
}
),

View File

@@ -1581,3 +1581,7 @@ Mutable <- R6Class("Mutable",
get = function() { private$value }
)
)
is.not.null <- function(x) {
!is.null(x)
}

View File

@@ -3,6 +3,7 @@
## Error handling/debugging
- [ ] ..stacktraceon../..stacktraceoff.. and stack traces in general
- [ ] long stack traces
- [ ] options(shiny.error) should work in promise handlers
## Render functions
- [ ] Non-async render functions should have their code all execute on the current tick. Otherwise order of execution will be surprising if they have side effects and explicit priorities.

36
manualtests/async/timer.R Normal file
View File

@@ -0,0 +1,36 @@
library(shiny)
library(future)
library(promises)
library(magrittr)
plan(multisession)
ui <- fluidPage(
p("This app tests that ", tags$code("invalidateLater()"), " calls are held until async operations are complete."),
tags$ol(
tags$li("You should see the number below increasing by 1, every 2 seconds."),
tags$li("The output should be semi-transparent (i.e. recalculating state) continuously."),
tags$li("You should see the word 'Flushed' in the R console, every 2 seconds.")
),
verbatimTextOutput("out")
)
server <- function(input, output, session) {
value <- reactiveVal(0L)
observe({
invalidateLater(100)
isolate({ value(value() + 1L) })
})
session$onFlushed(function() {
print("Flushed")
}, once = FALSE)
output$out <- renderText({
future(Sys.sleep(2)) %...>%
{ value() }
})
}
shinyApp(ui, server)