1use std::task::{Context, Poll};
27
28use tower::Service;
29
30use crate::{buffer4::Buffer, BoxError};
31use tendermint::v0_38::abci::{
32 ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
33 MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
34};
35
36pub fn service<S>(service: S, bound: usize) -> (Consensus<S>, Mempool<S>, Snapshot<S>, Info<S>)
46where
47 S: Service<Request, Response = Response, Error = BoxError> + Send + 'static,
48 S::Future: Send + 'static,
49{
50 let bound = std::cmp::max(1, bound);
51 let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound);
52
53 (
54 Consensus { inner: buffer1 },
55 Mempool { inner: buffer2 },
56 Snapshot { inner: buffer3 },
57 Info { inner: buffer4 },
58 )
59}
60
61pub struct Consensus<S>
63where
64 S: Service<Request, Response = Response, Error = BoxError>,
65{
66 inner: Buffer<S, Request>,
67}
68
69impl<S> Clone for Consensus<S>
71where
72 S: Service<Request, Response = Response, Error = BoxError>,
73{
74 fn clone(&self) -> Self {
75 Self {
76 inner: self.inner.clone(),
77 }
78 }
79}
80
81impl<S> Service<ConsensusRequest> for Consensus<S>
82where
83 S: Service<Request, Response = Response, Error = BoxError>,
84{
85 type Response = ConsensusResponse;
86 type Error = BoxError;
87 type Future = futures::ConsensusFuture<S>;
88
89 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90 self.inner.poll_ready(cx)
91 }
92
93 fn call(&mut self, req: ConsensusRequest) -> Self::Future {
94 futures::ConsensusFuture {
95 inner: self.inner.call(req.into()),
96 }
97 }
98}
99
100pub struct Mempool<S>
102where
103 S: Service<Request, Response = Response, Error = BoxError>,
104{
105 inner: Buffer<S, Request>,
106}
107
108impl<S> Clone for Mempool<S>
110where
111 S: Service<Request, Response = Response, Error = BoxError>,
112{
113 fn clone(&self) -> Self {
114 Self {
115 inner: self.inner.clone(),
116 }
117 }
118}
119
120impl<S> Service<MempoolRequest> for Mempool<S>
121where
122 S: Service<Request, Response = Response, Error = BoxError>,
123{
124 type Response = MempoolResponse;
125 type Error = BoxError;
126 type Future = futures::MempoolFuture<S>;
127
128 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
129 self.inner.poll_ready(cx)
130 }
131
132 fn call(&mut self, req: MempoolRequest) -> Self::Future {
133 futures::MempoolFuture {
134 inner: self.inner.call(req.into()),
135 }
136 }
137}
138
139pub struct Info<S>
141where
142 S: Service<Request, Response = Response, Error = BoxError>,
143{
144 inner: Buffer<S, Request>,
145}
146
147impl<S> Clone for Info<S>
149where
150 S: Service<Request, Response = Response, Error = BoxError>,
151{
152 fn clone(&self) -> Self {
153 Self {
154 inner: self.inner.clone(),
155 }
156 }
157}
158
159impl<S> Service<InfoRequest> for Info<S>
160where
161 S: Service<Request, Response = Response, Error = BoxError>,
162{
163 type Response = InfoResponse;
164 type Error = BoxError;
165 type Future = futures::InfoFuture<S>;
166
167 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
168 self.inner.poll_ready(cx)
169 }
170
171 fn call(&mut self, req: InfoRequest) -> Self::Future {
172 futures::InfoFuture {
173 inner: self.inner.call(req.into()),
174 }
175 }
176}
177
178pub struct Snapshot<S>
180where
181 S: Service<Request, Response = Response, Error = BoxError>,
182{
183 inner: Buffer<S, Request>,
184}
185
186impl<S> Clone for Snapshot<S>
188where
189 S: Service<Request, Response = Response, Error = BoxError>,
190{
191 fn clone(&self) -> Self {
192 Self {
193 inner: self.inner.clone(),
194 }
195 }
196}
197
198impl<S> Service<SnapshotRequest> for Snapshot<S>
199where
200 S: Service<Request, Response = Response, Error = BoxError>,
201{
202 type Response = SnapshotResponse;
203 type Error = BoxError;
204 type Future = futures::SnapshotFuture<S>;
205
206 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
207 self.inner.poll_ready(cx)
208 }
209
210 fn call(&mut self, req: SnapshotRequest) -> Self::Future {
211 futures::SnapshotFuture {
212 inner: self.inner.call(req.into()),
213 }
214 }
215}
216
217pub mod futures {
227 use pin_project::pin_project;
228 use std::{convert::TryInto, future::Future, pin::Pin};
229
230 use super::*;
231
232 #[pin_project]
233 pub struct ConsensusFuture<S>
234 where
235 S: Service<Request, Response = Response, Error = BoxError>,
236 {
237 #[pin]
238 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
239 }
240
241 impl<S> Future for ConsensusFuture<S>
242 where
243 S: Service<Request, Response = Response, Error = BoxError>,
244 {
245 type Output = Result<ConsensusResponse, BoxError>;
246
247 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
248 let this = self.project();
249 match this.inner.poll(cx) {
250 Poll::Ready(rsp) => Poll::Ready(
251 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
252 ),
253 Poll::Pending => Poll::Pending,
254 }
255 }
256 }
257
258 #[pin_project]
259 pub struct MempoolFuture<S>
260 where
261 S: Service<Request, Response = Response, Error = BoxError>,
262 {
263 #[pin]
264 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
265 }
266
267 impl<S> Future for MempoolFuture<S>
268 where
269 S: Service<Request, Response = Response, Error = BoxError>,
270 {
271 type Output = Result<MempoolResponse, BoxError>;
272
273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274 let this = self.project();
275 match this.inner.poll(cx) {
276 Poll::Ready(rsp) => Poll::Ready(
277 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
278 ),
279 Poll::Pending => Poll::Pending,
280 }
281 }
282 }
283
284 #[pin_project]
285 pub struct InfoFuture<S>
286 where
287 S: Service<Request, Response = Response, Error = BoxError>,
288 {
289 #[pin]
290 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
291 }
292
293 impl<S> Future for InfoFuture<S>
294 where
295 S: Service<Request, Response = Response, Error = BoxError>,
296 {
297 type Output = Result<InfoResponse, BoxError>;
298
299 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
300 let this = self.project();
301 match this.inner.poll(cx) {
302 Poll::Ready(rsp) => Poll::Ready(
303 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
304 ),
305 Poll::Pending => Poll::Pending,
306 }
307 }
308 }
309
310 #[pin_project]
311 pub struct SnapshotFuture<S>
312 where
313 S: Service<Request, Response = Response, Error = BoxError>,
314 {
315 #[pin]
316 pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
317 }
318
319 impl<S> Future for SnapshotFuture<S>
320 where
321 S: Service<Request, Response = Response, Error = BoxError>,
322 {
323 type Output = Result<SnapshotResponse, BoxError>;
324
325 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
326 let this = self.project();
327 match this.inner.poll(cx) {
328 Poll::Ready(rsp) => Poll::Ready(
329 rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
330 ),
331 Poll::Pending => Poll::Pending,
332 }
333 }
334 }
335}