tower_abci/buffer4/
future.rs1use super::{error::Closed, message};
2use futures::ready;
3use pin_project::pin_project;
4use std::{
5 future::Future,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10#[pin_project]
12#[derive(Debug)]
13pub struct ResponseFuture<T> {
14 #[pin]
15 state: ResponseState<T>,
16}
17
18#[pin_project(project = ResponseStateProj)]
19#[derive(Debug)]
20enum ResponseState<T> {
21 Failed(Option<crate::BoxError>),
22 Rx(#[pin] message::Rx<T>),
23 Poll(#[pin] T),
24}
25
26impl<T> ResponseFuture<T> {
27 pub(crate) fn new(rx: message::Rx<T>) -> Self {
28 ResponseFuture {
29 state: ResponseState::Rx(rx),
30 }
31 }
32
33 pub(crate) fn failed(err: crate::BoxError) -> Self {
34 ResponseFuture {
35 state: ResponseState::Failed(Some(err)),
36 }
37 }
38}
39
40impl<F, T, E> Future for ResponseFuture<F>
41where
42 F: Future<Output = Result<T, E>>,
43 E: Into<crate::BoxError>,
44{
45 type Output = Result<T, crate::BoxError>;
46
47 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
48 let mut this = self.project();
49
50 loop {
51 match this.state.as_mut().project() {
52 ResponseStateProj::Failed(e) => {
53 return Poll::Ready(Err(e.take().expect("polled after error")));
54 }
55 ResponseStateProj::Rx(rx) => match ready!(rx.poll(cx)) {
56 Ok(Ok(f)) => this.state.set(ResponseState::Poll(f)),
57 Ok(Err(e)) => return Poll::Ready(Err(e.into())),
58 Err(_) => return Poll::Ready(Err(Closed::new().into())),
59 },
60 ResponseStateProj::Poll(fut) => return fut.poll(cx).map_err(Into::into),
61 }
62 }
63 }
64}