penumbra_sdk_governance/component/
rpc.rs

1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::Context;
5use async_stream::try_stream;
6use cnidarium::Storage;
7use futures::{StreamExt, TryStreamExt};
8use penumbra_sdk_num::Amount;
9use penumbra_sdk_proto::core::component::governance::v1::AllTalliedDelegatorVotesForProposalRequest;
10use penumbra_sdk_proto::core::component::governance::v1::AllTalliedDelegatorVotesForProposalResponse;
11use penumbra_sdk_proto::core::component::governance::v1::NextProposalIdRequest;
12use penumbra_sdk_proto::core::component::governance::v1::NextProposalIdResponse;
13use penumbra_sdk_proto::core::component::governance::v1::VotingPowerAtProposalStartRequest;
14use penumbra_sdk_proto::core::component::governance::v1::VotingPowerAtProposalStartResponse;
15use penumbra_sdk_proto::{
16    core::component::governance::v1::{
17        query_service_server::QueryService, ProposalDataRequest, ProposalDataResponse,
18        ProposalInfoRequest, ProposalInfoResponse, ProposalListRequest, ProposalListResponse,
19        ProposalRateDataRequest, ProposalRateDataResponse, ValidatorVotesRequest,
20        ValidatorVotesResponse,
21    },
22    StateReadProto,
23};
24use penumbra_sdk_stake::rate::RateData;
25use penumbra_sdk_stake::IdentityKey;
26use tonic::Status;
27use tracing::instrument;
28
29use crate::state_key;
30use crate::Tally;
31use crate::Vote;
32
33use super::StateReadExt;
34
35// TODO: Hide this and only expose a Router?
36pub struct Server {
37    storage: Storage,
38}
39
40impl Server {
41    pub fn new(storage: Storage) -> Self {
42        Self { storage }
43    }
44}
45
46#[tonic::async_trait]
47impl QueryService for Server {
48    #[instrument(skip(self, request))]
49    async fn proposal_info(
50        &self,
51        request: tonic::Request<ProposalInfoRequest>,
52    ) -> Result<tonic::Response<ProposalInfoResponse>, Status> {
53        let state = self.storage.latest_snapshot();
54        let proposal_id = request.into_inner().proposal_id;
55
56        let start_block_height = state
57            .proposal_voting_start(proposal_id)
58            .await
59            .map_err(|e| tonic::Status::internal(e.to_string()))?
60            .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
61
62        let start_position = state
63            .proposal_voting_start_position(proposal_id)
64            .await
65            .map_err(|e| tonic::Status::internal(e.to_string()))?
66            .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
67
68        Ok(tonic::Response::new(ProposalInfoResponse {
69            start_block_height,
70            start_position: start_position.into(),
71        }))
72    }
73
74    #[instrument(skip(self, _request))]
75    async fn next_proposal_id(
76        &self,
77        _request: tonic::Request<NextProposalIdRequest>,
78    ) -> Result<tonic::Response<NextProposalIdResponse>, Status> {
79        let state = self.storage.latest_snapshot();
80
81        let next_proposal_id: u64 = state
82            .get_proto(state_key::next_proposal_id())
83            .await
84            .map_err(|e| tonic::Status::internal(format!("unable to fetch next proposal id: {e}")))?
85            .ok_or_else(|| tonic::Status::not_found("there are no proposals yet".to_string()))?;
86
87        Ok(tonic::Response::new(NextProposalIdResponse {
88            next_proposal_id,
89        }))
90    }
91
92    #[instrument(skip(self, request))]
93    async fn proposal_data(
94        &self,
95        request: tonic::Request<ProposalDataRequest>,
96    ) -> Result<tonic::Response<ProposalDataResponse>, Status> {
97        let state = self.storage.latest_snapshot();
98        let proposal_id = request.into_inner().proposal_id;
99
100        let start_block_height = state
101            .proposal_voting_start(proposal_id)
102            .await
103            .map_err(|e| tonic::Status::internal(e.to_string()))?
104            .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
105
106        let end_block_height = state
107            .proposal_voting_end(proposal_id)
108            .await
109            .map_err(|e| tonic::Status::internal(e.to_string()))?
110            .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
111
112        let start_position = state
113            .proposal_voting_start_position(proposal_id)
114            .await
115            .map_err(|e| tonic::Status::internal(e.to_string()))?
116            .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
117
118        let proposal = state
119            .proposal_definition(proposal_id)
120            .await
121            .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal: {e}")))?
122            .ok_or_else(|| {
123                tonic::Status::not_found(format!("proposal {} not found", proposal_id))
124            })?;
125
126        let proposal_state = state
127            .proposal_state(proposal_id)
128            .await
129            .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal state: {e}")))?
130            .ok_or_else(|| {
131                tonic::Status::not_found(format!("proposal {} state not found", proposal_id))
132            })?;
133
134        let proposal_deposit_amount: Amount = state
135            .get(&state_key::proposal_deposit_amount(proposal_id))
136            .await
137            .map_err(|e| {
138                tonic::Status::internal(format!("unable to fetch proposal deposit amount: {e}"))
139            })?
140            .ok_or_else(|| {
141                tonic::Status::not_found(format!(
142                    "deposit amount for proposal {} was not found",
143                    proposal_id
144                ))
145            })?;
146
147        Ok(tonic::Response::new(ProposalDataResponse {
148            start_block_height,
149            end_block_height,
150            start_position: start_position.into(),
151            state: Some(proposal_state.into()),
152            proposal: Some(proposal.into()),
153            proposal_deposit_amount: Some(proposal_deposit_amount.into()),
154        }))
155    }
156
157    type ProposalRateDataStream = Pin<
158        Box<dyn futures::Stream<Item = Result<ProposalRateDataResponse, tonic::Status>> + Send>,
159    >;
160
161    #[instrument(skip(self, request))]
162    async fn proposal_rate_data(
163        &self,
164        request: tonic::Request<ProposalRateDataRequest>,
165    ) -> Result<tonic::Response<Self::ProposalRateDataStream>, Status> {
166        let state = self.storage.latest_snapshot();
167        let proposal_id = request.into_inner().proposal_id;
168
169        let s = state.prefix(&state_key::all_rate_data_at_proposal_start(proposal_id));
170        Ok(tonic::Response::new(
171            s.map_ok(|i: (String, RateData)| {
172                let (_key, rate_data) = i;
173                ProposalRateDataResponse {
174                    rate_data: Some(rate_data.into()),
175                }
176            })
177            .map_err(|e: anyhow::Error| {
178                tonic::Status::unavailable(format!("error getting prefix value from storage: {e}"))
179            })
180            // TODO: how do we instrument a Stream
181            //.instrument(Span::current())
182            .boxed(),
183        ))
184    }
185
186    type ProposalListStream =
187        Pin<Box<dyn futures::Stream<Item = Result<ProposalListResponse, tonic::Status>> + Send>>;
188
189    #[instrument(skip(self, request))]
190    async fn proposal_list(
191        &self,
192        request: tonic::Request<ProposalListRequest>,
193    ) -> Result<tonic::Response<Self::ProposalListStream>, Status> {
194        let state = self.storage.latest_snapshot();
195
196        let proposal_id_list: Vec<u64> = if request.into_inner().inactive {
197            let next = state.next_proposal_id().await.map_err(|e| {
198                tonic::Status::internal(format!("unable to get next proposal id: {e}"))
199            })?;
200
201            (0..next).collect()
202        } else {
203            state
204                .unfinished_proposals()
205                .await
206                .map_err(|e| {
207                    tonic::Status::internal(format!("unable to fetch unfinished proposals: {e}"))
208                })?
209                .into_iter()
210                .collect::<Vec<_>>()
211        };
212
213        let s = try_stream! {
214            for proposal_id in proposal_id_list {
215            let proposal = state
216                .proposal_definition(proposal_id)
217                .await
218                .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal: {e}")))?
219                .ok_or_else(|| {
220                    tonic::Status::not_found(format!("proposal {} not found", proposal_id))
221                })?;
222
223            let proposal_state = state
224                .proposal_state(proposal_id)
225                .await
226                .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal state: {e}")))?
227                .ok_or_else(|| {
228                    tonic::Status::not_found(format!("proposal {} state not found", proposal_id))
229                })?;
230
231            let proposal_voting_start_position = state
232                .proposal_voting_start_position(proposal_id)
233                .await
234                .map_err(|e| {
235                    tonic::Status::internal(format!(
236                        "unable to fetch proposal voting start position: {e}"
237                    ))
238                })?
239                .ok_or_else(|| {
240                    tonic::Status::not_found(format!(
241                        "voting start position for proposal {} not found",
242                        proposal_id
243                    ))
244                })?;
245
246            let start_block_height = state
247                .proposal_voting_start(proposal_id)
248                .await
249                .map_err(|e| {
250                    tonic::Status::internal(format!(
251                        "unable to fetch proposal voting start block: {e}"
252                    ))
253                })?
254                .ok_or_else(|| {
255                    tonic::Status::not_found(format!(
256                        "voting start block for proposal {} not found",
257                        proposal_id
258                    ))
259                })?;
260
261            let end_block_height = state
262                .proposal_voting_end(proposal_id)
263                .await
264                .map_err(|e| tonic::Status::internal(e.to_string()))?
265                .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
266
267            yield ProposalListResponse {
268                proposal: Some(proposal.into()),
269                start_block_height,
270                end_block_height,
271                start_position: proposal_voting_start_position.into(),
272                state: Some(proposal_state.into()),
273            }
274        }};
275
276        Ok(tonic::Response::new(
277            s.map_err(|e: anyhow::Error| {
278                tonic::Status::unavailable(format!(
279                    "error getting position value from storage: {e}"
280                ))
281            })
282            // TODO: how do we instrument a Stream
283            //.instrument(Span::current())
284            .boxed(),
285        ))
286    }
287
288    type ValidatorVotesStream =
289        Pin<Box<dyn futures::Stream<Item = Result<ValidatorVotesResponse, tonic::Status>> + Send>>;
290
291    #[instrument(skip(self, request))]
292    async fn validator_votes(
293        &self,
294        request: tonic::Request<ValidatorVotesRequest>,
295    ) -> Result<tonic::Response<Self::ValidatorVotesStream>, Status> {
296        let state = self.storage.latest_snapshot();
297
298        let proposal_id = request.into_inner().proposal_id;
299
300        let s = state
301            .prefix::<Vote>(&state_key::all_validator_votes_for_proposal(proposal_id))
302            .and_then(|r| async move {
303                Ok((
304                    IdentityKey::from_str(r.0.rsplit('/').next().context("invalid key")?)?,
305                    r.1,
306                ))
307            })
308            .map_ok(|i: (IdentityKey, Vote)| ValidatorVotesResponse {
309                vote: Some(i.1.into()),
310                identity_key: Some(i.0.into()),
311            });
312
313        Ok(tonic::Response::new(
314            s.map_err(|e: anyhow::Error| {
315                tonic::Status::unavailable(format!(
316                    "error getting validator votes from storage: {e}"
317                ))
318            })
319            // TODO: how do we instrument a Stream
320            //.instrument(Span::current())
321            .boxed(),
322        ))
323    }
324
325    #[instrument(skip(self, request))]
326    async fn voting_power_at_proposal_start(
327        &self,
328        request: tonic::Request<VotingPowerAtProposalStartRequest>,
329    ) -> Result<tonic::Response<VotingPowerAtProposalStartResponse>, Status> {
330        let state = self.storage.latest_snapshot();
331        let request = request.into_inner();
332        let proposal_id = request.proposal_id;
333        if let Some(identity_key) = request.identity_key {
334            // If the query is for a specific validator, return their voting power at the start of
335            // the proposal
336            let identity_key = identity_key.try_into().map_err(|_| {
337                tonic::Status::invalid_argument(
338                    "identity key in request was bad protobuf".to_string(),
339                )
340            })?;
341
342            let voting_power = state
343                .get_proto::<u64>(&state_key::voting_power_at_proposal_start(
344                    proposal_id,
345                    identity_key,
346                ))
347                .await
348                .map_err(|e| tonic::Status::internal(format!("error accessing storage: {}", e)))?;
349
350            if voting_power.is_none() {
351                return Err(tonic::Status::not_found(format!(
352                    "validator did not exist at proposal creation: {}",
353                    identity_key
354                )));
355            }
356
357            Ok(tonic::Response::new(VotingPowerAtProposalStartResponse {
358                voting_power: voting_power.expect("voting power should be set"),
359            }))
360        } else {
361            // If the query is for the total voting power at the start of the proposal, return that
362            let total_voting_power = state
363                .total_voting_power_at_proposal_start(proposal_id)
364                .await
365                .map_err(|e| tonic::Status::internal(format!("error accessing storage: {}", e)))?;
366
367            Ok(tonic::Response::new(VotingPowerAtProposalStartResponse {
368                voting_power: total_voting_power,
369            }))
370        }
371    }
372
373    type AllTalliedDelegatorVotesForProposalStream = Pin<
374        Box<
375            dyn futures::Stream<
376                    Item = Result<AllTalliedDelegatorVotesForProposalResponse, tonic::Status>,
377                > + Send,
378        >,
379    >;
380
381    #[instrument(skip(self, request))]
382    async fn all_tallied_delegator_votes_for_proposal(
383        &self,
384        request: tonic::Request<AllTalliedDelegatorVotesForProposalRequest>,
385    ) -> Result<tonic::Response<Self::AllTalliedDelegatorVotesForProposalStream>, Status> {
386        let state = self.storage.latest_snapshot();
387        let proposal_id = request.into_inner().proposal_id;
388
389        let s = state.prefix::<Tally>(&state_key::all_tallied_delegator_votes_for_proposal(
390            proposal_id,
391        ));
392        Ok(tonic::Response::new(
393            s.and_then(|r| async move {
394                Ok((
395                    IdentityKey::from_str(r.0.rsplit('/').next().context("invalid key")?)?,
396                    r.1,
397                ))
398            })
399            .map_err(|e| {
400                tonic::Status::internal(format!("unable to retrieve tallied delegator votes: {e}"))
401            })
402            .map_ok(
403                |i: (IdentityKey, Tally)| AllTalliedDelegatorVotesForProposalResponse {
404                    tally: Some(i.1.into()),
405                    identity_key: Some(i.0.into()),
406                },
407            )
408            // TODO: how do we instrument a Stream
409            //.instrument(Span::current())
410            .boxed(),
411        ))
412    }
413}