1
use std::{
2
    collections::{HashMap, HashSet, VecDeque},
3
    fmt::Debug,
4
    future::Future,
5
    marker::PhantomData,
6
    ops::Deref,
7
    pin::Pin,
8
    task::{Context, Poll, Waker},
9
};
10

            
11
use crate::{BudgetContext, BudgetContextData, BudgetResult, Budgetable, Container};
12

            
13
struct Runtime<Budget, Tasks, SharedTasks, Backing>
14
where
15
    SharedTasks: Container<PendingTasks>,
16
    Tasks: Container<RunningTasks>,
17
    Backing: Container<BudgetContextData<Budget>>,
18
{
19
    context: BudgetContext<Backing, Budget>,
20
    shared_tasks: SharedTasks,
21
    tasks: Tasks,
22
}
23

            
24
impl<Budget, Tasks, SharedTasks, Backing> Clone for Runtime<Budget, Tasks, SharedTasks, Backing>
25
where
26
    Budget: Clone,
27
    Tasks: Container<RunningTasks>,
28
    SharedTasks: Container<PendingTasks>,
29
    Backing: Container<BudgetContextData<Budget>>,
30
{
31
217
    fn clone(&self) -> Self {
32
217
        Self {
33
217
            context: self.context.clone(),
34
217
            tasks: self.tasks.cloned(),
35
217
            shared_tasks: self.shared_tasks.cloned(),
36
217
        }
37
217
    }
38
}
39

            
40
impl<Budget, Tasks, SharedTasks, Backing> Deref for Runtime<Budget, Tasks, SharedTasks, Backing>
41
where
42
    Tasks: Container<RunningTasks>,
43
    SharedTasks: Container<PendingTasks>,
44
    Backing: Container<BudgetContextData<Budget>>,
45
{
46
    type Target = BudgetContext<Backing, Budget>;
47

            
48
11222
    fn deref(&self) -> &Self::Target {
49
11222
        &self.context
50
11222
    }
51
}
52

            
53
impl<Budget, Tasks, SharedTasks, Backing> Debug for Runtime<Budget, Tasks, SharedTasks, Backing>
54
where
55
    Budget: Budgetable,
56
    Tasks: Container<RunningTasks>,
57
    SharedTasks: Container<PendingTasks>,
58
    Backing: Container<BudgetContextData<Budget>> + Debug,
59
{
60
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61
        f.debug_struct("Runtime")
62
            .field("context", &self.context)
63
            .field("tasks", &())
64
            .finish()
65
    }
66
}
67

            
68
impl<Budget, Tasks, SharedTasks, Backing> Runtime<Budget, Tasks, SharedTasks, Backing>
69
where
70
    Budget: Budgetable,
71
    Tasks: Container<RunningTasks>,
72
    SharedTasks: Container<PendingTasks>,
73
    Backing: Container<BudgetContextData<Budget>>,
74
{
75
202
    pub fn spawn<Status, F: Future + 'static>(&self, future: F) -> TaskHandle<Status, F::Output>
76
202
    where
77
202
        Status: Container<SpawnedTaskStatus<F::Output>>,
78
202
    {
79
202
        self.shared_tasks.map_locked(|tasks| {
80
202
            let status = Status::new(SpawnedTaskStatus::default());
81
202
            let task_id = tasks.next_task_id;
82
202
            tasks.next_task_id = tasks.next_task_id.checked_add(1).expect("u64 wrapped");
83
202

            
84
202
            // TODO two allocations isn't ideal. Can we do pin projection through
85
202
            // dynamic dispatch? I think it's "safe" because it seems to fit the
86
202
            // definition of structural pinning?
87
202
            tasks.new.push_back(Box::new(SpawnedTask {
88
202
                id: task_id,
89
202
                future: Some(Box::pin(future)),
90
202
                status: status.cloned(),
91
202
                waker: budget_waker::for_current_thread(self.shared_tasks.cloned(), Some(task_id)),
92
202
            }));
93
202

            
94
202
            TaskHandle {
95
202
                status,
96
202
                _output: PhantomData,
97
202
            }
98
202
        })
99
202
    }
100
}
101

            
102
16
#[derive(Default)]
103
pub(crate) struct PendingTasks {
104
    next_task_id: u64,
105
    new: VecDeque<Box<dyn AnySpawnedTask>>,
106
    tasks_to_wake: HashSet<u64>,
107
}
108

            
109
16
#[derive(Default)]
110
pub(crate) struct RunningTasks {
111
    woke: VecDeque<Box<dyn AnySpawnedTask>>,
112
    pending: HashMap<u64, Box<dyn AnySpawnedTask>>,
113
}
114

            
115
/// Executes `future` with the provided budget. The future will run until it
116
/// completes or until it has invoked [`spend()`](crate::spend) enough to
117
/// exhaust the budget provided. If the future never called
118
/// [`spend()`](crate::spend), this function will block the current thread until
119
/// the future completes.
120
///
121
/// This method should not be used within async methods. For running a future
122
/// inside of an async context, use
123
/// [`asynchronous::run_with_budget()`](crate::asynchronous::run_with_budget).
124
///
125
/// # Panics
126
///
127
/// Panics when called within from within `future` or any code invoked by
128
/// `future`.
129
16
fn run_with_budget<Budget, Tasks, SharedTasks, Backing, F>(
130
16
    future: impl FnOnce(Runtime<Budget, Tasks, SharedTasks, Backing>) -> F,
131
16
    initial_budget: Budget,
132
16
) -> Progress<Budget, Tasks, SharedTasks, Backing, F>
133
16
where
134
16
    Budget: Budgetable,
