genawaiter/
core.rs

1use crate::{ops::GeneratorState, waker};
2use std::{
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8pub enum Next<Y, R> {
9    Empty,
10    Yield(Y),
11    Resume(R),
12    Completed,
13}
14
15#[allow(clippy::use_self)]
16impl<Y, R> Next<Y, R> {
17    pub fn without_values(&self) -> Next<(), ()> {
18        match self {
19            Self::Empty => Next::Empty,
20            Self::Yield(_) => Next::Yield(()),
21            Self::Resume(_) => Next::Resume(()),
22            Self::Completed => Next::Completed,
23        }
24    }
25}
26
27pub fn advance<Y, R, F: Future>(
28    future: Pin<&mut F>,
29    airlock: &impl Airlock<Yield = Y, Resume = R>,
30) -> GeneratorState<Y, F::Output> {
31    let waker = waker::create();
32    let mut cx = Context::from_waker(&waker);
33
34    match future.poll(&mut cx) {
35        Poll::Pending => {
36            let value = airlock.replace(Next::Empty);
37            match value {
38                Next::Empty | Next::Completed => unreachable!(),
39                Next::Yield(y) => GeneratorState::Yielded(y),
40                Next::Resume(_) => {
41                    #[cfg(debug_assertions)]
42                    panic!(
43                        "An async generator was resumed via a non-async method. For \
44                         async generators, use `Stream` or `async_resume` instead of \
45                         `Iterator` or `resume`.",
46                    );
47
48                    #[cfg(not(debug_assertions))]
49                    panic!("misused async generator");
50                }
51            }
52        }
53        Poll::Ready(value) => {
54            #[cfg(debug_assertions)]
55            airlock.replace(Next::Completed);
56
57            GeneratorState::Complete(value)
58        }
59    }
60}
61
62pub fn async_advance<'a, Y, R, F: Future>(
63    future: Pin<&'a mut F>,
64    airlock: impl Airlock<Yield = Y, Resume = R> + 'a,
65) -> impl Future<Output = GeneratorState<Y, F::Output>> + 'a {
66    Advance { future, airlock }
67}
68
69struct Advance<'a, F: Future, A: Airlock> {
70    future: Pin<&'a mut F>,
71    airlock: A,
72}
73
74impl<'a, F: Future, A: Airlock> Advance<'a, F, A> {
75    fn future_mut(self: Pin<&mut Self>) -> Pin<&mut F> {
76        // Safety: This is just projecting a pinned reference. Neither `self` nor
77        // `self.future` are moved.
78        unsafe { self.map_unchecked_mut(|s| s.future.as_mut().get_unchecked_mut()) }
79    }
80}
81
82impl<'a, F: Future, A: Airlock> Future for Advance<'a, F, A> {
83    type Output = GeneratorState<A::Yield, F::Output>;
84
85    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86        match self.as_mut().future_mut().poll(cx) {
87            Poll::Pending => {
88                let value = self.airlock.replace(Next::Empty);
89                match value {
90                    Next::Empty | Next::Resume(_) => Poll::Pending,
91                    Next::Yield(y) => Poll::Ready(GeneratorState::Yielded(y)),
92                    Next::Completed => unreachable!(),
93                }
94            }
95            Poll::Ready(value) => {
96                #[cfg(debug_assertions)]
97                self.airlock.replace(Next::Completed);
98
99                Poll::Ready(GeneratorState::Complete(value))
100            }
101        }
102    }
103}
104
105pub trait Airlock {
106    type Yield;
107    type Resume;
108
109    fn peek(&self) -> Next<(), ()>;
110
111    fn replace(
112        &self,
113        next: Next<Self::Yield, Self::Resume>,
114    ) -> Next<Self::Yield, Self::Resume>;
115}
116
117pub struct Co<A: Airlock> {
118    airlock: A,
119}
120
121impl<A: Airlock> Co<A> {
122    pub(crate) fn new(airlock: A) -> Self {
123        Self { airlock }
124    }
125
126    /// Yields a value from the generator.
127    ///
128    /// The caller should immediately `await` the result of this function.
129    ///
130    /// [_See the module-level docs for examples._](.)
131    pub fn yield_(&self, value: A::Yield) -> impl Future<Output = A::Resume> + '_ {
132        #[cfg(debug_assertions)]
133        match self.airlock.peek() {
134            Next::Yield(()) => {
135                panic!(
136                    "Multiple values were yielded without an intervening await. Make \
137                     sure to immediately await the result of `Co::yield_`."
138                );
139            }
140            Next::Completed => {
141                panic!(
142                    "`yield_` should not be used after the generator completes. The \
143                     `Co` object should have been dropped by now."
144                )
145            }
146            Next::Empty | Next::Resume(()) => {}
147        }
148
149        self.airlock.replace(Next::Yield(value));
150        Barrier {
151            airlock: &self.airlock,
152        }
153    }
154}
155
156struct Barrier<'a, A: Airlock> {
157    airlock: &'a A,
158}
159
160impl<'a, A: Airlock> Future for Barrier<'a, A> {
161    type Output = A::Resume;
162
163    fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
164        match self.airlock.peek() {
165            Next::Yield(_) => Poll::Pending,
166            Next::Resume(_) => {
167                let next = self.airlock.replace(Next::Empty);
168                match next {
169                    Next::Resume(arg) => Poll::Ready(arg),
170                    Next::Empty | Next::Yield(_) | Next::Completed => unreachable!(),
171                }
172            }
173            Next::Empty | Next::Completed => unreachable!(),
174        }
175    }
176}