1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
use crate::task::AtomicWaker;
use alloc::sync::Arc;
use core::fmt;
use core::pin::Pin;
use core::sync::atomic::{AtomicBool, Ordering};
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;
pin_project! {
/// A future/stream which can be remotely short-circuited using an `AbortHandle`.
#[derive(Debug, Clone)]
#[must_use = "futures/streams do nothing unless you poll them"]
pub struct Abortable<T> {
#[pin]
task: T,
inner: Arc<AbortInner>,
}
}
impl<T> Abortable<T> {
/// Creates a new `Abortable` future/stream using an existing `AbortRegistration`.
/// `AbortRegistration`s can be acquired through `AbortHandle::new`.
///
/// When `abort` is called on the handle tied to `reg` or if `abort` has
/// already been called, the future/stream will complete immediately without making
/// any further progress.
///
/// # Examples:
///
/// Usage with futures:
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future::{Abortable, AbortHandle, Aborted};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let future = Abortable::new(async { 2 }, abort_registration);
/// abort_handle.abort();
/// assert_eq!(future.await, Err(Aborted));
/// # });
/// ```
///
/// Usage with streams:
///
/// ```
/// # futures::executor::block_on(async {
/// # use futures::future::{Abortable, AbortHandle};
/// # use futures::stream::{self, StreamExt};
///
/// let (abort_handle, abort_registration) = AbortHandle::new_pair();
/// let mut stream = Abortable::new(stream::iter(vec![1, 2, 3]), abort_registration);
/// abort_handle.abort();
/// assert_eq!(stream.next().await, None);
/// # });
/// ```
pub fn new(task: T, reg: AbortRegistration) -> Self {
Self { task, inner: reg.inner }
}
/// Checks whether the task has been aborted. Note that all this
/// method indicates is whether [`AbortHandle::abort`] was *called*.
/// This means that it will return `true` even if:
/// * `abort` was called after the task had completed.
/// * `abort` was called while the task was being polled - the task may still be running and
/// will not be stopped until `poll` returns.
pub fn is_aborted(&self) -> bool {
self.inner.aborted.load(Ordering::Relaxed)
}
}
/// A registration handle for an `Abortable` task.
/// Values of this type can be acquired from `AbortHandle::new` and are used
/// in calls to `Abortable::new`.
#[derive(Debug)]
pub struct AbortRegistration {
pub(crate) inner: Arc<AbortInner>,
}
/// A handle to an `Abortable` task.
#[derive(Debug, Clone)]
pub struct AbortHandle {
inner: Arc<AbortInner>,
}
impl AbortHandle {
/// Creates an (`AbortHandle`, `AbortRegistration`) pair which can be used
/// to abort a running future or stream.
///
/// This function is usually paired with a call to [`Abortable::new`].
pub fn new_pair() -> (Self, AbortRegistration) {
let inner =
Arc::new(AbortInner { waker: AtomicWaker::new(), aborted: AtomicBool::new(false) });
(Self { inner: inner.clone() }, AbortRegistration { inner })
}
}
// Inner type storing the waker to awaken and a bool indicating that it
// should be aborted.
#[derive(Debug)]
pub(crate) struct AbortInner {
pub(crate) waker: AtomicWaker,
pub(crate) aborted: AtomicBool,
}
/// Indicator that the `Abortable` task was aborted.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub struct Aborted;
impl fmt::Display for Aborted {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "`Abortable` future has been aborted")
}
}
#[cfg(feature = "std")]
impl std::error::Error for Aborted {}
impl<T> Abortable<T> {
fn try_poll<I>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
poll: impl Fn(Pin<&mut T>, &mut Context<'_>) -> Poll<I>,
) -> Poll<Result<I, Aborted>> {
// Check if the task has been aborted
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}
// attempt to complete the task
if let Poll::Ready(x) = poll(self.as_mut().project().task, cx) {
return Poll::Ready(Ok(x));
}
// Register to receive a wakeup if the task is aborted in the future
self.inner.waker.register(cx.waker());
// Check to see if the task was aborted between the first check and
// registration.
// Checking with `is_aborted` which uses `Relaxed` is sufficient because
// `register` introduces an `AcqRel` barrier.
if self.is_aborted() {
return Poll::Ready(Err(Aborted));
}
Poll::Pending
}
}
impl<Fut> Future for Abortable<Fut>
where
Fut: Future,
{
type Output = Result<Fut::Output, Aborted>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.try_poll(cx, |fut, cx| fut.poll(cx))
}
}
impl<St> Stream for Abortable<St>
where
St: Stream,
{
type Item = St::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.try_poll(cx, |stream, cx| stream.poll_next(cx)).map(Result::ok).map(Option::flatten)
}
}
impl AbortHandle {
/// Abort the `Abortable` stream/future associated with this handle.
///
/// Notifies the Abortable task associated with this handle that it
/// should abort. Note that if the task is currently being polled on
/// another thread, it will not immediately stop running. Instead, it will
/// continue to run until its poll method returns.
pub fn abort(&self) {
self.inner.aborted.store(true, Ordering::Relaxed);
self.inner.waker.wake();
}
}