penumbra_sdk_community_pool/component/
rpc.rs

1use penumbra_sdk_asset::{asset, Value};
2use penumbra_sdk_proto::core::{
3    asset::v1 as pb,
4    component::community_pool::v1::{
5        query_service_server::QueryService, CommunityPoolAssetBalancesRequest,
6        CommunityPoolAssetBalancesResponse,
7    },
8};
9
10use async_stream::try_stream;
11use futures::{StreamExt, TryStreamExt};
12use std::pin::Pin;
13use tonic::Status;
14use tracing::instrument;
15
16use cnidarium::Storage;
17
18use super::StateReadExt;
19
20pub struct Server {
21    storage: Storage,
22}
23
24impl Server {
25    pub fn new(storage: Storage) -> Self {
26        Self { storage }
27    }
28}
29
30#[tonic::async_trait]
31impl QueryService for Server {
32    /// Stream of asset balance info within the CommunityPool, to satisfy the "repeated"
33    /// field in the protobuf spec.
34    type CommunityPoolAssetBalancesStream = Pin<
35        Box<
36            dyn futures::Stream<Item = Result<CommunityPoolAssetBalancesResponse, tonic::Status>>
37                + Send,
38        >,
39    >;
40
41    #[instrument(skip(self, request))]
42    async fn community_pool_asset_balances(
43        &self,
44        request: tonic::Request<CommunityPoolAssetBalancesRequest>,
45    ) -> Result<tonic::Response<Self::CommunityPoolAssetBalancesStream>, Status> {
46        let state = self.storage.latest_snapshot();
47        let request = request.into_inner();
48
49        // Asset IDs are optional in the req; if none set, return all balances.
50        let asset_ids = request
51            .asset_ids
52            .into_iter()
53            .map(asset::Id::try_from)
54            .collect::<anyhow::Result<Vec<_>>>()
55            .map_err(|_| tonic::Status::invalid_argument("failed to parse asset filter"))?;
56
57        // Get all balances; we can filter later.
58        let asset_balances = state.community_pool_balance().await.or_else(|_| {
59            Err(tonic::Status::internal(
60                "failed to find community pool balances",
61            ))
62        })?;
63
64        let s = try_stream! {
65            for (asset_id, amount) in asset_balances {
66                let v = Value { asset_id, amount };
67                // Check whether a filter was requested
68                if asset_ids.is_empty() || asset_ids.contains(&asset_id){
69                    yield pb::Value::from(v);
70                }
71            }
72        };
73        Ok(tonic::Response::new(
74            s.map_ok(|value| CommunityPoolAssetBalancesResponse {
75                balance: Some(value),
76            })
77            .map_err(|e: anyhow::Error| {
78                tonic::Status::unavailable(format!(
79                    "error getting balances for community pool: {e}"
80                ))
81            })
82            .boxed(),
83        ))
84    }
85}