135
16
    Tasks: Container<RunningTasks>,
136
16
    SharedTasks: Container<PendingTasks>,
137
16
    F: Future,
138
16
    Backing: Container<BudgetContextData<Budget>>,
139
16
{
140
16
    let runtime = Runtime {
141
16
        context: BudgetContext {
142
16
            data: Backing::new(crate::BudgetContextData {
143
16
                budget: initial_budget,
144
16
                paused_future: None,
145
16
            }),
146
16
            _budget: PhantomData,
147
16
        },
148
16
        tasks: Tasks::new(RunningTasks::default()),
149
16
        shared_tasks: SharedTasks::new(PendingTasks::default()),
150
16
    };
151
16

            
152
16
    let waker = budget_waker::for_current_thread(runtime.shared_tasks.cloned(), None);
153
16
    execute_future(Box::pin(future(runtime.clone())), waker, runtime, false)
154
16
}
155

            
156
4591
fn execute_future<Budget, Tasks, SharedTasks, Backing, F>(
157
4591
    mut future: Pin<Box<F>>,
158
4591
    waker: Waker,
159
4591
    runtime: Runtime<Budget, Tasks, SharedTasks, Backing>,
160
4591
    mut wait_for_budget: bool,
161
4591
) -> Progress<Budget, Tasks, SharedTasks, Backing, F>
162
4591
where
163
4591
    Budget: Budgetable,
164
4591
    Tasks: Container<RunningTasks>,
165
4591
    SharedTasks: Container<PendingTasks>,
166
4591
    F: Future,
167
4591
    Backing: Container<BudgetContextData<Budget>>,
