tower_abci/v034/
server.rs

1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8    net::{TcpListener, ToSocketAddrs},
9    select,
10};
11
12use crate::BoxError;
13use tendermint::abci::MethodKind;
14use tendermint::v0_34::abci::{
15    ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
16    MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
17};
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tower::{Service, ServiceExt};
20
21#[cfg(target_family = "unix")]
22use std::path::Path;
23#[cfg(target_family = "unix")]
24use tokio::net::UnixListener;
25
26/// An ABCI server which listens for connections and forwards requests to four
27/// component ABCI [`Service`]s.
28pub struct Server<C, M, I, S> {
29    consensus: C,
30    mempool: M,
31    info: I,
32    snapshot: S,
33}
34
35pub struct ServerBuilder<C, M, I, S> {
36    consensus: Option<C>,
37    mempool: Option<M>,
38    info: Option<I>,
39    snapshot: Option<S>,
40}
41
42impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
43    fn default() -> Self {
44        Self {
45            consensus: None,
46            mempool: None,
47            info: None,
48            snapshot: None,
49        }
50    }
51}
52
53impl<C, M, I, S> ServerBuilder<C, M, I, S>
54where
55    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
56        + Send
57        + Clone
58        + 'static,
59    C::Future: Send + 'static,
60    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
61        + Send
62        + Clone
63        + 'static,
64    M::Future: Send + 'static,
65    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
66    I::Future: Send + 'static,
67    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
68        + Send
69        + Clone
70        + 'static,
71    S::Future: Send + 'static,
72{
73    pub fn consensus(mut self, consensus: C) -> Self {
74        self.consensus = Some(consensus);
75        self
76    }
77
78    pub fn mempool(mut self, mempool: M) -> Self {
79        self.mempool = Some(mempool);
80        self
81    }
82
83    pub fn info(mut self, info: I) -> Self {
84        self.info = Some(info);
85        self
86    }
87
88    pub fn snapshot(mut self, snapshot: S) -> Self {
89        self.snapshot = Some(snapshot);
90        self
91    }
92
93    pub fn finish(self) -> Option<Server<C, M, I, S>> {
94        let consensus = self.consensus?;
95        let mempool = self.mempool?;
96        let info = self.info?;
97        let snapshot = self.snapshot?;
98
99        Some(Server {
100            consensus,
101            mempool,
102            info,
103            snapshot,
104        })
105    }
106}
107
108impl<C, M, I, S> Server<C, M, I, S>
109where
110    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
111        + Send
112        + Clone
113        + 'static,
114    C::Future: Send + 'static,
115    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
116        + Send
117        + Clone
118        + 'static,
119    M::Future: Send + 'static,
120    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
121    I::Future: Send + 'static,
122    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
123        + Send
124        + Clone
125        + 'static,
126    S::Future: Send + 'static,
127{
128    pub fn builder() -> ServerBuilder<C, M, I, S> {
129        ServerBuilder::default()
130    }
131
132    #[cfg(target_family = "unix")]
133    pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
134        let listener = UnixListener::bind(path)?;
135        let addr = listener.local_addr()?;
136        tracing::info!(?addr, "ABCI server starting on uds");
137
138        loop {
139            match listener.accept().await {
140                Ok((socket, _addr)) => {
141                    tracing::debug!(?_addr, "accepted new connection");
142                    let conn = Connection {
143                        consensus: self.consensus.clone(),
144                        mempool: self.mempool.clone(),
145                        info: self.info.clone(),
146                        snapshot: self.snapshot.clone(),
147                    };
148                    let (read, write) = socket.into_split();
149                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
150                }
151                Err(e) => {
152                    tracing::error!({ %e }, "error accepting new connection");
153                }
154            }
155        }
156    }
157
158    pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
159        self,
160        addr: A,
161    ) -> Result<(), BoxError> {
162        let listener = TcpListener::bind(addr).await?;
163        let addr = listener.local_addr()?;
164        tracing::info!(?addr, "ABCI server starting on tcp socket");
165
166        loop {
167            match listener.accept().await {
168                Ok((socket, _addr)) => {
169                    tracing::debug!(?_addr, "accepted new connection");
170                    let conn = Connection {
171                        consensus: self.consensus.clone(),
172                        mempool: self.mempool.clone(),
173                        info: self.info.clone(),
174                        snapshot: self.snapshot.clone(),
175                    };
176                    let (read, write) = socket.into_split();
177                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
178                }
179                Err(e) => {
180                    tracing::error!({ %e }, "error accepting new connection");
181                }
182            }
183        }
184    }
185}
186
187struct Connection<C, M, I, S> {
188    consensus: C,
189    mempool: M,
190    info: I,
191    snapshot: S,
192}
193
194impl<C, M, I, S> Connection<C, M, I, S>
195where
196    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
197    C::Future: Send + 'static,
198    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
199    M::Future: Send + 'static,
200    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
201    I::Future: Send + 'static,
202    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
203    S::Future: Send + 'static,
204{
205    // XXX handle errors gracefully
206    // figure out how / if to return errors to tendermint
207    async fn run(
208        mut self,
209        read: impl AsyncReadExt + std::marker::Unpin,
210        write: impl AsyncWriteExt + std::marker::Unpin,
211    ) -> Result<(), BoxError> {
212        tracing::info!("listening for requests");
213
214        use tendermint_proto::v0_34::abci as pb;
215
216        let (mut request_stream, mut response_sink) = {
217            use crate::v034::codec::{Decode, Encode};
218            (
219                FramedRead::new(read, Decode::<pb::Request>::default()),
220                FramedWrite::new(write, Encode::<pb::Response>::default()),
221            )
222        };
223
224        let mut responses = FuturesOrdered::new();
225
226        loop {
227            select! {
228                req = request_stream.next() => {
229                    let proto = match req.transpose()? {
230                        Some(proto) => proto,
231                        None => return Ok(()),
232                    };
233                    let request = Request::try_from(proto)?;
234                    tracing::debug!(?request, "new request");
235                    match request.kind() {
236                        MethodKind::Consensus => {
237                            let request = request.try_into().expect("checked kind");
238                            let response = self.consensus.ready().await?.call(request);
239                            // Need to box here for type erasure
240                            responses.push_back(response.map_ok(Response::from).boxed());
241                        }
242                        MethodKind::Mempool => {
243                            let request = request.try_into().expect("checked kind");
244                            let response = self.mempool.ready().await?.call(request);
245                            responses.push_back(response.map_ok(Response::from).boxed());
246                        }
247                        MethodKind::Snapshot => {
248                            let request = request.try_into().expect("checked kind");
249                            let response = self.snapshot.ready().await?.call(request);
250                            responses.push_back(response.map_ok(Response::from).boxed());
251                        }
252                        MethodKind::Info => {
253                            let request = request.try_into().expect("checked kind");
254                            let response = self.info.ready().await?.call(request);
255                            responses.push_back(response.map_ok(Response::from).boxed());
256                        }
257                        MethodKind::Flush => {
258                            // Instead of propagating Flush requests to the application,
259                            // handle them here by awaiting all pending responses.
260                            tracing::debug!(responses.len = responses.len(), "flushing responses");
261                            while let Some(response) = responses.next().await {
262                                // XXX: sometimes we might want to send errors to tendermint
263                                // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
264                                tracing::debug!(?response, "flushing response");
265                                response_sink.send(response?.into()).await?;
266                            }
267                            // Now we need to tell Tendermint we've flushed responses
268                            response_sink.send(Response::Flush.into()).await?;
269                        }
270                    }
271                }
272                rsp = responses.next(), if !responses.is_empty() => {
273                    let response = rsp.expect("didn't poll when responses was empty");
274                    // XXX: sometimes we might want to send errors to tendermint
275                    // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
276                    tracing::debug!(?response, "sending response");
277                    response_sink.send(response?.into()).await?;
278                }
279            }
280        }
281    }
282}