penumbra_sdk_community_pool/component/
rpc.rs1use penumbra_sdk_asset::{asset, Value};
2use penumbra_sdk_proto::core::{
3 asset::v1 as pb,
4 component::community_pool::v1::{
5 query_service_server::QueryService, CommunityPoolAssetBalancesRequest,
6 CommunityPoolAssetBalancesResponse,
7 },
8};
9
10use async_stream::try_stream;
11use futures::{StreamExt, TryStreamExt};
12use std::pin::Pin;
13use tonic::Status;
14use tracing::instrument;
15
16use cnidarium::Storage;
17
18use super::StateReadExt;
19
20pub struct Server {
21 storage: Storage,
22}
23
24impl Server {
25 pub fn new(storage: Storage) -> Self {
26 Self { storage }
27 }
28}
29
30#[tonic::async_trait]
31impl QueryService for Server {
32 type CommunityPoolAssetBalancesStream = Pin<
35 Box<
36 dyn futures::Stream<Item = Result<CommunityPoolAssetBalancesResponse, tonic::Status>>
37 + Send,
38 >,
39 >;
40
41 #[instrument(skip(self, request))]
42 async fn community_pool_asset_balances(
43 &self,
44 request: tonic::Request<CommunityPoolAssetBalancesRequest>,
45 ) -> Result<tonic::Response<Self::CommunityPoolAssetBalancesStream>, Status> {
46 let state = self.storage.latest_snapshot();
47 let request = request.into_inner();
48
49 let asset_ids = request
51 .asset_ids
52 .into_iter()
53 .map(asset::Id::try_from)
54 .collect::<anyhow::Result<Vec<_>>>()
55 .map_err(|_| tonic::Status::invalid_argument("failed to parse asset filter"))?;
56
57 let asset_balances = state.community_pool_balance().await.or_else(|_| {
59 Err(tonic::Status::internal(
60 "failed to find community pool balances",
61 ))
62 })?;
63
64 let s = try_stream! {
65 for (asset_id, amount) in asset_balances {
66 let v = Value { asset_id, amount };
67 if asset_ids.is_empty() || asset_ids.contains(&asset_id){
69 yield pb::Value::from(v);
70 }
71 }
72 };
73 Ok(tonic::Response::new(
74 s.map_ok(|value| CommunityPoolAssetBalancesResponse {
75 balance: Some(value),
76 })
77 .map_err(|e: anyhow::Error| {
78 tonic::Status::unavailable(format!(
79 "error getting balances for community pool: {e}"
80 ))
81 })
82 .boxed(),
83 ))
84 }
85}