1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8 net::{TcpListener, ToSocketAddrs},
9 select,
10};
11
12use tokio_util::codec::{FramedRead, FramedWrite};
13use tower::{Service, ServiceExt};
14
15use crate::BoxError;
16use tendermint::abci::MethodKind;
17
18#[cfg(target_family = "unix")]
19use std::path::Path;
20#[cfg(target_family = "unix")]
21use tokio::net::UnixListener;
22
23use tendermint::v0_37::abci::{
24 ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
25 MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
26};
27
28pub struct Server<C, M, I, S> {
31 consensus: C,
32 mempool: M,
33 info: I,
34 snapshot: S,
35}
36
37pub struct ServerBuilder<C, M, I, S> {
38 consensus: Option<C>,
39 mempool: Option<M>,
40 info: Option<I>,
41 snapshot: Option<S>,
42}
43
44impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
45 fn default() -> Self {
46 Self {
47 consensus: None,
48 mempool: None,
49 info: None,
50 snapshot: None,
51 }
52 }
53}
54
55impl<C, M, I, S> ServerBuilder<C, M, I, S>
56where
57 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
58 + Send
59 + Clone
60 + 'static,
61 C::Future: Send + 'static,
62 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
63 + Send
64 + Clone
65 + 'static,
66 M::Future: Send + 'static,
67 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
68 I::Future: Send + 'static,
69 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
70 + Send
71 + Clone
72 + 'static,
73 S::Future: Send + 'static,
74{
75 pub fn consensus(mut self, consensus: C) -> Self {
76 self.consensus = Some(consensus);
77 self
78 }
79
80 pub fn mempool(mut self, mempool: M) -> Self {
81 self.mempool = Some(mempool);
82 self
83 }
84
85 pub fn info(mut self, info: I) -> Self {
86 self.info = Some(info);
87 self
88 }
89
90 pub fn snapshot(mut self, snapshot: S) -> Self {
91 self.snapshot = Some(snapshot);
92 self
93 }
94
95 pub fn finish(self) -> Option<Server<C, M, I, S>> {
96 let consensus = self.consensus?;
97 let mempool = self.mempool?;
98 let info = self.info?;
99 let snapshot = self.snapshot?;
100
101 Some(Server {
102 consensus,
103 mempool,
104 info,
105 snapshot,
106 })
107 }
108}
109
110impl<C, M, I, S> Server<C, M, I, S>
111where
112 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
113 + Send
114 + Clone
115 + 'static,
116 C::Future: Send + 'static,
117 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
118 + Send
119 + Clone
120 + 'static,
121 M::Future: Send + 'static,
122 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
123 I::Future: Send + 'static,
124 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
125 + Send
126 + Clone
127 + 'static,
128 S::Future: Send + 'static,
129{
130 pub fn builder() -> ServerBuilder<C, M, I, S> {
131 ServerBuilder::default()
132 }
133
134 #[cfg(target_family = "unix")]
135 pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
136 let listener = UnixListener::bind(path)?;
137 let addr = listener.local_addr()?;
138 tracing::info!(?addr, "ABCI server starting on uds");
139
140 loop {
141 match listener.accept().await {
142 Ok((socket, _addr)) => {
143 tracing::debug!(?_addr, "accepted new connection");
144 let conn = Connection {
145 consensus: self.consensus.clone(),
146 mempool: self.mempool.clone(),
147 info: self.info.clone(),
148 snapshot: self.snapshot.clone(),
149 };
150 let (read, write) = socket.into_split();
151 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
152 }
153 Err(e) => {
154 tracing::error!({ %e }, "error accepting new connection");
155 }
156 }
157 }
158 }
159
160 pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
161 self,
162 addr: A,
163 ) -> Result<(), BoxError> {
164 let listener = TcpListener::bind(addr).await?;
165 let addr = listener.local_addr()?;
166 tracing::info!(?addr, "ABCI server starting on tcp socket");
167
168 loop {
169 match listener.accept().await {
170 Ok((socket, _addr)) => {
171 tracing::debug!(?_addr, "accepted new connection");
172 let conn = Connection {
173 consensus: self.consensus.clone(),
174 mempool: self.mempool.clone(),
175 info: self.info.clone(),
176 snapshot: self.snapshot.clone(),
177 };
178 let (read, write) = socket.into_split();
179 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
180 }
181 Err(e) => {
182 tracing::error!({ %e }, "error accepting new connection");
183 }
184 }
185 }
186 }
187}
188
189struct Connection<C, M, I, S> {
190 consensus: C,
191 mempool: M,
192 info: I,
193 snapshot: S,
194}
195
196impl<C, M, I, S> Connection<C, M, I, S>
197where
198 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
199 C::Future: Send + 'static,
200 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
201 M::Future: Send + 'static,
202 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
203 I::Future: Send + 'static,
204 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
205 S::Future: Send + 'static,
206{
207 async fn run(
210 mut self,
211 read: impl AsyncReadExt + std::marker::Unpin,
212 write: impl AsyncWriteExt + std::marker::Unpin,
213 ) -> Result<(), BoxError> {
214 tracing::info!("listening for requests");
215
216 use tendermint_proto::v0_37::abci as pb;
217
218 let (mut request_stream, mut response_sink) = {
219 use crate::v037::codec::{Decode, Encode};
220 (
221 FramedRead::new(read, Decode::<pb::Request>::default()),
222 FramedWrite::new(write, Encode::<pb::Response>::default()),
223 )
224 };
225
226 let mut responses = FuturesOrdered::new();
227
228 loop {
229 select! {
230 req = request_stream.next() => {
231 let proto = match req.transpose()? {
232 Some(proto) => proto,
233 None => return Ok(()),
234 };
235 let request = Request::try_from(proto)?;
236 tracing::debug!(?request, "new request");
237 match request.kind() {
238 MethodKind::Consensus => {
239 let request = request.try_into().expect("checked kind");
240 let response = self.consensus.ready().await?.call(request);
241 responses.push_back(response.map_ok(Response::from).boxed());
243 }
244 MethodKind::Mempool => {
245 let request = request.try_into().expect("checked kind");
246 let response = self.mempool.ready().await?.call(request);
247 responses.push_back(response.map_ok(Response::from).boxed());
248 }
249 MethodKind::Snapshot => {
250 let request = request.try_into().expect("checked kind");
251 let response = self.snapshot.ready().await?.call(request);
252 responses.push_back(response.map_ok(Response::from).boxed());
253 }
254 MethodKind::Info => {
255 let request = request.try_into().expect("checked kind");
256 let response = self.info.ready().await?.call(request);
257 responses.push_back(response.map_ok(Response::from).boxed());
258 }
259 MethodKind::Flush => {
260 tracing::debug!(responses.len = responses.len(), "flushing responses");
263 while let Some(response) = responses.next().await {
264 tracing::debug!(?response, "flushing response");
267 response_sink.send(response?.into()).await?;
268 }
269 response_sink.send(Response::Flush.into()).await?;
271 }
272 }
273 }
274 rsp = responses.next(), if !responses.is_empty() => {
275 let response = rsp.expect("didn't poll when responses was empty");
276 tracing::debug!(?response, "sending response");
279 response_sink.send(response?.into()).await?;
280 }
281 }
282 }
283 }
284}