1use crate::{ops::GeneratorState, waker};
2use std::{
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8pub enum Next<Y, R> {
9 Empty,
10 Yield(Y),
11 Resume(R),
12 Completed,
13}
14
15#[allow(clippy::use_self)]
16impl<Y, R> Next<Y, R> {
17 pub fn without_values(&self) -> Next<(), ()> {
18 match self {
19 Self::Empty => Next::Empty,
20 Self::Yield(_) => Next::Yield(()),
21 Self::Resume(_) => Next::Resume(()),
22 Self::Completed => Next::Completed,
23 }
24 }
25}
26
27pub fn advance<Y, R, F: Future>(
28 future: Pin<&mut F>,
29 airlock: &impl Airlock<Yield = Y, Resume = R>,
30) -> GeneratorState<Y, F::Output> {
31 let waker = waker::create();
32 let mut cx = Context::from_waker(&waker);
33
34 match future.poll(&mut cx) {
35 Poll::Pending => {
36 let value = airlock.replace(Next::Empty);
37 match value {
38 Next::Empty | Next::Completed => unreachable!(),
39 Next::Yield(y) => GeneratorState::Yielded(y),
40 Next::Resume(_) => {
41 #[cfg(debug_assertions)]
42 panic!(
43 "An async generator was resumed via a non-async method. For \
44 async generators, use `Stream` or `async_resume` instead of \
45 `Iterator` or `resume`.",
46 );
47
48 #[cfg(not(debug_assertions))]
49 panic!("misused async generator");
50 }
51 }
52 }
53 Poll::Ready(value) => {
54 #[cfg(debug_assertions)]
55 airlock.replace(Next::Completed);
56
57 GeneratorState::Complete(value)
58 }
59 }
60}
61
62pub fn async_advance<'a, Y, R, F: Future>(
63 future: Pin<&'a mut F>,
64 airlock: impl Airlock<Yield = Y, Resume = R> + 'a,
65) -> impl Future<Output = GeneratorState<Y, F::Output>> + 'a {
66 Advance { future, airlock }
67}
68
69struct Advance<'a, F: Future, A: Airlock> {
70 future: Pin<&'a mut F>,
71 airlock: A,
72}
73
74impl<'a, F: Future, A: Airlock> Advance<'a, F, A> {
75 fn future_mut(self: Pin<&mut Self>) -> Pin<&mut F> {
76 unsafe { self.map_unchecked_mut(|s| s.future.as_mut().get_unchecked_mut()) }
79 }
80}
81
82impl<'a, F: Future, A: Airlock> Future for Advance<'a, F, A> {
83 type Output = GeneratorState<A::Yield, F::Output>;
84
85 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
86 match self.as_mut().future_mut().poll(cx) {
87 Poll::Pending => {
88 let value = self.airlock.replace(Next::Empty);
89 match value {
90 Next::Empty | Next::Resume(_) => Poll::Pending,
91 Next::Yield(y) => Poll::Ready(GeneratorState::Yielded(y)),
92 Next::Completed => unreachable!(),
93 }
94 }
95 Poll::Ready(value) => {
96 #[cfg(debug_assertions)]
97 self.airlock.replace(Next::Completed);
98
99 Poll::Ready(GeneratorState::Complete(value))
100 }
101 }
102 }
103}
104
105pub trait Airlock {
106 type Yield;
107 type Resume;
108
109 fn peek(&self) -> Next<(), ()>;
110
111 fn replace(
112 &self,
113 next: Next<Self::Yield, Self::Resume>,
114 ) -> Next<Self::Yield, Self::Resume>;
115}
116
117pub struct Co<A: Airlock> {
118 airlock: A,
119}
120
121impl<A: Airlock> Co<A> {
122 pub(crate) fn new(airlock: A) -> Self {
123 Self { airlock }
124 }
125
126 pub fn yield_(&self, value: A::Yield) -> impl Future<Output = A::Resume> + '_ {
132 #[cfg(debug_assertions)]
133 match self.airlock.peek() {
134 Next::Yield(()) => {
135 panic!(
136 "Multiple values were yielded without an intervening await. Make \
137 sure to immediately await the result of `Co::yield_`."
138 );
139 }
140 Next::Completed => {
141 panic!(
142 "`yield_` should not be used after the generator completes. The \
143 `Co` object should have been dropped by now."
144 )
145 }
146 Next::Empty | Next::Resume(()) => {}
147 }
148
149 self.airlock.replace(Next::Yield(value));
150 Barrier {
151 airlock: &self.airlock,
152 }
153 }
154}
155
156struct Barrier<'a, A: Airlock> {
157 airlock: &'a A,
158}
159
160impl<'a, A: Airlock> Future for Barrier<'a, A> {
161 type Output = A::Resume;
162
163 fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
164 match self.airlock.peek() {
165 Next::Yield(_) => Poll::Pending,
166 Next::Resume(_) => {
167 let next = self.airlock.replace(Next::Empty);
168 match next {
169 Next::Resume(arg) => Poll::Ready(arg),
170 Next::Empty | Next::Yield(_) | Next::Completed => unreachable!(),
171 }
172 }
173 Next::Empty | Next::Completed => unreachable!(),
174 }
175 }
176}