Crate budget_executor
source · [−]Expand description
An approach to “throttling” async tasks in Rust using manual instrumentation.
This crate implements a manual task throttling approach using a simple usize
to
track available budget. Both leverage Rust’s approach to async tasks to enable
pausing the task when it requests to spend more budget than is remaining.
One use-case/inspiration for this crate is a scripting language built in Rust.
By defining parts of the interpeter using async
functions, untrusted scripts
can be executed with this crate with limited budget to protect against infinite
loops or too-intensive scripts slowing down a server. The interpreter can assign
different costs to various operations.
This crate has two implementations of this approach: one that blocks the current
thread and one that works with any async runtime. Additionally, there are
threadsafe
(Send + Sync
) and singlethreaded
variations:
Async? | Send + Sync ? | Module |
---|---|---|
no | no | blocking::singlethreaded |
no | yes | blocking::threadsafe |
yes | no | asynchronous::singlethreaded |
yes | yes | asynchronous::threadsafe |
The APIs are identically designed between the modules. The notable changes between the modules are:
- For async, functions that execute the future are implemented as futures. This
means, for example,
run_with_budget()
must be.await
ed for anything to happen. - For
Send + Sync
, the backing type is changed fromRc<RefCell<T>>
toArc<Mutex<T>>
. This is transparent to the public API, but the expected performance differences will apply.
Using from non-async code (Blocking)
This example of the blocking, single-threaded implementation is
from examples/simple.rs
in the repository:
use std::time::Duration;
use budget_executor::blocking::singlethreaded::{Progress, Runtime};
fn main() {
// Run a task with no initial budget. The first time the task asks to spend
// any budget, it will be paused.
let mut progress = Runtime::run_with_budget(some_task_to_limit, 0);
// At this point, the task has run until the first call to
// budget_executor::spend. Because we gave an initial_budget of 0, the future
// is now paused. Let's loop until it's finished, feeding it 5 budget at a
// time.
loop {
progress = match progress {
Progress::NoBudget(incomplete_task) => {
// Resume the task, allowing for 5 more budget to be spent.
println!("+5 budget");
incomplete_task.continue_with_additional_budget(5)
}
Progress::Complete(result) => {
// The task has completed. `result.output` contains the output
// of the task itself. We can also inspect the balance of the
// budget:
println!(
"Task completed with balance: {:?}, output: {:?}",
result.balance, result.output
);
break;
}
};
}
}
async fn some_task_to_limit(runtime: Runtime<usize>) -> bool {
do_some_operation(1, &runtime).await;
do_some_operation(5, &runtime).await;
do_some_operation(1, &runtime).await;
do_some_operation(25, &runtime).await;
true
}
async fn do_some_operation(times: u8, runtime: &Runtime<usize>) {
println!("> Asking to spend {times} from the budget");
runtime.spend(usize::from(times)).await;
// Despite being async code, because we know we're running in a
// single-threaded environment, we can still call blocking operations.
std::thread::sleep(Duration::from_millis(u64::from(times) * 100));
}
#[test]
fn runs() {
main()
}
When run, it produces this output:
~/p/budget-executor (main)> cargo run --example simple
Compiling budget-executor v0.1.0 (/home/ecton/projects/budget-executor)
Finished dev [unoptimized + debuginfo] target(s) in 0.31s
Running `target/debug/examples/simple`
> Asking to spend 1 from the budget
+5 budget
> Asking to spend 5 from the budget
+5 budget
> Asking to spend 1 from the budget
> Asking to spend 25 from the budget
+5 budget
+5 budget
+5 budget
+5 budget
+5 budget
Task completed with balance: Remaining(3), output: true
How does this work?
At the start of the example, run_with_budget() is called with
an initial balance of 0. This will cause the future (some_task_to_limit()
) to
execute until it executes spend(amount).await
. When the future
attempts to spend any budget, because the initial balance was 0, the future will
be paused until budget made available. run_with_budget()
returns
Progress::NoBudget
which contains the incomplete task.
The example now loops until progress
contains Progress::Complete
. When
Progress::NoBudget
is returned instead, the task is resumed using
continue_with_additional_budget()
. This resumes
executing the future, which will re-awaken inside of spend().await
. The budget
will be checked again. If there is enough budget, spend().await
will deduct
the spent amount and return. If there isn’t enough budget, the future will pause again and continue_with_additional_budget
returns Progress::NoBudget
.
Upon completion, the remaining balance is returned along with the task’s output
in Progress::Complete
. In this example, the task spends a total of 32. Because
the budget is always allocated in increments of 5, 35 budget was allocated which
left a remaining budget of 3 when the task completed.
Blocking Implementation Warnings
If you invoke .await
on anything other than spend()
, the
executing thread will be parked until that future is completed. This means care
must be taken if you attempt to use the blocking implementation within another
async context: you must ensure that any future awaited by the task will be
completed on a separate thread. If you can’t guarantee this, you should use the
asynchronous implementation.
Using from async code
This example of the asynchronous, single-threaded implementation
is examples/simple-async.rs
in the repository:
use std::time::Duration;
use budget_executor::asynchronous::singlethreaded::{Context, Progress};
#[tokio::main(flavor = "current_thread")]
async fn main() {
// Run a task with no initial budget. The first time the task asks to spend
// any budget, it will be paused.
let mut progress = Context::run_with_budget(some_task_to_limit, 0).await;
// At this point, the task has run until the first call to
// budget_executor::spend. Because we gave an initial_budget of 0, the future
// is now paused. Let's loop until it's finished, feeding it 5 budget at a
// time.
loop {
progress = match progress {
Progress::NoBudget(incomplete_task) => {
// Resume the task, allowing for 5 more budget to be spent.
println!("+5 budget");
incomplete_task.continue_with_additional_budget(5).await
}
Progress::Complete(result) => {
// The task has completed. `result.output` contains the output
// of the task itself. We can also inspect the balance of the
// budget:
println!(
"Task completed with balance: {:?}, output: {:?}",
result.balance, result.output
);
break;
}
};
}
}
async fn some_task_to_limit(context: Context<usize>) -> bool {
do_some_operation(1, &context).await;
do_some_operation(5, &context).await;
do_some_operation(1, &context).await;
do_some_operation(25, &context).await;
true
}
async fn do_some_operation(times: u8, context: &Context<usize>) {
println!("> Asking to spend {times} from the budget");
context.spend(usize::from(times)).await;
tokio::time::sleep(Duration::from_millis(u64::from(times) * 100)).await;
}
#[test]
fn runs() {
main()
}
When run, it produces the same output as displayed in the blocking section.
How does this work?
This example is identical to the blocking example, but instead uses the
asynchronous
module’s APIs:
run_with_budget().await
and
continue_with_additional_budget().await
.
This implementation is runtime agnostic and is actively tested against tokio.
Modules
A budget implementation compatible with any async executor.
A standalone implementation does not require another async executor and blocks the current thread while executing.
Shared implementation of budget spending.
Structs
The result of a completed future.
An atomic budget storage that can be replenished by other threads or tasks than the one driving the budgeted task.
Traits
A type that can be used as a budget.