mirror of
https://github.com/rstudio/shiny.git
synced 2026-01-13 00:48:09 -05:00
* Avoid way too many promise domains being activated Using `captureStackTraces` in wrapForContext is a bad idea, it piles on a new domain every time a handler is bound. * Use captureStackTraces, it means the same thing * Update promises version requirement * Add test for stack trace growth * Simplify stack trace snapshot tests The `category` column isn't a good candidate for snapshot testing, as its contents vary depending on how the package was loaded/installed. During devtools::test() or similar, shiny package code shows up as 'user'. But during CI, it doesn't show up as anything.
253 lines
6.8 KiB
R
253 lines
6.8 KiB
R
processId <- local({
|
|
# pid is not sufficient to uniquely identify a process, because
|
|
# distributed futures span machines which could introduce pid
|
|
# collisions.
|
|
cached <- NULL
|
|
function() {
|
|
if (is.null(cached)) {
|
|
cached <<- rlang::hash(list(
|
|
Sys.info(),
|
|
Sys.time()
|
|
))
|
|
}
|
|
# Sys.getpid() cannot be cached because forked children will
|
|
# then have the same processId as their parents.
|
|
paste(cached, Sys.getpid())
|
|
}
|
|
})
|
|
|
|
#' @include graph.R
|
|
Context <- R6Class(
|
|
'Context',
|
|
portable = FALSE,
|
|
class = FALSE,
|
|
public = list(
|
|
id = character(0),
|
|
.reactId = character(0),
|
|
.reactType = "other",
|
|
.label = character(0), # For debug purposes
|
|
.invalidated = FALSE,
|
|
.invalidateCallbacks = list(),
|
|
.flushCallbacks = list(),
|
|
.domain = NULL,
|
|
.pid = NULL,
|
|
.weak = NULL,
|
|
|
|
initialize = function(
|
|
domain, label='', type='other', prevId='',
|
|
reactId = rLog$noReactId,
|
|
id = .getReactiveEnvironment()$nextId(), # For dummy context
|
|
weak = FALSE
|
|
) {
|
|
id <<- id
|
|
.label <<- label
|
|
.domain <<- domain
|
|
.pid <<- processId()
|
|
.reactId <<- reactId
|
|
.reactType <<- type
|
|
.weak <<- weak
|
|
rLog$createContext(id, label, type, prevId, domain)
|
|
},
|
|
run = function(func) {
|
|
"Run the provided function under this context."
|
|
|
|
promises::with_promise_domain(reactivePromiseDomain(), {
|
|
withReactiveDomain(.domain, {
|
|
captureStackTraces({
|
|
env <- .getReactiveEnvironment()
|
|
rLog$enter(.reactId, id, .reactType, .domain)
|
|
on.exit(rLog$exit(.reactId, id, .reactType, .domain), add = TRUE)
|
|
env$runWith(self, func)
|
|
})
|
|
})
|
|
})
|
|
},
|
|
invalidate = function() {
|
|
"Invalidate this context. It will immediately call the callbacks
|
|
that have been registered with onInvalidate()."
|
|
|
|
if (!identical(.pid, processId())) {
|
|
rlang::abort("Reactive context was created in one process and invalidated from another.")
|
|
}
|
|
|
|
if (.invalidated)
|
|
return()
|
|
.invalidated <<- TRUE
|
|
|
|
rLog$invalidateStart(.reactId, id, .reactType, .domain)
|
|
on.exit(rLog$invalidateEnd(.reactId, id, .reactType, .domain), add = TRUE)
|
|
|
|
lapply(.invalidateCallbacks, function(func) {
|
|
func()
|
|
})
|
|
.invalidateCallbacks <<- list()
|
|
NULL
|
|
},
|
|
onInvalidate = function(func) {
|
|
"Register a function to be called when this context is invalidated.
|
|
If this context is already invalidated, the function is called
|
|
immediately."
|
|
|
|
if (!identical(.pid, processId())) {
|
|
rlang::abort("Reactive context was created in one process and accessed from another.")
|
|
}
|
|
|
|
if (.invalidated)
|
|
func()
|
|
else
|
|
.invalidateCallbacks <<- c(.invalidateCallbacks, func)
|
|
NULL
|
|
},
|
|
addPendingFlush = function(priority) {
|
|
"Tell the reactive environment that this context should be flushed the
|
|
next time flushReact() called."
|
|
.getReactiveEnvironment()$addPendingFlush(self, priority)
|
|
},
|
|
onFlush = function(func) {
|
|
"Register a function to be called when this context is flushed."
|
|
.flushCallbacks <<- c(.flushCallbacks, func)
|
|
},
|
|
executeFlushCallbacks = function() {
|
|
"For internal use only."
|
|
|
|
lapply(.flushCallbacks, function(flushCallback) {
|
|
flushCallback()
|
|
})
|
|
},
|
|
isWeak = function() {
|
|
.weak
|
|
}
|
|
)
|
|
)
|
|
|
|
ReactiveEnvironment <- R6Class(
|
|
'ReactiveEnvironment',
|
|
portable = FALSE,
|
|
class = FALSE,
|
|
public = list(
|
|
.currentContext = NULL,
|
|
.nextId = 0L,
|
|
.pendingFlush = 'PriorityQueue',
|
|
.inFlush = FALSE,
|
|
|
|
initialize = function() {
|
|
.pendingFlush <<- PriorityQueue$new()
|
|
},
|
|
nextId = function() {
|
|
.nextId <<- .nextId + 1L
|
|
return(as.character(.nextId))
|
|
},
|
|
currentContext = function() {
|
|
if (is.null(.currentContext)) {
|
|
if (isTRUE(getOption('shiny.suppressMissingContextError'))) {
|
|
return(getDummyContext())
|
|
} else {
|
|
rlang::abort(c(
|
|
'Operation not allowed without an active reactive context.',
|
|
paste0(
|
|
'You tried to do something that can only be done from inside a ',
|
|
'reactive consumer.'
|
|
)
|
|
))
|
|
}
|
|
}
|
|
return(.currentContext)
|
|
},
|
|
runWith = function(ctx, contextFunc) {
|
|
old.ctx <- .currentContext
|
|
.currentContext <<- ctx
|
|
on.exit(.currentContext <<- old.ctx)
|
|
contextFunc()
|
|
},
|
|
addPendingFlush = function(ctx, priority) {
|
|
.pendingFlush$enqueue(ctx, priority)
|
|
},
|
|
hasPendingFlush = function() {
|
|
return(!.pendingFlush$isEmpty())
|
|
},
|
|
# Returns TRUE if anything was actually called
|
|
flush = function() {
|
|
# If nothing to flush, exit early
|
|
if (!hasPendingFlush()) return(invisible(FALSE))
|
|
# If already in a flush, don't start another one
|
|
if (.inFlush) return(invisible(FALSE))
|
|
.inFlush <<- TRUE
|
|
on.exit({
|
|
.inFlush <<- FALSE
|
|
rLog$idle(domain = NULL)
|
|
})
|
|
|
|
while (hasPendingFlush()) {
|
|
ctx <- .pendingFlush$dequeue()
|
|
ctx$executeFlushCallbacks()
|
|
}
|
|
|
|
invisible(TRUE)
|
|
}
|
|
)
|
|
)
|
|
|
|
.getReactiveEnvironment <- local({
|
|
reactiveEnvironment <- NULL
|
|
function() {
|
|
if (is.null(reactiveEnvironment))
|
|
reactiveEnvironment <<- ReactiveEnvironment$new()
|
|
return(reactiveEnvironment)
|
|
}
|
|
})
|
|
|
|
# Causes any pending invalidations to run. Returns TRUE if any invalidations
|
|
# were pending (i.e. if work was actually done).
|
|
flushReact <- function() {
|
|
return(.getReactiveEnvironment()$flush())
|
|
}
|
|
|
|
# Retrieves the current reactive context, or errors if there is no reactive
|
|
# context active at the moment.
|
|
getCurrentContext <- function() {
|
|
.getReactiveEnvironment()$currentContext()
|
|
}
|
|
hasCurrentContext <- function() {
|
|
!is.null(.getReactiveEnvironment()$.currentContext) ||
|
|
isTRUE(getOption("shiny.suppressMissingContextError"))
|
|
}
|
|
|
|
getDummyContext <- function() {
|
|
Context$new(
|
|
getDefaultReactiveDomain(), '[none]', type = 'isolate',
|
|
id = "Dummy", reactId = rLog$dummyReactId
|
|
)
|
|
}
|
|
|
|
wrapForContext <- function(func, ctx) {
|
|
force(func)
|
|
force(ctx) # may be NULL (in the case of maskReactiveContext())
|
|
|
|
function(...) {
|
|
.getReactiveEnvironment()$runWith(ctx, function() {
|
|
func(...)
|
|
})
|
|
}
|
|
}
|
|
|
|
reactivePromiseDomain <- function() {
|
|
promises::new_promise_domain(
|
|
wrapOnFulfilled = function(onFulfilled) {
|
|
force(onFulfilled)
|
|
|
|
# ctx will be NULL if we're in a maskReactiveContext()
|
|
ctx <- if (hasCurrentContext()) getCurrentContext() else NULL
|
|
|
|
wrapForContext(onFulfilled, ctx)
|
|
},
|
|
wrapOnRejected = function(onRejected) {
|
|
force(onRejected)
|
|
|
|
# ctx will be NULL if we're in a maskReactiveContext()
|
|
ctx <- if (hasCurrentContext()) getCurrentContext() else NULL
|
|
|
|
wrapForContext(onRejected, ctx)
|
|
}
|
|
)
|
|
}
|