use std::task::{Context, Poll};
use tower::Service;
use crate::{buffer4::Buffer, BoxError};
use tendermint::v0_34::abci::{
ConsensusRequest, ConsensusResponse, InfoRequest, InfoResponse, MempoolRequest,
MempoolResponse, Request, Response, SnapshotRequest, SnapshotResponse,
};
pub fn service<S>(service: S, bound: usize) -> (Consensus<S>, Mempool<S>, Snapshot<S>, Info<S>)
where
S: Service<Request, Response = Response, Error = BoxError> + Send + 'static,
S::Future: Send + 'static,
{
let bound = std::cmp::max(1, bound);
let (buffer1, buffer2, buffer3, buffer4) = Buffer::new(service, bound);
(
Consensus { inner: buffer1 },
Mempool { inner: buffer2 },
Snapshot { inner: buffer3 },
Info { inner: buffer4 },
)
}
pub struct Consensus<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
inner: Buffer<S, Request>,
}
impl<S> Clone for Consensus<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S> Service<ConsensusRequest> for Consensus<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Response = ConsensusResponse;
type Error = BoxError;
type Future = futures::ConsensusFuture<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: ConsensusRequest) -> Self::Future {
futures::ConsensusFuture {
inner: self.inner.call(req.into()),
}
}
}
pub struct Mempool<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
inner: Buffer<S, Request>,
}
impl<S> Clone for Mempool<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S> Service<MempoolRequest> for Mempool<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Response = MempoolResponse;
type Error = BoxError;
type Future = futures::MempoolFuture<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: MempoolRequest) -> Self::Future {
futures::MempoolFuture {
inner: self.inner.call(req.into()),
}
}
}
pub struct Info<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
inner: Buffer<S, Request>,
}
impl<S> Clone for Info<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S> Service<InfoRequest> for Info<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Response = InfoResponse;
type Error = BoxError;
type Future = futures::InfoFuture<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: InfoRequest) -> Self::Future {
futures::InfoFuture {
inner: self.inner.call(req.into()),
}
}
}
pub struct Snapshot<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
inner: Buffer<S, Request>,
}
impl<S> Clone for Snapshot<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
impl<S> Service<SnapshotRequest> for Snapshot<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Response = SnapshotResponse;
type Error = BoxError;
type Future = futures::SnapshotFuture<S>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}
fn call(&mut self, req: SnapshotRequest) -> Self::Future {
futures::SnapshotFuture {
inner: self.inner.call(req.into()),
}
}
}
pub mod futures {
use pin_project::pin_project;
use std::{convert::TryInto, future::Future, pin::Pin};
use super::*;
#[pin_project]
pub struct ConsensusFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
#[pin]
pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
}
impl<S> Future for ConsensusFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Output = Result<ConsensusResponse, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(rsp) => Poll::Ready(
rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
),
Poll::Pending => Poll::Pending,
}
}
}
#[pin_project]
pub struct MempoolFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
#[pin]
pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
}
impl<S> Future for MempoolFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Output = Result<MempoolResponse, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(rsp) => Poll::Ready(
rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
),
Poll::Pending => Poll::Pending,
}
}
}
#[pin_project]
pub struct InfoFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
#[pin]
pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
}
impl<S> Future for InfoFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Output = Result<InfoResponse, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(rsp) => Poll::Ready(
rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
),
Poll::Pending => Poll::Pending,
}
}
}
#[pin_project]
pub struct SnapshotFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
#[pin]
pub(super) inner: <Buffer<S, Request> as Service<Request>>::Future,
}
impl<S> Future for SnapshotFuture<S>
where
S: Service<Request, Response = Response, Error = BoxError>,
{
type Output = Result<SnapshotResponse, BoxError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.inner.poll(cx) {
Poll::Ready(rsp) => Poll::Ready(
rsp.map(|rsp| rsp.try_into().expect("service gave wrong response type")),
),
Poll::Pending => Poll::Pending,
}
}
}
}