Title: | Progress Bar for Parallel Tasks |
---|---|
Description: | A simple interface in the form of R6 classes for executing tasks in parallel, tracking their progress, and displaying accurate progress bars. |
Authors: | Mihai Constantin [aut, cre] |
Maintainer: | Mihai Constantin <[email protected]> |
License: | MIT + file LICENSE |
Version: | 1.2.1 |
Built: | 2024-11-17 22:20:21 UTC |
Source: | https://github.com/mihaiconstantin/parabar |
This is a concrete implementation of the abstract class Backend
that implements the BackendService
interface. This backend
executes tasks in parallel asynchronously (i.e., without blocking the main
R
session) on a parallel::makeCluster()
cluster created in a background
R
session
.
parabar::BackendService
-> parabar::Backend
-> AsyncBackend
task_state
A list of logical values indicating the state of
the task execution. See the TaskState
class for more
information on how the statues are determined. The following statuses
are available:
task_not_started
: Indicates whether the backend is free. TRUE
signifies that no task has been started and the backend is free to
deploy.
task_is_running
: Indicates whether a task is currently running on
the backend.
task_is_completed
: Indicates whether a task has finished
executing. TRUE
signifies that the output of the task has not been
fetched. Calling the method get_option()
will move the output from
the background R
session to the main R
session. Once the output
has been fetched, the backend is free to deploy another task.
session_state
A list of logical values indicating the state of
the background session managing the cluster. See the
SessionState
class for more information on the available
statuses. The following statuses are available:
session_is_starting
: Indicates whether the session is starting.
session_is_idle
: Indicates whether the session is idle.
session_is_busy
: Indicates whether the session is busy. A session
is busy when a task is running or when the output of a task has not
been fetched into the main R
session. See the task_state
field.
session_is_finished
: Indicates whether the session was closed.
new()
Create a new AsyncBackend
object.
AsyncBackend$new()
An object of class AsyncBackend
.
start()
Start the backend.
AsyncBackend$start(specification)
specification
An object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster
private field on the Backend
abstract class,
and accessible to any concrete backend implementations via the active
binding cluster
.
stop()
Stop the backend.
AsyncBackend$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE))
on each node in the
backend.
AsyncBackend$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv
.
AsyncBackend$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv
on each
node.
export()
Export variables from a given environment to the backend.
AsyncBackend$export(variables, environment)
variables
A character vector of variable names to export.
environment
An environment object from which to export the variables.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
AsyncBackend$evaluate(expression)
expression
An unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply()
.
AsyncBackend$sapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
lapply()
Run a task on the backend akin to parallel::parLapply()
.
AsyncBackend$lapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
apply()
Run a task on the backend akin to parallel::parApply()
.
AsyncBackend$apply(x, margin, fun, ...)
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
get_output()
Get the output of the task execution.
AsyncBackend$get_output(wait = FALSE)
wait
A logical value indicating whether to wait for the task
to finish executing before fetching the results. Defaults to FALSE
.
See the Details section for more information.
This method fetches the output of the task execution after calling
the sapply()
method. It returns the output and immediately removes
it from the backend. Subsequent calls to this method will throw an
error if no additional tasks have been executed in the meantime. This
method should be called after the execution of a task.
If wait = TRUE
, the method will block the main process until the
backend finishes executing the task and the results are available. If
wait = FALSE
, the method will immediately attempt to fetch the
results from the background R
session, and throw an error if the
task is still running.
A vector, matrix, or list of the same length as x
, containing the
results of the fun
. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel
for more information.
clone()
The objects of this class are cloneable with this method.
AsyncBackend$clone(deep = FALSE)
deep
Whether to make a deep clone.
BackendService
, Backend
, SyncBackend
,
ProgressTrackingContext
, and TaskState
.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable to the backend. backend$export("name") # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 2.5 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.5) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Right know the main process is free and the task is executing on a `psock` # cluster started in a background `R` session. # Trying to get the output immediately will throw an error, indicating that the # task is still running. try(backend$get_output()) # However, we can block the main process and wait for the task to complete # before fetching the results. backend$get_output(wait = TRUE) # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable to the backend. backend$export("name") # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 2.5 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.5) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Right know the main process is free and the task is executing on a `psock` # cluster started in a background `R` session. # Trying to get the output immediately will throw an error, indicating that the # task is still running. try(backend$get_output()) # However, we can block the main process and wait for the task to complete # before fetching the results. backend$get_output(wait = TRUE) # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
This is an abstract class that serves as a base class for all concrete backend implementations. It defines the common properties that all concrete backends require.
This class cannot be instantiated. It needs to be extended by concrete
subclasses that implement the pure virtual methods. Instances of concrete
backend implementations can be conveniently obtained using the
BackendFactory
class.
parabar::BackendService
-> Backend
cluster
The cluster object used by the backend. For
SyncBackend
objects, this is a cluster object created by
parallel::makeCluster()
. For AsyncBackend
objects,
this is a permanent R
session created by callr::r_session
that
contains the parallel::makeCluster()
cluster object.
supports_progress
A boolean value indicating whether the backend implementation supports progress tracking.
active
A boolean value indicating whether the backend implementation has an active cluster.
parabar::BackendService$apply()
parabar::BackendService$clear()
parabar::BackendService$evaluate()
parabar::BackendService$export()
parabar::BackendService$get_output()
parabar::BackendService$lapply()
parabar::BackendService$peek()
parabar::BackendService$sapply()
parabar::BackendService$start()
parabar::BackendService$stop()
new()
Create a new Backend
object.
Backend$new()
Instantiating this class will throw an error.
clone()
The objects of this class are cloneable with this method.
Backend$clone(deep = FALSE)
deep
Whether to make a deep clone.
BackendService
, SyncBackend
,
AsyncBackend
, BackendFactory
, and
Context
.
This class is a factory that provides concrete implementations of the
Backend
abstract class.
get()
Obtain a concrete implementation of the abstract Backend
class of the specified type.
BackendFactory$get(type)
type
A character string specifying the type of the
Backend
to instantiate. Possible values are "sync"
and
"async"
. See the Details section for more information.
When type = "sync"
a SyncBackend
instance is created
and returned. When type = "async"
an AsyncBackend
instance is provided instead.
A concrete implementation of the class Backend
. It
throws an error if the requested backend type
is not supported.
clone()
The objects of this class are cloneable with this method.
BackendFactory$clone(deep = FALSE)
deep
Whether to make a deep clone.
BackendService
, Backend
, SyncBackend
,
AsyncBackend
, and ContextFactory
.
# Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Check the class of the backend instance. class(backend) # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Check the class of the backend instance. class(backend)
# Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Check the class of the backend instance. class(backend) # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Check the class of the backend instance. class(backend)
This is an interface that defines the operations available on a
Backend
implementation. Backend implementations and the
Context
class must implement this interface.
new()
Create a new BackendService
object.
BackendService$new()
Instantiating this class will throw an error.
start()
Start the backend.
BackendService$start(specification)
specification
An object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster
private field on the Backend
abstract class,
and accessible to any concrete backend implementations via the active
binding cluster
.
stop()
Stop the backend.
BackendService$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE))
on each node in the
backend.
BackendService$clear()
This method is ran by default when the backend is started.
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv
.
BackendService$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv
on each
node.
export()
Export variables from a given environment to the backend.
BackendService$export(variables, environment)
variables
A character vector of variable names to export.
environment
An environment object from which to export the variables.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
BackendService$evaluate(expression)
expression
An unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply()
.
BackendService$sapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
lapply()
Run a task on the backend akin to parallel::parLapply()
.
BackendService$lapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
apply()
Run a task on the backend akin to parallel::parApply()
.
BackendService$apply(x, margin, fun, ...)
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
get_output()
Get the output of the task execution.
BackendService$get_output(...)
...
Additional optional arguments that may be used by concrete implementations.
This method fetches the output of the task execution after calling
the sapply()
method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method are
not advised. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x
, containing the
results of the fun
. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel
for more information.
clone()
The objects of this class are cloneable with this method.
BackendService$clone(deep = FALSE)
deep
Whether to make a deep clone.
Backend
, SyncBackend
, AsyncBackend
,
and Context
.
This is an abstract class that defines the pure virtual methods a concrete bar must implement.
This class cannot be instantiated. It needs to be extended by concrete
subclasses that implement the pure virtual methods. Instances of concrete
backend implementations can be conveniently obtained using the
BarFactory
class.
engine
The bar engine.
new()
Create a new Bar
object.
Bar$new()
Instantiating this class will throw an error.
create()
Create a progress bar.
Bar$create(total, initial, ...)
total
The total number of times the progress bar should tick.
initial
The starting point of the progress bar.
...
Additional arguments for the bar creation. See the Details section for more information.
The optional ...
named arguments depend on the specific concrete
implementation (i.e., BasicBar
or
ModernBar
).
This method returns void. The resulting bar is stored in the private
field .bar
, accessible via the active binding engine
.
update()
Update the progress bar.
Bar$update(current)
current
The position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar.
Bar$terminate()
clone()
The objects of this class are cloneable with this method.
Bar$clone(deep = FALSE)
deep
Whether to make a deep clone.
BasicBar
, ModernBar
, and BarFactory
.
This class is a factory that provides concrete implementations of the
Bar
abstract class.
get()
Obtain a concrete implementation of the abstract Bar
class of the specified type.
BarFactory$get(type)
type
A character string specifying the type of the
Bar
to instantiate. Possible values are "modern"
and
"basic"
. See the Details section for more information.
When type = "modern"
a ModernBar
instance is created
and returned. When type = "basic"
a BasicBar
instance
is provided instead.
A concrete implementation of the class Bar
. It throws an
error if the requested bar type
is not supported.
clone()
The objects of this class are cloneable with this method.
BarFactory$clone(deep = FALSE)
deep
Whether to make a deep clone.
# Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Check the class of the bar instance. class(bar) # Get a basic bar instance. bar <- bar_factory$get("basic") # Check the class of the bar instance. class(bar)
# Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Check the class of the bar instance. class(bar) # Get a basic bar instance. bar <- bar_factory$get("basic") # Check the class of the bar instance. class(bar)
This is a concrete implementation of the abstract class Bar
using the utils::txtProgressBar()
as engine for the progress bar.
parabar::Bar
-> BasicBar
new()
Create a new BasicBar
object.
BasicBar$new()
An object of class BasicBar
.
create()
Create a progress bar.
BasicBar$create(total, initial, ...)
total
The total number of times the progress bar should tick.
initial
The starting point of the progress bar.
...
Additional arguments for the bar creation passed to
utils::txtProgressBar()
.
This method returns void. The resulting bar is stored in the private
field .bar
, accessible via the active binding engine
. Both the
private field and the active binding are defined in the super class
Bar
.
update()
Update the progress bar by calling utils::setTxtProgressBar()
.
BasicBar$update(current)
current
The position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar by calling base::close()
on the
private field .bar
.
BasicBar$terminate()
clone()
The objects of this class are cloneable with this method.
BasicBar$clone(deep = FALSE)
deep
Whether to make a deep clone.
Bar
, ModernBar
, and BarFactory
.
# Create a basic bar instance. bar <- BasicBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
# Create a basic bar instance. bar <- BasicBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
This function can be used to clear a backend
created
by start_backend()
.
clear(backend)
clear(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
calls the clear
method on the provided
backend
instance.
The function returns void. It throws an error if the value provided for the
backend
argument is not an instance of class Backend
.
start_backend()
, peek()
, export()
,
evaluate()
, configure_bar()
, par_sapply()
,
par_lapply()
, par_apply()
, stop_backend()
,
and BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This function can be used to conveniently configure the progress bar by
adjusting the progress_bar_config
field of the
Options
instance in the base::.Options
list.
configure_bar(type = "modern", ...)
configure_bar(type = "modern", ...)
type |
A character string specifying the type of progress bar to be used
with compatible |
... |
A list of named arguments used to configure the progress bar. See the Details section for more information. |
The optional ...
named arguments depend on the type
of progress bar being
configured. When type = "modern"
, the ...
take the named arguments of the
progress::progress_bar
class. When type = "basic"
, the ...
take the
named arguments of the utils::txtProgressBar()
built-in function. See the
Examples section for a demonstration.
The function returns void. It throws an error if the requested bar type
is
not supported.
progress::progress_bar
, utils::txtProgressBar()
,
set_default_options()
, get_option()
,
set_option()
# Set the default package options. set_default_options() # Get the progress bar type from options. get_option("progress_bar_type") # Get the progress bar configuration from options. get_option("progress_bar_config") # Adjust the format of the `modern` progress bar. configure_bar(type = "modern", format = "[:bar] :percent") # Check that the configuration has been updated in the options. get_option("progress_bar_config") # Change to and adjust the style of the `basic` progress bar. configure_bar(type = "basic", style = 3) # Check that the configuration has been updated in the options. get_option("progress_bar_type") get_option("progress_bar_config")
# Set the default package options. set_default_options() # Get the progress bar type from options. get_option("progress_bar_type") # Get the progress bar configuration from options. get_option("progress_bar_config") # Adjust the format of the `modern` progress bar. configure_bar(type = "modern", format = "[:bar] :percent") # Check that the configuration has been updated in the options. get_option("progress_bar_config") # Change to and adjust the style of the `basic` progress bar. configure_bar(type = "basic", style = 3) # Check that the configuration has been updated in the options. get_option("progress_bar_type") get_option("progress_bar_config")
This class represents the base context for interacting with
Backend
implementations via the BackendService
interface.
This class is a vanilla wrapper around a Backend
implementation.
It registers a backend instance and forwards all BackendService
methods calls to the backend instance. Subclasses can override any of the
BackendService
methods to decorate the backend instance with
additional functionality (e.g., see the ProgressTrackingContext
class for an example).
parabar::BackendService
-> Context
backend
The Backend
object registered with the
context.
new()
Create a new Context
object.
Context$new()
An object of class Context
.
set_backend()
Set the backend instance to be used by the context.
Context$set_backend(backend)
backend
An object of class Backend
that
implements the BackendService
interface.
start()
Start the backend.
Context$start(specification)
specification
An object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster
private field on the Backend
abstract class,
and accessible to any concrete backend implementations via the active
binding cluster
.
stop()
Stop the backend.
Context$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE))
on each node in the
backend.
Context$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv
.
Context$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv
on each
node.
export()
Export variables from a given environment to the backend.
Context$export(variables, environment)
variables
A character vector of variable names to export.
environment
An environment object from which to export the variables. Defaults to the parent frame.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
Context$evaluate(expression)
expression
An unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply()
.
Context$sapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
lapply()
Run a task on the backend akin to parallel::parLapply()
.
Context$lapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
apply()
Run a task on the backend akin to parallel::parApply()
.
Context$apply(x, margin, fun, ...)
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
get_output()
Get the output of the task execution.
Context$get_output(...)
...
Additional arguments to pass to the backend registered
with the context. This is useful for backends that require additional
arguments to fetch the output (e.g., AsyncBackend$get_output(wait = TRUE)
).
This method fetches the output of the task execution after calling
the sapply()
method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method are
not advised. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x
, containing the
results of the fun
. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel
for more information.
clone()
The objects of this class are cloneable with this method.
Context$clone(deep = FALSE)
deep
Whether to make a deep clone.
ProgressTrackingContext
, BackendService
,
Backend
, and SyncBackend
.
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.25) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Create a base context object. context <- Context$new() # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. context$get_output() # Close the backend. context$stop() # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Register the backend with the same context object. context$set_backend(backend) # Start the backend reusing the specification object. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.25) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Create a base context object. context <- Context$new() # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. context$get_output() # Close the backend. context$stop() # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Register the backend with the same context object. context$set_backend(backend) # Start the backend reusing the specification object. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
This class is a factory that provides instances of the Context
class.
get()
Obtain instances of the Context
class.
ContextFactory$get(type)
type
A character string specifying the type of the
Context
to instantiate. Possible values are "regular"
and "progress"
. See the Details section for more information.
When type = "regular"
a Context
instance is created
and returned. When type = "progress"
a
ProgressTrackingContext
instance is provided instead.
An object of type Context
. It throws an error if the
requested context type
is not supported.
clone()
The objects of this class are cloneable with this method.
ContextFactory$clone(deep = FALSE)
deep
Whether to make a deep clone.
Context
, ProgressTrackingContext
,
BackendService
, and Backend
# Create a context factory. context_factory <- ContextFactory$new() # Get a regular context instance. context <- context_factory$get("regular") # Check the class of the context instance. class(context) # Get a progress context instance. context <- context_factory$get("progress") class(context)
# Create a context factory. context_factory <- ContextFactory$new() # Get a regular context instance. context <- context_factory$get("regular") # Check the class of the context instance. class(context) # Get a progress context instance. context <- context_factory$get("progress") class(context)
This function can be used to evaluate an arbitrary base::expression()
a
backend
created by start_backend()
.
evaluate(backend, expression)
evaluate(backend, expression)
backend |
An object of class |
expression |
An unquoted expression to evaluate on the backend. |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
calls the evaluate
method on the provided
backend
instance.
This method returns the result of the expression evaluation. It throws an
error if the value provided for the backend
argument is not an instance of
class Backend
.
start_backend()
, peek()
, export()
,
clear()
, configure_bar()
, par_sapply()
,
par_lapply()
, par_apply()
, stop_backend()
,
and BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This class contains static methods for throwing exceptions with informative messages.
Exception$abstract_class_not_instantiable(object)
Exception for instantiating abstract classes or interfaces.
Exception$method_not_implemented()
Exception for calling methods without an implementation.
Exception$feature_not_developed()
Exception for running into things not yet developed.
Exception$not_enough_cores()
Exception for requesting more cores than available on the machine.
Exception$cluster_active()
Exception for attempting to start a cluster while another one is active.
Exception$cluster_not_active()
Exception for attempting to stop a cluster while not active.
Exception$async_task_not_started()
Exception for reading results while an asynchronous task has not yet started.
Exception$async_task_running()
Exception for reading results while an asynchronous task is running.
Exception$async_task_completed()
Exception for reading results while a completed asynchronous task has unread results.
Exception$async_task_error(error)
Exception for errors while running an asynchronous task.
Exception$stop_busy_backend_not_allowed()
Exception for stopping a busy backend without intent.
Exception$temporary_file_creation_failed()
Exception for reading results while an asynchronous task is running.
Exception$type_not_assignable(actual, expected)
Exception for when providing incorrect object types.
Exception$unknown_package_option(option)
Exception for when requesting unknown package options.
Exception$primitive_as_task_not_allowed()
Exception for when decorating primitive functions with progress tracking.
Exception$array_margins_not_compatible(actual, allowed)
Exception for using improper margins in the BackendService$apply
operation.
This function can be used to export objects to a
backend
created by start_backend()
.
export(backend, variables, environment)
export(backend, variables, environment)
backend |
An object of class |
variables |
A character vector of variable names to export to the backend. |
environment |
An environment from which to export the variables. If no
environment is provided, the |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
calls the export
method on the provided
backend
instance.
The function returns void. It throws an error if the value provided for the
backend
argument is not an instance of class Backend
.
start_backend()
, peek()
, evaluate()
,
clear()
, configure_bar()
, par_sapply()
,
par_lapply()
, par_apply()
, stop_backend()
,
and BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
The get_option()
function is a helper for retrieving the value of
parabar
options
. If the
option
requested is not available in the session
base::.Options
list, the corresponding default value set by the
Options
R6::R6
class is returned instead.
The set_option()
function is a helper for setting
parabar
options
. The function adjusts the
fields of the Options
instance stored in the base::.Options
list. If no Options
instance is present in the
base::.Options
list, a new one is created.
The set_default_options()
function is used to set the default
options
values for the parabar
package. The
function is automatically called at package load and the entry created can be
retrieved via getOption("parabar")
. Specific package
options
can be retrieved using the helper function
get_option()
.
get_option(option) set_option(option, value) set_default_options()
get_option(option) set_option(option, value) set_default_options()
option |
A character string representing the name of the option to
retrieve or adjust. See the public fields of |
value |
The value to set the |
The get_option()
function returns the value of the requested
option
present in the base::.Options
list, or its
corresponding default value (i.e., see Options
). If the
requested option
is not known, an error is thrown.
The set_option()
function returns void. It throws an error if the
requested option
to be adjusted is not known.
The set_default_options()
function returns void. The
options
set can be consulted via the base::.Options
list. See the Options
R6::R6
class for more information on
the default values set by this function.
Options
, set_default_options()
, base::options()
,
and base::getOption()
.
# Get the status of progress tracking. get_option("progress_track") # Set the status of progress tracking to `FALSE`. set_option("progress_track", FALSE) # Get the status of progress tracking again. get_option("progress_track") # Restore default options. set_default_options() # Get the status of progress tracking yet again. get_option("progress_track")
# Get the status of progress tracking. get_option("progress_track") # Set the status of progress tracking to `FALSE`. set_option("progress_track", FALSE) # Get the status of progress tracking again. get_option("progress_track") # Restore default options. set_default_options() # Get the status of progress tracking yet again. get_option("progress_track")
This class contains static helper methods.
Helper$get_class_name(object)
Helper for getting the class of a given object.
Helper$is_of_class(object, class)
Check if an object is of a certain class.
Helper$get_option(option)
Get package option, or corresponding default value.
Helper$set_option(option, value)
Set package option.
Helper$check_object_type(object, expected_type)
Check the type of a given object.
Helper$check_array_margins(margins, dimensions)
Helper to check array margins for the BackendService$apply
operation.
The logo is generated by make_logo()
and displayed on package
attach for interactive R
sessions.
LOGO
LOGO
An object of class character
containing the ASCII
logo.
print(LOGO)
print(LOGO)
This function is meant for generating or updating the logo. After running
this procedure we end up with what is stored in the LOGO
constant.
make_logo( template = "./inst/assets/logo/parabar-logo.txt", version = c(1, 0, 0) )
make_logo( template = "./inst/assets/logo/parabar-logo.txt", version = c(1, 0, 0) )
template |
A character string representing the path to the logo template. |
version |
A numerical vector of three positive integers representing the version of the package to append to the logo. |
The ASCII logo.
## Not run: # Generate the logo. logo <- make_logo() # Print the logo. cat(logo) ## End(Not run)
## Not run: # Generate the logo. logo <- make_logo() # Print the logo. cat(logo) ## End(Not run)
This is a concrete implementation of the abstract class Bar
using the progress::progress_bar
as engine for the progress bar.
parabar::Bar
-> ModernBar
new()
Create a new ModernBar
object.
ModernBar$new()
An object of class ModernBar
.
create()
Create a progress bar.
ModernBar$create(total, initial, ...)
total
The total number of times the progress bar should tick.
initial
The starting point of the progress bar.
...
Additional arguments for the bar creation passed to
progress::progress_bar$new()
.
This method returns void. The resulting bar is stored in the private
field .bar
, accessible via the active binding engine
. Both the
private field and the active binding are defined in the super class
Bar
.
update()
Update the progress bar by calling
progress::progress_bar$update()
.
ModernBar$update(current)
current
The position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar by calling
progress::progress_bar$terminate()
.
ModernBar$terminate()
clone()
The objects of this class are cloneable with this method.
ModernBar$clone(deep = FALSE)
deep
Whether to make a deep clone.
Bar
, BasicBar
, and BarFactory
.
# Create a modern bar instance. bar <- ModernBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
# Create a modern bar instance. bar <- ModernBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
This class holds public fields that represent the package
options
used to configure the default behavior of the
functionality parabar
provides.
An instance of this class is automatically created and stored in the session
base::.Options
at load time. This instance can be accessed and changed
via getOption("parabar")
. Specific package
options
can be retrieved using the helper function
get_option()
.
progress_track
A logical value indicating whether progress
tracking should be enabled (i.e., TRUE
) or disabled (i.e.,
FALSE
) globally for compatible backends. The default value is
TRUE
.
progress_timeout
A numeric value indicating the timeout (i.e.,
in seconds) between subsequent checks of the log file for new
progress records. The default value is 0.001
.
progress_wait
A numeric value indicating the approximate
duration (i.e., in seconds) to wait between progress bar updates
before checking if the task has finished (i.e., possibly with an
error). The default value is 0.1
.
progress_bar_type
A character string indicating the default
bar type to use with compatible backends. Possible values are
"modern"
(the default) or "basic"
.
progress_bar_config
A list of lists containing the default bar configuration for each supported bar engine. Elements of these lists represent arguments for the corresponding bar engines. Currently, the supported bar engines are:
modern
: The progress::progress_bar
engine, with the following
default configuration:
show_after = 0
format = "> completed :current out of :total tasks [:percent] [:elapsed]"
basic
: The utils::txtProgressBar
engine, with no default
configuration.
stop_forceful
A logical value indicating whether to allow
stopping an asynchronous backend forcefully (i.e., TRUE
), or not
(i.e., FALSE
). When stopping forcefully, the backend is terminated
without waiting for a running tasks to finish or for the results to
be read into the main R
session. The default value is FALSE
.
progress_log_path
A character string indicating the path to
the log file where to track the execution progress of a running task.
The default value is a temporary file generated by
base::tempfile()
. Calling this active binding repeatedly will
yield different temporary file paths. Fixing the path to a specific
value is possible by setting this active binding to a character
string representing the desired path. Setting this active binding to
NULL
will reset it to the default value (i.e., yielding different
temporary file paths).
get_option()
, set_option()
, and
set_default_options()
.
# Set the default package options (i.e., automatically set at load time). set_default_options() # First, get the options instance from the session options. parabar <- getOption("parabar") # Then, disable progress tracking. parabar$progress_track <- FALSE # Check that the change was applied (i.e., `progress_track: FALSE`). getOption("parabar") # To restore defaults, set the default options again. set_default_options() # Check that the change was applied (i.e., `progress_track: TRUE`). getOption("parabar") # We can also use the built-in helpers to get and set options more conveniently. # Get the progress tracking option. get_option("progress_track") # Set the progress tracking option to `FALSE`. set_option("progress_track", FALSE) # Check that the change was applied (i.e., `progress_track: FALSE`). get_option("progress_track") # Get a temporary file for logging the progress. get_option("progress_log_path") # Fix the logging file path. set_option("progress_log_path", "./progress.log") # Check that the logging path change was applied. get_option("progress_log_path") # Restore the logging path to the default behavior. set_option("progress_log_path", NULL) # Check that the logging path change was applied. get_option("progress_log_path") # Restore the defaults. set_default_options()
# Set the default package options (i.e., automatically set at load time). set_default_options() # First, get the options instance from the session options. parabar <- getOption("parabar") # Then, disable progress tracking. parabar$progress_track <- FALSE # Check that the change was applied (i.e., `progress_track: FALSE`). getOption("parabar") # To restore defaults, set the default options again. set_default_options() # Check that the change was applied (i.e., `progress_track: TRUE`). getOption("parabar") # We can also use the built-in helpers to get and set options more conveniently. # Get the progress tracking option. get_option("progress_track") # Set the progress tracking option to `FALSE`. set_option("progress_track", FALSE) # Check that the change was applied (i.e., `progress_track: FALSE`). get_option("progress_track") # Get a temporary file for logging the progress. get_option("progress_log_path") # Fix the logging file path. set_option("progress_log_path", "./progress.log") # Check that the logging path change was applied. get_option("progress_log_path") # Restore the logging path to the default behavior. set_option("progress_log_path", NULL) # Check that the logging path change was applied. get_option("progress_log_path") # Restore the defaults. set_default_options()
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parApply()
. If
backend = NULL
, the task is executed sequentially using base::apply()
.
See the Details section for more information on how this function works.
par_apply(backend = NULL, x, margin, fun, ...)
par_apply(backend = NULL, x, margin, fun, ...)
backend |
An object of class |
x |
An array to pass to the |
margin |
A numeric vector indicating the dimensions of |
fun |
A function to apply to |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer
class that acts like an
interface for the developer API of the parabar
package.
The dimensions of the output vary according to the margin
argument. Consult
the documentation of base::apply()
for a detailed explanation on how the
output is structured.
start_backend()
, peek()
, export()
,
evaluate()
, clear()
, configure_bar()
,
par_sapply()
, par_lapply()
, stop_backend()
,
set_option()
, get_option()
, Options
,
UserApiConsumer
, and BackendService
.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. mean(x) } # Define a matrix for the task. x <- matrix(rnorm(100^2, mean = 10, sd = 0.5), nrow = 100, ncol = 100) # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel over the rows of `x`. results <- par_apply(backend, x = x, margin = 1, fun = task) # Run a task in parallel over the columns of `x`. results <- par_apply(backend, x = x, margin = 2, fun = task) # The task can also be run over all elements of `x` using `margin = c(1, 2)`. # Improper dimensions will throw an error. try(par_apply(backend, x = x, margin = c(1, 2, 3), fun = task)) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_apply(NULL, x = x, margin = 1, fun = task)
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. mean(x) } # Define a matrix for the task. x <- matrix(rnorm(100^2, mean = 10, sd = 0.5), nrow = 100, ncol = 100) # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel over the rows of `x`. results <- par_apply(backend, x = x, margin = 1, fun = task) # Run a task in parallel over the columns of `x`. results <- par_apply(backend, x = x, margin = 2, fun = task) # The task can also be run over all elements of `x` using `margin = c(1, 2)`. # Improper dimensions will throw an error. try(par_apply(backend, x = x, margin = c(1, 2, 3), fun = task)) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_apply(NULL, x = x, margin = 1, fun = task)
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parLapply()
. If
backend = NULL
, the task is executed sequentially using base::lapply()
.
See the Details section for more information on how this function works.
par_lapply(backend = NULL, x, fun, ...)
par_lapply(backend = NULL, x, fun, ...)
backend |
An object of class |
x |
An atomic vector or list to pass to the |
fun |
A function to apply to each element of |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer
class that acts like an
interface for the developer API of the parabar
package.
A list of the same length as x
containing the results of the fun
. The
output format resembles that of base::lapply()
.
start_backend()
, peek()
, export()
,
evaluate()
, clear()
, configure_bar()
,
par_sapply()
, par_apply()
, stop_backend()
,
set_option()
, get_option()
, Options
,
UserApiConsumer
, and BackendService
.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_lapply(NULL, x = 1:300, fun = task)
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_lapply(NULL, x = 1:300, fun = task)
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parSapply()
. If
backend = NULL
, the task is executed sequentially using base::sapply()
.
See the Details section for more information on how this function works.
par_sapply(backend = NULL, x, fun, ...)
par_sapply(backend = NULL, x, fun, ...)
backend |
An object of class |
x |
An atomic vector or list to pass to the |
fun |
A function to apply to each element of |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer
class that acts like an
interface for the developer API of the parabar
package.
A vector of the same length as x
containing the results of the fun
. The
output format resembles that of base::sapply()
.
start_backend()
, peek()
, export()
,
evaluate()
, clear()
, configure_bar()
,
par_lapply()
, par_apply()
, stop_backend()
,
set_option()
, get_option()
, Options
,
UserApiConsumer
, and BackendService
.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::sapply` (i.e., non-parallel). results <- par_sapply(NULL, x = 1:300, fun = task)
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::sapply` (i.e., non-parallel). results <- par_sapply(NULL, x = 1:300, fun = task)
This function can be used to check the names of the variables present on a
backend
created by start_backend()
.
peek(backend)
peek(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
calls the peek
method on the provided
backend
instance.
The function returns a list of character vectors, where each list element
corresponds to a node, and each element of the character vector is the name
of a variable present on that node. It throws an error if the value provided
for the backend
argument is not an instance of class Backend
.
start_backend()
, export()
, evaluate()
,
clear()
, configure_bar()
, par_sapply()
,
par_lapply()
, par_apply()
, stop_backend()
,
and BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This class represents a progress tracking context for interacting with
Backend
implementations via the BackendService
interface.
This class extends the base Context
class and overrides the
sapply
parent method to decorate the backend instance
with additional functionality. Specifically, this class creates a temporary
file to log the progress of backend tasks, and then creates a progress bar to
display the progress of the backend tasks.
The progress bar is updated after each backend task execution. The timeout
between subsequent checks of the temporary log file is controlled by the
Options
class and defaults to 0.001
. This value can be
adjusted via the Options
instance present in the session
base::.Options
list (i.e., see set_option()
). For example, to
set the timeout to 0.1
we can run set_option("progress_timeout", 0.1)
.
This class is a good example of how to extend the base Context
class to decorate the backend instance with additional functionality.
parabar::BackendService
-> parabar::Context
-> ProgressTrackingContext
bar
The Bar
instance registered with the context.
set_backend()
Set the backend instance to be used by the context.
ProgressTrackingContext$set_backend(backend)
backend
An object of class Backend
that supports
progress tracking implements the BackendService
interface.
This method overrides the parent method to validate the backend
provided and guarantee it is an instance of the
AsyncBackend
class.
set_bar()
Set the Bar
instance to be used by the context.
ProgressTrackingContext$set_bar(bar)
bar
An object of class Bar
.
configure_bar()
Configure the Bar
instance registered with the context.
ProgressTrackingContext$configure_bar(...)
sapply()
Run a task on the backend akin to parallel::parSapply()
, but with
a progress bar.
ProgressTrackingContext$sapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
lapply()
Run a task on the backend akin to parallel::parLapply()
, but with
a progress bar.
ProgressTrackingContext$lapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
apply()
Run a task on the backend akin to parallel::parApply()
.
ProgressTrackingContext$apply(x, margin, fun, ...)
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
clone()
The objects of this class are cloneable with this method.
ProgressTrackingContext$clone(deep = FALSE)
deep
Whether to make a deep clone.
Context
, BackendService
, Backend
, and
AsyncBackend
.
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.15) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a backend instance that does not support progress tracking. backend <- backend_factory$get("sync") # Create a progress tracking context object. context <- ProgressTrackingContext$new() # Attempt to set the incompatible backend instance. try(context$set_backend(backend)) # Get a backend instance that does support progress tracking. backend <- backend_factory$get("async") # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the context. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = " > completed :current out of :total tasks [:percent] [:elapsed]" ) # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Change the bar type. bar <- bar_factory$get("basic") # Register the bar with the context. context$set_bar(bar) # Remove the previous bar configuration. context$configure_bar() # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.15) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a backend instance that does not support progress tracking. backend <- backend_factory$get("sync") # Create a progress tracking context object. context <- ProgressTrackingContext$new() # Attempt to set the incompatible backend instance. try(context$set_backend(backend)) # Get a backend instance that does support progress tracking. backend <- backend_factory$get("async") # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the context. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = " > completed :current out of :total tasks [:percent] [:elapsed]" ) # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Change the bar type. bar <- bar_factory$get("basic") # Register the bar with the context. context$set_bar(bar) # Remove the previous bar configuration. context$configure_bar() # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
This class holds the state of a background session
used by an asynchronous backend (i.e., AsyncBackend
). See the
Details section for more information.
The session state is useful to check if an asynchronous backend is ready for certain operations. A session can only be in one of the following four states at a time:
session_is_starting
: When TRUE
, it indicates that the session is
starting.
session_is_idle
: When TRUE
, it indicates that the session is idle and
ready to execute operations.
session_is_busy
: When TRUE
, it indicates that the session is busy
(i.e., see the TaskState
class for more information about a
task's state).
session_is_finished
: When TRUE
, it indicates that the session is closed
and no longer available for operations.
session_is_starting
A logical value indicating whether the session is starting.
session_is_idle
A logical value indicating whether the session is idle and ready to execute operations.
session_is_busy
A logical value indicating whether the session is busy.
session_is_finished
A logical value indicating whether the session is closed and no longer available for operations.
new()
Create a new SessionState
object and determine the state
of a given background session
.
SessionState$new(session)
session
A callr::r_session
object.
An object of class SessionState
.
clone()
The objects of this class are cloneable with this method.
SessionState$clone(deep = FALSE)
deep
Whether to make a deep clone.
TaskState
, AsyncBackend
and
ProgressTrackingContext
.
# Handy function to print the session states all at once. check_state <- function(session) { # Create a session object and determine its state. session_state <- SessionState$new(session) # Print the state. cat( "Session is starting: ", session_state$session_is_starting, "\n", "Session is idle: ", session_state$session_is_idle, "\n", "Session is busy: ", session_state$session_is_busy, "\n", "Session is finished: ", session_state$session_is_finished, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the session is idle. check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check that the session is busy. check_state(backend$cluster) } # Get the output and wait for the task to complete. output <- backend$get_output(wait = TRUE) # Check that the session is idle again. check_state(backend$cluster) # Manually close the session. backend$cluster$close() # Check that the session is finished. check_state(backend$cluster) # Stop the backend. backend$stop()
# Handy function to print the session states all at once. check_state <- function(session) { # Create a session object and determine its state. session_state <- SessionState$new(session) # Print the state. cat( "Session is starting: ", session_state$session_is_starting, "\n", "Session is idle: ", session_state$session_is_idle, "\n", "Session is busy: ", session_state$session_is_busy, "\n", "Session is finished: ", session_state$session_is_finished, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the session is idle. check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check that the session is busy. check_state(backend$cluster) } # Get the output and wait for the task to complete. output <- backend$get_output(wait = TRUE) # Check that the session is idle again. check_state(backend$cluster) # Manually close the session. backend$cluster$close() # Check that the session is finished. check_state(backend$cluster) # Stop the backend. backend$stop()
This class contains the information required to start a backend. An instance
of this class is used by the start
method of the
BackendService
interface.
cores
The number of nodes to use in the cluster creation.
type
The type of cluster to create.
types
The supported cluster types.
set_cores()
Set the number of nodes to use in the cluster.
Specification$set_cores(cores)
cores
The number of nodes to use in the cluster.
This method also performs a validation of the requested number of
cores, ensuring that the the value lies between 2
and
parallel::detectCores() - 1
.
set_type()
Set the type of cluster to create.
Specification$set_type(type)
type
The type of cluster to create. Possible values are
"fork"
and "psock"
. Defaults to "psock"
.
If no type is explicitly requested (i.e., type = NULL
), the type is
determined based on the operating system. On Unix-like systems, the
type is set to "fork"
, while on Windows systems, the type is set to
"psock"
. If an unknown type is requested, a warning is issued and
the type is set to "psock"
.
clone()
The objects of this class are cloneable with this method.
Specification$clone(deep = FALSE)
deep
Whether to make a deep clone.
BackendService
, Backend
, SyncBackend
,
and AsyncBackend
.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 4) # Set the cluster type. specification$set_type(type = "psock") # Get the number of cores. specification$cores # Get the cluster type. specification$type # Attempt to set too many cores. specification$set_cores(cores = 100) # Check that the cores were reasonably set. specification$cores # Allow the object to determine the adequate cluster type. specification$set_type(type = NULL) # Check the type determined. specification$type # Attempt to set an invalid cluster type. specification$set_type(type = "invalid") # Check that the type was set to `psock`. specification$type
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 4) # Set the cluster type. specification$set_type(type = "psock") # Get the number of cores. specification$cores # Get the cluster type. specification$type # Attempt to set too many cores. specification$set_cores(cores = 100) # Check that the cores were reasonably set. specification$cores # Allow the object to determine the adequate cluster type. specification$set_type(type = NULL) # Check the type determined. specification$type # Attempt to set an invalid cluster type. specification$set_type(type = "invalid") # Check that the type was set to `psock`. specification$type
This function can be used to start a backend. Check out the Details section for more information.
start_backend(cores, cluster_type = "psock", backend_type = "async")
start_backend(cores, cluster_type = "psock", backend_type = "async")
cores |
A positive integer representing the number of cores to use
(i.e., the number of processes to start). This value must be between |
cluster_type |
A character string representing the type of cluster to
create. Possible values are |
backend_type |
A character string representing the type of backend to
create. Possible values are |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
uses the Specification
class to create a specification object,
and the BackendFactory
class to create a Backend
instance based on the specification object.
A Backend
instance that can be used to parallelize computations.
The methods available on the Backend
instance are defined by the
BackendService
interface.
The cluster type determines the type of cluster to create. The requested
value is validated and passed to the type
argument of the
parallel::makeCluster()
function. The following table lists the possible
values and their corresponding description.
Cluster | Description |
"fork" |
For Unix-based systems. |
"psock" |
For Windows-based systems. |
The backend type determines the type of backend to create. The requested
value is passed to the BackendFactory
class, which returns a
Backend
instance of the desired type. The following table lists
the possible backend types and their corresponding description.
Backend | Description | Implementation | Progress |
"sync" |
A synchronous backend. | SyncBackend |
no |
"async" |
An asynchronous backend. | AsyncBackend |
yes |
In a nutshell, the difference between the two backend types is that for the
synchronous backend the cluster is created in the main process, while for the
asynchronous backend the cluster is created in a backend R
process using
callr::r_session
. Therefore, the synchronous backend is blocking the main
process during task execution, while the asynchronous backend is
non-blocking. Check out the implementations listed in the table above for
more information. All concrete implementations extend the
Backend
abstract class and implement the
BackendService
interface.
peek()
, export()
, evaluate()
,
clear()
, configure_bar()
, par_sapply()
,
par_lapply()
, par_apply()
, stop_backend()
,
and BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This function can be used to stop a backend
created
by start_backend()
.
stop_backend(backend)
stop_backend(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar
aimed at developers. More specifically, this function
calls the stop
method on the provided
backend
instance.
The function returns void. It throws an error if:
the value provided for the backend
argument is not an instance of class
Backend
.
the backend
object provided is already stopped
(i.e., is not active).
start_backend()
, peek()
, export()
,
evaluate()
, clear()
, configure_bar()
,
par_sapply()
, par_apply()
, par_lapply()
, and
BackendService
.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This is a concrete implementation of the abstract class Backend
that implements the BackendService
interface. This backend
executes tasks in parallel on a parallel::makeCluster()
cluster
synchronously (i.e., blocking the main R
session).
parabar::BackendService
-> parabar::Backend
-> SyncBackend
new()
Create a new SyncBackend
object.
SyncBackend$new()
An object of class SyncBackend
.
start()
Start the backend.
SyncBackend$start(specification)
specification
An object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster
private field on the Backend
abstract class,
and accessible to any concrete backend implementations via the active
binding cluster
.
stop()
Stop the backend.
SyncBackend$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE))
on each node in the
backend.
SyncBackend$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv
.
SyncBackend$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv
on each
node.
export()
Export variables from a given environment to the backend.
SyncBackend$export(variables, environment)
variables
A character vector of variable names to export.
environment
An environment object from which to export the variables. Defaults to the parent frame.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
SyncBackend$evaluate(expression)
expression
An unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply()
.
SyncBackend$sapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
lapply()
Run a task on the backend akin to parallel::parLapply()
.
SyncBackend$lapply(x, fun, ...)
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
apply()
Run a task on the backend akin to parallel::parApply()
.
SyncBackend$apply(x, margin, fun, ...)
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
This method returns void. The output of the task execution must be
stored in the private field .output
on the Backend
abstract class, and is accessible via the get_output()
method.
get_output()
Get the output of the task execution.
SyncBackend$get_output(...)
...
Additional arguments currently not in use.
This method fetches the output of the task execution after calling
the sapply()
method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method will
return NULL
. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x
, containing the
results of the fun
. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel
for more information.
clone()
The objects of this class are cloneable with this method.
SyncBackend$clone(deep = FALSE)
deep
Whether to make a deep clone.
BackendService
, Backend
, AsyncBackend
,
and Context
.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a synchronous backend object. backend <- SyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable from the current environment to the backend. backend$export("name", environment()) # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 1.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Get the task output. backend$get_output() # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a synchronous backend object. backend <- SyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable from the current environment to the backend. backend$export("name", environment()) # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 1.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Get the task output. backend$get_output() # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
This class holds the state of a task deployed to an asynchronous backend
(i.e., AsyncBackend
). See the Details section for more
information.
The task state is useful to check if an asynchronous backend is free to execute other operations. A task can only be in one of the following three states at a time:
task_not_started
: When TRUE
, it indicates whether the backend is free
to execute another operation.
task_is_running
: When TRUE
, it indicates that there is a task running
on the backend.
task_is_completed
: When TRUE
, it indicates that the task has been
completed, but the backend is still busy because the task output has not
been retrieved.
The task state is determined based on the state of the background
session
(i.e., see the get_state
method for
callr::r_session
) and the state of the task execution inferred from
polling the process (i.e., see the poll_process
method for
callr::r_session
) as follows:
Session State | Execution State | Not Started | Is Running | Is Completed |
idle |
timeout |
TRUE |
FALSE |
FALSE |
busy |
timeout |
FALSE |
TRUE |
FALSE |
busy |
ready |
FALSE |
FALSE |
TRUE |
task_not_started
A logical value indicating whether the task has been started. It is used to determine if the backend is free to execute another operation.
task_is_running
A logical value indicating whether the task is running.
task_is_completed
A logical value indicating whether the task has been completed and the output needs to be retrieved.
new()
Create a new TaskState
object and determine the state of
a task on a given background session
.
TaskState$new(session)
session
A callr::r_session
object.
An object of class TaskState
.
clone()
The objects of this class are cloneable with this method.
TaskState$clone(deep = FALSE)
deep
Whether to make a deep clone.
SessionState
, AsyncBackend
and
ProgressTrackingContext
.
# Handy function to print the task states all at once. check_state <- function(session) { # Create a task state object and determine the state. task_state <- TaskState$new(session) # Print the state. cat( "Task not started: ", task_state$task_not_started, "\n", "Task is running: ", task_state$task_is_running, "\n", "Task is completed: ", task_state$task_is_completed, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the task has not been started (i.e., the backend is free). check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check the state to see that the task is running. check_state(backend$cluster) } # Sleep for a bit to wait for the task to complete. Sys.sleep(1) # Check that the task is completed (i.e., the output needs to be retrieved). check_state(backend$cluster) # Get the output. output <- backend$get_output(wait = TRUE) # Check that the task has not been started (i.e., the backend is free again). check_state(backend$cluster) # Stop the backend. backend$stop()
# Handy function to print the task states all at once. check_state <- function(session) { # Create a task state object and determine the state. task_state <- TaskState$new(session) # Print the state. cat( "Task not started: ", task_state$task_not_started, "\n", "Task is running: ", task_state$task_is_running, "\n", "Task is completed: ", task_state$task_is_completed, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the task has not been started (i.e., the backend is free). check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check the state to see that the task is running. check_state(backend$cluster) } # Sleep for a bit to wait for the task to complete. Sys.sleep(1) # Check that the task is completed (i.e., the output needs to be retrieved). check_state(backend$cluster) # Get the output. output <- backend$get_output(wait = TRUE) # Check that the task has not been started (i.e., the backend is free again). check_state(backend$cluster) # Stop the backend. backend$stop()
This class is an opinionated interface around the developer API of the
parabar
package. See the Details section for more
information on how this class works.
This class acts as a wrapper around the R6::R6
developer API of the
parabar
package. In a nutshell, it provides an opinionated
interface by wrapping the developer API in simple functional calls. More
specifically, for executing a task in parallel, this class performs the
following steps:
Validates the backend provided.
Instantiates an appropriate parabar
context based on the
backend. If the backend supports progress tracking (i.e., the backend is an
instance of AsyncBackend
), a progress tracking context (i.e.,
ProgressTrackingContext
) is instantiated and used. Otherwise,
a regular context (i.e., Context
) is instantiated. A regular
context is also used if the progress tracking is disabled via the
Options
instance.
Registers the backend
with the context.
Instantiates and configures the progress bar based on the
Options
instance in the session base::.Options
list.
Executes the task in parallel, and displays a progress bar if appropriate.
Fetches the results from the backend and returns them.
sapply()
Execute a task in parallel akin to parallel::parSapply()
.
UserApiConsumer$sapply(backend, x, fun, ...)
backend
An object of class Backend
as returned by
the start_backend()
function. It can also be NULL
to run
the task sequentially via base::sapply()
.
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
A vector of the same length as x
containing the results of the
fun
. The output format resembles that of base::sapply()
.
lapply()
Execute a task in parallel akin to parallel::parLapply()
.
UserApiConsumer$lapply(backend, x, fun, ...)
backend
An object of class Backend
as returned by
the start_backend()
function. It can also be NULL
to run
the task sequentially via base::lapply()
.
x
An atomic vector or list to pass to the fun
function.
fun
A function to apply to each element of x
.
...
Additional arguments to pass to the fun
function.
A list of the same length as x
containing the results of the fun
.
The output format resembles that of base::lapply()
.
apply()
Execute a task in parallel akin to parallel::parApply()
.
UserApiConsumer$apply(backend, x, margin, fun, ...)
backend
An object of class Backend
as returned by
the start_backend()
function. It can also be NULL
to run
the task sequentially via base::apply()
.
x
An array to pass to the fun
function.
margin
A numeric vector indicating the dimensions of x
the
fun
function should be applied over. For example, for a matrix,
margin = 1
indicates applying fun
rows-wise, margin = 2
indicates applying fun
columns-wise, and margin = c(1, 2)
indicates applying fun
element-wise. Named dimensions are also
possible depending on x
. See parallel::parApply()
and
base::apply()
for more details.
fun
A function to apply to x
according to the margin
.
...
Additional arguments to pass to the fun
function.
The dimensions of the output vary according to the margin
argument.
Consult the documentation of base::apply()
for a detailed
explanation on how the output is structured.
clone()
The objects of this class are cloneable with this method.
UserApiConsumer$clone(deep = FALSE)
deep
Whether to make a deep clone.
start_backend()
, stop_backend()
,
configure_bar()
, par_sapply()
, and
par_lapply()
.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Create an user API consumer. consumer <- UserApiConsumer$new() # Execute the task using the `sapply` parallel operation. output_sapply <- consumer$sapply(backend = backend, x = 1:200, fun = task) # Print the head of the `sapply` operation output. head(output_sapply) # Execute the task using the `sapply` parallel operation. output_lapply <- consumer$lapply(backend = backend, x = 1:200, fun = task) # Print the head of the `lapply` operation output. head(output_lapply) # Stop the backend. stop_backend(backend)
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Create an user API consumer. consumer <- UserApiConsumer$new() # Execute the task using the `sapply` parallel operation. output_sapply <- consumer$sapply(backend = backend, x = 1:200, fun = task) # Print the head of the `sapply` operation output. head(output_sapply) # Execute the task using the `sapply` parallel operation. output_lapply <- consumer$lapply(backend = backend, x = 1:200, fun = task) # Print the head of the `lapply` operation output. head(output_lapply) # Stop the backend. stop_backend(backend)
This class contains static methods for throwing warnings with informative messages.
Warning$requested_cluster_cores_too_low()
Warning for not requesting enough cluster cores.
Warning$requested_cluster_cores_too_high()
Warning for requesting too many cluster cores.
Warning$requested_cluster_type_not_supported()
Warning for requesting an unsupported cluster type.
Warning$progress_not_supported_for_backend()
Warning for using a backend incompatible with progress tracking.
Warning$error_in_backend_finalizer()
Warning for errors in the backend finalizer during garbage collection.