penumbra_sdk_stake/component/
rpc.rs

1use std::pin::Pin;
2
3use cnidarium::Storage;
4use futures::StreamExt;
5use penumbra_sdk_proto::{
6    core::component::stake::v1::{
7        query_service_server::QueryService, CurrentValidatorRateRequest,
8        CurrentValidatorRateResponse, GetValidatorInfoRequest, GetValidatorInfoResponse,
9        ValidatorInfoRequest, ValidatorInfoResponse, ValidatorPenaltyRequest,
10        ValidatorPenaltyResponse, ValidatorStatusRequest, ValidatorStatusResponse,
11        ValidatorUptimeRequest, ValidatorUptimeResponse,
12    },
13    DomainType,
14};
15use tap::{TapFallible, TapOptional};
16use tonic::Status;
17use tracing::{error_span, instrument, Instrument, Span};
18
19use super::{validator_handler::ValidatorDataRead, ConsensusIndexRead, SlashingData};
20use crate::validator::{Info, State};
21
22// TODO: Hide this and only expose a Router?
23pub struct Server {
24    storage: Storage,
25}
26
27impl Server {
28    pub fn new(storage: Storage) -> Self {
29        Self { storage }
30    }
31}
32
33#[tonic::async_trait]
34impl QueryService for Server {
35    #[instrument(skip(self, request))]
36    async fn get_validator_info(
37        &self,
38        request: tonic::Request<GetValidatorInfoRequest>,
39    ) -> Result<tonic::Response<GetValidatorInfoResponse>, tonic::Status> {
40        let state = self.storage.latest_snapshot();
41        let GetValidatorInfoRequest { identity_key } = request.into_inner();
42
43        // Take the identity key from the inbound request.
44        let identity_key = identity_key
45            .ok_or_else(|| Status::invalid_argument("an identity key must be provided"))?
46            .try_into()
47            .tap_err(|error| tracing::debug!(?error, "request contained an invalid identity key"))
48            .map_err(|_| Status::invalid_argument("invalid identity key"))?;
49
50        // Look up the information for the validator with the given identity key.
51        let info = state
52            .get_validator_info(&identity_key)
53            .await
54            .tap_err(|error| tracing::error!(?error, %identity_key, "failed to get validator info"))
55            .map_err(|_| Status::invalid_argument("failed to get validator info"))?
56            .tap_none(|| tracing::debug!(%identity_key, "validator info was not found"))
57            .ok_or_else(|| Status::not_found("validator info was not found"))?;
58
59        // Construct the outbound response.
60        let resp = GetValidatorInfoResponse {
61            validator_info: Some(info.to_proto()),
62        };
63
64        Ok(tonic::Response::new(resp))
65    }
66
67    type ValidatorInfoStream =
68        Pin<Box<dyn futures::Stream<Item = Result<ValidatorInfoResponse, tonic::Status>> + Send>>;
69
70    #[instrument(skip(self, request), fields(show_inactive = request.get_ref().show_inactive))]
71    async fn validator_info(
72        &self,
73        request: tonic::Request<ValidatorInfoRequest>,
74    ) -> Result<tonic::Response<Self::ValidatorInfoStream>, Status> {
75        use futures::TryStreamExt;
76
77        // Get the latest snapshot from the backing storage, and determine whether or not the
78        // response should include inactive validator definitions.
79        let snapshot = self.storage.latest_snapshot();
80        let ValidatorInfoRequest { show_inactive } = request.into_inner();
81
82        // Returns `true` if we should include a validator in the outbound response.
83        let filter_inactive = move |info: &Info| {
84            let should = match info.status.state {
85                State::Active => true,
86                _ if show_inactive => true, // Include other validators if the request asked us to.
87                _ => false,                 // Otherwise, skip this entry.
88            };
89            futures::future::ready(should)
90        };
91
92        // Converts information about a validator into a RPC response.
93        let to_resp = |info: Info| {
94            let validator_info = Some(info.to_proto());
95            ValidatorInfoResponse { validator_info }
96        };
97
98        // Creates a span that follows from the current tracing context.
99        let make_span = |identity_key| -> Span {
100            let span = error_span!("fetching validator information", %identity_key);
101            let current = Span::current();
102            span.follows_from(current);
103            span
104        };
105
106        // Get a stream of identity keys corresponding to validators in the consensus set.
107        let consensus_set = snapshot
108            .consensus_set_stream()
109            .map_err(|e| format!("error getting consensus set: {e}"))
110            .map_err(Status::unavailable)?;
111
112        // Adapt the stream of identity keys into a stream of validator information.
113        // Define a span indicating that the spawned future follows from the current context.
114        let validators = async_stream::try_stream! {
115            for await identity_key in consensus_set {
116                let identity_key = identity_key?;
117                let span = make_span(identity_key);
118                yield snapshot
119                    .get_validator_info(&identity_key)
120                    .instrument(span)
121                    .await?
122                    .expect("known validator must be present");
123            }
124        };
125
126        // Construct the outbound response.
127        let stream = validators
128            .try_filter(filter_inactive)
129            .map_ok(to_resp)
130            .map_err(|e: anyhow::Error| format!("error getting validator info: {e}"))
131            .map_err(Status::unavailable)
132            .into_stream()
133            .boxed();
134
135        Ok(tonic::Response::new(stream))
136    }
137
138    #[instrument(skip(self, request))]
139    async fn validator_status(
140        &self,
141        request: tonic::Request<ValidatorStatusRequest>,
142    ) -> Result<tonic::Response<ValidatorStatusResponse>, Status> {
143        let state = self.storage.latest_snapshot();
144
145        let id = request
146            .into_inner()
147            .identity_key
148            .ok_or_else(|| Status::invalid_argument("missing identity key"))?
149            .try_into()
150            .map_err(|_| Status::invalid_argument("invalid identity key"))?;
151
152        let status = state
153            .get_validator_status(&id)
154            .await
155            .map_err(|e| Status::unavailable(format!("error getting validator status: {e}")))?
156            .ok_or_else(|| Status::not_found("validator not found"))?;
157
158        Ok(tonic::Response::new(ValidatorStatusResponse {
159            status: Some(status.into()),
160        }))
161    }
162
163    #[instrument(skip(self, request))]
164    async fn validator_penalty(
165        &self,
166        request: tonic::Request<ValidatorPenaltyRequest>,
167    ) -> Result<tonic::Response<ValidatorPenaltyResponse>, Status> {
168        let state = self.storage.latest_snapshot();
169        let request = request.into_inner();
170        let id = request
171            .identity_key
172            .ok_or_else(|| Status::invalid_argument("missing identity key"))?
173            .try_into()
174            .map_err(|_| Status::invalid_argument("invalid identity key"))?;
175
176        let penalty = state
177            .compounded_penalty_over_range(&id, request.start_epoch_index, request.end_epoch_index)
178            .await
179            .map_err(|e| Status::unavailable(format!("error getting validator penalty: {e}")))?;
180
181        Ok(tonic::Response::new(ValidatorPenaltyResponse {
182            penalty: Some(penalty.into()),
183        }))
184    }
185
186    #[instrument(skip(self, request))]
187    async fn current_validator_rate(
188        &self,
189        request: tonic::Request<CurrentValidatorRateRequest>,
190    ) -> Result<tonic::Response<CurrentValidatorRateResponse>, Status> {
191        let state = self.storage.latest_snapshot();
192        let identity_key = request
193            .into_inner()
194            .identity_key
195            .ok_or_else(|| tonic::Status::invalid_argument("empty message"))?
196            .try_into()
197            .map_err(|_| tonic::Status::invalid_argument("invalid identity key"))?;
198
199        let rate_data = state
200            .get_validator_rate(&identity_key)
201            .await
202            .map_err(|e| tonic::Status::internal(e.to_string()))?;
203
204        match rate_data {
205            Some(r) => Ok(tonic::Response::new(CurrentValidatorRateResponse {
206                data: Some(r.into()),
207            })),
208            None => Err(Status::not_found("current validator rate not found")),
209        }
210    }
211
212    #[instrument(skip(self, request))]
213    async fn validator_uptime(
214        &self,
215        request: tonic::Request<ValidatorUptimeRequest>,
216    ) -> Result<tonic::Response<ValidatorUptimeResponse>, Status> {
217        let state = self.storage.latest_snapshot();
218        let identity_key = request
219            .into_inner()
220            .identity_key
221            .ok_or_else(|| tonic::Status::invalid_argument("empty message"))?
222            .try_into()
223            .map_err(|_| tonic::Status::invalid_argument("invalid identity key"))?;
224
225        let uptime_data = state
226            .get_validator_uptime(&identity_key)
227            .await
228            .map_err(|e| tonic::Status::internal(e.to_string()))?;
229
230        match uptime_data {
231            Some(u) => Ok(tonic::Response::new(ValidatorUptimeResponse {
232                uptime: Some(u.into()),
233            })),
234            None => Err(Status::not_found("validator uptime not found")),
235        }
236    }
237}