penumbra_sdk_proto/
box_grpc_svc.rs

1use bytes::Bytes;
2use http_body::Body;
3use http_body_util::{combinators::UnsyncBoxBody, BodyExt as _};
4use tonic::{body::BoxBody as ReqBody, codegen::http as grpc, transport::Endpoint};
5use tower::{util::BoxCloneService, Service, ServiceBuilder};
6
7/// A type-erased gRPC service.
8pub type BoxGrpcService =
9    BoxCloneService<grpc::Request<ReqBody>, grpc::Response<RspBody>, BoxError>;
10
11pub type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
12
13/// A type-erased gRPC response [`Body`].
14pub type RspBody = UnsyncBoxBody<Bytes, BoxError>;
15
16/// Connects to the provided tonic [`Endpoint`], returning a [`BoxGrpcService`].
17pub async fn connect(ep: Endpoint) -> anyhow::Result<BoxGrpcService> {
18    let conn = ep.connect().await?;
19    let svc = ServiceBuilder::new()
20        .map_response(|rsp: grpc::Response<tonic::body::BoxBody>| rsp.map(box_rsp_body))
21        .map_err(BoxError::from)
22        .service(conn);
23    Ok(BoxCloneService::new(svc))
24}
25
26/// Constructs a [`BoxGrpcService`] by erasing the type of an `S`-typed local
27/// (in-process) service instance.
28pub fn local<S, B>(svc: S) -> BoxGrpcService
29where
30    S: Service<grpc::Request<ReqBody>, Response = grpc::Response<B>>,
31    S: Clone + Send + Sync + 'static,
32    S::Error: 'static,
33    S::Future: Send,
34    BoxError: From<S::Error> + From<B::Error>,
35    B: Body<Data = Bytes> + Send + 'static,
36{
37    let svc = ServiceBuilder::new()
38        .map_response(|rsp: grpc::Response<B>| rsp.map(box_rsp_body))
39        .map_err(BoxError::from)
40        .service(svc);
41    BoxCloneService::new(svc)
42}
43
44/// Erases a response body's `Error` type, returning a `RspBody`.
45fn box_rsp_body<B>(body: B) -> RspBody
46where
47    B: Body<Data = Bytes> + Send + 'static,
48    BoxError: From<B::Error>,
49    B::Error: 'static,
50{
51    body.map_err(BoxError::from).boxed_unsync()
52}