penumbra_sdk_stake/component/
stake.rs

1pub mod address;
2
3use crate::event::EventSlashingPenaltyApplied;
4use crate::params::StakeParameters;
5use crate::rate::BaseRateData;
6use crate::validator::{self, Validator};
7use crate::{
8    state_key, CurrentConsensusKeys, Delegate, DelegationChanges, FundingStreams, IdentityKey,
9    Penalty, Undelegate,
10};
11use anyhow::Context;
12use anyhow::{anyhow, Result};
13use async_trait::async_trait;
14use cnidarium::{StateRead, StateWrite};
15use cnidarium_component::Component;
16use futures::{StreamExt, TryStreamExt};
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
19use penumbra_sdk_sct::component::clock::EpochRead;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::{collections::BTreeMap, sync::Arc};
23use tap::{Tap, TapFallible, TapOptional};
24use tendermint::v0_37::abci;
25use tendermint::validator::Update;
26use tendermint::{block, PublicKey};
27use tracing::{error, instrument, trace};
28
29use crate::component::epoch_handler::EpochHandler;
30use crate::component::validator_handler::{
31    ValidatorDataRead, ValidatorManager, ValidatorUptimeTracker,
32};
33
34#[cfg(test)]
35mod tests;
36
37pub struct Staking {}
38
39impl Staking {}
40
41#[async_trait]
42impl Component for Staking {
43    type AppState = (
44        crate::genesis::Content,
45        penumbra_sdk_shielded_pool::genesis::Content,
46    );
47
48    #[instrument(name = "staking", skip(state, app_state))]
49    async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
50        match app_state {
51            None => { /* perform upgrade specific check */ }
52            Some((staking_genesis, sp_genesis)) => {
53                state.put_stake_params(staking_genesis.stake_params.clone());
54
55                let starting_height = state
56                    .get_block_height()
57                    .await
58                    .expect("should be able to get initial block height")
59                    .tap(|height| trace!(%height,"found initial block height"));
60                let starting_epoch = state
61                    .get_epoch_by_height(starting_height)
62                    .await
63                    .expect("should be able to get initial epoch")
64                    .tap(|epoch| trace!(?epoch, "found initial epoch"));
65                let epoch_index = starting_epoch.index;
66
67                let genesis_base_rate = BaseRateData {
68                    epoch_index,
69                    base_reward_rate: 0u128.into(),
70                    base_exchange_rate: 1_0000_0000u128.into(),
71                };
72                state.set_base_rate(genesis_base_rate.clone());
73                trace!(?genesis_base_rate, "set base rate");
74
75                let mut genesis_allocations = BTreeMap::<_, Amount>::new();
76                for allocation in &sp_genesis.allocations {
77                    let value = allocation.value();
78                    *genesis_allocations.entry(value.asset_id).or_default() += value.amount;
79                }
80
81                trace!("parsing genesis validators");
82                for (i, validator) in staking_genesis.validators.iter().enumerate() {
83                    // Parse the proto into a domain type.
84                    let validator = Validator::try_from(validator.clone())
85                        .expect("should be able to parse genesis validator")
86                        .tap(|Validator { name, enabled, .. }|
87                             trace!(%i, %name, %enabled, "parsed genesis validator")
88                        );
89
90                    state
91                        .add_genesis_validator(&genesis_allocations, &genesis_base_rate, validator)
92                        .await
93                        .expect("should be able to add genesis validator to state");
94                }
95
96                // First, "prime" the state with an empty set, so the build_ function can read it.
97                state.put(
98                    state_key::consensus_update::consensus_keys().to_owned(),
99                    CurrentConsensusKeys::default(),
100                );
101
102                // Finally, record that there were no delegations in this block, so the data
103                // isn't missing when we process the first epoch transition.
104                state
105                    .set_delegation_changes(
106                        starting_height
107                            .try_into()
108                            .expect("should be able to convert u64 into block height"),
109                        Default::default(),
110                    )
111                    .await;
112            }
113        }
114        // Build the initial validator set update.
115        state
116            .build_cometbft_validator_updates()
117            .await
118            .expect("should be able to build initial tendermint validator updates");
119    }
120
121    #[instrument(name = "staking", skip(state, begin_block))]
122    async fn begin_block<S: StateWrite + 'static>(
123        state: &mut Arc<S>,
124        begin_block: &abci::request::BeginBlock,
125    ) {
126        let state = Arc::get_mut(state).expect("state should be unique");
127        // For each validator identified as byzantine by tendermint, update its
128        // state to be slashed. If the validator is not tracked in the JMT, this
129        // will be a no-op. See #2919 for more details.
130        for evidence in begin_block.byzantine_validators.iter() {
131            let _ = state.process_evidence(evidence).await.map_err(|e| {
132                tracing::warn!(?e, "failed to process byzantine misbehavior evidence")
133            });
134        }
135
136        state
137            .track_uptime(&begin_block.last_commit_info)
138            .await
139            .expect("should be able to track uptime");
140    }
141
142    /// Writes the delegation changes for this block.
143    #[instrument(name = "staking", skip(state, end_block))]
144    async fn end_block<S: StateWrite + 'static>(
145        state: &mut Arc<S>,
146        end_block: &abci::request::EndBlock,
147    ) {
148        let state = Arc::get_mut(state).expect("state should be unique");
149        let height = end_block
150            .height
151            .try_into()
152            .expect("should be able to convert i64 into block height");
153        let changes = state.get_delegation_changes_tally();
154
155        state.set_delegation_changes(height, changes).await;
156    }
157
158    /// Writes validator updates for this block.
159    #[instrument(name = "staking", skip(state))]
160    async fn end_epoch<S: StateWrite + 'static>(state: &mut Arc<S>) -> anyhow::Result<()> {
161        let state = Arc::get_mut(state).context("state should be unique")?;
162        let epoch_ending = state
163            .get_current_epoch()
164            .await
165            .context("should be able to get current epoch during end_epoch")?;
166        state
167            .end_epoch(epoch_ending)
168            .await
169            .context("should be able to write end_epoch")?;
170        // Since we only update the validator set at epoch boundaries,
171        // we only need to build the validator set updates here in end_epoch.
172        state
173            .build_cometbft_validator_updates()
174            .await
175            .context("should be able to build tendermint validator updates")?;
176        Ok(())
177    }
178}
179
180pub trait ConsensusUpdateRead: StateRead {
181    /// Returns a list of validator updates to send to Tendermint.
182    ///
183    /// Set during `end_block`.
184    fn cometbft_validator_updates(&self) -> Option<Vec<Update>> {
185        self.object_get(state_key::internal::cometbft_validator_updates())
186            .unwrap_or(None)
187    }
188}
189
190impl<T: StateRead + ?Sized> ConsensusUpdateRead for T {}
191
192pub(crate) trait ConsensusUpdateWrite: StateWrite {
193    fn put_cometbft_validator_updates(&mut self, updates: Vec<Update>) {
194        tracing::debug!(?updates);
195        self.object_put(
196            state_key::internal::cometbft_validator_updates(),
197            Some(updates),
198        )
199    }
200}
201
202impl<T: StateWrite + ?Sized> ConsensusUpdateWrite for T {}
203
204/// Extension trait providing read access to staking data.
205#[async_trait]
206pub trait StateReadExt: StateRead {
207    /// Gets the stake parameters from the JMT.
208    #[instrument(skip(self), level = "trace")]
209    async fn get_stake_params(&self) -> Result<StakeParameters> {
210        self.get(state_key::parameters::key())
211            .await
212            .tap_err(|err| error!(?err, "could not deserialize stake parameters"))
213            .expect("no deserialization error should happen")
214            .tap_none(|| error!("could not find stake parameters"))
215            .ok_or_else(|| anyhow!("Missing StakeParameters"))
216    }
217
218    #[instrument(skip(self), level = "trace")]
219    async fn signed_blocks_window_len(&self) -> Result<u64> {
220        self.get_stake_params()
221            .await
222            .map(|p| p.signed_blocks_window_len)
223    }
224
225    #[instrument(skip(self), level = "trace")]
226    async fn missed_blocks_maximum(&self) -> Result<u64> {
227        self.get_stake_params()
228            .await
229            .map(|p| p.missed_blocks_maximum)
230    }
231
232    /// Delegation changes accumulated over the course of this block, to be
233    /// persisted at the end of the block for processing at the end of the next
234    /// epoch.
235    #[instrument(skip(self), level = "trace")]
236    fn get_delegation_changes_tally(&self) -> DelegationChanges {
237        self.object_get(state_key::chain::delegation_changes::key())
238            .unwrap_or_default()
239    }
240
241    #[instrument(skip(self), level = "trace")]
242    async fn get_current_base_rate(&self) -> Result<BaseRateData> {
243        self.get(state_key::chain::base_rate::current())
244            .await
245            .map(|rate_data| rate_data.expect("rate data must be set after init_chain"))
246    }
247
248    #[instrument(skip(self), level = "trace")]
249    fn get_previous_base_rate(&self) -> Option<BaseRateData> {
250        self.object_get(state_key::chain::base_rate::previous())
251    }
252
253    /// Returns the funding queue from object storage (end-epoch).
254    #[instrument(skip(self), level = "trace")]
255    fn get_funding_queue(&self) -> Option<Vec<(IdentityKey, FundingStreams, Amount)>> {
256        self.object_get(state_key::validators::rewards::staking())
257    }
258
259    /// Returns the [`DelegationChanges`] at the given [`Height`][block::Height].
260    #[instrument(skip(self), level = "trace")]
261    async fn get_delegation_changes(&self, height: block::Height) -> Result<DelegationChanges> {
262        self.get(&state_key::chain::delegation_changes::by_height(
263            height.value(),
264        ))
265        .await
266        .tap_err(|err| error!(?err, "delegation changes for block exist but are invalid"))?
267        .tap_none(|| error!("could not find delegation changes for block"))
268        .ok_or_else(|| anyhow!("missing delegation changes for block {}", height))
269    }
270}
271
272impl<T: StateRead + ?Sized> StateReadExt for T {}
273
274/// Extension trait providing write access to staking data.
275#[async_trait]
276pub trait StateWriteExt: StateWrite {
277    /// Writes the provided stake parameters to the JMT.
278    fn put_stake_params(&mut self, params: StakeParameters) {
279        // Change the stake parameters:
280        self.put(state_key::parameters::key().into(), params)
281    }
282
283    /// Delegation changes accumulated over the course of this block, to be
284    /// persisted at the end of the block for processing at the end of the next
285    /// epoch.
286    fn put_delegation_changes(&mut self, delegation_changes: DelegationChanges) {
287        self.object_put(
288            state_key::chain::delegation_changes::key(),
289            delegation_changes,
290        )
291    }
292
293    /// Push an entry in the delegation queue for the current block (object-storage).
294    fn push_delegation(&mut self, delegation: Delegate) {
295        let mut changes = self.get_delegation_changes_tally();
296        changes.delegations.push(delegation);
297        self.put_delegation_changes(changes);
298    }
299
300    /// Push an entry in the undelegation queue for the current block (object-storage).
301    fn push_undelegation(&mut self, undelegation: Undelegate) {
302        let mut changes = self.get_delegation_changes_tally();
303        changes.undelegations.push(undelegation);
304        self.put_delegation_changes(changes);
305    }
306
307    #[instrument(skip(self))]
308    fn queue_staking_rewards(
309        &mut self,
310        staking_reward_queue: Vec<(IdentityKey, FundingStreams, Amount)>,
311    ) {
312        self.object_put(
313            state_key::validators::rewards::staking(),
314            staking_reward_queue,
315        )
316    }
317
318    /// Register a [consensus key][`PublicKey`] in the state, via two verifiable indices:
319    /// 1. CometBFT address -> [`PublicKey`]
320    /// 2. [`PublicKey`] -> [`IdentityKey`]
321    ///
322    /// # Important note
323    /// We do not delete obsolete entries on purpose. This is so that
324    /// the staking component can do evidence attribution even if a byzantine validator
325    /// has changed the consensus key that was used at the time of the misbehavior.
326    #[instrument(skip_all)]
327    fn register_consensus_key(&mut self, identity_key: &IdentityKey, consensus_key: &PublicKey) {
328        let address = self::address::validator_address(consensus_key);
329        tracing::debug!(?identity_key, ?consensus_key, hash = ?hex::encode(address), "registering consensus key");
330        self.put(
331            state_key::validators::lookup_by::cometbft_address(&address),
332            consensus_key.clone(),
333        );
334        self.put(
335            state_key::validators::lookup_by::consensus_key(consensus_key),
336            identity_key.clone(),
337        );
338    }
339}
340
341impl<T: StateWrite + ?Sized> StateWriteExt for T {}
342
343#[async_trait]
344pub trait SlashingData: StateRead {
345    async fn get_penalty_in_epoch(&self, id: &IdentityKey, epoch_index: u64) -> Option<Penalty> {
346        self.get(&state_key::penalty::for_id_in_epoch(id, epoch_index))
347            .await
348            .expect("serialization error cannot happen")
349    }
350
351    async fn get_penalty_for_range(&self, id: &IdentityKey, start: u64, end: u64) -> Vec<Penalty> {
352        let prefix = state_key::penalty::prefix(id);
353        let all_penalties: BTreeMap<String, Penalty> = self
354            .prefix::<Penalty>(&prefix)
355            .try_collect()
356            .await
357            .unwrap_or_default();
358        let start_key = state_key::penalty::for_id_in_epoch(id, start);
359        let end_key = state_key::penalty::for_id_in_epoch(id, end);
360        all_penalties
361            .range(start_key..end_key)
362            .map(|(_k, v)| v.to_owned())
363            .collect()
364    }
365
366    fn compute_compounded_penalty(penalties: Vec<Penalty>) -> Penalty {
367        let compounded = Penalty::from_percent(0);
368        penalties
369            .into_iter()
370            .fold(compounded, |acc, penalty| acc.compound(penalty))
371    }
372
373    /// Returns the compounded penalty for the given validator over the half-open range of epochs [start, end).
374    async fn compounded_penalty_over_range(
375        &self,
376        id: &IdentityKey,
377        epoch_index_start: u64,
378        epoch_index_end: u64,
379    ) -> Result<Penalty> {
380        if epoch_index_start > epoch_index_end {
381            anyhow::bail!("invalid penalty window")
382        }
383        let range = self
384            .get_penalty_for_range(id, epoch_index_start, epoch_index_end)
385            .await;
386        let compounded_penalty = Self::compute_compounded_penalty(range);
387        Ok(compounded_penalty)
388    }
389}
390
391impl<T: StateRead + ?Sized> SlashingData for T {}
392
393#[async_trait]
394pub(crate) trait InternalStakingData: StateRead {
395    /// Calculate the amount of stake that is delegated to the currently active validator set,
396    /// denominated in the staking token.
397    #[instrument(skip(self))]
398    async fn total_active_stake(&self) -> Result<Amount> {
399        let mut total_active_stake = Amount::zero();
400
401        let mut validator_stream = self.consensus_set_stream()?;
402        while let Some(validator_identity) = validator_stream.next().await {
403            let validator_identity = validator_identity?;
404            let validator_state = self
405                .get_validator_state(&validator_identity)
406                .await?
407                .ok_or_else(|| {
408                    anyhow::anyhow!("validator (identity_key={}) is in the consensus set index but its state was not found", validator_identity)
409                })?;
410
411            if validator_state != validator::State::Active {
412                continue;
413            }
414
415            let delegation_token_supply = self
416                .get_validator_pool_size(&validator_identity)
417                .await
418                .ok_or_else(|| {
419                anyhow::anyhow!(
420                    "validator delegation pool not found for {}",
421                    validator_identity
422                )
423            })?;
424
425            let validator_rate = self
426                .get_validator_rate(&validator_identity)
427                .await?
428                .ok_or_else(|| {
429                    anyhow::anyhow!("validator (identity_key={}) is in the consensus set index but its rate data was not found", validator_identity)
430                })?;
431
432            // Add the validator's unbonded amount to the total active stake
433            total_active_stake = total_active_stake
434                .checked_add(&validator_rate.unbonded_amount(delegation_token_supply))
435                .ok_or_else(|| {
436                    anyhow::anyhow!("total active stake overflowed `Amount` (128 bits)")
437                })?;
438        }
439
440        Ok(total_active_stake)
441    }
442}
443
444impl<T: StateRead + ?Sized> InternalStakingData for T {}
445
446#[async_trait]
447pub(crate) trait RateDataWrite: StateWrite {
448    #[instrument(skip(self))]
449    fn set_base_rate(&mut self, rate_data: BaseRateData) {
450        tracing::debug!("setting base rate");
451        self.put(state_key::chain::base_rate::current().to_owned(), rate_data);
452    }
453
454    #[instrument(skip(self))]
455    fn set_prev_base_rate(&mut self, rate_data: BaseRateData) {
456        self.object_put(state_key::chain::base_rate::previous(), rate_data);
457    }
458
459    async fn record_slashing_penalty(
460        &mut self,
461        identity_key: &IdentityKey,
462        slashing_penalty: Penalty,
463    ) {
464        let current_epoch_index = self
465            .get_current_epoch()
466            .await
467            .expect("epoch has been set")
468            .index;
469
470        let current_penalty = self
471            .get_penalty_in_epoch(identity_key, current_epoch_index)
472            .await
473            .unwrap_or(Penalty::from_percent(0));
474
475        let new_penalty = current_penalty.compound(slashing_penalty);
476
477        // Emit an event indicating the validator had a slashing penalty applied.
478        self.record_proto(
479            EventSlashingPenaltyApplied {
480                identity_key: *identity_key,
481                epoch_index: current_epoch_index,
482                new_penalty,
483            }
484            .to_proto(),
485        );
486        self.put(
487            state_key::penalty::for_id_in_epoch(identity_key, current_epoch_index),
488            new_penalty,
489        );
490    }
491
492    #[tracing::instrument(
493        level = "trace",
494        skip_all,
495        fields(
496            %height,
497            delegations = ?changes.delegations,
498            undelegations = ?changes.undelegations,
499        )
500    )]
501    async fn set_delegation_changes(&mut self, height: block::Height, changes: DelegationChanges) {
502        let key = state_key::chain::delegation_changes::by_height(height.value());
503        tracing::trace!(%key, "setting delegation changes");
504        self.put(key, changes);
505    }
506}
507
508impl<T: StateWrite + ?Sized> RateDataWrite for T {}
509
510#[async_trait]
511pub trait ConsensusIndexRead: StateRead {
512    /// Returns a stream of [`IdentityKey`]s of validators that are currently in the consensus set.
513    /// This only excludes validators that do not meet the minimum validator stake requirement
514    /// (see [`StakeParameters::min_validator_stake`]).
515    fn consensus_set_stream(
516        &self,
517    ) -> Result<Pin<Box<dyn futures::Stream<Item = Result<IdentityKey>> + Send + 'static>>> {
518        Ok(self
519            .nonverifiable_prefix_raw(
520                state_key::validators::consensus_set_index::prefix().as_bytes(),
521            )
522            .map(|res| {
523                res.map(|(_, raw_identity_key)| {
524                    // TODO(erwan): is this an opportunity to extend the proto overlay?
525                    let str_identity_key = std::str::from_utf8(raw_identity_key.as_slice())
526                        .expect("state keys should only have valid identity keys");
527                    IdentityKey::from_str(str_identity_key)
528                        .expect("state keys should only have valid identity keys")
529                })
530            })
531            .boxed())
532    }
533
534    /// Returns the [`IdentityKey`]s of validators that are currently in the consensus set.
535    async fn get_consensus_set(&self) -> anyhow::Result<Vec<IdentityKey>> {
536        use futures::TryStreamExt;
537        self.consensus_set_stream()?.try_collect().await
538    }
539
540    /// Returns whether a validator should be indexed in the consensus set.
541    /// Here, "consensus set" refers to the set of active validators as well as
542    /// the "inactive" validators which could be promoted during a view change.
543    #[instrument(level = "error", skip(self))]
544    async fn belongs_in_index(&self, validator_id: &IdentityKey) -> bool {
545        let Some(state) = self
546            .get_validator_state(validator_id)
547            .await
548            .expect("no deserialization error")
549        else {
550            tracing::error!("validator state was not found");
551            return false;
552        };
553
554        match state {
555            validator::State::Active | validator::State::Inactive => {
556                tracing::debug!(?state, "validator belongs in the consensus set");
557                true
558            }
559            _ => {
560                tracing::debug!(?state, "validator does not belong in the consensus set");
561                false
562            }
563        }
564    }
565}
566
567impl<T: StateRead + ?Sized> ConsensusIndexRead for T {}
568
569#[async_trait]
570pub trait ConsensusIndexWrite: StateWrite {
571    /// Add a validator identity to the consensus set index.
572    /// The consensus set index includes any validator that has a delegation pool that
573    /// is greater than [`StakeParameters::min_validator_stake`].
574    fn add_consensus_set_index(&mut self, identity_key: &IdentityKey) {
575        tracing::debug!(validator = %identity_key, "adding validator identity to consensus set index");
576        self.nonverifiable_put_raw(
577            state_key::validators::consensus_set_index::by_id(identity_key)
578                .as_bytes()
579                .to_vec(),
580            identity_key.to_string().as_bytes().to_vec(),
581        );
582    }
583
584    /// Remove a validator identity from the consensus set index.
585    /// The consensus set index includes any validator that has a delegation pool that
586    /// is greater than [`StakeParameters::min_validator_stake`].
587    fn remove_consensus_set_index(&mut self, identity_key: &IdentityKey) {
588        tracing::debug!(validator = %identity_key, "removing validator identity from consensus set index");
589        self.nonverifiable_delete(
590            state_key::validators::consensus_set_index::by_id(identity_key)
591                .as_bytes()
592                .to_vec(),
593        );
594    }
595}
596
597impl<T: StateWrite + ?Sized> ConsensusIndexWrite for T {}