penumbra_sdk_stake/component/
epoch_handler.rs

1use crate::{
2    component::{
3        stake::{
4            ConsensusIndexRead, ConsensusIndexWrite, ConsensusUpdateWrite, InternalStakingData,
5            RateDataWrite,
6        },
7        validator_handler::{
8            ValidatorDataRead, ValidatorDataWrite, ValidatorManager, ValidatorPoolTracker,
9        },
10        SlashingData,
11    },
12    rate::BaseRateData,
13    state_key, validator, CurrentConsensusKeys, FundingStreams, IdentityKey, Penalty, StateReadExt,
14    StateWriteExt, BPS_SQUARED_SCALING_FACTOR,
15};
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use cnidarium::StateWrite;
19use futures::{StreamExt, TryStreamExt};
20use penumbra_sdk_distributions::component::StateReadExt as _;
21use penumbra_sdk_num::{fixpoint::U128x128, Amount};
22use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
23use penumbra_sdk_sct::{component::clock::EpochRead, epoch::Epoch};
24use std::collections::{BTreeMap, BTreeSet};
25use tendermint::{validator::Update, PublicKey};
26use tokio::task::JoinSet;
27use tracing::instrument;
28
29#[async_trait]
30pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
31    #[instrument(skip(self, epoch_to_end), fields(index = epoch_to_end.index))]
32    /// Process the end of an epoch for the staking component.
33    async fn end_epoch(&mut self, epoch_to_end: Epoch) -> Result<()> {
34        // Collect all the delegation changes that occurred in the epoch we are ending.
35        let mut delegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
36        let mut undelegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
37
38        let end_height = self.get_block_height().await?;
39        let mut num_delegations = 0usize;
40        let mut num_undelegations = 0usize;
41
42        // Performance: see #3874.
43        for height in epoch_to_end.start_height..=end_height {
44            let changes = self
45                .get_delegation_changes(
46                    height
47                        .try_into()
48                        .context("should be able to convert u64 into block height")?,
49                )
50                .await?;
51
52            num_delegations = num_delegations.saturating_add(changes.delegations.len());
53            num_undelegations = num_undelegations.saturating_add(changes.undelegations.len());
54
55            for d in changes.delegations {
56                let validator_identity = d.validator_identity.clone();
57                let delegation_tally = delegations_by_validator
58                    .entry(validator_identity)
59                    .or_default()
60                    .saturating_add(&d.delegation_amount);
61                delegations_by_validator.insert(validator_identity, delegation_tally);
62            }
63            for u in changes.undelegations {
64                let validator_identity = u.validator_identity.clone();
65                let undelegation_tally = undelegations_by_validator
66                    .entry(validator_identity)
67                    .or_default()
68                    .saturating_add(&u.delegation_amount);
69
70                undelegations_by_validator.insert(validator_identity, undelegation_tally);
71            }
72        }
73
74        tracing::debug!(
75            num_delegations,
76            num_undelegations,
77            epoch_start = epoch_to_end.start_height,
78            epoch_end = end_height,
79            epoch_index = epoch_to_end.index,
80            "collected delegation changes for the epoch"
81        );
82
83        // Compute and set the chain base rate for the upcoming epoch.
84        let next_base_rate = self.process_chain_base_rate().await?;
85
86        // TODO(erwan): replace this with a tagged stream once we have tests. See #3874.
87        let delegation_set = delegations_by_validator
88            .keys()
89            .cloned()
90            .collect::<BTreeSet<_>>();
91        let undelegation_set = undelegations_by_validator
92            .keys()
93            .cloned()
94            .collect::<BTreeSet<_>>();
95        let validators_with_delegation_changes = delegation_set
96            .union(&undelegation_set)
97            .cloned()
98            .collect::<BTreeSet<_>>();
99
100        // We're only tracking the consensus set, and each validator identity is about 64 bytes,
101        // it seems reasonable to collect the entire consensus set into memory since we expect
102        // less than 100k entries. We do this to keep the code simple.
103        let consensus_set = self
104            .consensus_set_stream()?
105            .try_collect::<BTreeSet<IdentityKey>>()
106            .await?;
107
108        let validators_to_process = validators_with_delegation_changes
109            .union(&consensus_set)
110            .collect::<BTreeSet<_>>();
111
112        let mut funding_queue: Vec<(IdentityKey, FundingStreams, Amount)> = Vec::new();
113
114        for validator_identity in validators_to_process {
115            let total_delegations = delegations_by_validator
116                .remove(validator_identity)
117                .unwrap_or_else(Amount::zero);
118
119            let total_undelegations = undelegations_by_validator
120                .remove(validator_identity)
121                .unwrap_or_else(Amount::zero);
122
123            if let Some(rewards) = self
124                .process_validator(
125                    validator_identity,
126                    epoch_to_end,
127                    next_base_rate.clone(),
128                    total_delegations,
129                    total_undelegations,
130                )
131                .await
132                .map_err(|e| {
133                    tracing::error!(
134                        ?e,
135                        ?validator_identity,
136                        "failed to process validator's end-epoch"
137                    );
138                    e
139                })?
140            {
141                funding_queue.push(rewards)
142            }
143        }
144
145        // This is a sanity check to ensure that we have processed all the delegation changes.
146        // It should be impossible for this to fail, but we check it anyway. We can remove
147        // these guards when we start rolling out our testing framework and increase coverage.
148        // This should coincide with a profiling/performance effort on the epoch-handler.
149        assert!(delegations_by_validator.is_empty());
150        assert!(undelegations_by_validator.is_empty());
151
152        // We have collected the funding streams for all validators, so we can now
153        // record them for the funding component to process.
154        self.queue_staking_rewards(funding_queue);
155
156        // Now that the consensus set voting power has been calculated, we can select the
157        // top N validators to be active for the next epoch.
158        self.set_active_and_inactive_validators().await?;
159        Ok(())
160    }
161
162    async fn process_validator(
163        &mut self,
164        validator_identity: &IdentityKey,
165        epoch_to_end: Epoch,
166        next_base_rate: BaseRateData,
167        total_delegations: Amount,
168        total_undelegations: Amount,
169    ) -> Result<Option<(IdentityKey, FundingStreams, Amount)>> {
170        let validator = self.get_validator_definition(&validator_identity).await?.ok_or_else(|| {
171            anyhow::anyhow!("validator (identity={}) is in consensus index but its definition was not found in the JMT", &validator_identity)
172        })?;
173
174        // Grab the current validator state.
175        let validator_state = self
176            .get_validator_state(&validator.identity_key)
177            .await?
178            .ok_or_else(|| {
179                anyhow::anyhow!("validator (identity={}) is in consensus index but its state was not found in the JMT", &validator.identity_key)
180            })?;
181
182        // We are transitioning to the next epoch, so the "current" validator
183        // rate in the state is now the previous validator rate.
184        let prev_validator_rate = self
185            .get_validator_rate(&validator.identity_key)
186            .await?
187            .ok_or_else(|| {
188                anyhow::anyhow!("validator (identity={}) is in consensus index but its rate data was not found in the JMT", &validator.identity_key)
189            })?;
190
191        // First, apply any penalty recorded in the epoch we are ending.
192        let penalty = self
193            .get_penalty_in_epoch(&validator.identity_key, epoch_to_end.index)
194            .await
195            .unwrap_or(Penalty::from_percent(0));
196        let prev_validator_rate_with_penalty = prev_validator_rate.slash(penalty);
197
198        self.set_prev_validator_rate(
199            &validator.identity_key,
200            prev_validator_rate_with_penalty.clone(),
201        );
202
203        // Then compute the next validator rate, accounting for funding streams and validator state.
204        let next_validator_rate = prev_validator_rate_with_penalty.next_epoch(
205            &next_base_rate,
206            validator.funding_streams.as_ref(),
207            &validator_state,
208        );
209
210        // In theory, the maximum amount of delegation tokens is the total supply of staking tokens.
211        // In practice, this is unlikely to happen, but even if it did, we anticipate that the total
212        // supply of staking token is << 10^32 (2^107) tokens with a unit denomination of 10^6 (2^20),
213        // so there should be ample room to cast this to an i128.
214        let delegation_delta =
215            (total_delegations.value() as i128) - (total_undelegations.value() as i128);
216
217        tracing::debug!(
218            validator = ?validator.identity_key,
219            ?total_delegations,
220            ?total_undelegations,
221            delegation_delta,
222            "net delegation change for validator's pool for the epoch"
223        );
224
225        let abs_delegation_change = Amount::from(delegation_delta.unsigned_abs());
226
227        // We need to either contract or expand the validator pool size,
228        // and panic if we encounter an under/overflow, because it can only
229        // happen if something has gone seriously wrong with the validator rate data.
230        if delegation_delta > 0 {
231            self.increase_validator_pool_size(validator_identity, abs_delegation_change)
232                .await
233                .expect("overflow should be impossible");
234        } else if delegation_delta < 0 {
235            self.decrease_validator_pool_size(validator_identity, abs_delegation_change)
236                .await
237                .expect("underflow should be impossible");
238        } else {
239            tracing::debug!(
240                validator = ?validator.identity_key,
241                "no change in delegation, no change in token supply")
242        }
243
244        // Get the updated delegation token supply for use calculating voting power.
245        let delegation_token_supply = self
246            .get_validator_pool_size(validator_identity)
247            .await
248            .unwrap_or(Amount::zero());
249
250        // Calculate the voting power in the newly beginning epoch
251        let voting_power = next_validator_rate.voting_power(delegation_token_supply);
252
253        tracing::debug!(
254            validator = ?validator.identity_key,
255            validator_delegation_pool = ?delegation_token_supply,
256            validator_power = ?voting_power,
257            "calculated validator's voting power for the upcoming epoch"
258        );
259
260        // Update the state of the validator within the validator set
261        // with the newly starting epoch's calculated voting rate and power.
262        self.set_validator_rate_data(&validator.identity_key, next_validator_rate.clone());
263        self.set_validator_power(&validator.identity_key, voting_power)?;
264
265        // The epoch is ending, so we check if this validator was active and if so
266        // we queue its [`FundingStreams`] for processing by the funding component.
267        let reward_queue_entry = if validator_state == validator::State::Active {
268            // Here we collect funding data to create a record that the funding component
269            // can "pull". We do this because by the time the funding component is executed
270            // the validator set has possibly changed (e.g. a new validator enter the active
271            // set).
272            Some((
273                validator.identity_key.clone(),
274                validator.funding_streams.clone(),
275                delegation_token_supply,
276            ))
277        } else {
278            None
279        };
280
281        let final_state = self
282            .try_precursor_transition(
283                validator_identity,
284                validator_state,
285                &next_validator_rate,
286                delegation_token_supply,
287            )
288            .await;
289
290        tracing::debug!(validator_identity = %validator.identity_key,
291            previous_epoch_validator_rate= ?prev_validator_rate,
292            next_epoch_validator_rate = ?next_validator_rate,
293            ?delegation_token_supply,
294            voting_power = ?voting_power,
295            final_state = ?final_state,
296            "validator's end-epoch has been processed");
297
298        self.process_validator_pool_state(&validator.identity_key, epoch_to_end.start_height)
299            .await.map_err(|e| {
300                tracing::error!(?e, validator_identity = %validator.identity_key, "failed to process validator pool state");
301                e
302            })?;
303
304        // Finally, we decide whether to keep this validator in the consensus set.
305        // Doing this here means that we no longer have to worry about validators
306        // escaping end-epoch processing.
307        //
308        // Performance: NV-storage layer churn because the validator might not actually
309        // be in the CS index. We should replace the union set approach with a merged
310        // stream that tags items with their source. See #3874.
311        if !self.belongs_in_index(&validator.identity_key).await {
312            self.remove_consensus_set_index(&validator.identity_key);
313        }
314
315        Ok(reward_queue_entry)
316    }
317
318    /// Compute and return the chain base rate ("L1BOR").
319    async fn process_chain_base_rate(&mut self) -> Result<BaseRateData> {
320        // We are transitioning to the next epoch, so the "current" base rate in
321        // the state is now the previous base rate.
322        let prev_base_rate = self.get_current_base_rate().await?;
323
324        tracing::debug!(
325            "fetching the issuance budget for this epoch from the distributions component"
326        );
327        // Fetch the issuance budget for the epoch we are ending.
328        let issuance_budget_for_epoch = self
329            .get_staking_token_issuance_for_epoch()
330            .expect("issuance budget is always set by the distributions component");
331
332        // Compute the base reward rate for the upcoming epoch based on the total amount
333        // of active stake and the issuance budget given to us by the distribution component.
334        let total_active_stake_previous_epoch = self.total_active_stake().await?;
335        tracing::debug!(
336            ?total_active_stake_previous_epoch,
337            ?issuance_budget_for_epoch,
338            "computing base rate for the upcoming epoch"
339        );
340
341        let base_reward_rate =
342            U128x128::ratio(issuance_budget_for_epoch, total_active_stake_previous_epoch)
343                .expect("total active stake is nonzero");
344        let base_reward_rate: Amount = (base_reward_rate * *BPS_SQUARED_SCALING_FACTOR)
345            .expect("base reward rate is around one")
346            .round_down()
347            .try_into()
348            .expect("rounded to an integral value");
349        tracing::debug!(%base_reward_rate, "base reward rate for the upcoming epoch");
350
351        let next_base_rate = prev_base_rate.next_epoch(base_reward_rate);
352        tracing::debug!(
353            ?prev_base_rate,
354            ?next_base_rate,
355            ?base_reward_rate,
356            ?total_active_stake_previous_epoch,
357            ?issuance_budget_for_epoch,
358            "calculated base rate for the upcoming epoch"
359        );
360
361        // Set the next base rate as the new "current" base rate.
362        self.set_base_rate(next_base_rate.clone());
363        // We cache the previous base rate in the state, so that other components
364        // can use it in their end-epoch procesisng (e.g. funding for staking rewards).
365        self.set_prev_base_rate(prev_base_rate);
366        Ok(next_base_rate)
367    }
368
369    /// Called during `end_epoch`. Will perform state transitions to validators based
370    /// on changes to voting power that occurred in this epoch.
371    async fn set_active_and_inactive_validators(&mut self) -> Result<()> {
372        // A list of all active and inactive validators, with nonzero voting power.
373        let mut validators_by_power = Vec::new();
374        // A list of validators with zero power, who must be inactive.
375        let mut zero_power = Vec::new();
376
377        let mut validator_identity_stream = self.consensus_set_stream()?;
378        while let Some(identity_key) = validator_identity_stream.next().await {
379            let identity_key = identity_key?;
380            let state = self
381                .get_validator_state(&identity_key)
382                .await?
383                .context("should be able to fetch validator state")?;
384            let power = self
385                .get_validator_power(&identity_key)
386                .await?
387                .unwrap_or_default();
388            if matches!(state, validator::State::Active | validator::State::Inactive) {
389                if power == Amount::zero() {
390                    zero_power.push((identity_key, power));
391                } else {
392                    validators_by_power.push((identity_key, power));
393                }
394            }
395        }
396
397        // Sort by voting power descending.
398        validators_by_power.sort_by(|a, b| b.1.cmp(&a.1));
399
400        // The top `limit` validators with nonzero power become active.
401        // All other validators become inactive.
402        let limit = self.get_stake_params().await?.active_validator_limit as usize;
403        let active = validators_by_power.iter().take(limit);
404        let inactive = validators_by_power
405            .iter()
406            .skip(limit)
407            .chain(zero_power.iter());
408
409        for (v, _) in active {
410            self.set_validator_state(v, validator::State::Active)
411                .await?;
412        }
413        for (v, _) in inactive {
414            self.set_validator_state(v, validator::State::Inactive)
415                .await?;
416        }
417
418        Ok(())
419    }
420
421    /// Materializes the entire current validator set as a CometBFT update.
422    ///
423    /// This re-defines all validators every time, to simplify the code compared to
424    /// trying to track delta updates.
425    #[instrument(skip(self))]
426    async fn build_cometbft_validator_updates(&mut self) -> Result<()> {
427        let current_consensus_keys: CurrentConsensusKeys = self
428            .get(state_key::consensus_update::consensus_keys())
429            .await?
430            .expect("current consensus keys must be present");
431        let current_consensus_keys = current_consensus_keys
432            .consensus_keys
433            .into_iter()
434            .collect::<BTreeSet<_>>();
435
436        let mut voting_power_by_consensus_key = BTreeMap::<PublicKey, Amount>::new();
437
438        // First, build a mapping of consensus key to voting power for all known validators.
439
440        // Using a JoinSet, run each validator's state queries concurrently.
441        let mut js: JoinSet<std::prelude::v1::Result<(PublicKey, Amount), anyhow::Error>> =
442            JoinSet::new();
443        let mut validator_identity_stream = self.consensus_set_stream()?;
444        while let Some(identity_key) = validator_identity_stream.next().await {
445            let identity_key = identity_key?;
446            let state = self.get_validator_state(&identity_key);
447            let power = self.get_validator_power(&identity_key);
448            let consensus_key = self.fetch_validator_consensus_key(&identity_key);
449            js.spawn(async move {
450                let state = state
451                    .await?
452                    .expect("every known validator must have a recorded state");
453                // Compute the effective power of this validator; this is the
454                // validator power, clamped to zero for all non-Active validators.
455                let effective_power = if matches!(state, validator::State::Active) {
456                    power
457                        .await?
458                        .expect("every active validator must have a recorded power")
459                } else {
460                    Amount::zero()
461                };
462
463                let consensus_key = consensus_key
464                    .await?
465                    .expect("every known validator must have a recorded consensus key");
466
467                anyhow::Ok((consensus_key, effective_power))
468            });
469        }
470        // Now collect the computed results into the lookup table.
471        while let Some(pair) = js.join_next().await.transpose()? {
472            let (consensus_key, effective_power) = pair?;
473            voting_power_by_consensus_key.insert(consensus_key, effective_power);
474        }
475
476        // Next, filter that mapping to exclude any zero-power validators, UNLESS they
477        // were already known to CometBFT.
478        voting_power_by_consensus_key.retain(|consensus_key, voting_power| {
479            *voting_power > Amount::zero() || current_consensus_keys.contains(consensus_key)
480        });
481
482        // Finally, tell tendermint to delete any known consensus keys not otherwise updated
483        for ck in current_consensus_keys.iter() {
484            voting_power_by_consensus_key
485                .entry(*ck)
486                .or_insert(Amount::zero());
487        }
488
489        // Save the validator updates to send to Tendermint.
490        let tendermint_validator_updates = voting_power_by_consensus_key
491            .iter()
492            .map(|(consensus_key, power)| {
493                Ok(Update {
494                    pub_key: *consensus_key,
495                    // Validator voting power is measured in units of staking tokens,
496                    // at time, CometBFT has an upper limit of around 2^60 - 1.
497                    // This means that there is an upper bound on the maximum possible issuance
498                    // at around 10^12 units of staking tokens.
499                    power: ((*power).value() as u64).try_into()?,
500                })
501            })
502            .collect::<Result<Vec<_>>>()?;
503        self.put_cometbft_validator_updates(tendermint_validator_updates);
504
505        // Record the new consensus keys we will have told tendermint about.
506        let updated_consensus_keys = CurrentConsensusKeys {
507            consensus_keys: voting_power_by_consensus_key
508                .iter()
509                .filter_map(|(consensus_key, power)| {
510                    if *power != Amount::zero() {
511                        Some(*consensus_key)
512                    } else {
513                        None
514                    }
515                })
516                .collect(),
517        };
518        tracing::debug!(?updated_consensus_keys);
519        self.put(
520            state_key::consensus_update::consensus_keys().to_owned(),
521            updated_consensus_keys,
522        );
523
524        Ok(())
525    }
526}
527
528impl<T: StateWrite + ConsensusIndexRead + ?Sized> EpochHandler for T {}