mirror of
https://github.com/rstudio/shiny.git
synced 2026-04-07 03:00:20 -04:00
Add priority levels to observers
Observers can now take priority levels, which allow the user to control the order of execution. Note that reactive expressions do not have priority levels; since they are lazily evaluated, it wouldn't make any sense to speak of priorities. Another commit will be needed to add an API for changing the priorities of outputs (probably in outputOptions?).
This commit is contained in:
@@ -31,6 +31,7 @@ URL: http://www.rstudio.com/shiny/
|
||||
BugReports: https://github.com/rstudio/shiny/issues
|
||||
Collate:
|
||||
'map.R'
|
||||
'priorityqueue.R'
|
||||
'utils.R'
|
||||
'tar.R'
|
||||
'timer.R'
|
||||
|
||||
84
R/priorityqueue.R
Normal file
84
R/priorityqueue.R
Normal file
@@ -0,0 +1,84 @@
|
||||
# "...like a regular queue or stack data structure, but where additionally each
|
||||
# element has a "priority" associated with it. In a priority queue, an element
|
||||
# with high priority is served before an element with low priority. If two
|
||||
# elements have the same priority, they are served according to their order in
|
||||
# the queue." (http://en.wikipedia.org/wiki/Priority_queue)
|
||||
|
||||
PriorityQueue <- setRefClass(
|
||||
'PriorityQueue',
|
||||
fields = list(
|
||||
# Keys are priorities, values are subqueues (implemented as list)
|
||||
.itemsByPriority = 'Map',
|
||||
# Sorted vector (largest first)
|
||||
.priorities = 'integer'
|
||||
),
|
||||
methods = list(
|
||||
# Enqueue an item, with the given priority level (must be integer). Higher
|
||||
# priority numbers are dequeued earlier than lower.
|
||||
#
|
||||
# We insist on integers over numerics to avoid problems that may arise from
|
||||
# loss of precision that is introduced when converting a numeric to a string
|
||||
# (which is performed when accessing the .itemsByPriority map).
|
||||
enqueue = function(item, priority) {
|
||||
if (!(priority %in% .priorities)) {
|
||||
.priorities <<- c(.priorities, priority)
|
||||
.priorities <<- sort(.priorities, decreasing=TRUE)
|
||||
.itemsByPriority$set(.key(priority), list(item))
|
||||
} else {
|
||||
.itemsByPriority$set(
|
||||
.key(priority),
|
||||
c(.itemsByPriority$get(.key(priority)), item)
|
||||
)
|
||||
}
|
||||
return(invisible())
|
||||
},
|
||||
# Retrieve a single item by 1) priority number (highest first) and then 2)
|
||||
# insertion order (first in, first out). If there are no items to be
|
||||
# dequeued, then NULL is returned. If it is necessary to distinguish between
|
||||
# a NULL value and the empty case, call isEmpty() before dequeue().
|
||||
dequeue = function() {
|
||||
if (length(.priorities) == 0)
|
||||
return(NULL)
|
||||
|
||||
maxPriority <- .priorities[[1]]
|
||||
items <- .itemsByPriority$get(.key(maxPriority))
|
||||
firstItem <- items[[1]]
|
||||
if (length(items) == 1) {
|
||||
# This is the last item at this priority. Remove both the list and the
|
||||
# priority level.
|
||||
.itemsByPriority$remove(.key(maxPriority))
|
||||
.priorities <<- .priorities[-1]
|
||||
} else {
|
||||
# There are still items at this priority. Remove the current item from
|
||||
# the list, and save it.
|
||||
items <- items[-1]
|
||||
.itemsByPriority$set(.key(maxPriority), items)
|
||||
}
|
||||
return(firstItem)
|
||||
},
|
||||
# Returns TRUE if no items are in the queue, otherwise FALSE.
|
||||
isEmpty = function() {
|
||||
length(.priorities) == 0
|
||||
},
|
||||
# Translates a priority integer to a character that is suitable for using as
|
||||
# a key.
|
||||
.key = function(priority) {
|
||||
as.character(priority)
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
# pq <- PriorityQueue$new()
|
||||
# pq$enqueue('a', 1L)
|
||||
# pq$enqueue('b', 1L)
|
||||
# pq$enqueue('c', 1L)
|
||||
# pq$enqueue('A', 2L)
|
||||
# pq$enqueue('B', 2L)
|
||||
# pq$enqueue('C', 2L)
|
||||
# pq$enqueue('d', 1L)
|
||||
# pq$enqueue('e', 1L)
|
||||
# pq$enqueue('f', 1L)
|
||||
# pq$enqueue('D', 2L)
|
||||
# pq$enqueue('E', 2L)
|
||||
# pq$enqueue('F', 2L)
|
||||
# # Expect ABCDEFabcdef
|
||||
17
R/react.R
17
R/react.R
@@ -42,10 +42,10 @@ Context <- setRefClass(
|
||||
.invalidateCallbacks <<- c(.invalidateCallbacks, func)
|
||||
NULL
|
||||
},
|
||||
addPendingFlush = function() {
|
||||
addPendingFlush = function(priority) {
|
||||
"Tell the reactive environment that this context should be flushed the
|
||||
next time flushReact() called."
|
||||
.getReactiveEnvironment()$addPendingFlush(.self)
|
||||
.getReactiveEnvironment()$addPendingFlush(.self, priority)
|
||||
},
|
||||
onFlush = function(func) {
|
||||
"Register a function to be called when this context is flushed."
|
||||
@@ -71,14 +71,14 @@ ReactiveEnvironment <- setRefClass(
|
||||
fields = list(
|
||||
.currentContext = 'ANY',
|
||||
.nextId = 'integer',
|
||||
.pendingFlush = 'list',
|
||||
.pendingFlush = 'PriorityQueue',
|
||||
.inFlush = 'logical'
|
||||
),
|
||||
methods = list(
|
||||
initialize = function() {
|
||||
.currentContext <<- NULL
|
||||
.nextId <<- 0L
|
||||
.pendingFlush <<- list()
|
||||
.pendingFlush <<- PriorityQueue$new()
|
||||
.inFlush <<- FALSE
|
||||
},
|
||||
nextId = function() {
|
||||
@@ -98,8 +98,8 @@ ReactiveEnvironment <- setRefClass(
|
||||
on.exit(.currentContext <<- old.ctx)
|
||||
func()
|
||||
},
|
||||
addPendingFlush = function(ctx) {
|
||||
.pendingFlush <<- c(ctx, .pendingFlush)
|
||||
addPendingFlush = function(ctx, priority) {
|
||||
.pendingFlush$enqueue(ctx, priority)
|
||||
},
|
||||
flush = function() {
|
||||
# If already in a flush, don't start another one
|
||||
@@ -107,9 +107,8 @@ ReactiveEnvironment <- setRefClass(
|
||||
.inFlush <<- TRUE
|
||||
on.exit(.inFlush <<- FALSE)
|
||||
|
||||
while (length(.pendingFlush) > 0) {
|
||||
ctx <- .pendingFlush[[1]]
|
||||
.pendingFlush <<- .pendingFlush[-1]
|
||||
while (!.pendingFlush$isEmpty()) {
|
||||
ctx <- .pendingFlush$dequeue()
|
||||
ctx$executeFlushCallbacks()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,19 +391,21 @@ Observer <- setRefClass(
|
||||
fields = list(
|
||||
.func = 'function',
|
||||
.label = 'character',
|
||||
.priority = 'numeric',
|
||||
.invalidateCallbacks = 'list',
|
||||
.execCount = 'integer',
|
||||
.onResume = 'function',
|
||||
.suspended = 'logical'
|
||||
),
|
||||
methods = list(
|
||||
initialize = function(func, label, suspended = FALSE) {
|
||||
initialize = function(func, label, suspended = FALSE, priority = 0L) {
|
||||
if (length(formals(func)) > 0)
|
||||
stop("Can't make an observer from a function that takes parameters; ",
|
||||
"only functions without parameters can be reactive.")
|
||||
|
||||
.func <<- func
|
||||
.label <<- label
|
||||
.priority <<- priority
|
||||
.execCount <<- 0L
|
||||
.suspended <<- suspended
|
||||
.onResume <<- function() NULL
|
||||
@@ -421,7 +423,7 @@ Observer <- setRefClass(
|
||||
})
|
||||
|
||||
continue <- function() {
|
||||
ctx$addPendingFlush()
|
||||
ctx$addPendingFlush(.priority)
|
||||
}
|
||||
|
||||
if (.suspended == FALSE)
|
||||
@@ -445,6 +447,13 @@ Observer <- setRefClass(
|
||||
"Register a function to run when this observer is invalidated"
|
||||
.invalidateCallbacks <<- c(.invalidateCallbacks, func)
|
||||
},
|
||||
setPriority = function(priority = 0L) {
|
||||
"Change the observer's priority. Note that if the observer is currently
|
||||
invalidated, then the change in priority will not take effect until the
|
||||
next invalidation--unless the observer is also currently suspended, in
|
||||
which case the priority change will be effective upon resume."
|
||||
.priority <<- priority
|
||||
},
|
||||
suspend = function() {
|
||||
"Causes this observer to stop scheduling flushes (re-executions) in
|
||||
response to invalidations. If the observer was invalidated prior to this
|
||||
@@ -493,6 +502,10 @@ Observer <- setRefClass(
|
||||
#' @param label A label for the observer, useful for debugging.
|
||||
#' @param suspended If \code{TRUE}, start the observer in a suspended state.
|
||||
#' If \code{FALSE} (the default), start in a non-suspended state.
|
||||
#' @param priority An integer that controls the priority with which this
|
||||
#' observer should be executed. An observer with a given priority level will
|
||||
#' always execute sooner than all observers with a lower priority level.
|
||||
#' Positive, negative, and zero values are allowed.
|
||||
#'
|
||||
#' @examples
|
||||
#' values <- reactiveValues(A=1)
|
||||
@@ -514,13 +527,14 @@ Observer <- setRefClass(
|
||||
#'
|
||||
#' @export
|
||||
observe <- function(x, env=parent.frame(), quoted=FALSE, label=NULL,
|
||||
suspended=FALSE) {
|
||||
suspended=FALSE, priority=0L) {
|
||||
|
||||
fun <- exprToFunction(x, env, quoted)
|
||||
if (is.null(label))
|
||||
label <- deparse(body(fun))
|
||||
|
||||
invisible(Observer$new(fun, label=label, suspended=suspended))
|
||||
invisible(Observer$new(
|
||||
fun, label=label, suspended=suspended, priority=priority))
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@@ -650,3 +650,16 @@ test_that("observe() accepts quoted and unquoted expressions", {
|
||||
flushReact()
|
||||
expect_identical(parent.env(inside_env), this_env)
|
||||
})
|
||||
|
||||
test_that("Observer priorities are respected", {
|
||||
results <- c()
|
||||
observe(results <<- c(results, 10), priority=10L)
|
||||
observe(results <<- c(results, 30), priority=30L)
|
||||
observe(results <<- c(results, 20), priority=20L)
|
||||
observe(results <<- c(results, 21), priority=20L)
|
||||
observe(results <<- c(results, 22), priority=20L)
|
||||
|
||||
flushReact()
|
||||
|
||||
expect_identical(results, c(30, 20, 21, 22, 10))
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user