penumbra_sdk_stake/component/
rpc.rs1use std::pin::Pin;
2
3use cnidarium::Storage;
4use futures::StreamExt;
5use penumbra_sdk_proto::{
6 core::component::stake::v1::{
7 query_service_server::QueryService, CurrentValidatorRateRequest,
8 CurrentValidatorRateResponse, GetValidatorInfoRequest, GetValidatorInfoResponse,
9 ValidatorInfoRequest, ValidatorInfoResponse, ValidatorPenaltyRequest,
10 ValidatorPenaltyResponse, ValidatorStatusRequest, ValidatorStatusResponse,
11 ValidatorUptimeRequest, ValidatorUptimeResponse,
12 },
13 DomainType,
14};
15use tap::{TapFallible, TapOptional};
16use tonic::Status;
17use tracing::{error_span, instrument, Instrument, Span};
18
19use super::{validator_handler::ValidatorDataRead, ConsensusIndexRead, SlashingData};
20use crate::validator::{Info, State};
21
22pub struct Server {
24 storage: Storage,
25}
26
27impl Server {
28 pub fn new(storage: Storage) -> Self {
29 Self { storage }
30 }
31}
32
33#[tonic::async_trait]
34impl QueryService for Server {
35 #[instrument(skip(self, request))]
36 async fn get_validator_info(
37 &self,
38 request: tonic::Request<GetValidatorInfoRequest>,
39 ) -> Result<tonic::Response<GetValidatorInfoResponse>, tonic::Status> {
40 let state = self.storage.latest_snapshot();
41 let GetValidatorInfoRequest { identity_key } = request.into_inner();
42
43 let identity_key = identity_key
45 .ok_or_else(|| Status::invalid_argument("an identity key must be provided"))?
46 .try_into()
47 .tap_err(|error| tracing::debug!(?error, "request contained an invalid identity key"))
48 .map_err(|_| Status::invalid_argument("invalid identity key"))?;
49
50 let info = state
52 .get_validator_info(&identity_key)
53 .await
54 .tap_err(|error| tracing::error!(?error, %identity_key, "failed to get validator info"))
55 .map_err(|_| Status::invalid_argument("failed to get validator info"))?
56 .tap_none(|| tracing::debug!(%identity_key, "validator info was not found"))
57 .ok_or_else(|| Status::not_found("validator info was not found"))?;
58
59 let resp = GetValidatorInfoResponse {
61 validator_info: Some(info.to_proto()),
62 };
63
64 Ok(tonic::Response::new(resp))
65 }
66
67 type ValidatorInfoStream =
68 Pin<Box<dyn futures::Stream<Item = Result<ValidatorInfoResponse, tonic::Status>> + Send>>;
69
70 #[instrument(skip(self, request), fields(show_inactive = request.get_ref().show_inactive))]
71 async fn validator_info(
72 &self,
73 request: tonic::Request<ValidatorInfoRequest>,
74 ) -> Result<tonic::Response<Self::ValidatorInfoStream>, Status> {
75 use futures::TryStreamExt;
76
77 let snapshot = self.storage.latest_snapshot();
80 let ValidatorInfoRequest { show_inactive } = request.into_inner();
81
82 let filter_inactive = move |info: &Info| {
84 let should = match info.status.state {
85 State::Active => true,
86 _ if show_inactive => true, _ => false, };
89 futures::future::ready(should)
90 };
91
92 let to_resp = |info: Info| {
94 let validator_info = Some(info.to_proto());
95 ValidatorInfoResponse { validator_info }
96 };
97
98 let make_span = |identity_key| -> Span {
100 let span = error_span!("fetching validator information", %identity_key);
101 let current = Span::current();
102 span.follows_from(current);
103 span
104 };
105
106 let consensus_set = snapshot
108 .consensus_set_stream()
109 .map_err(|e| format!("error getting consensus set: {e}"))
110 .map_err(Status::unavailable)?;
111
112 let validators = async_stream::try_stream! {
115 for await identity_key in consensus_set {
116 let identity_key = identity_key?;
117 let span = make_span(identity_key);
118 yield snapshot
119 .get_validator_info(&identity_key)
120 .instrument(span)
121 .await?
122 .expect("known validator must be present");
123 }
124 };
125
126 let stream = validators
128 .try_filter(filter_inactive)
129 .map_ok(to_resp)
130 .map_err(|e: anyhow::Error| format!("error getting validator info: {e}"))
131 .map_err(Status::unavailable)
132 .into_stream()
133 .boxed();
134
135 Ok(tonic::Response::new(stream))
136 }
137
138 #[instrument(skip(self, request))]
139 async fn validator_status(
140 &self,
141 request: tonic::Request<ValidatorStatusRequest>,
142 ) -> Result<tonic::Response<ValidatorStatusResponse>, Status> {
143 let state = self.storage.latest_snapshot();
144
145 let id = request
146 .into_inner()
147 .identity_key
148 .ok_or_else(|| Status::invalid_argument("missing identity key"))?
149 .try_into()
150 .map_err(|_| Status::invalid_argument("invalid identity key"))?;
151
152 let status = state
153 .get_validator_status(&id)
154 .await
155 .map_err(|e| Status::unavailable(format!("error getting validator status: {e}")))?
156 .ok_or_else(|| Status::not_found("validator not found"))?;
157
158 Ok(tonic::Response::new(ValidatorStatusResponse {
159 status: Some(status.into()),
160 }))
161 }
162
163 #[instrument(skip(self, request))]
164 async fn validator_penalty(
165 &self,
166 request: tonic::Request<ValidatorPenaltyRequest>,
167 ) -> Result<tonic::Response<ValidatorPenaltyResponse>, Status> {
168 let state = self.storage.latest_snapshot();
169 let request = request.into_inner();
170 let id = request
171 .identity_key
172 .ok_or_else(|| Status::invalid_argument("missing identity key"))?
173 .try_into()
174 .map_err(|_| Status::invalid_argument("invalid identity key"))?;
175
176 let penalty = state
177 .compounded_penalty_over_range(&id, request.start_epoch_index, request.end_epoch_index)
178 .await
179 .map_err(|e| Status::unavailable(format!("error getting validator penalty: {e}")))?;
180
181 Ok(tonic::Response::new(ValidatorPenaltyResponse {
182 penalty: Some(penalty.into()),
183 }))
184 }
185
186 #[instrument(skip(self, request))]
187 async fn current_validator_rate(
188 &self,
189 request: tonic::Request<CurrentValidatorRateRequest>,
190 ) -> Result<tonic::Response<CurrentValidatorRateResponse>, Status> {
191 let state = self.storage.latest_snapshot();
192 let identity_key = request
193 .into_inner()
194 .identity_key
195 .ok_or_else(|| tonic::Status::invalid_argument("empty message"))?
196 .try_into()
197 .map_err(|_| tonic::Status::invalid_argument("invalid identity key"))?;
198
199 let rate_data = state
200 .get_validator_rate(&identity_key)
201 .await
202 .map_err(|e| tonic::Status::internal(e.to_string()))?;
203
204 match rate_data {
205 Some(r) => Ok(tonic::Response::new(CurrentValidatorRateResponse {
206 data: Some(r.into()),
207 })),
208 None => Err(Status::not_found("current validator rate not found")),
209 }
210 }
211
212 #[instrument(skip(self, request))]
213 async fn validator_uptime(
214 &self,
215 request: tonic::Request<ValidatorUptimeRequest>,
216 ) -> Result<tonic::Response<ValidatorUptimeResponse>, Status> {
217 let state = self.storage.latest_snapshot();
218 let identity_key = request
219 .into_inner()
220 .identity_key
221 .ok_or_else(|| tonic::Status::invalid_argument("empty message"))?
222 .try_into()
223 .map_err(|_| tonic::Status::invalid_argument("invalid identity key"))?;
224
225 let uptime_data = state
226 .get_validator_uptime(&identity_key)
227 .await
228 .map_err(|e| tonic::Status::internal(e.to_string()))?;
229
230 match uptime_data {
231 Some(u) => Ok(tonic::Response::new(ValidatorUptimeResponse {
232 uptime: Some(u.into()),
233 })),
234 None => Err(Status::not_found("validator uptime not found")),
235 }
236 }
237}