168
4591
{
169
4591
    let mut pinned_future = Pin::new(&mut future);
170
4591
    let mut cx = Context::from_waker(&waker);
171
    loop {
172
17821
        let poll_result = pinned_future.poll(&mut cx);
173
17821
        let (ran_out_of_budget, budget_remaining_after_completion) =
174
17821
            runtime.context.data.map_locked(|data| {
175
17815
                data.budget.remove_waker(cx.waker());
176
17815
                let ran_out_of_budget = data.paused_future.take().is_some();
177
17820
                let budget_remaining_after_completion = if poll_result.is_ready() {
178
16
                    Some(data.budget.clone())
179
                } else {
180
17799
                    if !ran_out_of_budget || wait_for_budget {
181
13229
                        data.budget.add_waker(cx.waker());
182
13236
                    }
183
17804
                    None
184
                };
185
17820
                (ran_out_of_budget, budget_remaining_after_completion)
186
17821
            });
187

            
188
17821
        if let Poll::Ready(output) = poll_result {
189
16
            return Progress::Complete(BudgetResult {
190
16
                output,
191
16
                balance: budget_remaining_after_completion
192
16
                    .expect("should always be present when Ready returns"),
193
16
            });
194
17805
        }
195
17805

            
196
17805
        if ran_out_of_budget && !wait_for_budget {
197
4575
            return Progress::NoBudget(IncompleteFuture {
198
4575
                future,
199
4575
                waker,
200
4575
                runtime,
201
4575
            });
202
13230
        }
203
13230

            
204
13230
        pinned_future = Pin::new(&mut future);
205
13230

            
206
13230
        // If we have our own tasks to run, execute them. Otherwise, park
207
13230
        // the thread.
208
13230
        runtime.tasks.map_locked(|tasks| {
209
13230
            runtime.shared_tasks.map_locked(|shared| {
210
13230
                tasks.woke.extend(shared.new.drain(..));
211
354874
                for task_id in shared.tasks_to_wake.drain() {
212
354874
                    if let Some(task) = tasks.pending.remove(&task_id) {
213
354787
                        tasks.woke.push_back(task);
214
354787
                    }
215
                }
216
13230
            });
217
13230

            
218
13230
            if tasks.woke.is_empty() {
219
4564
                wait_for_budget = false;
220
4564
                runtime
221
4564
                    .context
222
4564
                    .data
223
4564
                    .map_locked(|data| data.budget.park_for_budget());
224
4564
            } else {
225
                // Call poll on all of the futures that are awake.
226
354987
                for mut task in tasks.woke.drain(..) {
227
354987
                    let result = task.poll();
228
354987

            
229
354987
                    match result {
230
202
                        Poll::Ready(_) => {
231
202
                            // The future is complete.
232
202
                        }
233
354785
                        Poll::Pending => {
234
354785
                            // Move the future to the pending queue.
235
354785
                            tasks.pending.insert(task.id(), task);
236
354785
                        }
237
                    }
238
                }
239
            }
240
13230
        });
241
    }
242
4591
}
243

            
244
/// A future that was budgeted with [`run_with_budget()`] that has not yet
245
/// completed.
246
struct IncompleteFuture<Budget, Tasks, SharedTasks, Backing, F>
247
where
248
    F: Future,
249
    Tasks: Container<RunningTasks>,
250
    SharedTasks: Container<PendingTasks>,
251
    Backing: Container<BudgetContextData<Budget>>,
252
{
253
    future: Pin<Box<F>>,
254
    waker: Waker,
255
    runtime: Runtime<Budget, Tasks, SharedTasks, Backing>,
256
}
257

            
258
impl<Budget, Tasks, SharedTasks, Backing, F>
259
    IncompleteFuture<Budget, Tasks, SharedTasks, Backing, F>
260
where
261
    F: Future,
262
    Budget: Budgetable,
263
    Tasks: Container<RunningTasks>,
264
    SharedTasks: Container<PendingTasks>,
265
    Backing: Container<BudgetContextData<Budget>>,
266
{
267
    /// Adds `additional_budget` to the remaining balance and continues
268
    /// executing the future.
269
25
    pub fn continue_with_additional_budget(
270
25
        self,
271
25
        additional_budget: usize,
272
25
    ) -> Progress<Budget, Tasks, SharedTasks, Backing, F> {
273
25
        let Self {
274
25
            future,
275
25
            waker,
276
25
            runtime,
277
25
            ..
278
25
        } = self;
279
25
        runtime
280
25
            .context
281
25
            .data
282
25
            .map_locked(|data| data.budget.replenish(additional_budget));
283
25

            
284
25
        execute_future(future, waker, runtime, false)
285
25
    }
286
    /// Waits for additional budget to be allocated through
287
    /// [`ReplenishableBudget::replenish()`].
288
4550
    pub fn wait_for_budget(self) -> Progress<Budget, Tasks, SharedTasks, Backing, F> {
289
4550
        let Self {
290
4550
            future,
291
4550
            waker,
292
4550
            runtime,
293
4550
            ..
294
4550
        } = self;
295
4550

            
296
4550
        execute_future(future, waker, runtime, true)
297
4550
    }
298
}
299

            
300
/// The progress of a future's execution.
301
#[must_use]
302
enum Progress<Budget, Tasks, SharedTasks, Backing, F: Future>
303
where
304
    Tasks: Container<RunningTasks>,
305
    SharedTasks: Container<PendingTasks>,
306
    Backing: Container<BudgetContextData<Budget>>,
