tower_abci/v038/
split.rs

1//! Splits a single [`Service`] implementing all of ABCI into four cloneable
2//! component services, each implementing one category of ABCI requests.
3//!
4//! The component services share access to the main service via message-passing
5//! over buffered channels. This means that the component services can be cloned
6//! to provide shared access to the ABCI application, which processes
7//! requests sequentially with the following prioritization:
8//!
9//! 1. [`ConsensusRequest`]s sent to the [`Consensus`] service;
10//! 2. [`MempoolRequest`]s sent to the [`Mempool`] service;
11//! 3. [`SnapshotRequest`]s sent to the [`Snapshot`] service;
12//! 4. [`InfoRequest`]s sent to the [`Info`] service.
13//!
14//! The ABCI service can execute these requests synchronously, in
15//! [`Service::call`](tower::Service::call), or asynchronously, by immediately
16//! returning a future that will be executed on the caller's task. Or, it can
17//! split the difference and perform some amount of synchronous work and defer
18//! the rest to be performed asynchronously.
19//!
20//! Because each category of requests is handled by a different service, request
21//! behavior can be customized on a per-category basis using Tower
22//! [`Layer`](tower::Layer)s. For instance, load-shedding can be added to
23//! [`InfoRequest`]s but not [`ConsensusRequest`]s, or different categories can
24//! have different timeout policies, or different types of instrumentation.
25
26use std::task::{Context, Poll};
27
28use tower::Service;
29
30use crate::{buffer4::Buffer, BoxError};
31use tendermint::v0_38::abci::{
32    ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
33    MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
34};
35
36/// Splits a single `service` implementing all of ABCI into four cloneable
37/// component services, each implementing one category of ABCI requests. See the
38/// module documentation for details.
39///
40/// The `bound` parameter bounds the size of each component's request queue. For
41/// the same reason as in Tower's [`Buffer`](tower::buffer::Buffer) middleware,
42/// it's advisable to set the `bound` to be at least the number of concurrent
43/// requests to the component services. However, large buffers hide backpressure
44/// from propagating to the caller.
45pub fn service<S>(service: S, bound: usize) -> (Consensus<S>, Mempool<S>, Snapshot<S>, Info<S>)
46where
47    S: Service<Request, Response = Response, Error = BoxError> + Send + 'static,
48    S::Future: Send + 'static,
49{
50    let bound = std::cmp::max(1, bound);
51    let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound);
52
53    (
54        Consensus { inner: buffer1 },
55        Mempool { inner: buffer2 },
56        Snapshot { inner: buffer3 },
57        Info { inner: buffer4 },
58    )
59}
60
61/// Forwards consensus requests to a shared backing service.
62pub struct Consensus<S>
63where
64    S: Service<Request, Response = Response, Error = BoxError>,
65{
66    inner: Buffer<S, Request>,
67}
68
69// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
70impl<S> Clone for Consensus<S>
71where
72    S: Service<Request, Response = Response, Error = BoxError>,
73{
74    fn clone(&self) -> Self {
75        Self {
76            inner: self.inner.clone(),
77        }
78    }
79}
80
81impl<S> Service<ConsensusRequest> for Consensus<S>
82where
83    S: Service<Request, Response = Response, Error = BoxError>,
84{
85    type Response = ConsensusResponse;
86    type Error = BoxError;
87    type Future = futures::ConsensusFuture<S>;
88
89    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90        self.inner.poll_ready(cx)
91    }
92
93    fn call(&mut self, req: ConsensusRequest) -> Self::Future {
94        futures::ConsensusFuture {
95            inner: self.inner.call(req.into()),
96        }
97    }
98}
99
100/// Forwards mempool requests to a shared backing service.
101pub struct Mempool<S>
102where
103    S: Service<Request, Response = Response, Error = BoxError>,
104{
105    inner: Buffer<S, Request>,
106}
107
108// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
109impl<S> Clone for Mempool<S>
110where
111    S: Service<Request, Response = Response, Error = BoxError>,
112{
113    fn clone(&self) -> Self {
114        Self {
115            inner: self.inner.clone(),
116        }
117    }
118}
119
120impl<S> Service<MempoolRequest> for Mempool<S>
121where
122    S: Service<Request, Response = Response, Error = BoxError>,
123{
124    type Response = MempoolResponse;
125    type Error = BoxError;
126    type Future = futures::MempoolFuture<S>;
127
128    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129        self.inner.poll_ready(cx)
130    }
131
132    fn call(&mut self, req: MempoolRequest) -> Self::Future {
133        futures::MempoolFuture {
134            inner: self.inner.call(req.into()),
135        }
136    }
137}
138
139/// Forwards info requests to a shared backing service.
140pub struct Info<S>
141where
142    S: Service<Request, Response = Response, Error = BoxError>,
143{
144    inner: Buffer<S, Request>,
145}
146
147// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
148impl<S> Clone for Info<S>
149where
150    S: Service<Request, Response = Response, Error = BoxError>,
151{
152    fn clone(&self) -> Self {
153        Self {
154            inner: self.inner.clone(),
155        }
156    }
157}
158
159impl<S> Service<InfoRequest> for Info<S>
160where
161    S: Service<Request, Response = Response, Error = BoxError>,
162{
163    type Response = InfoResponse;
164    type Error = BoxError;
165    type Future = futures::InfoFuture<S>;
166
167    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168        self.inner.poll_ready(cx)
169    }
170
171    fn call(&mut self, req: InfoRequest) -> Self::Future {
172        futures::InfoFuture {
173            inner: self.inner.call(req.into()),
174        }
175    }
176}
177
178/// Forwards snapshot requests to a shared backing service.
179pub struct Snapshot<S>
180where
181    S: Service<Request, Response = Response, Error = BoxError>,
182{
183    inner: Buffer<S, Request>,
184}
185
186// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
187impl<S> Clone for Snapshot<S>
188where
189    S: Service<Request, Response = Response, Error = BoxError>,
190{
191    fn clone(&self) -> Self {
192        Self {
193            inner: self.inner.clone(),
194        }
195    }
196}
197
198impl<S> Service<SnapshotRequest> for Snapshot<S>
199where
200    S: Service<Request, Response = Response, Error = BoxError>,
201{
202    type Response = SnapshotResponse;
203    type Error = BoxError;
204    type Future = futures::SnapshotFuture<S>;
205
206    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207        self.inner.poll_ready(cx)
208    }
209
210    fn call(&mut self, req: SnapshotRequest) -> Self::Future {
211        futures::SnapshotFuture {
212            inner: self.inner.call(req.into()),
213        }
214    }
215}
216
217// this is all "necessary" only because rust does not support full GATs or allow
218// specifying a concrete (but unnameable) associated type using impl Trait.
219// this means that Tower services either have to have handwritten futures
220// or box the futures they return.  Boxing a few futures is not really a big deal
221// but it's nice to avoid deeply nested boxes arising from service combinators like
222// these ones.
223
224// https://github.com/rust-lang/rust/issues/63063 fixes this
225/// Futures types.
226pub mod futures {
227    use pin_project::pin_project;
228    use std::{convert::TryInto, future::Future, pin::Pin};
229
230    use super::*;
231
232    #[pin_project]
233    pub struct ConsensusFuture<S>
234    where
235        S: Service<Request, Response = Response, Error = BoxError>,
236    {
237        #[pin]
238        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
239    }
240
241    impl<S> Future for ConsensusFuture<S>
242    where
243        S: Service<Request, Response = Response, Error = BoxError>,
244    {
245        type Output = Result<ConsensusResponse, BoxError>;
246
247        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248            let this = self.project();
249            match this.inner.poll(cx) {
250                Poll::Ready(rsp) => Poll::Ready(
251                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
252                ),
253                Poll::Pending => Poll::Pending,
254            }
255        }
256    }
257
258    #[pin_project]
259    pub struct MempoolFuture<S>
260    where
261        S: Service<Request, Response = Response, Error = BoxError>,
262    {
263        #[pin]
264        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
265    }
266
267    impl<S> Future for MempoolFuture<S>
268    where
269        S: Service<Request, Response = Response, Error = BoxError>,
270    {
271        type Output = Result<MempoolResponse, BoxError>;
272
273        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274            let this = self.project();
275            match this.inner.poll(cx) {
276                Poll::Ready(rsp) => Poll::Ready(
277                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
278                ),
279                Poll::Pending => Poll::Pending,
280            }
281        }
282    }
283
284    #[pin_project]
285    pub struct InfoFuture<S>
286    where
287        S: Service<Request, Response = Response, Error = BoxError>,
288    {
289        #[pin]
290        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
291    }
292
293    impl<S> Future for InfoFuture<S>
294    where
295        S: Service<Request, Response = Response, Error = BoxError>,
296    {
297        type Output = Result<InfoResponse, BoxError>;
298
299        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
300            let this = self.project();
301            match this.inner.poll(cx) {
302                Poll::Ready(rsp) => Poll::Ready(
303                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
304                ),
305                Poll::Pending => Poll::Pending,
306            }
307        }
308    }
309
310    #[pin_project]
311    pub struct SnapshotFuture<S>
312    where
313        S: Service<Request, Response = Response, Error = BoxError>,
314    {
315        #[pin]
316        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
317    }
318
319    impl<S> Future for SnapshotFuture<S>
320    where
321        S: Service<Request, Response = Response, Error = BoxError>,
322    {
323        type Output = Result<SnapshotResponse, BoxError>;
324
325        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
326            let this = self.project();
327            match this.inner.poll(cx) {
328                Poll::Ready(rsp) => Poll::Ready(
329                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
330                ),
331                Poll::Pending => Poll::Pending,
332            }
333        }
334    }
335}