penumbra_sdk_governance/component/
rpc.rs1use std::pin::Pin;
2use std::str::FromStr;
3
4use anyhow::Context;
5use async_stream::try_stream;
6use cnidarium::Storage;
7use futures::{StreamExt, TryStreamExt};
8use penumbra_sdk_num::Amount;
9use penumbra_sdk_proto::core::component::governance::v1::AllTalliedDelegatorVotesForProposalRequest;
10use penumbra_sdk_proto::core::component::governance::v1::AllTalliedDelegatorVotesForProposalResponse;
11use penumbra_sdk_proto::core::component::governance::v1::NextProposalIdRequest;
12use penumbra_sdk_proto::core::component::governance::v1::NextProposalIdResponse;
13use penumbra_sdk_proto::core::component::governance::v1::VotingPowerAtProposalStartRequest;
14use penumbra_sdk_proto::core::component::governance::v1::VotingPowerAtProposalStartResponse;
15use penumbra_sdk_proto::{
16 core::component::governance::v1::{
17 query_service_server::QueryService, ProposalDataRequest, ProposalDataResponse,
18 ProposalInfoRequest, ProposalInfoResponse, ProposalListRequest, ProposalListResponse,
19 ProposalRateDataRequest, ProposalRateDataResponse, ValidatorVotesRequest,
20 ValidatorVotesResponse,
21 },
22 StateReadProto,
23};
24use penumbra_sdk_stake::rate::RateData;
25use penumbra_sdk_stake::IdentityKey;
26use tonic::Status;
27use tracing::instrument;
28
29use crate::state_key;
30use crate::Tally;
31use crate::Vote;
32
33use super::StateReadExt;
34
35pub struct Server {
37 storage: Storage,
38}
39
40impl Server {
41 pub fn new(storage: Storage) -> Self {
42 Self { storage }
43 }
44}
45
46#[tonic::async_trait]
47impl QueryService for Server {
48 #[instrument(skip(self, request))]
49 async fn proposal_info(
50 &self,
51 request: tonic::Request<ProposalInfoRequest>,
52 ) -> Result<tonic::Response<ProposalInfoResponse>, Status> {
53 let state = self.storage.latest_snapshot();
54 let proposal_id = request.into_inner().proposal_id;
55
56 let start_block_height = state
57 .proposal_voting_start(proposal_id)
58 .await
59 .map_err(|e| tonic::Status::internal(e.to_string()))?
60 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
61
62 let start_position = state
63 .proposal_voting_start_position(proposal_id)
64 .await
65 .map_err(|e| tonic::Status::internal(e.to_string()))?
66 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
67
68 Ok(tonic::Response::new(ProposalInfoResponse {
69 start_block_height,
70 start_position: start_position.into(),
71 }))
72 }
73
74 #[instrument(skip(self, _request))]
75 async fn next_proposal_id(
76 &self,
77 _request: tonic::Request<NextProposalIdRequest>,
78 ) -> Result<tonic::Response<NextProposalIdResponse>, Status> {
79 let state = self.storage.latest_snapshot();
80
81 let next_proposal_id: u64 = state
82 .get_proto(state_key::next_proposal_id())
83 .await
84 .map_err(|e| tonic::Status::internal(format!("unable to fetch next proposal id: {e}")))?
85 .ok_or_else(|| tonic::Status::not_found("there are no proposals yet".to_string()))?;
86
87 Ok(tonic::Response::new(NextProposalIdResponse {
88 next_proposal_id,
89 }))
90 }
91
92 #[instrument(skip(self, request))]
93 async fn proposal_data(
94 &self,
95 request: tonic::Request<ProposalDataRequest>,
96 ) -> Result<tonic::Response<ProposalDataResponse>, Status> {
97 let state = self.storage.latest_snapshot();
98 let proposal_id = request.into_inner().proposal_id;
99
100 let start_block_height = state
101 .proposal_voting_start(proposal_id)
102 .await
103 .map_err(|e| tonic::Status::internal(e.to_string()))?
104 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
105
106 let end_block_height = state
107 .proposal_voting_end(proposal_id)
108 .await
109 .map_err(|e| tonic::Status::internal(e.to_string()))?
110 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
111
112 let start_position = state
113 .proposal_voting_start_position(proposal_id)
114 .await
115 .map_err(|e| tonic::Status::internal(e.to_string()))?
116 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
117
118 let proposal = state
119 .proposal_definition(proposal_id)
120 .await
121 .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal: {e}")))?
122 .ok_or_else(|| {
123 tonic::Status::not_found(format!("proposal {} not found", proposal_id))
124 })?;
125
126 let proposal_state = state
127 .proposal_state(proposal_id)
128 .await
129 .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal state: {e}")))?
130 .ok_or_else(|| {
131 tonic::Status::not_found(format!("proposal {} state not found", proposal_id))
132 })?;
133
134 let proposal_deposit_amount: Amount = state
135 .get(&state_key::proposal_deposit_amount(proposal_id))
136 .await
137 .map_err(|e| {
138 tonic::Status::internal(format!("unable to fetch proposal deposit amount: {e}"))
139 })?
140 .ok_or_else(|| {
141 tonic::Status::not_found(format!(
142 "deposit amount for proposal {} was not found",
143 proposal_id
144 ))
145 })?;
146
147 Ok(tonic::Response::new(ProposalDataResponse {
148 start_block_height,
149 end_block_height,
150 start_position: start_position.into(),
151 state: Some(proposal_state.into()),
152 proposal: Some(proposal.into()),
153 proposal_deposit_amount: Some(proposal_deposit_amount.into()),
154 }))
155 }
156
157 type ProposalRateDataStream = Pin<
158 Box<dyn futures::Stream<Item = Result<ProposalRateDataResponse, tonic::Status>> + Send>,
159 >;
160
161 #[instrument(skip(self, request))]
162 async fn proposal_rate_data(
163 &self,
164 request: tonic::Request<ProposalRateDataRequest>,
165 ) -> Result<tonic::Response<Self::ProposalRateDataStream>, Status> {
166 let state = self.storage.latest_snapshot();
167 let proposal_id = request.into_inner().proposal_id;
168
169 let s = state.prefix(&state_key::all_rate_data_at_proposal_start(proposal_id));
170 Ok(tonic::Response::new(
171 s.map_ok(|i: (String, RateData)| {
172 let (_key, rate_data) = i;
173 ProposalRateDataResponse {
174 rate_data: Some(rate_data.into()),
175 }
176 })
177 .map_err(|e: anyhow::Error| {
178 tonic::Status::unavailable(format!("error getting prefix value from storage: {e}"))
179 })
180 .boxed(),
183 ))
184 }
185
186 type ProposalListStream =
187 Pin<Box<dyn futures::Stream<Item = Result<ProposalListResponse, tonic::Status>> + Send>>;
188
189 #[instrument(skip(self, request))]
190 async fn proposal_list(
191 &self,
192 request: tonic::Request<ProposalListRequest>,
193 ) -> Result<tonic::Response<Self::ProposalListStream>, Status> {
194 let state = self.storage.latest_snapshot();
195
196 let proposal_id_list: Vec<u64> = if request.into_inner().inactive {
197 let next = state.next_proposal_id().await.map_err(|e| {
198 tonic::Status::internal(format!("unable to get next proposal id: {e}"))
199 })?;
200
201 (0..next).collect()
202 } else {
203 state
204 .unfinished_proposals()
205 .await
206 .map_err(|e| {
207 tonic::Status::internal(format!("unable to fetch unfinished proposals: {e}"))
208 })?
209 .into_iter()
210 .collect::<Vec<_>>()
211 };
212
213 let s = try_stream! {
214 for proposal_id in proposal_id_list {
215 let proposal = state
216 .proposal_definition(proposal_id)
217 .await
218 .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal: {e}")))?
219 .ok_or_else(|| {
220 tonic::Status::not_found(format!("proposal {} not found", proposal_id))
221 })?;
222
223 let proposal_state = state
224 .proposal_state(proposal_id)
225 .await
226 .map_err(|e| tonic::Status::internal(format!("unable to fetch proposal state: {e}")))?
227 .ok_or_else(|| {
228 tonic::Status::not_found(format!("proposal {} state not found", proposal_id))
229 })?;
230
231 let proposal_voting_start_position = state
232 .proposal_voting_start_position(proposal_id)
233 .await
234 .map_err(|e| {
235 tonic::Status::internal(format!(
236 "unable to fetch proposal voting start position: {e}"
237 ))
238 })?
239 .ok_or_else(|| {
240 tonic::Status::not_found(format!(
241 "voting start position for proposal {} not found",
242 proposal_id
243 ))
244 })?;
245
246 let start_block_height = state
247 .proposal_voting_start(proposal_id)
248 .await
249 .map_err(|e| {
250 tonic::Status::internal(format!(
251 "unable to fetch proposal voting start block: {e}"
252 ))
253 })?
254 .ok_or_else(|| {
255 tonic::Status::not_found(format!(
256 "voting start block for proposal {} not found",
257 proposal_id
258 ))
259 })?;
260
261 let end_block_height = state
262 .proposal_voting_end(proposal_id)
263 .await
264 .map_err(|e| tonic::Status::internal(e.to_string()))?
265 .ok_or_else(|| tonic::Status::not_found(format!("proposal {proposal_id} not found")))?;
266
267 yield ProposalListResponse {
268 proposal: Some(proposal.into()),
269 start_block_height,
270 end_block_height,
271 start_position: proposal_voting_start_position.into(),
272 state: Some(proposal_state.into()),
273 }
274 }};
275
276 Ok(tonic::Response::new(
277 s.map_err(|e: anyhow::Error| {
278 tonic::Status::unavailable(format!(
279 "error getting position value from storage: {e}"
280 ))
281 })
282 .boxed(),
285 ))
286 }
287
288 type ValidatorVotesStream =
289 Pin<Box<dyn futures::Stream<Item = Result<ValidatorVotesResponse, tonic::Status>> + Send>>;
290
291 #[instrument(skip(self, request))]
292 async fn validator_votes(
293 &self,
294 request: tonic::Request<ValidatorVotesRequest>,
295 ) -> Result<tonic::Response<Self::ValidatorVotesStream>, Status> {
296 let state = self.storage.latest_snapshot();
297
298 let proposal_id = request.into_inner().proposal_id;
299
300 let s = state
301 .prefix::<Vote>(&state_key::all_validator_votes_for_proposal(proposal_id))
302 .and_then(|r| async move {
303 Ok((
304 IdentityKey::from_str(r.0.rsplit('/').next().context("invalid key")?)?,
305 r.1,
306 ))
307 })
308 .map_ok(|i: (IdentityKey, Vote)| ValidatorVotesResponse {
309 vote: Some(i.1.into()),
310 identity_key: Some(i.0.into()),
311 });
312
313 Ok(tonic::Response::new(
314 s.map_err(|e: anyhow::Error| {
315 tonic::Status::unavailable(format!(
316 "error getting validator votes from storage: {e}"
317 ))
318 })
319 .boxed(),
322 ))
323 }
324
325 #[instrument(skip(self, request))]
326 async fn voting_power_at_proposal_start(
327 &self,
328 request: tonic::Request<VotingPowerAtProposalStartRequest>,
329 ) -> Result<tonic::Response<VotingPowerAtProposalStartResponse>, Status> {
330 let state = self.storage.latest_snapshot();
331 let request = request.into_inner();
332 let proposal_id = request.proposal_id;
333 if let Some(identity_key) = request.identity_key {
334 let identity_key = identity_key.try_into().map_err(|_| {
337 tonic::Status::invalid_argument(
338 "identity key in request was bad protobuf".to_string(),
339 )
340 })?;
341
342 let voting_power = state
343 .get_proto::<u64>(&state_key::voting_power_at_proposal_start(
344 proposal_id,
345 identity_key,
346 ))
347 .await
348 .map_err(|e| tonic::Status::internal(format!("error accessing storage: {}", e)))?;
349
350 if voting_power.is_none() {
351 return Err(tonic::Status::not_found(format!(
352 "validator did not exist at proposal creation: {}",
353 identity_key
354 )));
355 }
356
357 Ok(tonic::Response::new(VotingPowerAtProposalStartResponse {
358 voting_power: voting_power.expect("voting power should be set"),
359 }))
360 } else {
361 let total_voting_power = state
363 .total_voting_power_at_proposal_start(proposal_id)
364 .await
365 .map_err(|e| tonic::Status::internal(format!("error accessing storage: {}", e)))?;
366
367 Ok(tonic::Response::new(VotingPowerAtProposalStartResponse {
368 voting_power: total_voting_power,
369 }))
370 }
371 }
372
373 type AllTalliedDelegatorVotesForProposalStream = Pin<
374 Box<
375 dyn futures::Stream<
376 Item = Result<AllTalliedDelegatorVotesForProposalResponse, tonic::Status>,
377 > + Send,
378 >,
379 >;
380
381 #[instrument(skip(self, request))]
382 async fn all_tallied_delegator_votes_for_proposal(
383 &self,
384 request: tonic::Request<AllTalliedDelegatorVotesForProposalRequest>,
385 ) -> Result<tonic::Response<Self::AllTalliedDelegatorVotesForProposalStream>, Status> {
386 let state = self.storage.latest_snapshot();
387 let proposal_id = request.into_inner().proposal_id;
388
389 let s = state.prefix::<Tally>(&state_key::all_tallied_delegator_votes_for_proposal(
390 proposal_id,
391 ));
392 Ok(tonic::Response::new(
393 s.and_then(|r| async move {
394 Ok((
395 IdentityKey::from_str(r.0.rsplit('/').next().context("invalid key")?)?,
396 r.1,
397 ))
398 })
399 .map_err(|e| {
400 tonic::Status::internal(format!("unable to retrieve tallied delegator votes: {e}"))
401 })
402 .map_ok(
403 |i: (IdentityKey, Tally)| AllTalliedDelegatorVotesForProposalResponse {
404 tally: Some(i.1.into()),
405 identity_key: Some(i.0.into()),
406 },
407 )
408 .boxed(),
411 ))
412 }
413}