tower_abci/v037/
server.rs

1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8    net::{TcpListener, ToSocketAddrs},
9    select,
10};
11
12use tokio_util::codec::{FramedRead, FramedWrite};
13use tower::{Service, ServiceExt};
14
15use crate::BoxError;
16use tendermint::abci::MethodKind;
17
18#[cfg(target_family = "unix")]
19use std::path::Path;
20#[cfg(target_family = "unix")]
21use tokio::net::UnixListener;
22
23use tendermint::v0_37::abci::{
24    ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
25    MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
26};
27
28/// An ABCI server which listens for connections and forwards requests to four
29/// component ABCI [`Service`]s.
30pub struct Server<C, M, I, S> {
31    consensus: C,
32    mempool: M,
33    info: I,
34    snapshot: S,
35}
36
37pub struct ServerBuilder<C, M, I, S> {
38    consensus: Option<C>,
39    mempool: Option<M>,
40    info: Option<I>,
41    snapshot: Option<S>,
42}
43
44impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
45    fn default() -> Self {
46        Self {
47            consensus: None,
48            mempool: None,
49            info: None,
50            snapshot: None,
51        }
52    }
53}
54
55impl<C, M, I, S> ServerBuilder<C, M, I, S>
56where
57    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
58        + Send
59        + Clone
60        + 'static,
61    C::Future: Send + 'static,
62    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
63        + Send
64        + Clone
65        + 'static,
66    M::Future: Send + 'static,
67    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
68    I::Future: Send + 'static,
69    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
70        + Send
71        + Clone
72        + 'static,
73    S::Future: Send + 'static,
74{
75    pub fn consensus(mut self, consensus: C) -> Self {
76        self.consensus = Some(consensus);
77        self
78    }
79
80    pub fn mempool(mut self, mempool: M) -> Self {
81        self.mempool = Some(mempool);
82        self
83    }
84
85    pub fn info(mut self, info: I) -> Self {
86        self.info = Some(info);
87        self
88    }
89
90    pub fn snapshot(mut self, snapshot: S) -> Self {
91        self.snapshot = Some(snapshot);
92        self
93    }
94
95    pub fn finish(self) -> Option<Server<C, M, I, S>> {
96        let consensus = self.consensus?;
97        let mempool = self.mempool?;
98        let info = self.info?;
99        let snapshot = self.snapshot?;
100
101        Some(Server {
102            consensus,
103            mempool,
104            info,
105            snapshot,
106        })
107    }
108}
109
110impl<C, M, I, S> Server<C, M, I, S>
111where
112    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
113        + Send
114        + Clone
115        + 'static,
116    C::Future: Send + 'static,
117    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
118        + Send
119        + Clone
120        + 'static,
121    M::Future: Send + 'static,
122    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
123    I::Future: Send + 'static,
124    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
125        + Send
126        + Clone
127        + 'static,
128    S::Future: Send + 'static,
129{
130    pub fn builder() -> ServerBuilder<C, M, I, S> {
131        ServerBuilder::default()
132    }
133
134    #[cfg(target_family = "unix")]
135    pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
136        let listener = UnixListener::bind(path)?;
137        let addr = listener.local_addr()?;
138        tracing::info!(?addr, "ABCI server starting on uds");
139
140        loop {
141            match listener.accept().await {
142                Ok((socket, _addr)) => {
143                    tracing::debug!(?_addr, "accepted new connection");
144                    let conn = Connection {
145                        consensus: self.consensus.clone(),
146                        mempool: self.mempool.clone(),
147                        info: self.info.clone(),
148                        snapshot: self.snapshot.clone(),
149                    };
150                    let (read, write) = socket.into_split();
151                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
152                }
153                Err(e) => {
154                    tracing::error!({ %e }, "error accepting new connection");
155                }
156            }
157        }
158    }
159
160    pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
161        self,
162        addr: A,
163    ) -> Result<(), BoxError> {
164        let listener = TcpListener::bind(addr).await?;
165        let addr = listener.local_addr()?;
166        tracing::info!(?addr, "ABCI server starting on tcp socket");
167
168        loop {
169            match listener.accept().await {
170                Ok((socket, _addr)) => {
171                    tracing::debug!(?_addr, "accepted new connection");
172                    let conn = Connection {
173                        consensus: self.consensus.clone(),
174                        mempool: self.mempool.clone(),
175                        info: self.info.clone(),
176                        snapshot: self.snapshot.clone(),
177                    };
178                    let (read, write) = socket.into_split();
179                    tokio::spawn(async move { conn.run(read, write).await.unwrap() });
180                }
181                Err(e) => {
182                    tracing::error!({ %e }, "error accepting new connection");
183                }
184            }
185        }
186    }
187}
188
189struct Connection<C, M, I, S> {
190    consensus: C,
191    mempool: M,
192    info: I,
193    snapshot: S,
194}
195
196impl<C, M, I, S> Connection<C, M, I, S>
197where
198    C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
199    C::Future: Send + 'static,
200    M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
201    M::Future: Send + 'static,
202    I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
203    I::Future: Send + 'static,
204    S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
205    S::Future: Send + 'static,
206{
207    // XXX handle errors gracefully
208    // figure out how / if to return errors to tendermint
209    async fn run(
210        mut self,
211        read: impl AsyncReadExt + std::marker::Unpin,
212        write: impl AsyncWriteExt + std::marker::Unpin,
213    ) -> Result<(), BoxError> {
214        tracing::info!("listening for requests");
215
216        use tendermint_proto::v0_37::abci as pb;
217
218        let (mut request_stream, mut response_sink) = {
219            use crate::v037::codec::{Decode, Encode};
220            (
221                FramedRead::new(read, Decode::<pb::Request>::default()),
222                FramedWrite::new(write, Encode::<pb::Response>::default()),
223            )
224        };
225
226        let mut responses = FuturesOrdered::new();
227
228        loop {
229            select! {
230                req = request_stream.next() => {
231                    let proto = match req.transpose()? {
232                        Some(proto) => proto,
233                        None => return Ok(()),
234                    };
235                    let request = Request::try_from(proto)?;
236                    tracing::debug!(?request, "new request");
237                    match request.kind() {
238                        MethodKind::Consensus => {
239                            let request = request.try_into().expect("checked kind");
240                            let response = self.consensus.ready().await?.call(request);
241                            // Need to box here for type erasure
242                            responses.push_back(response.map_ok(Response::from).boxed());
243                        }
244                        MethodKind::Mempool => {
245                            let request = request.try_into().expect("checked kind");
246                            let response = self.mempool.ready().await?.call(request);
247                            responses.push_back(response.map_ok(Response::from).boxed());
248                        }
249                        MethodKind::Snapshot => {
250                            let request = request.try_into().expect("checked kind");
251                            let response = self.snapshot.ready().await?.call(request);
252                            responses.push_back(response.map_ok(Response::from).boxed());
253                        }
254                        MethodKind::Info => {
255                            let request = request.try_into().expect("checked kind");
256                            let response = self.info.ready().await?.call(request);
257                            responses.push_back(response.map_ok(Response::from).boxed());
258                        }
259                        MethodKind::Flush => {
260                            // Instead of propagating Flush requests to the application,
261                            // handle them here by awaiting all pending responses.
262                            tracing::debug!(responses.len = responses.len(), "flushing responses");
263                            while let Some(response) = responses.next().await {
264                                // XXX: sometimes we might want to send errors to tendermint
265                                // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
266                                tracing::debug!(?response, "flushing response");
267                                response_sink.send(response?.into()).await?;
268                            }
269                            // Now we need to tell Tendermint we've flushed responses
270                            response_sink.send(Response::Flush.into()).await?;
271                        }
272                    }
273                }
274                rsp = responses.next(), if !responses.is_empty() => {
275                    let response = rsp.expect("didn't poll when responses was empty");
276                    // XXX: sometimes we might want to send errors to tendermint
277                    // https://docs.tendermint.com/v0.32/spec/abci/abci.html#errors
278                    tracing::debug!(?response, "sending response");
279                    response_sink.send(response?.into()).await?;
280                }
281            }
282        }
283    }
284}