| Title: | Progress Bar for Parallel Tasks |
|---|---|
| Description: | A simple interface in the form of R6 classes for executing tasks in parallel, tracking their progress, and displaying accurate progress bars. |
| Authors: | Mihai Constantin [aut, cre] (ORCID: <https://orcid.org/0000-0002-6460-0107>) |
| Maintainer: | Mihai Constantin <[email protected]> |
| License: | MIT + file LICENSE |
| Version: | 1.4.2 |
| Built: | 2026-05-14 06:21:07 UTC |
| Source: | https://github.com/mihaiconstantin/parabar |
This is a concrete implementation of the abstract class Backend
that implements the BackendService interface. This backend
executes tasks in parallel asynchronously (i.e., without blocking the main
R session) on a parallel::makeCluster() cluster created in a background
R session.
parabar::BackendService -> parabar::Backend -> AsyncBackend
task_stateA list of logical values indicating the state of
the task execution. See the TaskState class for more
information on how the statues are determined. The following statuses
are available:
task_not_started: Indicates whether the backend is free. TRUE
signifies that no task has been started and the backend is free to
deploy.
task_is_running: Indicates whether a task is currently running on
the backend.
task_is_completed: Indicates whether a task has finished
executing. TRUE signifies that the output of the task has not been
fetched. Calling the method get_option() will move the output from
the background R session to the main R session. Once the output
has been fetched, the backend is free to deploy another task.
session_stateA list of logical values indicating the state of
the background session managing the cluster. See the
SessionState class for more information on the available
statuses. The following statuses are available:
session_is_starting: Indicates whether the session is starting.
session_is_idle: Indicates whether the session is idle.
session_is_busy: Indicates whether the session is busy. A session
is busy when a task is running or when the output of a task has not
been fetched into the main R session. See the task_state field.
session_is_finished: Indicates whether the session was closed.
new()
Create a new AsyncBackend object.
AsyncBackend$new()
An object of class AsyncBackend.
start()
Start the backend.
AsyncBackend$start(specification)
specificationAn object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster private field on the Backend abstract class,
and accessible to any concrete backend implementations via the active
binding cluster.
stop()
Stop the backend.
AsyncBackend$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE)) on each node in the
backend.
AsyncBackend$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv.
AsyncBackend$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv on each
node.
export()
Export variables from a given environment to the backend.
AsyncBackend$export(variables, environment)
variablesA character vector of variable names to export.
environmentAn environment object from which to export the variables.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
AsyncBackend$evaluate(expression)
expressionAn unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply().
AsyncBackend$sapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
lapply()
Run a task on the backend akin to parallel::parLapply().
AsyncBackend$lapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
apply()
Run a task on the backend akin to parallel::parApply().
AsyncBackend$apply(x, margin, fun, ...)
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
get_output()
Get the output of the task execution.
AsyncBackend$get_output(wait = FALSE)
waitA logical value indicating whether to wait for the task
to finish executing before fetching the results. Defaults to FALSE.
See the Details section for more information.
This method fetches the output of the task execution after calling
the sapply() method. It returns the output and immediately removes
it from the backend. Subsequent calls to this method will throw an
error if no additional tasks have been executed in the meantime. This
method should be called after the execution of a task.
If wait = TRUE, the method will block the main process until the
backend finishes executing the task and the results are available. If
wait = FALSE, the method will immediately attempt to fetch the
results from the background R session, and throw an error if the
task is still running.
A vector, matrix, or list of the same length as x, containing the
results of the fun. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel for more information.
clone()
The objects of this class are cloneable with this method.
AsyncBackend$clone(deep = FALSE)
deepWhether to make a deep clone.
BackendService, Backend, SyncBackend,
ProgressTrackingContext, and TaskState.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable to the backend. backend$export("name") # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 2.5 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.5) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Right know the main process is free and the task is executing on a `psock` # cluster started in a background `R` session. # Trying to get the output immediately will throw an error, indicating that the # task is still running. try(backend$get_output()) # However, we can block the main process and wait for the task to complete # before fetching the results. backend$get_output(wait = TRUE) # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable to the backend. backend$export("name") # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 2.5 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.5) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Right know the main process is free and the task is executing on a `psock` # cluster started in a background `R` session. # Trying to get the output immediately will throw an error, indicating that the # task is still running. try(backend$get_output()) # However, we can block the main process and wait for the task to complete # before fetching the results. backend$get_output(wait = TRUE) # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
This is an abstract class that serves as a base class for all concrete backend implementations. It defines the common properties that all concrete backends require.
This class cannot be instantiated. It needs to be extended by concrete
subclasses that implement the pure virtual methods. Instances of concrete
backend implementations can be conveniently obtained using the
BackendFactory class.
parabar::BackendService -> Backend
clusterThe cluster object used by the backend. For
SyncBackend objects, this is a cluster object created by
parallel::makeCluster(). For AsyncBackend objects,
this is a permanent R session created by callr::r_session that
contains the parallel::makeCluster() cluster object.
supports_progressA boolean value indicating whether the backend implementation supports progress tracking.
activeA boolean value indicating whether the backend implementation has an active cluster.
parabar::BackendService$apply()parabar::BackendService$clear()parabar::BackendService$evaluate()parabar::BackendService$export()parabar::BackendService$get_output()parabar::BackendService$lapply()parabar::BackendService$peek()parabar::BackendService$sapply()parabar::BackendService$start()parabar::BackendService$stop()new()
Create a new Backend object.
Backend$new()
Instantiating this class will throw an error.
clone()
The objects of this class are cloneable with this method.
Backend$clone(deep = FALSE)
deepWhether to make a deep clone.
BackendService, SyncBackend,
AsyncBackend, BackendFactory, and
Context.
This class is a factory that provides concrete implementations of the
Backend abstract class.
get()
Obtain a concrete implementation of the abstract Backend
class of the specified type.
BackendFactory$get(type)
typeA character string specifying the type of the
Backend to instantiate. Possible values are "sync" and
"async". See the Details section for more information.
When type = "sync" a SyncBackend instance is created
and returned. When type = "async" an AsyncBackend
instance is provided instead.
A concrete implementation of the class Backend. It
throws an error if the requested backend type is not supported.
clone()
The objects of this class are cloneable with this method.
BackendFactory$clone(deep = FALSE)
deepWhether to make a deep clone.
BackendService, Backend, SyncBackend,
AsyncBackend, and ContextFactory.
# Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Check the class of the backend instance. class(backend) # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Check the class of the backend instance. class(backend)# Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Check the class of the backend instance. class(backend) # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Check the class of the backend instance. class(backend)
This is an interface that defines the operations available on a
Backend implementation. Backend implementations and the
Context class must implement this interface.
new()
Create a new BackendService object.
BackendService$new()
Instantiating this class will throw an error.
start()
Start the backend.
BackendService$start(specification)
specificationAn object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster private field on the Backend abstract class,
and accessible to any concrete backend implementations via the active
binding cluster.
stop()
Stop the backend.
BackendService$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE)) on each node in the
backend.
BackendService$clear()
This method is ran by default when the backend is started.
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv.
BackendService$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv on each
node.
export()
Export variables from a given environment to the backend.
BackendService$export(variables, environment)
variablesA character vector of variable names to export.
environmentAn environment object from which to export the variables.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
BackendService$evaluate(expression)
expressionAn unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply().
BackendService$sapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
lapply()
Run a task on the backend akin to parallel::parLapply().
BackendService$lapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
apply()
Run a task on the backend akin to parallel::parApply().
BackendService$apply(x, margin, fun, ...)
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
get_output()
Get the output of the task execution.
BackendService$get_output(...)
...Additional optional arguments that may be used by concrete implementations.
This method fetches the output of the task execution after calling
the sapply() method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method are
not advised. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x, containing the
results of the fun. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel for more information.
clone()
The objects of this class are cloneable with this method.
BackendService$clone(deep = FALSE)
deepWhether to make a deep clone.
Backend, SyncBackend, AsyncBackend,
and Context.
This is an abstract class that defines the pure virtual methods a concrete bar must implement.
This class cannot be instantiated. It needs to be extended by concrete
subclasses that implement the pure virtual methods. Instances of concrete
backend implementations can be conveniently obtained using the
BarFactory class.
engineThe bar engine.
new()
Create a new Bar object.
Bar$new()
Instantiating this class will throw an error.
create()
Create a progress bar.
Bar$create(total, initial, ...)
totalThe total number of times the progress bar should tick.
initialThe starting point of the progress bar.
...Additional arguments for the bar creation. See the Details section for more information.
The optional ... named arguments depend on the specific concrete
implementation (i.e., BasicBar or
ModernBar).
This method returns void. The resulting bar is stored in the private
field .bar, accessible via the active binding engine.
update()
Update the progress bar.
Bar$update(current)
currentThe position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar.
Bar$terminate()
clone()
The objects of this class are cloneable with this method.
Bar$clone(deep = FALSE)
deepWhether to make a deep clone.
BasicBar, ModernBar, and BarFactory.
This class is a factory that provides concrete implementations of the
Bar abstract class.
get()
Obtain a concrete implementation of the abstract Bar
class of the specified type.
BarFactory$get(type)
typeA character string specifying the type of the
Bar to instantiate. Possible values are "modern" and
"basic". See the Details section for more information.
When type = "modern" a ModernBar instance is created
and returned. When type = "basic" a BasicBar instance
is provided instead.
A concrete implementation of the class Bar. It throws an
error if the requested bar type is not supported.
clone()
The objects of this class are cloneable with this method.
BarFactory$clone(deep = FALSE)
deepWhether to make a deep clone.
# Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Check the class of the bar instance. class(bar) # Get a basic bar instance. bar <- bar_factory$get("basic") # Check the class of the bar instance. class(bar)# Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Check the class of the bar instance. class(bar) # Get a basic bar instance. bar <- bar_factory$get("basic") # Check the class of the bar instance. class(bar)
This is a concrete implementation of the abstract class Bar
using the utils::txtProgressBar() as engine for the progress bar.
parabar::Bar -> BasicBar
new()
Create a new BasicBar object.
BasicBar$new()
An object of class BasicBar.
create()
Create a progress bar.
BasicBar$create(total, initial, ...)
totalThe total number of times the progress bar should tick.
initialThe starting point of the progress bar.
...Additional arguments for the bar creation passed to
utils::txtProgressBar().
This method returns void. The resulting bar is stored in the private
field .bar, accessible via the active binding engine. Both the
private field and the active binding are defined in the super class
Bar.
update()
Update the progress bar by calling utils::setTxtProgressBar().
BasicBar$update(current)
currentThe position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar by calling base::close() on the
private field .bar.
BasicBar$terminate()
clone()
The objects of this class are cloneable with this method.
BasicBar$clone(deep = FALSE)
deepWhether to make a deep clone.
Bar, ModernBar, and BarFactory.
# Create a basic bar instance. bar <- BasicBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()# Create a basic bar instance. bar <- BasicBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
This function can be used to clear a backend created
by start_backend().
clear(backend)clear(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
calls the clear method on the provided
backend instance.
The function returns void. It throws an error if the value provided for the
backend argument is not an instance of class Backend.
start_backend(), peek(), export(),
evaluate(), configure_bar(), par_sapply(),
par_lapply(), par_apply(), stop_backend(),
and BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This function can be used to conveniently configure the progress bar by
adjusting the progress_bar_config field of the
Options instance in the base::.Options list.
configure_bar(type = "modern", ...)configure_bar(type = "modern", ...)
type |
A character string specifying the type of progress bar to be used
with compatible |
... |
A list of named arguments used to configure the progress bar. See the Details section for more information. |
The optional ... named arguments depend on the type of progress bar being
configured. When type = "modern", the ... take the named arguments of the
progress::progress_bar class. When type = "basic", the ... take the
named arguments of the utils::txtProgressBar() built-in function. See the
Examples section for a demonstration.
The function returns void. It throws an error if the requested bar type is
not supported.
progress::progress_bar, utils::txtProgressBar(),
set_default_options(), get_option(),
set_option()
# Set the default package options. set_default_options() # Get the progress bar type from options. get_option("progress_bar_type") # Get the progress bar configuration from options. get_option("progress_bar_config") # Adjust the format of the `modern` progress bar. configure_bar(type = "modern", format = "[:bar] :percent") # Check that the configuration has been updated in the options. get_option("progress_bar_config") # Change to and adjust the style of the `basic` progress bar. configure_bar(type = "basic", style = 3) # Check that the configuration has been updated in the options. get_option("progress_bar_type") get_option("progress_bar_config")# Set the default package options. set_default_options() # Get the progress bar type from options. get_option("progress_bar_type") # Get the progress bar configuration from options. get_option("progress_bar_config") # Adjust the format of the `modern` progress bar. configure_bar(type = "modern", format = "[:bar] :percent") # Check that the configuration has been updated in the options. get_option("progress_bar_config") # Change to and adjust the style of the `basic` progress bar. configure_bar(type = "basic", style = 3) # Check that the configuration has been updated in the options. get_option("progress_bar_type") get_option("progress_bar_config")
This class represents the base context for interacting with
Backend implementations via the BackendService
interface.
This class is a vanilla wrapper around a Backend implementation.
It registers a backend instance and forwards all BackendService
methods calls to the backend instance. Subclasses can override any of the
BackendService methods to decorate the backend instance with
additional functionality (e.g., see the ProgressTrackingContext
class for an example).
parabar::BackendService -> Context
backendThe Backend object registered with the
context.
new()
Create a new Context object.
Context$new()
An object of class Context.
set_backend()
Set the backend instance to be used by the context.
Context$set_backend(backend)
backendAn object of class Backend that
implements the BackendService interface.
start()
Start the backend.
Context$start(specification)
specificationAn object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster private field on the Backend abstract class,
and accessible to any concrete backend implementations via the active
binding cluster.
stop()
Stop the backend.
Context$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE)) on each node in the
backend.
Context$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv.
Context$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv on each
node.
export()
Export variables from a given environment to the backend.
Context$export(variables, environment)
variablesA character vector of variable names to export.
environmentAn environment object from which to export the variables. Defaults to the parent frame.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
Context$evaluate(expression)
expressionAn unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply().
Context$sapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
lapply()
Run a task on the backend akin to parallel::parLapply().
Context$lapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
apply()
Run a task on the backend akin to parallel::parApply().
Context$apply(x, margin, fun, ...)
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
get_output()
Get the output of the task execution.
Context$get_output(...)
...Additional arguments to pass to the backend registered
with the context. This is useful for backends that require additional
arguments to fetch the output (e.g., AsyncBackend$get_output(wait = TRUE)).
This method fetches the output of the task execution after calling
the sapply() method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method are
not advised. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x, containing the
results of the fun. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel for more information.
clone()
The objects of this class are cloneable with this method.
Context$clone(deep = FALSE)
deepWhether to make a deep clone.
ProgressTrackingContext, BackendService,
Backend, and SyncBackend.
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.25) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Create a base context object. context <- Context$new() # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. context$get_output() # Close the backend. context$stop() # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Register the backend with the same context object. context$set_backend(backend) # Start the backend reusing the specification object. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.25) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a synchronous backend instance. backend <- backend_factory$get("sync") # Create a base context object. context <- Context$new() # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. context$get_output() # Close the backend. context$stop() # Get an asynchronous backend instance. backend <- backend_factory$get("async") # Register the backend with the same context object. context$set_backend(backend) # Start the backend reusing the specification object. context$start(specification) # Run a task in parallel (i.e., approx. 1.25 seconds). context$sapply(x = 1:10, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
This class is a factory that provides instances of the Context
class.
get()
Obtain instances of the Context class.
ContextFactory$get(type)
typeA character string specifying the type of the
Context to instantiate. Possible values are "regular"
and "progress". See the Details section for more information.
When type = "regular" a Context instance is created
and returned. When type = "progress" a
ProgressTrackingContext instance is provided instead.
An object of type Context. It throws an error if the
requested context type is not supported.
clone()
The objects of this class are cloneable with this method.
ContextFactory$clone(deep = FALSE)
deepWhether to make a deep clone.
Context, ProgressTrackingContext,
BackendService, and Backend
# Create a context factory. context_factory <- ContextFactory$new() # Get a regular context instance. context <- context_factory$get("regular") # Check the class of the context instance. class(context) # Get a progress context instance. context <- context_factory$get("progress") class(context)# Create a context factory. context_factory <- ContextFactory$new() # Get a regular context instance. context <- context_factory$get("regular") # Check the class of the context instance. class(context) # Get a progress context instance. context <- context_factory$get("progress") class(context)
This function can be used to evaluate an arbitrary base::expression() a
backend created by start_backend().
evaluate(backend, expression)evaluate(backend, expression)
backend |
An object of class |
expression |
An unquoted expression to evaluate on the backend. |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
calls the evaluate method on the provided
backend instance.
This method returns the result of the expression evaluation. It throws an
error if the value provided for the backend argument is not an instance of
class Backend.
start_backend(), peek(), export(),
clear(), configure_bar(), par_sapply(),
par_lapply(), par_apply(), stop_backend(),
and BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This class contains static methods for throwing exceptions with informative messages.
Exception$abstract_class_not_instantiable(object)Exception for instantiating abstract classes or interfaces.
Exception$method_not_implemented()Exception for calling methods without an implementation.
Exception$feature_not_developed()Exception for running into things not yet developed.
Exception$not_enough_cores()Exception for requesting more cores than available on the machine.
Exception$cluster_active()Exception for attempting to start a cluster while another one is active.
Exception$cluster_not_active()Exception for attempting to stop a cluster while not active.
Exception$async_task_not_started()Exception for reading results while an asynchronous task has not yet started.
Exception$async_task_running()Exception for reading results while an asynchronous task is running.
Exception$async_task_completed()Exception for reading results while a completed asynchronous task has unread results.
Exception$async_task_error(error)Exception for errors while running an asynchronous task.
Exception$stop_busy_backend_not_allowed()Exception for stopping a busy backend without intent.
Exception$temporary_file_creation_failed()Exception for reading results while an asynchronous task is running.
Exception$type_not_assignable(actual, expected)Exception for when providing incorrect object types.
Exception$unknown_package_option(option)Exception for when requesting unknown package options.
Exception$primitive_as_task_not_allowed()Exception for when decorating primitive functions with progress tracking.
Exception$array_margins_not_compatible(actual, allowed)Exception for using improper margins in the BackendService$apply operation.
This function can be used to export objects to a
backend created by start_backend().
export(backend, variables, environment)export(backend, variables, environment)
backend |
An object of class |
variables |
A character vector of variable names to export to the backend. |
environment |
An environment from which to export the variables. If no
environment is provided, the |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
calls the export method on the provided
backend instance.
The function returns void. It throws an error if the value provided for the
backend argument is not an instance of class Backend.
start_backend(), peek(), evaluate(),
clear(), configure_bar(), par_sapply(),
par_lapply(), par_apply(), stop_backend(),
and BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
The get_option() function is a helper for retrieving the value of
parabar options. If the
option requested is not available in the session
base::.Options list, the corresponding default value set by the
Options R6::R6 class is returned instead.
The set_option() function is a helper for setting
parabar options. The function adjusts the
fields of the Options instance stored in the base::.Options
list. If no Options instance is present in the
base::.Options list, a new one is created.
The set_default_options() function is used to set the default
options values for the parabar package. The
function is automatically called at package load and the entry created can be
retrieved via getOption("parabar"). Specific package
options can be retrieved using the helper function
get_option().
get_option(option) set_option(option, value) set_default_options()get_option(option) set_option(option, value) set_default_options()
option |
A character string representing the name of the option to
retrieve or adjust. See the public fields of |
value |
The value to set the |
The get_option() function returns the value of the requested
option present in the base::.Options list, or its
corresponding default value (i.e., see Options). If the
requested option is not known, an error is thrown.
The set_option() function returns void. It throws an error if the
requested option to be adjusted is not known.
The set_default_options() function returns void. The
options set can be consulted via the base::.Options
list. See the Options R6::R6 class for more information on
the default values set by this function.
Options, set_default_options(), base::options(),
and base::getOption().
# Get the status of progress tracking. get_option("progress_track") # Set the status of progress tracking to `FALSE`. set_option("progress_track", FALSE) # Get the status of progress tracking again. get_option("progress_track") # Restore default options. set_default_options() # Get the status of progress tracking yet again. get_option("progress_track")# Get the status of progress tracking. get_option("progress_track") # Set the status of progress tracking to `FALSE`. set_option("progress_track", FALSE) # Get the status of progress tracking again. get_option("progress_track") # Restore default options. set_default_options() # Get the status of progress tracking yet again. get_option("progress_track")
This class contains static helper methods.
Helper$get_class_name(object)Helper for getting the class of a given object.
Helper$is_of_class(object, class)Check if an object is of a certain class.
Helper$get_option(option)Get package option, or corresponding default value.
Helper$set_option(option, value)Set package option.
Helper$check_object_type(object, expected_type)Check the type of a given object.
Helper$check_array_margins(margins, dimensions)Helper to check array margins for the BackendService$apply operation.
The logo is generated by make_logo() and displayed on package
attach for interactive R sessions.
LOGOLOGO
An object of class character containing the ASCII logo.
print(LOGO)print(LOGO)
This function is meant for generating or updating the logo. After running
this procedure we end up with what is stored in the LOGO
constant.
make_logo( template = "./inst/assets/logo/parabar-logo.txt", version = c(1, 0, 0) )make_logo( template = "./inst/assets/logo/parabar-logo.txt", version = c(1, 0, 0) )
template |
A character string representing the path to the logo template. |
version |
A numerical vector of three positive integers representing the version of the package to append to the logo. |
The ASCII logo.
## Not run: # Generate the logo. logo <- make_logo() # Print the logo. cat(logo) ## End(Not run)## Not run: # Generate the logo. logo <- make_logo() # Print the logo. cat(logo) ## End(Not run)
This is a concrete implementation of the abstract class Bar
using the progress::progress_bar as engine for the progress bar.
parabar::Bar -> ModernBar
new()
Create a new ModernBar object.
ModernBar$new()
An object of class ModernBar.
create()
Create a progress bar.
ModernBar$create(total, initial, ...)
totalThe total number of times the progress bar should tick.
initialThe starting point of the progress bar.
...Additional arguments for the bar creation passed to
progress::progress_bar$new().
This method returns void. The resulting bar is stored in the private
field .bar, accessible via the active binding engine. Both the
private field and the active binding are defined in the super class
Bar.
update()
Update the progress bar by calling
progress::progress_bar$update().
ModernBar$update(current)
currentThe position the progress bar should be at (e.g., 30 out of 100), usually the index in a loop.
terminate()
Terminate the progress bar by calling
progress::progress_bar$terminate().
ModernBar$terminate()
clone()
The objects of this class are cloneable with this method.
ModernBar$clone(deep = FALSE)
deepWhether to make a deep clone.
Bar, BasicBar, and BarFactory.
# Create a modern bar instance. bar <- ModernBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()# Create a modern bar instance. bar <- ModernBar$new() # Specify the number of ticks to be performed. total <- 100 # Create the progress bar. bar$create(total = total, initial = 0) # Use the progress bar. for (i in 1:total) { # Sleep a bit. Sys.sleep(0.02) # Update the progress bar. bar$update(i) } # Terminate the progress bar. bar$terminate()
This class holds public fields that represent the package
options used to configure the default behavior of the
functionality parabar provides.
An instance of this class is automatically created and stored in the session
base::.Options at load time. This instance can be accessed and changed
via getOption("parabar"). Specific package
options can be retrieved using the helper function
get_option().
progress_trackA logical value indicating whether progress
tracking should be enabled (i.e., TRUE) or disabled (i.e.,
FALSE) globally for compatible backends. The default value is
TRUE.
progress_timeoutA numeric value indicating the timeout (i.e.,
in seconds) between subsequent checks of the log file for new
progress records. The default value is 0.001.
progress_waitA numeric value indicating the approximate
duration (i.e., in seconds) to wait between progress bar updates
before checking if the task has finished (i.e., possibly with an
error). The default value is 0.1.
progress_bar_typeA character string indicating the default
bar type to use with compatible backends. Possible values are
"modern" (the default) or "basic".
progress_bar_configA list of lists containing the default bar configuration for each supported bar engine. Elements of these lists represent arguments for the corresponding bar engines. Currently, the supported bar engines are:
modern: The progress::progress_bar engine, with the following
default configuration:
show_after = 0
format = "> completed :current out of :total tasks [:percent] [:elapsed]"
basic: The utils::txtProgressBar engine, with no default
configuration.
stop_forcefulA logical value indicating whether to allow
stopping an asynchronous backend forcefully (i.e., TRUE), or not
(i.e., FALSE). When stopping forcefully, the backend is terminated
without waiting for a running tasks to finish or for the results to
be read into the main R session. The default value is FALSE.
progress_log_pathA character string indicating the path to
the log file where to track the execution progress of a running task.
The default value is a temporary file generated by
base::tempfile(). Calling this active binding repeatedly will
yield different temporary file paths. Fixing the path to a specific
value is possible by setting this active binding to a character
string representing the desired path. Setting this active binding to
NULL will reset it to the default value (i.e., yielding different
temporary file paths).
get_option(), set_option(), and
set_default_options().
# Set the default package options (i.e., automatically set at load time). set_default_options() # First, get the options instance from the session options. parabar <- getOption("parabar") # Then, disable progress tracking. parabar$progress_track <- FALSE # Check that the change was applied (i.e., `progress_track: FALSE`). getOption("parabar") # To restore defaults, set the default options again. set_default_options() # Check that the change was applied (i.e., `progress_track: TRUE`). getOption("parabar") # We can also use the built-in helpers to get and set options more conveniently. # Get the progress tracking option. get_option("progress_track") # Set the progress tracking option to `FALSE`. set_option("progress_track", FALSE) # Check that the change was applied (i.e., `progress_track: FALSE`). get_option("progress_track") # Get a temporary file for logging the progress. get_option("progress_log_path") # Fix the logging file path. set_option("progress_log_path", "./progress.log") # Check that the logging path change was applied. get_option("progress_log_path") # Restore the logging path to the default behavior. set_option("progress_log_path", NULL) # Check that the logging path change was applied. get_option("progress_log_path") # Restore the defaults. set_default_options()# Set the default package options (i.e., automatically set at load time). set_default_options() # First, get the options instance from the session options. parabar <- getOption("parabar") # Then, disable progress tracking. parabar$progress_track <- FALSE # Check that the change was applied (i.e., `progress_track: FALSE`). getOption("parabar") # To restore defaults, set the default options again. set_default_options() # Check that the change was applied (i.e., `progress_track: TRUE`). getOption("parabar") # We can also use the built-in helpers to get and set options more conveniently. # Get the progress tracking option. get_option("progress_track") # Set the progress tracking option to `FALSE`. set_option("progress_track", FALSE) # Check that the change was applied (i.e., `progress_track: FALSE`). get_option("progress_track") # Get a temporary file for logging the progress. get_option("progress_log_path") # Fix the logging file path. set_option("progress_log_path", "./progress.log") # Check that the logging path change was applied. get_option("progress_log_path") # Restore the logging path to the default behavior. set_option("progress_log_path", NULL) # Check that the logging path change was applied. get_option("progress_log_path") # Restore the defaults. set_default_options()
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parApply(). If
backend = NULL, the task is executed sequentially using base::apply().
See the Details section for more information on how this function works.
par_apply(backend = NULL, x, margin, fun, ...)par_apply(backend = NULL, x, margin, fun, ...)
backend |
An object of class |
x |
An array to pass to the |
margin |
A numeric vector indicating the dimensions of |
fun |
A function to apply to |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer class that acts like an
interface for the developer API of the parabar package.
The dimensions of the output vary according to the margin argument. Consult
the documentation of base::apply() for a detailed explanation on how the
output is structured.
start_backend(), peek(), export(),
evaluate(), clear(), configure_bar(),
par_sapply(), par_lapply(), stop_backend(),
set_option(), get_option(), Options,
UserApiConsumer, and BackendService.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. mean(x) } # Define a matrix for the task. x <- matrix(rnorm(100^2, mean = 10, sd = 0.5), nrow = 100, ncol = 100) # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel over the rows of `x`. results <- par_apply(backend, x = x, margin = 1, fun = task) # Run a task in parallel over the columns of `x`. results <- par_apply(backend, x = x, margin = 2, fun = task) # The task can also be run over all elements of `x` using `margin = c(1, 2)`. # Improper dimensions will throw an error. try(par_apply(backend, x = x, margin = c(1, 2, 3), fun = task)) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_apply(NULL, x = x, margin = 1, fun = task)# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. mean(x) } # Define a matrix for the task. x <- matrix(rnorm(100^2, mean = 10, sd = 0.5), nrow = 100, ncol = 100) # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel over the rows of `x`. results <- par_apply(backend, x = x, margin = 1, fun = task) # Run a task in parallel over the columns of `x`. results <- par_apply(backend, x = x, margin = 2, fun = task) # The task can also be run over all elements of `x` using `margin = c(1, 2)`. # Improper dimensions will throw an error. try(par_apply(backend, x = x, margin = c(1, 2, 3), fun = task)) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_apply(backend, x = x, margin = 1, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_apply(NULL, x = x, margin = 1, fun = task)
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parLapply(). If
backend = NULL, the task is executed sequentially using base::lapply().
See the Details section for more information on how this function works.
par_lapply(backend = NULL, x, fun, ...)par_lapply(backend = NULL, x, fun, ...)
backend |
An object of class |
x |
An atomic vector or list to pass to the |
fun |
A function to apply to each element of |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer class that acts like an
interface for the developer API of the parabar package.
A list of the same length as x containing the results of the fun. The
output format resembles that of base::lapply().
start_backend(), peek(), export(),
evaluate(), clear(), configure_bar(),
par_sapply(), par_apply(), stop_backend(),
set_option(), get_option(), Options,
UserApiConsumer, and BackendService.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_lapply(NULL, x = 1:300, fun = task)# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_lapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::lapply` (i.e., non-parallel). results <- par_lapply(NULL, x = 1:300, fun = task)
This function can be used to run a task in parallel. The task is executed in
parallel on the specified backend, similar to parallel::parSapply(). If
backend = NULL, the task is executed sequentially using base::sapply().
See the Details section for more information on how this function works.
par_sapply(backend = NULL, x, fun, ...)par_sapply(backend = NULL, x, fun, ...)
backend |
An object of class |
x |
An atomic vector or list to pass to the |
fun |
A function to apply to each element of |
... |
Additional arguments to pass to the |
This function uses the UserApiConsumer class that acts like an
interface for the developer API of the parabar package.
A vector of the same length as x containing the results of the fun. The
output format resembles that of base::sapply().
start_backend(), peek(), export(),
evaluate(), clear(), configure_bar(),
par_lapply(), par_apply(), stop_backend(),
set_option(), get_option(), Options,
UserApiConsumer, and BackendService.
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::sapply` (i.e., non-parallel). results <- par_sapply(NULL, x = 1:300, fun = task)# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Enable progress tracking. set_option("progress_track", TRUE) # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Start a synchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "sync") # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Disable progress tracking to remove the warning that progress is not supported. set_option("progress_track", FALSE) # Run a task in parallel. results <- par_sapply(backend, x = 1:300, fun = task) # Stop the backend. stop_backend(backend) # Run the task using the `base::sapply` (i.e., non-parallel). results <- par_sapply(NULL, x = 1:300, fun = task)
This function can be used to check the names of the variables present on a
backend created by start_backend().
peek(backend)peek(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
calls the peek method on the provided
backend instance.
The function returns a list of character vectors, where each list element
corresponds to a node, and each element of the character vector is the name
of a variable present on that node. It throws an error if the value provided
for the backend argument is not an instance of class Backend.
start_backend(), export(), evaluate(),
clear(), configure_bar(), par_sapply(),
par_lapply(), par_apply(), stop_backend(),
and BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This class represents a progress tracking context for interacting with
Backend implementations via the BackendService
interface.
This class extends the base Context class and overrides the
sapply parent method to decorate the backend instance
with additional functionality. Specifically, this class creates a temporary
file to log the progress of backend tasks, and then creates a progress bar to
display the progress of the backend tasks.
The progress bar is updated after each backend task execution. The timeout
between subsequent checks of the temporary log file is controlled by the
Options class and defaults to 0.001. This value can be
adjusted via the Options instance present in the session
base::.Options list (i.e., see set_option()). For example, to
set the timeout to 0.1 we can run set_option("progress_timeout", 0.1).
This class is a good example of how to extend the base Context
class to decorate the backend instance with additional functionality.
parabar::BackendService -> parabar::Context -> ProgressTrackingContext
barThe Bar instance registered with the context.
set_backend()
Set the backend instance to be used by the context.
ProgressTrackingContext$set_backend(backend)
backendAn object of class Backend that supports
progress tracking implements the BackendService
interface.
This method overrides the parent method to validate the backend
provided and guarantee it is an instance of the
AsyncBackend class.
set_bar()
Set the Bar instance to be used by the context.
ProgressTrackingContext$set_bar(bar)
barAn object of class Bar.
configure_bar()
Configure the Bar instance registered with the context.
ProgressTrackingContext$configure_bar(...)
sapply()
Run a task on the backend akin to parallel::parSapply(), but with
a progress bar.
ProgressTrackingContext$sapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
lapply()
Run a task on the backend akin to parallel::parLapply(), but with
a progress bar.
ProgressTrackingContext$lapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
apply()
Run a task on the backend akin to parallel::parApply().
ProgressTrackingContext$apply(x, margin, fun, ...)
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
clone()
The objects of this class are cloneable with this method.
ProgressTrackingContext$clone(deep = FALSE)
deepWhether to make a deep clone.
Context, BackendService, Backend, and
AsyncBackend.
# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.15) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a backend instance that does not support progress tracking. backend <- backend_factory$get("sync") # Create a progress tracking context object. context <- ProgressTrackingContext$new() # Attempt to set the incompatible backend instance. try(context$set_backend(backend)) # Get a backend instance that does support progress tracking. backend <- backend_factory$get("async") # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the context. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = " > completed :current out of :total tasks [:percent] [:elapsed]" ) # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Change the bar type. bar <- bar_factory$get("basic") # Register the bar with the context. context$set_bar(bar) # Remove the previous bar configuration. context$configure_bar() # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()# Define a task to run in parallel. task <- function(x, y) { # Sleep a bit. Sys.sleep(0.15) # Return the result of a computation. return(x + y) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a backend factory. backend_factory <- BackendFactory$new() # Get a backend instance that does not support progress tracking. backend <- backend_factory$get("sync") # Create a progress tracking context object. context <- ProgressTrackingContext$new() # Attempt to set the incompatible backend instance. try(context$set_backend(backend)) # Get a backend instance that does support progress tracking. backend <- backend_factory$get("async") # Register the backend with the context. context$set_backend(backend) # From now all, all backend operations are intercepted by the context. # Start the backend. context$start(specification) # Create a bar factory. bar_factory <- BarFactory$new() # Get a modern bar instance. bar <- bar_factory$get("modern") # Register the bar with the context. context$set_bar(bar) # Configure the bar. context$configure_bar( show_after = 0, format = " > completed :current out of :total tasks [:percent] [:elapsed]" ) # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Change the bar type. bar <- bar_factory$get("basic") # Register the bar with the context. context$set_bar(bar) # Remove the previous bar configuration. context$configure_bar() # Run a task in parallel (i.e., approx. 1.9 seconds). context$sapply(x = 1:25, fun = task, y = 10) # Get the task output. backend$get_output(wait = TRUE) # Close the backend. context$stop()
This class holds the state of a background session
used by an asynchronous backend (i.e., AsyncBackend). See the
Details section for more information.
The session state is useful to check if an asynchronous backend is ready for certain operations. A session can only be in one of the following four states at a time:
session_is_starting: When TRUE, it indicates that the session is
starting.
session_is_idle: When TRUE, it indicates that the session is idle and
ready to execute operations.
session_is_busy: When TRUE, it indicates that the session is busy
(i.e., see the TaskState class for more information about a
task's state).
session_is_finished: When TRUE, it indicates that the session is closed
and no longer available for operations.
session_is_startingA logical value indicating whether the session is starting.
session_is_idleA logical value indicating whether the session is idle and ready to execute operations.
session_is_busyA logical value indicating whether the session is busy.
session_is_finishedA logical value indicating whether the session is closed and no longer available for operations.
new()
Create a new SessionState object and determine the state
of a given background session.
SessionState$new(session)
sessionA callr::r_session object.
An object of class SessionState.
clone()
The objects of this class are cloneable with this method.
SessionState$clone(deep = FALSE)
deepWhether to make a deep clone.
TaskState, AsyncBackend and
ProgressTrackingContext.
# Handy function to print the session states all at once. check_state <- function(session) { # Create a session object and determine its state. session_state <- SessionState$new(session) # Print the state. cat( "Session is starting: ", session_state$session_is_starting, "\n", "Session is idle: ", session_state$session_is_idle, "\n", "Session is busy: ", session_state$session_is_busy, "\n", "Session is finished: ", session_state$session_is_finished, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the session is idle. check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check that the session is busy. check_state(backend$cluster) } # Get the output and wait for the task to complete. output <- backend$get_output(wait = TRUE) # Check that the session is idle again. check_state(backend$cluster) # Manually close the session. backend$cluster$close() # Check that the session is finished. check_state(backend$cluster) # Stop the backend. backend$stop()# Handy function to print the session states all at once. check_state <- function(session) { # Create a session object and determine its state. session_state <- SessionState$new(session) # Print the state. cat( "Session is starting: ", session_state$session_is_starting, "\n", "Session is idle: ", session_state$session_is_idle, "\n", "Session is busy: ", session_state$session_is_busy, "\n", "Session is finished: ", session_state$session_is_finished, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the session is idle. check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check that the session is busy. check_state(backend$cluster) } # Get the output and wait for the task to complete. output <- backend$get_output(wait = TRUE) # Check that the session is idle again. check_state(backend$cluster) # Manually close the session. backend$cluster$close() # Check that the session is finished. check_state(backend$cluster) # Stop the backend. backend$stop()
This class contains the information required to start a backend. An instance
of this class is used by the start method of the
BackendService interface.
coresThe number of nodes to use in the cluster creation.
typeThe type of cluster to create.
typesThe supported cluster types.
set_cores()
Set the number of nodes to use in the cluster.
Specification$set_cores(cores)
coresThe number of nodes to use in the cluster.
This method also performs a validation of the requested number of
cores, ensuring that the the value lies between 1 and
parallel::detectCores() - 1.
set_type()
Set the type of cluster to create.
Specification$set_type(type)
typeThe type of cluster to create. Possible values are
"fork" and "psock". Defaults to "psock".
If no type is explicitly requested (i.e., type = NULL), the type is
determined based on the operating system. On Unix-like systems, the
type is set to "fork", while on Windows systems, the type is set to
"psock". If an unknown type is requested, a warning is issued and
the type is set to "psock".
clone()
The objects of this class are cloneable with this method.
Specification$clone(deep = FALSE)
deepWhether to make a deep clone.
BackendService, Backend, SyncBackend,
and AsyncBackend.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 4) # Set the cluster type. specification$set_type(type = "psock") # Get the number of cores. specification$cores # Get the cluster type. specification$type # Attempt to set too many cores. specification$set_cores(cores = 100) # Check that the cores were reasonably set. specification$cores # Allow the object to determine the adequate cluster type. specification$set_type(type = NULL) # Check the type determined. specification$type # Attempt to set an invalid cluster type. specification$set_type(type = "invalid") # Check that the type was set to `psock`. specification$type# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 4) # Set the cluster type. specification$set_type(type = "psock") # Get the number of cores. specification$cores # Get the cluster type. specification$type # Attempt to set too many cores. specification$set_cores(cores = 100) # Check that the cores were reasonably set. specification$cores # Allow the object to determine the adequate cluster type. specification$set_type(type = NULL) # Check the type determined. specification$type # Attempt to set an invalid cluster type. specification$set_type(type = "invalid") # Check that the type was set to `psock`. specification$type
This function can be used to start a backend. Check out the Details section for more information.
start_backend(cores, cluster_type = "psock", backend_type = "async")start_backend(cores, cluster_type = "psock", backend_type = "async")
cores |
A positive integer representing the number of cores to use
(i.e., the number of processes to start). This value must be between |
cluster_type |
A character string representing the type of cluster to
create. Possible values are |
backend_type |
A character string representing the type of backend to
create. Possible values are |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
uses the Specification class to create a specification object,
and the BackendFactory class to create a Backend
instance based on the specification object.
A Backend instance that can be used to parallelize computations.
The methods available on the Backend instance are defined by the
BackendService interface.
The cluster type determines the type of cluster to create. The requested
value is validated and passed to the type argument of the
parallel::makeCluster() function. The following table lists the possible
values and their corresponding description.
| Cluster | Description |
"fork" |
For Unix-based systems. |
"psock" |
For Windows-based systems. |
The backend type determines the type of backend to create. The requested
value is passed to the BackendFactory class, which returns a
Backend instance of the desired type. The following table lists
the possible backend types and their corresponding description.
| Backend | Description | Implementation | Progress |
"sync" |
A synchronous backend. | SyncBackend |
no |
"async" |
An asynchronous backend. | AsyncBackend |
yes |
In a nutshell, the difference between the two backend types is that for the
synchronous backend the cluster is created in the main process, while for the
asynchronous backend the cluster is created in a backend R process using
callr::r_session. Therefore, the synchronous backend is blocking the main
process during task execution, while the asynchronous backend is
non-blocking. Check out the implementations listed in the table above for
more information. All concrete implementations extend the
Backend abstract class and implement the
BackendService interface.
peek(), export(), evaluate(),
clear(), configure_bar(), par_sapply(),
par_lapply(), par_apply(), stop_backend(),
and BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This function can be used to stop a backend created
by start_backend().
stop_backend(backend)stop_backend(backend)
backend |
An object of class |
This function is a convenience wrapper around the lower-lever API of
parabar aimed at developers. More specifically, this function
calls the stop method on the provided
backend instance.
The function returns void. It throws an error if:
the value provided for the backend argument is not an instance of class
Backend.
the backend object provided is already stopped
(i.e., is not active).
start_backend(), peek(), export(),
evaluate(), clear(), configure_bar(),
par_sapply(), par_apply(), par_lapply(), and
BackendService.
# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active# Create an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Check that the backend is active. backend$active # Check if there is anything on the backend. peek(backend) # Create a dummy variable. name <- "parabar" # Export the `name` variable in the current environment to the backend. export(backend, "name", environment()) # Remove the dummy variable from the current environment. rm(name) # Check the backend to see that the variable has been exported. peek(backend) # Run an expression on the backend. # Note that the symbols in the expression are resolved on the backend. evaluate(backend, { # Print the name. print(paste0("Hello, ", name, "!")) }) # Clear the backend. clear(backend) # Check that there is nothing on the backend. peek(backend) # Use a basic progress bar (i.e., see `parabar::Bar`). configure_bar(type = "basic", style = 3) # Run a task in parallel (i.e., approx. 1.25 seconds). output <- par_sapply(backend, x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute and return. return(x + 1) }) # Print the output. print(output) # Stop the backend. stop_backend(backend) # Check that the backend is not active. backend$active
This is a concrete implementation of the abstract class Backend
that implements the BackendService interface. This backend
executes tasks in parallel on a parallel::makeCluster() cluster
synchronously (i.e., blocking the main R session).
parabar::BackendService -> parabar::Backend -> SyncBackend
new()
Create a new SyncBackend object.
SyncBackend$new()
An object of class SyncBackend.
start()
Start the backend.
SyncBackend$start(specification)
specificationAn object of class Specification
that contains the backend configuration.
This method returns void. The resulting backend must be stored in the
.cluster private field on the Backend abstract class,
and accessible to any concrete backend implementations via the active
binding cluster.
stop()
Stop the backend.
SyncBackend$stop()
This method returns void.
clear()
Remove all objects from the backend. This function is equivalent to
calling rm(list = ls(all.names = TRUE)) on each node in the
backend.
SyncBackend$clear()
This method returns void.
peek()
Inspect the backend for variables available in the .GlobalEnv.
SyncBackend$peek()
This method returns a list of character vectors, where each element
corresponds to a node in the backend. The character vectors contain
the names of the variables available in the .GlobalEnv on each
node.
export()
Export variables from a given environment to the backend.
SyncBackend$export(variables, environment)
variablesA character vector of variable names to export.
environmentAn environment object from which to export the variables. Defaults to the parent frame.
This method returns void.
evaluate()
Evaluate an arbitrary expression on the backend.
SyncBackend$evaluate(expression)
expressionAn unquoted expression to evaluate on the backend.
This method returns the result of the expression evaluation.
sapply()
Run a task on the backend akin to parallel::parSapply().
SyncBackend$sapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
lapply()
Run a task on the backend akin to parallel::parLapply().
SyncBackend$lapply(x, fun, ...)
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
apply()
Run a task on the backend akin to parallel::parApply().
SyncBackend$apply(x, margin, fun, ...)
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
This method returns void. The output of the task execution must be
stored in the private field .output on the Backend
abstract class, and is accessible via the get_output() method.
get_output()
Get the output of the task execution.
SyncBackend$get_output(...)
...Additional arguments currently not in use.
This method fetches the output of the task execution after calling
the sapply() method. It returns the output and immediately removes
it from the backend. Therefore, subsequent calls to this method will
return NULL. This method should be called after the execution of a
task.
A vector, matrix, or list of the same length as x, containing the
results of the fun. The output format differs based on the specific
operation employed. Check out the documentation for the apply
operations of parallel::parallel for more information.
clone()
The objects of this class are cloneable with this method.
SyncBackend$clone(deep = FALSE)
deepWhether to make a deep clone.
BackendService, Backend, AsyncBackend,
and Context.
# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a synchronous backend object. backend <- SyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable from the current environment to the backend. backend$export("name", environment()) # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 1.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Get the task output. backend$get_output() # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active# Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create a synchronous backend object. backend <- SyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check if there is anything on the backend. backend$peek() # Create a dummy variable. name <- "parabar" # Export the variable from the current environment to the backend. backend$export("name", environment()) # Remove variable from current environment. rm(name) # Run an expression on the backend, using the exported variable `name`. backend$evaluate({ # Print the name. print(paste0("Hello, ", name, "!")) }) # Run a task in parallel (i.e., approx. 1.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.25) # Compute something. output <- x + 1 # Return the result. return(output) } ) # Get the task output. backend$get_output() # Clear the backend. backend$clear() # Check that there is nothing on the cluster. backend$peek() # Stop the backend. backend$stop() # Check that the backend is not active. backend$active
This class holds the state of a task deployed to an asynchronous backend
(i.e., AsyncBackend). See the Details section for more
information.
The task state is useful to check if an asynchronous backend is free to execute other operations. A task can only be in one of the following three states at a time:
task_not_started: When TRUE, it indicates whether the backend is free
to execute another operation.
task_is_running: When TRUE, it indicates that there is a task running
on the backend.
task_is_completed: When TRUE, it indicates that the task has been
completed, but the backend is still busy because the task output has not
been retrieved.
The task state is determined based on the state of the background
session (i.e., see the get_state method for
callr::r_session) and the state of the task execution inferred from
polling the process (i.e., see the poll_process method for
callr::r_session) as follows:
| Session State | Execution State | Not Started | Is Running | Is Completed |
idle |
timeout |
TRUE |
FALSE |
FALSE |
busy |
timeout |
FALSE |
TRUE |
FALSE |
busy |
ready |
FALSE |
FALSE |
TRUE
|
task_not_startedA logical value indicating whether the task has been started. It is used to determine if the backend is free to execute another operation.
task_is_runningA logical value indicating whether the task is running.
task_is_completedA logical value indicating whether the task has been completed and the output needs to be retrieved.
new()
Create a new TaskState object and determine the state of
a task on a given background session.
TaskState$new(session)
sessionA callr::r_session object.
An object of class TaskState.
clone()
The objects of this class are cloneable with this method.
TaskState$clone(deep = FALSE)
deepWhether to make a deep clone.
SessionState, AsyncBackend and
ProgressTrackingContext.
# Handy function to print the task states all at once. check_state <- function(session) { # Create a task state object and determine the state. task_state <- TaskState$new(session) # Print the state. cat( "Task not started: ", task_state$task_not_started, "\n", "Task is running: ", task_state$task_is_running, "\n", "Task is completed: ", task_state$task_is_completed, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the task has not been started (i.e., the backend is free). check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check the state to see that the task is running. check_state(backend$cluster) } # Sleep for a bit to wait for the task to complete. Sys.sleep(1) # Check that the task is completed (i.e., the output needs to be retrieved). check_state(backend$cluster) # Get the output. output <- backend$get_output(wait = TRUE) # Check that the task has not been started (i.e., the backend is free again). check_state(backend$cluster) # Stop the backend. backend$stop()# Handy function to print the task states all at once. check_state <- function(session) { # Create a task state object and determine the state. task_state <- TaskState$new(session) # Print the state. cat( "Task not started: ", task_state$task_not_started, "\n", "Task is running: ", task_state$task_is_running, "\n", "Task is completed: ", task_state$task_is_completed, "\n", sep = "" ) } # Create a specification object. specification <- Specification$new() # Set the number of cores. specification$set_cores(cores = 2) # Set the cluster type. specification$set_type(type = "psock") # Create an asynchronous backend object. backend <- AsyncBackend$new() # Start the cluster on the backend. backend$start(specification) # Check that the task has not been started (i.e., the backend is free). check_state(backend$cluster) { # Run a task in parallel (i.e., approx. 0.25 seconds). backend$sapply( x = 1:10, fun = function(x) { # Sleep a bit. Sys.sleep(0.05) # Compute something. output <- x + 1 # Return the result. return(output) } ) # And immediately check the state to see that the task is running. check_state(backend$cluster) } # Sleep for a bit to wait for the task to complete. Sys.sleep(1) # Check that the task is completed (i.e., the output needs to be retrieved). check_state(backend$cluster) # Get the output. output <- backend$get_output(wait = TRUE) # Check that the task has not been started (i.e., the backend is free again). check_state(backend$cluster) # Stop the backend. backend$stop()
This class is an opinionated interface around the developer API of the
parabar package. See the Details section for more
information on how this class works.
This class acts as a wrapper around the R6::R6 developer API of the
parabar package. In a nutshell, it provides an opinionated
interface by wrapping the developer API in simple functional calls. More
specifically, for executing a task in parallel, this class performs the
following steps:
Validates the backend provided.
Instantiates an appropriate parabar context based on the
backend. If the backend supports progress tracking (i.e., the backend is an
instance of AsyncBackend), a progress tracking context (i.e.,
ProgressTrackingContext) is instantiated and used. Otherwise,
a regular context (i.e., Context) is instantiated. A regular
context is also used if the progress tracking is disabled via the
Options instance.
Registers the backend with the context.
Instantiates and configures the progress bar based on the
Options instance in the session base::.Options list.
Executes the task in parallel, and displays a progress bar if appropriate.
Fetches the results from the backend and returns them.
sapply()
Execute a task in parallel akin to parallel::parSapply().
UserApiConsumer$sapply(backend, x, fun, ...)
backendAn object of class Backend as returned by
the start_backend() function. It can also be NULL to run
the task sequentially via base::sapply().
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
A vector of the same length as x containing the results of the
fun. The output format resembles that of base::sapply().
lapply()
Execute a task in parallel akin to parallel::parLapply().
UserApiConsumer$lapply(backend, x, fun, ...)
backendAn object of class Backend as returned by
the start_backend() function. It can also be NULL to run
the task sequentially via base::lapply().
xAn atomic vector or list to pass to the fun function.
funA function to apply to each element of x.
...Additional arguments to pass to the fun function.
A list of the same length as x containing the results of the fun.
The output format resembles that of base::lapply().
apply()
Execute a task in parallel akin to parallel::parApply().
UserApiConsumer$apply(backend, x, margin, fun, ...)
backendAn object of class Backend as returned by
the start_backend() function. It can also be NULL to run
the task sequentially via base::apply().
xAn array to pass to the fun function.
marginA numeric vector indicating the dimensions of x the
fun function should be applied over. For example, for a matrix,
margin = 1 indicates applying fun rows-wise, margin = 2
indicates applying fun columns-wise, and margin = c(1, 2)
indicates applying fun element-wise. Named dimensions are also
possible depending on x. See parallel::parApply() and
base::apply() for more details.
funA function to apply to x according to the margin.
...Additional arguments to pass to the fun function.
The dimensions of the output vary according to the margin argument.
Consult the documentation of base::apply() for a detailed
explanation on how the output is structured.
clone()
The objects of this class are cloneable with this method.
UserApiConsumer$clone(deep = FALSE)
deepWhether to make a deep clone.
start_backend(), stop_backend(),
configure_bar(), par_sapply(), and
par_lapply().
# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Create an user API consumer. consumer <- UserApiConsumer$new() # Execute the task using the `sapply` parallel operation. output_sapply <- consumer$sapply(backend = backend, x = 1:200, fun = task) # Print the head of the `sapply` operation output. head(output_sapply) # Execute the task using the `sapply` parallel operation. output_lapply <- consumer$lapply(backend = backend, x = 1:200, fun = task) # Print the head of the `lapply` operation output. head(output_lapply) # Stop the backend. stop_backend(backend)# Define a simple task. task <- function(x) { # Perform computations. Sys.sleep(0.01) # Return the result. return(x + 1) } # Start an asynchronous backend. backend <- start_backend(cores = 2, cluster_type = "psock", backend_type = "async") # Change the progress bar options. configure_bar(type = "modern", format = "[:bar] :percent") # Create an user API consumer. consumer <- UserApiConsumer$new() # Execute the task using the `sapply` parallel operation. output_sapply <- consumer$sapply(backend = backend, x = 1:200, fun = task) # Print the head of the `sapply` operation output. head(output_sapply) # Execute the task using the `sapply` parallel operation. output_lapply <- consumer$lapply(backend = backend, x = 1:200, fun = task) # Print the head of the `lapply` operation output. head(output_lapply) # Stop the backend. stop_backend(backend)
This class contains static methods for throwing warnings with informative messages.
Warning$requested_cluster_cores_too_low()Warning for not requesting enough cluster cores.
Warning$requested_cluster_cores_too_high()Warning for requesting too many cluster cores.
Warning$requested_cluster_type_not_supported()Warning for requesting an unsupported cluster type.
Warning$progress_not_supported_for_backend()Warning for using a backend incompatible with progress tracking.
Warning$error_in_backend_finalizer()Warning for errors in the backend finalizer during garbage collection.