307
{
308
    /// The future was interrupted because it requested to spend more budget
309
    /// than was available.
310
    NoBudget(IncompleteFuture<Budget, Tasks, SharedTasks, Backing, F>),
311
    /// The future has completed.
312
    Complete(BudgetResult<F::Output, Budget>),
313
}
314

            
315
mod budget_waker {
316
    use std::{
317
        any::TypeId,
318
        sync::Arc,
319
        task::{RawWaker, RawWakerVTable, Waker},
320
        thread::Thread,
321
    };
322

            
323
    use crate::{blocking::PendingTasks, Container, NotSyncContainer, SyncContainer};
324

            
325
    struct WakerData<Tasks> {
326
        thread: Thread,
327
        task_id: Option<u64>,
328
        tasks: Tasks,
329
    }
330

            
331
    impl<Tasks> WakerData<Tasks>
332
    where
333
        Tasks: Container<PendingTasks>,
334
    {
335
        pub fn wake(&self) {
336
975334
            if let Some(task_id) = self.task_id {
337
937156
                self.tasks
338
937156
                    .map_locked(|tasks| tasks.tasks_to_wake.insert(task_id));
339
937156

            
340
937156
                self.thread.unpark();
341
937156
            } else {
342
38178
                // The main task is unblocked, we always unpark.
343
38178
                self.thread.unpark();
344
38178
            }
345
975334
        }
346
    }
347

            
348
218
    pub(crate) fn for_current_thread<Tasks>(tasks: Tasks, task_id: Option<u64>) -> Waker
349
218
    where
350
218
        Tasks: Container<PendingTasks>,
351
218
    {
352
218
        let arc_thread = Arc::new(WakerData {
353
218
            tasks,
354
218
            thread: std::thread::current(),
355
218
            task_id,
356
218
        });
357
218
        let arc_thread = Arc::into_raw(arc_thread);
358
218

            
359
218
        unsafe { Waker::from_raw(RawWaker::new(arc_thread.cast::<()>(), vtable::<Tasks>())) }
360
218
    }
361

            
362
883120
    unsafe fn clone<Tasks>(arc_thread: *const ()) -> RawWaker
363
883120
    where
364
883120
        Tasks: Container<PendingTasks>,
365
883120
    {
366
883120
        let arc_thread: Arc<WakerData<Tasks>> =
367
883120
            Arc::from_raw(arc_thread.cast::<WakerData<Tasks>>());
368
883120
        let cloned = arc_thread.clone();
369
883120
        let cloned = Arc::into_raw(cloned);
370
883120

            
371
883120
        let _ = Arc::into_raw(arc_thread);
372
883120

            
373
883120
        RawWaker::new(cloned.cast::<()>(), vtable::<Tasks>())
374
883120
    }
375

            
376
883740
    fn vtable<Tasks>() -> &'static RawWakerVTable
377
883740
    where
378
883740
        Tasks: Container<PendingTasks>,
379
883740
    {
380
883740
        let task_type = TypeId::of::<Tasks>();
381
883740
        let sync_type = TypeId::of::<SyncContainer<PendingTasks>>();
382
883740
        let not_sync_type = TypeId::of::<NotSyncContainer<PendingTasks>>();
383
883740

            
384
883740
        if task_type == sync_type {
385
883771
            &SYNC_VTABLE
386
        } else if task_type == not_sync_type {
387
59
            &NOT_SYNC_VTABLE
388
        } else {
389
            unreachable!("unknown type Tasks")
390
        }
391
883830
    }
392

            
393
86
    unsafe fn wake_consuming<Tasks>(arc_thread: *const ())
394
86
    where
395
86
        Tasks: Container<PendingTasks>,
396
86
    {
397
86
        let arc_thread: Arc<WakerData<Tasks>> = Arc::from_raw(arc_thread as *mut WakerData<Tasks>);
398
86
        arc_thread.wake();
399
86
    }
400

            
401
975248
    unsafe fn wake_by_ref<Tasks>(arc_thread: *const ())
402
975248
    where
403
975248
        Tasks: Container<PendingTasks>,
404
975248
    {
405
975248
        let arc_thread: Arc<WakerData<Tasks>> = Arc::from_raw(arc_thread as *mut WakerData<Tasks>);
406
975248
        arc_thread.wake();
407
975248

            
408
975248
        let _ = Arc::into_raw(arc_thread);
409
975248
    }
410

            
411
883660
    unsafe fn drop_waker<Tasks>(arc_thread: *const ()) {
412
883660
        let arc_thread: Arc<WakerData<Tasks>> = Arc::from_raw(arc_thread as *mut WakerData<Tasks>);
413
883660
        drop(arc_thread);
414
883660
    }
415

            
416
    const SYNC_VTABLE: RawWakerVTable = RawWakerVTable::new(
417
        clone::<SyncContainer<PendingTasks>>,
418
        wake_consuming::<SyncContainer<PendingTasks>>,
419
        wake_by_ref::<SyncContainer<PendingTasks>>,
420
        drop_waker::<SyncContainer<PendingTasks>>,
421
    );
422

            
423
    const NOT_SYNC_VTABLE: RawWakerVTable = RawWakerVTable::new(
424
        clone::<NotSyncContainer<PendingTasks>>,
425
        wake_consuming::<NotSyncContainer<PendingTasks>>,
426
        wake_by_ref::<NotSyncContainer<PendingTasks>>,
427
        drop_waker::<NotSyncContainer<PendingTasks>>,
428
    );
429
}
430

            
431
struct SpawnedTask<Status, F>
432
where
433
    Status: Container<SpawnedTaskStatus<F::Output>>,
