1use std::convert::{TryFrom, TryInto};
2
3use futures::future::{FutureExt, TryFutureExt};
4use futures::sink::SinkExt;
5use futures::stream::{FuturesOrdered, StreamExt};
6use tokio::io::{AsyncReadExt, AsyncWriteExt};
7use tokio::{
8 net::{TcpListener, ToSocketAddrs},
9 select,
10};
11
12use crate::BoxError;
13use tendermint::abci::MethodKind;
14use tendermint::v0_34::abci::{
15 ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
16 MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
17};
18use tokio_util::codec::{FramedRead, FramedWrite};
19use tower::{Service, ServiceExt};
20
21#[cfg(target_family = "unix")]
22use std::path::Path;
23#[cfg(target_family = "unix")]
24use tokio::net::UnixListener;
25
26pub struct Server<C, M, I, S> {
29 consensus: C,
30 mempool: M,
31 info: I,
32 snapshot: S,
33}
34
35pub struct ServerBuilder<C, M, I, S> {
36 consensus: Option<C>,
37 mempool: Option<M>,
38 info: Option<I>,
39 snapshot: Option<S>,
40}
41
42impl<C, M, I, S> Default for ServerBuilder<C, M, I, S> {
43 fn default() -> Self {
44 Self {
45 consensus: None,
46 mempool: None,
47 info: None,
48 snapshot: None,
49 }
50 }
51}
52
53impl<C, M, I, S> ServerBuilder<C, M, I, S>
54where
55 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
56 + Send
57 + Clone
58 + 'static,
59 C::Future: Send + 'static,
60 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
61 + Send
62 + Clone
63 + 'static,
64 M::Future: Send + 'static,
65 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
66 I::Future: Send + 'static,
67 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
68 + Send
69 + Clone
70 + 'static,
71 S::Future: Send + 'static,
72{
73 pub fn consensus(mut self, consensus: C) -> Self {
74 self.consensus = Some(consensus);
75 self
76 }
77
78 pub fn mempool(mut self, mempool: M) -> Self {
79 self.mempool = Some(mempool);
80 self
81 }
82
83 pub fn info(mut self, info: I) -> Self {
84 self.info = Some(info);
85 self
86 }
87
88 pub fn snapshot(mut self, snapshot: S) -> Self {
89 self.snapshot = Some(snapshot);
90 self
91 }
92
93 pub fn finish(self) -> Option<Server<C, M, I, S>> {
94 let consensus = self.consensus?;
95 let mempool = self.mempool?;
96 let info = self.info?;
97 let snapshot = self.snapshot?;
98
99 Some(Server {
100 consensus,
101 mempool,
102 info,
103 snapshot,
104 })
105 }
106}
107
108impl<C, M, I, S> Server<C, M, I, S>
109where
110 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError>
111 + Send
112 + Clone
113 + 'static,
114 C::Future: Send + 'static,
115 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError>
116 + Send
117 + Clone
118 + 'static,
119 M::Future: Send + 'static,
120 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + Clone + 'static,
121 I::Future: Send + 'static,
122 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError>
123 + Send
124 + Clone
125 + 'static,
126 S::Future: Send + 'static,
127{
128 pub fn builder() -> ServerBuilder<C, M, I, S> {
129 ServerBuilder::default()
130 }
131
132 #[cfg(target_family = "unix")]
133 pub async fn listen_unix(self, path: impl AsRef<Path>) -> Result<(), BoxError> {
134 let listener = UnixListener::bind(path)?;
135 let addr = listener.local_addr()?;
136 tracing::info!(?addr, "ABCI server starting on uds");
137
138 loop {
139 match listener.accept().await {
140 Ok((socket, _addr)) => {
141 tracing::debug!(?_addr, "accepted new connection");
142 let conn = Connection {
143 consensus: self.consensus.clone(),
144 mempool: self.mempool.clone(),
145 info: self.info.clone(),
146 snapshot: self.snapshot.clone(),
147 };
148 let (read, write) = socket.into_split();
149 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
150 }
151 Err(e) => {
152 tracing::error!({ %e }, "error accepting new connection");
153 }
154 }
155 }
156 }
157
158 pub async fn listen_tcp<A: ToSocketAddrs + std::fmt::Debug>(
159 self,
160 addr: A,
161 ) -> Result<(), BoxError> {
162 let listener = TcpListener::bind(addr).await?;
163 let addr = listener.local_addr()?;
164 tracing::info!(?addr, "ABCI server starting on tcp socket");
165
166 loop {
167 match listener.accept().await {
168 Ok((socket, _addr)) => {
169 tracing::debug!(?_addr, "accepted new connection");
170 let conn = Connection {
171 consensus: self.consensus.clone(),
172 mempool: self.mempool.clone(),
173 info: self.info.clone(),
174 snapshot: self.snapshot.clone(),
175 };
176 let (read, write) = socket.into_split();
177 tokio::spawn(async move { conn.run(read, write).await.unwrap() });
178 }
179 Err(e) => {
180 tracing::error!({ %e }, "error accepting new connection");
181 }
182 }
183 }
184 }
185}
186
187struct Connection<C, M, I, S> {
188 consensus: C,
189 mempool: M,
190 info: I,
191 snapshot: S,
192}
193
194impl<C, M, I, S> Connection<C, M, I, S>
195where
196 C: Service<ConsensusRequest, Response = ConsensusResponse, Error = BoxError> + Send + 'static,
197 C::Future: Send + 'static,
198 M: Service<MempoolRequest, Response = MempoolResponse, Error = BoxError> + Send + 'static,
199 M::Future: Send + 'static,
200 I: Service<InfoRequest, Response = InfoResponse, Error = BoxError> + Send + 'static,
201 I::Future: Send + 'static,
202 S: Service<SnapshotRequest, Response = SnapshotResponse, Error = BoxError> + Send + 'static,
203 S::Future: Send + 'static,
204{
205 async fn run(
208 mut self,
209 read: impl AsyncReadExt + std::marker::Unpin,
210 write: impl AsyncWriteExt + std::marker::Unpin,
211 ) -> Result<(), BoxError> {
212 tracing::info!("listening for requests");
213
214 use tendermint_proto::v0_34::abci as pb;
215
216 let (mut request_stream, mut response_sink) = {
217 use crate::v034::codec::{Decode, Encode};
218 (
219 FramedRead::new(read, Decode::<pb::Request>::default()),
220 FramedWrite::new(write, Encode::<pb::Response>::default()),
221 )
222 };
223
224 let mut responses = FuturesOrdered::new();
225
226 loop {
227 select! {
228 req = request_stream.next() => {
229 let proto = match req.transpose()? {
230 Some(proto) => proto,
231 None => return Ok(()),
232 };
233 let request = Request::try_from(proto)?;
234 tracing::debug!(?request, "new request");
235 match request.kind() {
236 MethodKind::Consensus => {
237 let request = request.try_into().expect("checked kind");
238 let response = self.consensus.ready().await?.call(request);
239 responses.push_back(response.map_ok(Response::from).boxed());
241 }
242 MethodKind::Mempool => {
243 let request = request.try_into().expect("checked kind");
244 let response = self.mempool.ready().await?.call(request);
245 responses.push_back(response.map_ok(Response::from).boxed());
246 }
247 MethodKind::Snapshot => {
248 let request = request.try_into().expect("checked kind");
249 let response = self.snapshot.ready().await?.call(request);
250 responses.push_back(response.map_ok(Response::from).boxed());
251 }
252 MethodKind::Info => {
253 let request = request.try_into().expect("checked kind");
254 let response = self.info.ready().await?.call(request);
255 responses.push_back(response.map_ok(Response::from).boxed());
256 }
257 MethodKind::Flush => {
258 tracing::debug!(responses.len = responses.len(), "flushing responses");
261 while let Some(response) = responses.next().await {
262 tracing::debug!(?response, "flushing response");
265 response_sink.send(response?.into()).await?;
266 }
267 response_sink.send(Response::Flush.into()).await?;
269 }
270 }
271 }
272 rsp = responses.next(), if !responses.is_empty() => {
273 let response = rsp.expect("didn't poll when responses was empty");
274 tracing::debug!(?response, "sending response");
277 response_sink.send(response?.into()).await?;
278 }
279 }
280 }
281 }
282}