tower_abci/v038/
server.rs

1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8    net::{TcpListener, ToSocketAddrs},
9    select,
10};
11use tokio_util::codec::{FramedRead, FramedWrite};
12use tower::{Service, ServiceExt};
13
14use crate::BoxError;
15use tendermint::abci::MethodKind;
16
17use tendermint::v0_38::abci::{
18    ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
19    MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
20};
21
22/// An ABCI server which listens for connections and forwards requests to four
23/// component ABCI [`Service`]s.
24pub struct Server<C, M, I, S> {
25    consensus: C,
26    mempool: M,
27    info: I,
28    snapshot: S,
29}
30
31pub struct ServerBuilder<C, M, I, S> {
32    consensus: Option<C>,
33    mempool: Option<M>,
34    info: Option<I>,
35    snapshot: Option<S>,
36}
37
38impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
39    fn default() -> Self {
40        Self {
41            consensus: None,
42            mempool: None,
43            info: None,
44            snapshot: None,
45        }
46    }
47}
48
49impl<C, M, I, S> ServerBuilder<C, M, I, S>
50where
51    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
52        + Send
53        + Clone
54        + 'static,
55    C::Future: Send + 'static,
56    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
57        + Send
58        + Clone
59        + 'static,
60    M::Future: Send + 'static,
61    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
62    I::Future: Send + 'static,
63    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
64        + Send
65        + Clone
66        + 'static,
67    S::Future: Send + 'static,
68{
69    pub fn consensus(mut self, consensus: C) -> Self {
70        self.consensus = Some(consensus);
71        self
72    }
73
74    pub fn mempool(mut self, mempool: M) -> Self {
75        self.mempool = Some(mempool);
76        self
77    }
78
79    pub fn info(mut self, info: I) -> Self {
80        self.info = Some(info);
81        self
82    }
83
84    pub fn snapshot(mut self, snapshot: S) -> Self {
85        self.snapshot = Some(snapshot);
86        self
87    }
88
89    pub fn finish(self) -> Option<Server<C, M, I, S>> {
90        let consensus = self.consensus?;
91        let mempool = self.mempool?;
92        let info = self.info?;
93        let snapshot = self.snapshot?;
94
95        Some(Server {
96            consensus,
97            mempool,
98            info,
99            snapshot,
100        })
101    }
102}
103
104impl<C, M, I, S> Server<C, M, I, S>
105where
106    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
107        + Send
108        + Clone
109        + 'static,
110    C::Future: Send + 'static,
111    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
112        + Send
113        + Clone
114        + 'static,
115    M::Future: Send + 'static,
116    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
117    I::Future: Send + 'static,
118    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
119        + Send
120        + Clone
121        + 'static,
122    S::Future: Send + 'static,
123{
124    pub fn builder() -> ServerBuilder<C, M, I, S> {
125        ServerBuilder::default()
126    }
127
128    #[cfg(target_family = "unix")]
129    pub async fn listen_unix(self, path: impl AsRef<std::path::Path>) -> Result<(), BoxError> {
130        let listener = tokio::net::UnixListener::bind(path)?;
131        let addr = listener.local_addr()?;
132        tracing::info!(?addr, "ABCI server starting on uds");
133
134        loop {
135            match listener.accept().await {
136                Ok((socket, _addr)) => {
137                    tracing::debug!(?_addr, "accepted new connection");
138                    let conn = Connection {
139                        consensus: self.consensus.clone(),
140                        mempool: self.mempool.clone(),
141                        info: self.info.clone(),
142                        snapshot: self.snapshot.clone(),
143                    };
144                    let (read, write) = socket.into_split();
145                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
146                }
147                Err(e) => {
148                    tracing::error!({ %e }, "error accepting new connection");
149                }
150            }
151        }
152    }
153
154    pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
155        self,
156        addr: A,
157    ) -> Result<(), BoxError> {
158        let listener = TcpListener::bind(addr).await?;
159        let addr = listener.local_addr()?;
160        tracing::info!(?addr, "ABCI server starting on tcp socket");
161
162        loop {
163            match listener.accept().await {
164                Ok((socket, _addr)) => {
165                    tracing::debug!(?_addr, "accepted new connection");
166                    let conn = Connection {
167                        consensus: self.consensus.clone(),
168                        mempool: self.mempool.clone(),
169                        info: self.info.clone(),
170                        snapshot: self.snapshot.clone(),
171                    };
172                    let (read, write) = socket.into_split();
173                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
174                }
175                Err(e) => {
176                    tracing::error!({ %e }, "error accepting new connection");
177                }
178            }
179        }
180    }
181}
182
183struct Connection<C, M, I, S> {
184    consensus: C,
185    mempool: M,
186    info: I,
187    snapshot: S,
188}
189
190impl<C, M, I, S> Connection<C, M, I, S>
191where
192    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
193    C::Future: Send + 'static,
194    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
195    M::Future: Send + 'static,
196    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
197    I::Future: Send + 'static,
198    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
199    S::Future: Send + 'static,
200{
201    // XXX handle errors gracefully
202    // figure out how / if to return errors to tendermint
203    async fn run(
204        mut self,
205        read: impl AsyncReadExt + std::marker::Unpin,
206        write: impl AsyncWriteExt + std::marker::Unpin,
207    ) -> Result<(), BoxError> {
208        tracing::info!("listening for requests");
209
210        use tendermint_proto::v0_38::abci as pb;
211
212        let (mut request_stream, mut response_sink) = {
213            use crate::v038::codec::{Decode, Encode};
214            (
215                FramedRead::new(read, Decode::<pb::Request>::default()),
216                FramedWrite::new(write, Encode::<pb::Response>::default()),
217            )
218        };
219
220        let mut responses = FuturesOrdered::new();
221
222        loop {
223            select! {
224                req = request_stream.next() => {
225                    let proto = match req.transpose()? {
226                        Some(proto) => proto,
227                        None => return Ok(()),
228                    };
229                    let request = Request::try_from(proto)?;
230                    tracing::debug!(?request, "new request");
231                    match request.kind() {
232                        MethodKind::Consensus => {
233                            let request = request.try_into().expect("checked kind");
234                            let response = self.consensus.ready().await?.call(request);
235                            // Need to box here for type erasure
236                            responses.push_back(response.map_ok(Response::from).boxed());
237                        }
238                        MethodKind::Mempool => {
239                            let request = request.try_into().expect("checked kind");
240                            let response = self.mempool.ready().await?.call(request);
241                            responses.push_back(response.map_ok(Response::from).boxed());
242                        }
243                        MethodKind::Snapshot => {
244                            let request = request.try_into().expect("checked kind");
245                            let response = self.snapshot.ready().await?.call(request);
246                            responses.push_back(response.map_ok(Response::from).boxed());
247                        }
248                        MethodKind::Info => {
249                            let request = request.try_into().expect("checked kind");
250                            let response = self.info.ready().await?.call(request);
251                            responses.push_back(response.map_ok(Response::from).boxed());
252                        }
253                        MethodKind::Flush => {
254                            // Instead of propagating Flush requests to the application,
255                            // handle them here by awaiting all pending responses.
256                            tracing::debug!(responses.len = responses.len(), "flushing responses");
257                            while let Some(response) = responses.next().await {
258                                // XXX: sometimes we might want to send errors to tendermint
259                                // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
260                                tracing::debug!(?response, "flushing response");
261                                response_sink.send(response?.into()).await?;
262                            }
263                            // Now we need to tell Tendermint we've flushed responses
264                            response_sink.send(Response::Flush.into()).await?;
265                        }
266                    }
267                }
268                rsp = responses.next(), if !responses.is_empty() => {
269                    let response = rsp.expect("didn't poll when responses was empty");
270                    // XXX: sometimes we might want to send errors to tendermint
271                    // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
272                    tracing::debug!(?response, "sending response");
273                    response_sink.send(response?.into()).await?;
274                }
275            }
276        }
277    }
278}