tower_abci/v037/
split.rs

1//! Splits a single [`Service`] implementing all of ABCI into four cloneable
2//! component services, each implementing one category of ABCI requests.
3//!
4//! The component services share access to the main service via message-passing
5//! over buffered channels. This means that the component services can be cloned
6//! to provide shared access to the ABCI application, which processes
7//! requests sequentially with the following prioritization:
8//!
9//! 1. [`ConsensusRequest`]s sent to the [`Consensus`] service;
10//! 2. [`MempoolRequest`]s sent to the [`Mempool`] service;
11//! 3. [`SnapshotRequest`]s sent to the [`Snapshot`] service;
12//! 4. [`InfoRequest`]s sent to the [`Info`] service.
13//!
14//! The ABCI service can execute these requests synchronously, in
15//! [`Service::call`](tower::Service::call), or asynchronously, by immediately
16//! returning a future that will be executed on the caller's task. Or, it can
17//! split the difference and perform some amount of synchronous work and defer
18//! the rest to be performed asynchronously.
19//!
20//! Because each category of requests is handled by a different service, request
21//! behavior can be customized on a per-category basis using Tower
22//! [`Layer`](tower::Layer)s. For instance, load-shedding can be added to
23//! [`InfoRequest`]s but not [`ConsensusRequest`]s, or different categories can
24//! have different timeout policies, or different types of instrumentation.
25
26use std::task::{Context, Poll};
27
28use tower::Service;
29
30use crate::{buffer4::Buffer, BoxError};
31use tendermint::v0_37::abci::{
32    ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
33    MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
34};
35
36/// Splits a single `service` implementing all of ABCI into four cloneable
37/// component services, each implementing one category of ABCI requests. See the
38/// module documentation for details.
39///
40/// The `bound` parameter bounds the size of each component's request queue. For
41/// the same reason as in Tower's [`Buffer`](tower::buffer::Buffer) middleware,
42/// it's advisable to set the `bound` to be at least the number of concurrent
43/// requests to the component services. However, large buffers hide backpressure
44/// from propagating to the caller.
45pub fn service<S>(service: S, bound: usize) -> (Consensus<S>, Mempool<S>, Snapshot<S>, Info<S>)
46where
47    S: Service<Request, Response = Response, Error = BoxError> + Send + 'static,
48    S::Future: Send + 'static,
49{
50    let bound = std::cmp::max(1, bound);
51    let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound);
52
53    (
54        Consensus { inner: buffer1 },
55        Mempool { inner: buffer2 },
56        Snapshot { inner: buffer3 },
57        Info { inner: buffer4 },
58    )
59}
60
61/// Forwards consensus requests to a shared backing service.
62pub struct Consensus<S>
63where
64    S: Service<Request, Response = Response, Error = BoxError>,
65{
66    inner: Buffer<S, Request>,
67}
68
69// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
70impl<S> Clone for Consensus<S>
71where
72    S: Service<Request, Response = Response, Error = BoxError>,
73{
74    fn clone(&self) -> Self {
75        Self {
76            inner: self.inner.clone(),
77        }
78    }
79}
80
81impl<S> Service<ConsensusRequest> for Consensus<S>
82where
83    S: Service<Request, Response = Response, Error = BoxError>,
84{
85    type Response = ConsensusResponse;
86    type Error = BoxError;
87    type Future = futures::ConsensusFuture<S>;
88
89    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90        self.inner.poll_ready(cx)
91    }
92
93    fn call(&mut self, req: ConsensusRequest) -> Self::Future {
94        futures::ConsensusFuture {
95            inner: self.inner.call(req.into()),
96        }
97    }
98}
99
100/// Forwards mempool requests to a shared backing service.
101pub struct Mempool<S>
102where
103    S: Service<Request, Response = Response, Error = BoxError>,
104{
105    inner: Buffer<S, Request>,
106}
107
108// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
109impl<S> Clone for Mempool<S>
110where
111    S: Service<Request, Response = Response, Error = BoxError>,
112{
113    fn clone(&self) -> Self {
114        Self {
115            inner: self.inner.clone(),
116        }
117    }
118}
119
120impl<S> Service<MempoolRequest> for Mempool<S>
121where
122    S: Service<Request, Response = Response, Error = BoxError>,
123{
124    type Response = MempoolResponse;
125    type Error = BoxError;
126    type Future = futures::MempoolFuture<S>;
127
128    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129        self.inner.poll_ready(cx)
130    }
131
132    fn call(&mut self, req: MempoolRequest) -> Self::Future {
133        futures::MempoolFuture {
134            inner: self.inner.call(req.into()),
135        }
136    }
137}
138
139/// Forwards info requests to a shared backing service.
140pub struct Info<S>
141where
142    S: Service<Request, Response = Response, Error = BoxError>,
143{
144    inner: Buffer<S, Request>,
145}
146
147// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
148impl<S> Clone for Info<S>
149where
150    S: Service<Request, Response = Response, Error = BoxError>,
151{
152    fn clone(&self) -> Self {
153        Self {
154            inner: self.inner.clone(),
155        }
156    }
157}
158
159impl<S> Service<InfoRequest> for Info<S>
160where
161    S: Service<Request, Response = Response, Error = BoxError>,
162{
163    type Response = InfoResponse;
164    type Error = BoxError;
165    type Future = futures::InfoFuture<S>;
166
167    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168        self.inner.poll_ready(cx)
169    }
170
171    fn call(&mut self, req: InfoRequest) -> Self::Future {
172        futures::InfoFuture {
173            inner: self.inner.call(req.into()),
174        }
175    }
176}
177
178/// Forwards snapshot requests to a shared backing service.
179pub struct Snapshot<S>
180where
181    S: Service<Request, Response = Response, Error = BoxError>,
182{
183    inner: Buffer<S, Request>,
184}
185
186// Implementing Clone manually avoids an (incorrect) derived S: Clone bound
187impl<S> Clone for Snapshot<S>
188where
189    S: Service<Request, Response = Response, Error = BoxError>,
190{
191    fn clone(&self) -> Self {
192        Self {
193            inner: self.inner.clone(),
194        }
195    }
196}
197
198impl<S> Service<SnapshotRequest> for Snapshot<S>
199where
200    S: Service<Request, Response = Response, Error = BoxError>,
201{
202    type Response = SnapshotResponse;
203    type Error = BoxError;
204    type Future = futures::SnapshotFuture<S>;
205
206    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207        self.inner.poll_ready(cx)
208    }
209
210    fn call(&mut self, req: SnapshotRequest) -> Self::Future {
211        futures::SnapshotFuture {
212            inner: self.inner.call(req.into()),
213        }
214    }
215}
216
217// this is all "necessary" only because rust does not support full GATs or allow
218// specifying a concrete (but unnameable) associated type using impl Trait.
219// this means that Tower services either have to have handwritten futures
220// or box the futures they return.  Boxing a few futures is not really a big deal
221// but it's nice to avoid deeply nested boxes arising from service combinators like
222// these ones.
223
224// https://github.com/rust-lang/rust/issues/63063 fixes this
225
226/// Futures types.
227pub mod futures {
228    use pin_project::pin_project;
229    use std::{convert::TryInto, future::Future, pin::Pin};
230
231    use super::*;
232
233    #[pin_project]
234    pub struct ConsensusFuture<S>
235    where
236        S: Service<Request, Response = Response, Error = BoxError>,
237    {
238        #[pin]
239        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
240    }
241
242    impl<S> Future for ConsensusFuture<S>
243    where
244        S: Service<Request, Response = Response, Error = BoxError>,
245    {
246        type Output = Result<ConsensusResponse, BoxError>;
247
248        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249            let this = self.project();
250            match this.inner.poll(cx) {
251                Poll::Ready(rsp) => Poll::Ready(
252                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
253                ),
254                Poll::Pending => Poll::Pending,
255            }
256        }
257    }
258
259    #[pin_project]
260    pub struct MempoolFuture<S>
261    where
262        S: Service<Request, Response = Response, Error = BoxError>,
263    {
264        #[pin]
265        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
266    }
267
268    impl<S> Future for MempoolFuture<S>
269    where
270        S: Service<Request, Response = Response, Error = BoxError>,
271    {
272        type Output = Result<MempoolResponse, BoxError>;
273
274        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275            let this = self.project();
276            match this.inner.poll(cx) {
277                Poll::Ready(rsp) => Poll::Ready(
278                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
279                ),
280                Poll::Pending => Poll::Pending,
281            }
282        }
283    }
284
285    #[pin_project]
286    pub struct InfoFuture<S>
287    where
288        S: Service<Request, Response = Response, Error = BoxError>,
289    {
290        #[pin]
291        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
292    }
293
294    impl<S> Future for InfoFuture<S>
295    where
296        S: Service<Request, Response = Response, Error = BoxError>,
297    {
298        type Output = Result<InfoResponse, BoxError>;
299
300        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301            let this = self.project();
302            match this.inner.poll(cx) {
303                Poll::Ready(rsp) => Poll::Ready(
304                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
305                ),
306                Poll::Pending => Poll::Pending,
307            }
308        }
309    }
310
311    #[pin_project]
312    pub struct SnapshotFuture<S>
313    where
314        S: Service<Request, Response = Response, Error = BoxError>,
315    {
316        #[pin]
317        pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
318    }
319
320    impl<S> Future for SnapshotFuture<S>
321    where
322        S: Service<Request, Response = Response, Error = BoxError>,
323    {
324        type Output = Result<SnapshotResponse, BoxError>;
325
326        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
327            let this = self.project();
328            match this.inner.poll(cx) {
329                Poll::Ready(rsp) => Poll::Ready(
330                    rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
331                ),
332                Poll::Pending => Poll::Pending,
333            }
334        }
335    }
336}