434
    F: Future,
435
{
436
    id: u64,
437
    future: Option<Pin<Box<F>>>,
438
    status: Status,
439
    waker: Waker,
440
}
441

            
442
struct SpawnedTaskStatus<Output> {
443
    output: TaskOutput<Output>,
444
    waker: Option<Waker>,
445
}
446

            
447
impl<Output> Default for SpawnedTaskStatus<Output> {
448
202
    fn default() -> Self {
449
202
        Self {
450
202
            output: TaskOutput::NotCompleted,
451
202
            waker: None,
452
202
        }
453
202
    }
454
}
455

            
456
trait AnySpawnedTask {
457
    fn id(&self) -> u64;
458
    fn poll(&mut self) -> Poll<()>;
459
}
460

            
461
impl<Status, F> AnySpawnedTask for SpawnedTask<Status, F>
462
where
463
    Status: Container<SpawnedTaskStatus<F::Output>>,
464
    F: Future,
465
{
466
354782
    fn id(&self) -> u64 {
467
354782
        self.id
468
354782
    }
469

            
470
354966
    fn poll(&mut self) -> Poll<()> {
471
354966
        match self.future.as_mut() {
472
354976
            Some(future) => {
473
354976
                let pinned_future = Pin::new(future);
474
354976
                match pinned_future.poll(&mut Context::from_waker(&self.waker)) {
475
202
                    Poll::Ready(output) => {
476
202
                        self.status.map_locked(|status| {
477
202
                            status.output = TaskOutput::Completed(Some(output));
478
202
                            if let Some(waker) = status.waker.take() {
479
30
                                waker.wake();
480
172
                            }
481
202
                        });
482
202
                        Poll::Ready(())
483
                    }
484
354787
                    Poll::Pending => Poll::Pending,
485
                }
486
            }
487
            None => Poll::Ready(()),
488
        }
489
354979
    }
490
}
491

            
492
struct TaskHandle<Status, Output> {
493
    status: Status,
494
    _output: PhantomData<Output>,
495
}
496

            
497
impl<Status, Output> Future for TaskHandle<Status, Output>
498
where
499
    Status: Container<SpawnedTaskStatus<Output>>,
500
{
501
    type Output = Output;
502

            
503
12928
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
504
12928
        self.status.map_locked(|status| {
505
12927
            match &mut status.output {
506
102
                TaskOutput::Completed(output) => {
507
102
                    Poll::Ready(output.take().expect("already polled completion"))
508
                }
509
                TaskOutput::NotCompleted => {
510
                    // todo check if needed to overwrite
511
12825
                    status.waker = Some(cx.waker().clone());
512
12825
                    Poll::Pending
513
                }
514
            }
515
12928
        })
516
12928
    }
