1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
use bytes::Bytes;
use http_body::{combinators::UnsyncBoxBody, Body};
use tonic::{
    body::BoxBody as ReqBody,
    codegen::http as grpc,
    transport::{self, Endpoint},
};
use tower::{util::BoxCloneService, Service, ServiceBuilder};

/// A type-erased gRPC service.
pub type BoxGrpcService =
    BoxCloneService<grpc::Request<ReqBody>, grpc::Response<RspBody>, BoxError>;

pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;

/// A type-erased gRPC response [`Body`].
pub type RspBody = UnsyncBoxBody<Bytes, BoxError>;

/// Connects to the provided tonic [`Endpoint`], returning a [`BoxGrpcService`].
pub async fn connect(ep: Endpoint) -> anyhow::Result<BoxGrpcService> {
    let conn = ep.connect().await?;
    let svc = ServiceBuilder::new()
        .map_response(|rsp: grpc::Response<transport::Body>| rsp.map(box_rsp_body))
        .map_err(BoxError::from)
        .service(conn);
    Ok(BoxCloneService::new(svc))
}

/// Constructs a [`BoxGrpcService`] by erasing the type of an `S`-typed local
/// (in-process) service instance.
pub fn local<S, B>(svc: S) -> BoxGrpcService
where
    S: Service<grpc::Request<ReqBody>, Response = grpc::Response<B>>,
    S: Clone + Send + Sync + 'static,
    S::Error: 'static,
    S::Future: Send,
    BoxError: From<S::Error> + From<B::Error>,
    B: Body<Data = Bytes> + Send + 'static,
{
    let svc = ServiceBuilder::new()
        .map_response(|rsp: grpc::Response<B>| rsp.map(box_rsp_body))
        .map_err(BoxError::from)
        .service(svc);
    BoxCloneService::new(svc)
}

/// Erases a response body's `Error` type, returning a `RspBody`.
fn box_rsp_body<B>(body: B) -> RspBody
where
    B: Body<Data = Bytes> + Send + 'static,
    BoxError: From<B::Error>,
    B::Error: 'static,
{
    body.map_err(BoxError::from).boxed_unsync()
}