1use std::task::{Context, Poll};
27
28use tower::Service;
29
30use crate::{buffer4::Buffer, BoxError};
31use tendermint::v0_37::abci::{
32 ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
33 MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
34};
35
36pub fn service<S>(service: S, bound: usize) -> (Consensus<S>, Mempool<S>, Snapshot<S>, Info<S>)
46where
47 S: Service<Request, Response = Response, Error = BoxError> + Send + 'static,
48 S::Future: Send + 'static,
49{
50 let bound = std::cmp::max(1, bound);
51 let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound);
52
53 (
54 Consensus { inner: buffer1 },
55 Mempool { inner: buffer2 },
56 Snapshot { inner: buffer3 },
57 Info { inner: buffer4 },
58 )
59}
60
61pub struct Consensus<S>
63where
64 S: Service<Request, Response = Response, Error = BoxError>,
65{
66 inner: Buffer<S, Request>,
67}
68
69impl<S> Clone for Consensus<S>
71where
72 S: Service<Request, Response = Response, Error = BoxError>,
73{
74 fn clone(&self) -> Self {
75 Self {
76 inner: self.inner.clone(),
77 }
78 }
79}
80
81impl<S> Service<ConsensusRequest> for Consensus<S>
82where
83 S: Service<Request, Response = Response, Error = BoxError>,
84{
85 type Response = ConsensusResponse;
86 type Error = BoxError;
87 type Future = futures::ConsensusFuture<S>;
88
89 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90 self.inner.poll_ready(cx)
91 }
92
93 fn call(&mut self, req: ConsensusRequest) -> Self::Future {
94 futures::ConsensusFuture {
95 inner: self.inner.call(req.into()),
96 }
97 }
98}
99
100pub struct Mempool<S>
102where
103 S: Service<Request, Response = Response, Error = BoxError>,
104{
105 inner: Buffer<S, Request>,
106}
107
108impl<S> Clone for Mempool<S>
110where
111 S: Service<Request, Response = Response, Error = BoxError>,
112{
113 fn clone(&self) -> Self {
114 Self {
115 inner: self.inner.clone(),
116 }
117 }
118}
119
120impl<S> Service<MempoolRequest> for Mempool<S>
121where
122 S: Service<Request, Response = Response, Error = BoxError>,
123{
124 type Response = MempoolResponse;
125 type Error = BoxError;
126 type Future = futures::MempoolFuture<S>;
127
128 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129 self.inner.poll_ready(cx)
130 }
131
132 fn call(&mut self, req: MempoolRequest) -> Self::Future {
133 futures::MempoolFuture {
134 inner: self.inner.call(req.into()),
135 }
136 }
137}
138
139pub struct Info<S>
141where
142 S: Service<Request, Response = Response, Error = BoxError>,
143{
144 inner: Buffer<S, Request>,
145}
146
147impl<S> Clone for Info<S>
149where
150 S: Service<Request, Response = Response, Error = BoxError>,
151{
152 fn clone(&self) -> Self {
153 Self {
154 inner: self.inner.clone(),
155 }
156 }
157}
158
159impl<S> Service<InfoRequest> for Info<S>
160where
161 S: Service<Request, Response = Response, Error = BoxError>,
162{
163 type Response = InfoResponse;
164 type Error = BoxError;
165 type Future = futures::InfoFuture<S>;
166
167 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168 self.inner.poll_ready(cx)
169 }
170
171 fn call(&mut self, req: InfoRequest) -> Self::Future {
172 futures::InfoFuture {
173 inner: self.inner.call(req.into()),
174 }
175 }
176}
177
178pub struct Snapshot<S>
180where
181 S: Service<Request, Response = Response, Error = BoxError>,
182{
183 inner: Buffer<S, Request>,
184}
185
186impl<S> Clone for Snapshot<S>
188where
189 S: Service<Request, Response = Response, Error = BoxError>,
190{
191 fn clone(&self) -> Self {
192 Self {
193 inner: self.inner.clone(),
194 }
195 }
196}
197
198impl<S> Service<SnapshotRequest> for Snapshot<S>
199where
200 S: Service<Request, Response = Response, Error = BoxError>,
201{
202 type Response = SnapshotResponse;
203 type Error = BoxError;
204 type Future = futures::SnapshotFuture<S>;
205
206 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207 self.inner.poll_ready(cx)
208 }
209
210 fn call(&mut self, req: SnapshotRequest) -> Self::Future {
211 futures::SnapshotFuture {
212 inner: self.inner.call(req.into()),
213 }
214 }
215}
216
217pub mod futures {
228 use pin_project::pin_project;
229 use std::{convert::TryInto, future::Future, pin::Pin};
230
231 use super::*;
232
233 #[pin_project]
234 pub struct ConsensusFuture<S>
235 where
236 S: Service<Request, Response = Response, Error = BoxError>,
237 {
238 #[pin]
239 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
240 }
241
242 impl<S> Future for ConsensusFuture<S>
243 where
244 S: Service<Request, Response = Response, Error = BoxError>,
245 {
246 type Output = Result<ConsensusResponse, BoxError>;
247
248 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249 let this = self.project();
250 match this.inner.poll(cx) {
251 Poll::Ready(rsp) => Poll::Ready(
252 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
253 ),
254 Poll::Pending => Poll::Pending,
255 }
256 }
257 }
258
259 #[pin_project]
260 pub struct MempoolFuture<S>
261 where
262 S: Service<Request, Response = Response, Error = BoxError>,
263 {
264 #[pin]
265 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
266 }
267
268 impl<S> Future for MempoolFuture<S>
269 where
270 S: Service<Request, Response = Response, Error = BoxError>,
271 {
272 type Output = Result<MempoolResponse, BoxError>;
273
274 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
275 let this = self.project();
276 match this.inner.poll(cx) {
277 Poll::Ready(rsp) => Poll::Ready(
278 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
279 ),
280 Poll::Pending => Poll::Pending,
281 }
282 }
283 }
284
285 #[pin_project]
286 pub struct InfoFuture<S>
287 where
288 S: Service<Request, Response = Response, Error = BoxError>,
289 {
290 #[pin]
291 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
292 }
293
294 impl<S> Future for InfoFuture<S>
295 where
296 S: Service<Request, Response = Response, Error = BoxError>,
297 {
298 type Output = Result<InfoResponse, BoxError>;
299
300 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
301 let this = self.project();
302 match this.inner.poll(cx) {
303 Poll::Ready(rsp) => Poll::Ready(
304 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
305 ),
306 Poll::Pending => Poll::Pending,
307 }
308 }
309 }
310
311 #[pin_project]
312 pub struct SnapshotFuture<S>
313 where
314 S: Service<Request, Response = Response, Error = BoxError>,
315 {
316 #[pin]
317 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
318 }
319
320 impl<S> Future for SnapshotFuture<S>
321 where
322 S: Service<Request, Response = Response, Error = BoxError>,
323 {
324 type Output = Result<SnapshotResponse, BoxError>;
325
326 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
327 let this = self.project();
328 match this.inner.poll(cx) {
329 Poll::Ready(rsp) => Poll::Ready(
330 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
331 ),
332 Poll::Pending => Poll::Pending,
333 }
334 }
335 }
336}