517
}
518

            
519
enum TaskOutput<Output> {
520
    NotCompleted,
521
    Completed(Option<Output>),
522
}
523

            
524
macro_rules! define_public_interface {
525
    ($modname:ident, $backing:ident, $moduledocs:literal) => {
526
        #[doc = $moduledocs]
527
        pub mod $modname {
528
            use std::{
529
                future::Future,
530
                pin::Pin,
531
                task::{Context, Poll},
532
            };
533

            
534
            use crate::{
535
                blocking::{PendingTasks, RunningTasks},
536
                spend::$modname::SpendBudget,
537
                BudgetContextData, BudgetResult, Budgetable,
538
            };
539

            
540
            /// A lightweight asynchronous runtime that runs a future while
541
            /// keeping track of a budget.
542
            ///
543
            /// Regardless of whether the threadsafe or single-threaded versions
544
            /// of the Runtime are used, this Runtime will always be a
545
            /// single-threaded runtime. The benefit of using the threadsafe
546
            /// runtime is to be able to move paused runtimes between different
547
            /// threads or allow the running futures to be woken by external
548
            /// threads.
549
201
            #[derive(Debug, Clone)]
550
            pub struct Runtime<Budget>(
551
                super::Runtime<
552
                    Budget,
553
                    crate::$backing<RunningTasks>,
554
                    crate::$backing<PendingTasks>,
555
                    crate::$backing<BudgetContextData<Budget>>,
556
                >,
557
            )
558
            where
559
                Budget: Budgetable;
560

            
561
            impl<Budget> Runtime<Budget>
562
            where
563
                Budget: Budgetable,
564
            {
565
                /// Executes `future` with the provided budget. The future will run until it
566
                /// completes or until it has invoked [`spend()`](crate::spend) enough to
567
                /// exhaust the budget provided. If the future never called
568
                /// [`spend()`](crate::spend), this function will not return until the future
569
                /// has completed.
570
                ///
571
                /// This function returns a [`Future`] which must be awaited to execute the
572
                /// function.
573
                ///
574
                /// This implementation is runtime agnostic.
575
                ///
576
                /// # Panics
577
                ///
578
                /// Panics when called within from within `future` or any code invoked by
579
                /// `future`.
580
16
                pub fn run_with_budget<F>(
581
16
                    future: impl FnOnce(Self) -> F,
582
16
                    initial_budget: Budget,
583
16
                ) -> Progress<Budget, F>
584
16
                where
585
16
                    Budget: Budgetable,
586
16
                    F: Future,
587
16
                {
588
16
                    Progress::from(super::run_with_budget(
589
16
                        |rt| future(Self(rt)),
590
16
                        initial_budget,
591
16
                    ))
592
16
                }
593

            
594
                /// Schedules `future` to run.
595
                ///
596
                /// This runtime will only execute one future at any given
597
                /// moment in time. Context switches between futures will only
598
                /// happen when the running future yields control at an `await`.
599
                ///
600
                /// Returns a handle that can be `.await`ed to wait for the task
601
                /// to complete.
602
202
                pub fn spawn<F: Future + 'static>(&self, future: F) -> TaskHandle<F::Output> {
603
202
                    TaskHandle(self.0.spawn(future))
604
202
                }
605

            
606
                /// Retrieves the current budget.
607
                ///
608
                /// This function should only be called by code that is guaranteed to be running
609
                /// by this executor. When called outside of code run by this executor, this function will.
610
                #[must_use]
611
                pub fn budget(&self) -> usize {
612
                    self.0.budget()
613
                }
614

            
615
                /// Spends `amount` from the curent budget.
616
                ///
617
                /// This function returns a future which must be awaited.
618
11222
                pub fn spend(&self, amount: usize) -> SpendBudget<'_, Budget> {
619
11222
                    // How do we re-export SpendBudget since it's sahrd with async too. crate-level module?
620
11222
                    SpendBudget::from(self.0.spend(amount))
621
11222
                }
622
            }
623

            
624
            /// The progress of a future's execution.
625
            #[must_use]
626
            pub enum Progress<Budget, F>
627
            where
628
                Budget: Budgetable,
629
                F: Future,
630
            {
631
                /// The future was interrupted because it requested to spend more budget
632
                /// than was available.
633
                NoBudget(IncompleteFuture<Budget, F>),
634
                /// The future has completed.
635
                Complete(BudgetResult<F::Output, Budget>),
636
            }
637

            
638
            impl<Budget, F> Progress<Budget, F>
639
            where
640
                Budget: Budgetable,
641
                F: Future,
642
            {
643
                /// Continues executing the contained future until it is
644
                /// completed.
645
                ///
646
                /// This function will never return if the future enters an
647
                /// infinite loop or deadlocks, regardless of whether the budget
648
                /// is exhausted or not.
649
12
                pub fn wait_until_complete(self) -> BudgetResult<F::Output, Budget> {
650
12
                    let mut progress = self;
651
4462
                    loop {
652
4462
                        match progress {
653
4450
                            Progress::NoBudget(incomplete) => {
654
4450
                                progress = incomplete.wait_for_budget();
655
4450
                            }
656
12
                            Progress::Complete(result) => break result,
657
12
                        }
658
12
                    }
659
12
                }
660
            }
661

            
662
            impl<Budget, F>
663
                From<
664
                    super::Progress<
665
                        Budget,
666
                        crate::$backing<RunningTasks>,
667
                        crate::$backing<PendingTasks>,
668
                        crate::$backing<BudgetContextData<Budget>>,
669
                        F,
670
                    >,
671
                > for Progress<Budget, F>
672
            where
673
                Budget: Budgetable,
674
                F: Future,
675
            {
676
4591
                fn from(
677
4591
                    progress: super::Progress<
678
4591
                        Budget,
679
4591
                        crate::$backing<RunningTasks>,
680
4591
                        crate::$backing<PendingTasks>,
681
4591
                        crate::$backing<BudgetContextData<Budget>>,
682
4591
                        F,
683
4591
                    >,
684
4591
                ) -> Self {
685
4591
                    match progress {
686
4575
                        super::Progress::NoBudget(incomplete) => {
687
4575
                            Self::NoBudget(IncompleteFuture(incomplete))
688
                        }
689
16
                        super::Progress::Complete(result) => Self::Complete(result),
690
                    }
691
4591
                }
692
            }
693

            
694
            /// A future that was budgeted with [`Runtime::run_with_budget()`] that has not yet
695
            /// completed.
696
            pub struct IncompleteFuture<Budget, F>(
697
                pub(super)  super::IncompleteFuture<
698
                    Budget,
699
                    crate::$backing<RunningTasks>,
700
                    crate::$backing<PendingTasks>,
701
                    crate::$backing<BudgetContextData<Budget>>,
702
                    F,
703
                >,
704
            )
705
            where
706
                Budget: Budgetable,
707
                F: Future;
708

            
709
            impl<Budget, F> IncompleteFuture<Budget, F>
710
            where
711
                Budget: Budgetable,
712
                F: Future,
713
            {
714
                /// Adds `additional_budget` to the remaining balance and continues
715
                /// executing the future.
716
25
                pub fn continue_with_additional_budget(
717
25
                    self,
718
25
                    additional_budget: usize,
719
25
                ) -> Progress<Budget, F> {
720
25
                    Progress::from(self.0.continue_with_additional_budget(additional_budget))
721
25
                }
722

            
723
                /// Waits for additional budget to be allocated through
724
                /// [`ReplenishableBudget::replenish()`](crate::ReplenishableBudget::replenish).
725
4550
                pub fn wait_for_budget(self) -> Progress<Budget, F> {
726
4550
                    Progress::from(self.0.wait_for_budget())
727
4550
                }
728
            }
729

            
730
            /// A handle to a task scheduled with a [`Runtime`]. Invoking
731
            /// `.await` on this type will return the task's output when the
732
            /// original task completes.
733
            ///
734
            /// The task will continue to execute even if the handle is dropped.
735
            pub struct TaskHandle<Output>(
736
                super::TaskHandle<crate::$backing<super::SpawnedTaskStatus<Output>>, Output>,
737
            );
738

            
739
            impl<Output> Future for TaskHandle<Output>
740
            where
741
                Output: Unpin + 'static,
742
            {
743
                type Output = Output;
744

            
745
12929
                fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
746
12929
                    let inner = Pin::new(&mut self.0);
747
12929
                    inner.poll(cx)
748
12929
                }
749
            }
750
        }
751
    };
752
}
753

            
754
13
define_public_interface!(
755
13
    threadsafe,
756
13
    SyncContainer,
757
13
    "A threadsafe (`Send + Sync`), blocking budgeting implementation that is runtime agnostic.\n\nThe only difference between this module and the [`singlethreaded`] module is that this one uses [`std::sync::Arc`] and [`std::sync::Mutex`] instead of [`std::rc::Rc`] and [`std::cell::RefCell`]."
758
13
);
759

            
760
3
define_public_interface!(
761
3
    singlethreaded,
762
3
    NotSyncContainer,
763
3
    "A single-threaded (`!Send + !Sync`), blocking budgeting implementation that is runtime agnostic.\n\nThe only difference between this module and the [`threadsafe`] module is that this one uses [`std::rc::Rc`] and [`std::cell::RefCell`] instead of [`std::sync::Arc`] and [`std::sync::Mutex`]."
764
3
);