tower_abci/buffer4/
future.rs

1use super::{error::Closed, message};
2use futures::ready;
3use pin_project::pin_project;
4use std::{
5    future::Future,
6    pin::Pin,
7    task::{Context, Poll},
8};
9
10/// Future that completes when the buffered service eventually services the submitted request.
11#[pin_project]
12#[derive(Debug)]
13pub struct ResponseFuture<T> {
14    #[pin]
15    state: ResponseState<T>,
16}
17
18#[pin_project(project = ResponseStateProj)]
19#[derive(Debug)]
20enum ResponseState<T> {
21    Failed(Option<crate::BoxError>),
22    Rx(#[pin] message::Rx<T>),
23    Poll(#[pin] T),
24}
25
26impl<T> ResponseFuture<T> {
27    pub(crate) fn new(rx: message::Rx<T>) -> Self {
28        ResponseFuture {
29            state: ResponseState::Rx(rx),
30        }
31    }
32
33    pub(crate) fn failed(err: crate::BoxError) -> Self {
34        ResponseFuture {
35            state: ResponseState::Failed(Some(err)),
36        }
37    }
38}
39
40impl<F, T, E> Future for ResponseFuture<F>
41where
42    F: Future<Output = Result<T, E>>,
43    E: Into<crate::BoxError>,
44{
45    type Output = Result<T, crate::BoxError>;
46
47    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48        let mut this = self.project();
49
50        loop {
51            match this.state.as_mut().project() {
52                ResponseStateProj::Failed(e) => {
53                    return Poll::Ready(Err(e.take().expect("polled after error")));
54                }
55                ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) {
56                    Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
57                    Ok(Err(e)) => return Poll::Ready(Err(e.into())),
58                    Err(_) => return Poll::Ready(Err(Closed::new().into())),
59                },
60                ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into),
61            }
62        }
63    }
64}