1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8 net::{TcpListener, ToSocketAddrs},
9 select,
10};
11use tokio_util::codec::{FramedRead, FramedWrite};
12use tower::{Service, ServiceExt};
13
14use crate::BoxError;
15use tendermint::abci::MethodKind;
16
17use tendermint::v0_38::abci::{
18 ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
19 MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
20};
21
22pub struct Server<C, M, I, S> {
25 consensus: C,
26 mempool: M,
27 info: I,
28 snapshot: S,
29}
30
31pub struct ServerBuilder<C, M, I, S> {
32 consensus: Option<C>,
33 mempool: Option<M>,
34 info: Option<I>,
35 snapshot: Option<S>,
36}
37
38impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
39 fn default() -> Self {
40 Self {
41 consensus: None,
42 mempool: None,
43 info: None,
44 snapshot: None,
45 }
46 }
47}
48
49impl<C, M, I, S> ServerBuilder<C, M, I, S>
50where
51 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
52 + Send
53 + Clone
54 + 'static,
55 C::Future: Send + 'static,
56 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
57 + Send
58 + Clone
59 + 'static,
60 M::Future: Send + 'static,
61 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
62 I::Future: Send + 'static,
63 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
64 + Send
65 + Clone
66 + 'static,
67 S::Future: Send + 'static,
68{
69 pub fn consensus(mut self, consensus: C) -> Self {
70 self.consensus = Some(consensus);
71 self
72 }
73
74 pub fn mempool(mut self, mempool: M) -> Self {
75 self.mempool = Some(mempool);
76 self
77 }
78
79 pub fn info(mut self, info: I) -> Self {
80 self.info = Some(info);
81 self
82 }
83
84 pub fn snapshot(mut self, snapshot: S) -> Self {
85 self.snapshot = Some(snapshot);
86 self
87 }
88
89 pub fn finish(self) -> Option<Server<C, M, I, S>> {
90 let consensus = self.consensus?;
91 let mempool = self.mempool?;
92 let info = self.info?;
93 let snapshot = self.snapshot?;
94
95 Some(Server {
96 consensus,
97 mempool,
98 info,
99 snapshot,
100 })
101 }
102}
103
104impl<C, M, I, S> Server<C, M, I, S>
105where
106 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
107 + Send
108 + Clone
109 + 'static,
110 C::Future: Send + 'static,
111 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
112 + Send
113 + Clone
114 + 'static,
115 M::Future: Send + 'static,
116 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
117 I::Future: Send + 'static,
118 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
119 + Send
120 + Clone
121 + 'static,
122 S::Future: Send + 'static,
123{
124 pub fn builder() -> ServerBuilder<C, M, I, S> {
125 ServerBuilder::default()
126 }
127
128 #[cfg(target_family = "unix")]
129 pub async fn listen_unix(self, path: impl AsRef<std::path::Path>) -> Result<(), BoxError> {
130 let listener = tokio::net::UnixListener::bind(path)?;
131 let addr = listener.local_addr()?;
132 tracing::info!(?addr, "ABCI server starting on uds");
133
134 loop {
135 match listener.accept().await {
136 Ok((socket, _addr)) => {
137 tracing::debug!(?_addr, "accepted new connection");
138 let conn = Connection {
139 consensus: self.consensus.clone(),
140 mempool: self.mempool.clone(),
141 info: self.info.clone(),
142 snapshot: self.snapshot.clone(),
143 };
144 let (read, write) = socket.into_split();
145 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
146 }
147 Err(e) => {
148 tracing::error!({ %e }, "error accepting new connection");
149 }
150 }
151 }
152 }
153
154 pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
155 self,
156 addr: A,
157 ) -> Result<(), BoxError> {
158 let listener = TcpListener::bind(addr).await?;
159 let addr = listener.local_addr()?;
160 tracing::info!(?addr, "ABCI server starting on tcp socket");
161
162 loop {
163 match listener.accept().await {
164 Ok((socket, _addr)) => {
165 tracing::debug!(?_addr, "accepted new connection");
166 let conn = Connection {
167 consensus: self.consensus.clone(),
168 mempool: self.mempool.clone(),
169 info: self.info.clone(),
170 snapshot: self.snapshot.clone(),
171 };
172 let (read, write) = socket.into_split();
173 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
174 }
175 Err(e) => {
176 tracing::error!({ %e }, "error accepting new connection");
177 }
178 }
179 }
180 }
181}
182
183struct Connection<C, M, I, S> {
184 consensus: C,
185 mempool: M,
186 info: I,
187 snapshot: S,
188}
189
190impl<C, M, I, S> Connection<C, M, I, S>
191where
192 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
193 C::Future: Send + 'static,
194 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
195 M::Future: Send + 'static,
196 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
197 I::Future: Send + 'static,
198 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
199 S::Future: Send + 'static,
200{
201 async fn run(
204 mut self,
205 read: impl AsyncReadExt + std::marker::Unpin,
206 write: impl AsyncWriteExt + std::marker::Unpin,
207 ) -> Result<(), BoxError> {
208 tracing::info!("listening for requests");
209
210 use tendermint_proto::v0_38::abci as pb;
211
212 let (mut request_stream, mut response_sink) = {
213 use crate::v038::codec::{Decode, Encode};
214 (
215 FramedRead::new(read, Decode::<pb::Request>::default()),
216 FramedWrite::new(write, Encode::<pb::Response>::default()),
217 )
218 };
219
220 let mut responses = FuturesOrdered::new();
221
222 loop {
223 select! {
224 req = request_stream.next() => {
225 let proto = match req.transpose()? {
226 Some(proto) => proto,
227 None => return Ok(()),
228 };
229 let request = Request::try_from(proto)?;
230 tracing::debug!(?request, "new request");
231 match request.kind() {
232 MethodKind::Consensus => {
233 let request = request.try_into().expect("checked kind");
234 let response = self.consensus.ready().await?.call(request);
235 responses.push_back(response.map_ok(Response::from).boxed());
237 }
238 MethodKind::Mempool => {
239 let request = request.try_into().expect("checked kind");
240 let response = self.mempool.ready().await?.call(request);
241 responses.push_back(response.map_ok(Response::from).boxed());
242 }
243 MethodKind::Snapshot => {
244 let request = request.try_into().expect("checked kind");
245 let response = self.snapshot.ready().await?.call(request);
246 responses.push_back(response.map_ok(Response::from).boxed());
247 }
248 MethodKind::Info => {
249 let request = request.try_into().expect("checked kind");
250 let response = self.info.ready().await?.call(request);
251 responses.push_back(response.map_ok(Response::from).boxed());
252 }
253 MethodKind::Flush => {
254 tracing::debug!(responses.len = responses.len(), "flushing responses");
257 while let Some(response) = responses.next().await {
258 tracing::debug!(?response, "flushing response");
261 response_sink.send(response?.into()).await?;
262 }
263 response_sink.send(Response::Flush.into()).await?;
265 }
266 }
267 }
268 rsp = responses.next(), if !responses.is_empty() => {
269 let response = rsp.expect("didn't poll when responses was empty");
270 tracing::debug!(?response, "sending response");
273 response_sink.send(response?.into()).await?;
274 }
275 }
276 }
277 }
278}