1use crate::{
2 component::{
3 stake::{
4 ConsensusIndexRead, ConsensusIndexWrite, ConsensusUpdateWrite, InternalStakingData,
5 RateDataWrite,
6 },
7 validator_handler::{
8 ValidatorDataRead, ValidatorDataWrite, ValidatorManager, ValidatorPoolTracker,
9 },
10 SlashingData,
11 },
12 rate::BaseRateData,
13 state_key, validator, CurrentConsensusKeys, FundingStreams, IdentityKey, Penalty, StateReadExt,
14 StateWriteExt, BPS_SQUARED_SCALING_FACTOR,
15};
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use cnidarium::StateWrite;
19use futures::{StreamExt, TryStreamExt};
20use penumbra_sdk_distributions::component::StateReadExt as _;
21use penumbra_sdk_num::{fixpoint::U128x128, Amount};
22use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
23use penumbra_sdk_sct::{component::clock::EpochRead, epoch::Epoch};
24use std::collections::{BTreeMap, BTreeSet};
25use tendermint::{validator::Update, PublicKey};
26use tokio::task::JoinSet;
27use tracing::instrument;
2829#[async_trait]
30pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
31#[instrument(skip(self, epoch_to_end), fields(index = epoch_to_end.index))]
32/// Process the end of an epoch for the staking component.
33async fn end_epoch(&mut self, epoch_to_end: Epoch) -> Result<()> {
34// Collect all the delegation changes that occurred in the epoch we are ending.
35let mut delegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
36let mut undelegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
3738let end_height = self.get_block_height().await?;
39let mut num_delegations = 0usize;
40let mut num_undelegations = 0usize;
4142// Performance: see #3874.
43for height in epoch_to_end.start_height..=end_height {
44let changes = self
45.get_delegation_changes(
46 height
47 .try_into()
48 .context("should be able to convert u64 into block height")?,
49 )
50 .await?;
5152 num_delegations = num_delegations.saturating_add(changes.delegations.len());
53 num_undelegations = num_undelegations.saturating_add(changes.undelegations.len());
5455for d in changes.delegations {
56let validator_identity = d.validator_identity.clone();
57let delegation_tally = delegations_by_validator
58 .entry(validator_identity)
59 .or_default()
60 .saturating_add(&d.delegation_amount);
61 delegations_by_validator.insert(validator_identity, delegation_tally);
62 }
63for u in changes.undelegations {
64let validator_identity = u.validator_identity.clone();
65let undelegation_tally = undelegations_by_validator
66 .entry(validator_identity)
67 .or_default()
68 .saturating_add(&u.delegation_amount);
6970 undelegations_by_validator.insert(validator_identity, undelegation_tally);
71 }
72 }
7374tracing::debug!(
75 num_delegations,
76 num_undelegations,
77 epoch_start = epoch_to_end.start_height,
78 epoch_end = end_height,
79 epoch_index = epoch_to_end.index,
80"collected delegation changes for the epoch"
81);
8283// Compute and set the chain base rate for the upcoming epoch.
84let next_base_rate = self.process_chain_base_rate().await?;
8586// TODO(erwan): replace this with a tagged stream once we have tests. See #3874.
87let delegation_set = delegations_by_validator
88 .keys()
89 .cloned()
90 .collect::<BTreeSet<_>>();
91let undelegation_set = undelegations_by_validator
92 .keys()
93 .cloned()
94 .collect::<BTreeSet<_>>();
95let validators_with_delegation_changes = delegation_set
96 .union(&undelegation_set)
97 .cloned()
98 .collect::<BTreeSet<_>>();
99100// We're only tracking the consensus set, and each validator identity is about 64 bytes,
101 // it seems reasonable to collect the entire consensus set into memory since we expect
102 // less than 100k entries. We do this to keep the code simple.
103let consensus_set = self
104.consensus_set_stream()?
105.try_collect::<BTreeSet<IdentityKey>>()
106 .await?;
107108let validators_to_process = validators_with_delegation_changes
109 .union(&consensus_set)
110 .collect::<BTreeSet<_>>();
111112let mut funding_queue: Vec<(IdentityKey, FundingStreams, Amount)> = Vec::new();
113114for validator_identity in validators_to_process {
115let total_delegations = delegations_by_validator
116 .remove(validator_identity)
117 .unwrap_or_else(Amount::zero);
118119let total_undelegations = undelegations_by_validator
120 .remove(validator_identity)
121 .unwrap_or_else(Amount::zero);
122123if let Some(rewards) = self
124.process_validator(
125 validator_identity,
126 epoch_to_end,
127 next_base_rate.clone(),
128 total_delegations,
129 total_undelegations,
130 )
131 .await
132.map_err(|e| {
133tracing::error!(
134?e,
135?validator_identity,
136"failed to process validator's end-epoch"
137);
138 e
139 })?
140{
141 funding_queue.push(rewards)
142 }
143 }
144145// This is a sanity check to ensure that we have processed all the delegation changes.
146 // It should be impossible for this to fail, but we check it anyway. We can remove
147 // these guards when we start rolling out our testing framework and increase coverage.
148 // This should coincide with a profiling/performance effort on the epoch-handler.
149assert!(delegations_by_validator.is_empty());
150assert!(undelegations_by_validator.is_empty());
151152// We have collected the funding streams for all validators, so we can now
153 // record them for the funding component to process.
154self.queue_staking_rewards(funding_queue);
155156// Now that the consensus set voting power has been calculated, we can select the
157 // top N validators to be active for the next epoch.
158self.set_active_and_inactive_validators().await?;
159Ok(())
160 }
161162async fn process_validator(
163&mut self,
164 validator_identity: &IdentityKey,
165 epoch_to_end: Epoch,
166 next_base_rate: BaseRateData,
167 total_delegations: Amount,
168 total_undelegations: Amount,
169 ) -> Result<Option<(IdentityKey, FundingStreams, Amount)>> {
170let validator = self.get_validator_definition(&validator_identity).await?.ok_or_else(|| {
171anyhow::anyhow!("validator (identity={}) is in consensus index but its definition was not found in the JMT", &validator_identity)
172 })?;
173174// Grab the current validator state.
175let validator_state = self
176.get_validator_state(&validator.identity_key)
177 .await?
178.ok_or_else(|| {
179anyhow::anyhow!("validator (identity={}) is in consensus index but its state was not found in the JMT", &validator.identity_key)
180 })?;
181182// We are transitioning to the next epoch, so the "current" validator
183 // rate in the state is now the previous validator rate.
184let prev_validator_rate = self
185.get_validator_rate(&validator.identity_key)
186 .await?
187.ok_or_else(|| {
188anyhow::anyhow!("validator (identity={}) is in consensus index but its rate data was not found in the JMT", &validator.identity_key)
189 })?;
190191// First, apply any penalty recorded in the epoch we are ending.
192let penalty = self
193.get_penalty_in_epoch(&validator.identity_key, epoch_to_end.index)
194 .await
195.unwrap_or(Penalty::from_percent(0));
196let prev_validator_rate_with_penalty = prev_validator_rate.slash(penalty);
197198self.set_prev_validator_rate(
199&validator.identity_key,
200 prev_validator_rate_with_penalty.clone(),
201 );
202203// Then compute the next validator rate, accounting for funding streams and validator state.
204let next_validator_rate = prev_validator_rate_with_penalty.next_epoch(
205&next_base_rate,
206 validator.funding_streams.as_ref(),
207&validator_state,
208 );
209210// In theory, the maximum amount of delegation tokens is the total supply of staking tokens.
211 // In practice, this is unlikely to happen, but even if it did, we anticipate that the total
212 // supply of staking token is << 10^32 (2^107) tokens with a unit denomination of 10^6 (2^20),
213 // so there should be ample room to cast this to an i128.
214let delegation_delta =
215 (total_delegations.value() as i128) - (total_undelegations.value() as i128);
216217tracing::debug!(
218 validator = ?validator.identity_key,
219?total_delegations,
220?total_undelegations,
221 delegation_delta,
222"net delegation change for validator's pool for the epoch"
223);
224225let abs_delegation_change = Amount::from(delegation_delta.unsigned_abs());
226227// We need to either contract or expand the validator pool size,
228 // and panic if we encounter an under/overflow, because it can only
229 // happen if something has gone seriously wrong with the validator rate data.
230if delegation_delta > 0 {
231self.increase_validator_pool_size(validator_identity, abs_delegation_change)
232 .await
233.expect("overflow should be impossible");
234 } else if delegation_delta < 0 {
235self.decrease_validator_pool_size(validator_identity, abs_delegation_change)
236 .await
237.expect("underflow should be impossible");
238 } else {
239tracing::debug!(
240 validator = ?validator.identity_key,
241"no change in delegation, no change in token supply")
242 }
243244// Get the updated delegation token supply for use calculating voting power.
245let delegation_token_supply = self
246.get_validator_pool_size(validator_identity)
247 .await
248.unwrap_or(Amount::zero());
249250// Calculate the voting power in the newly beginning epoch
251let voting_power = next_validator_rate.voting_power(delegation_token_supply);
252253tracing::debug!(
254 validator = ?validator.identity_key,
255 validator_delegation_pool = ?delegation_token_supply,
256 validator_power = ?voting_power,
257"calculated validator's voting power for the upcoming epoch"
258);
259260// Update the state of the validator within the validator set
261 // with the newly starting epoch's calculated voting rate and power.
262self.set_validator_rate_data(&validator.identity_key, next_validator_rate.clone());
263self.set_validator_power(&validator.identity_key, voting_power)?;
264265// The epoch is ending, so we check if this validator was active and if so
266 // we queue its [`FundingStreams`] for processing by the funding component.
267let reward_queue_entry = if validator_state == validator::State::Active {
268// Here we collect funding data to create a record that the funding component
269 // can "pull". We do this because by the time the funding component is executed
270 // the validator set has possibly changed (e.g. a new validator enter the active
271 // set).
272Some((
273 validator.identity_key.clone(),
274 validator.funding_streams.clone(),
275 delegation_token_supply,
276 ))
277 } else {
278None
279};
280281let final_state = self
282.try_precursor_transition(
283 validator_identity,
284 validator_state,
285&next_validator_rate,
286 delegation_token_supply,
287 )
288 .await;
289290tracing::debug!(validator_identity = %validator.identity_key,
291 previous_epoch_validator_rate= ?prev_validator_rate,
292 next_epoch_validator_rate = ?next_validator_rate,
293?delegation_token_supply,
294 voting_power = ?voting_power,
295 final_state = ?final_state,
296"validator's end-epoch has been processed");
297298self.process_validator_pool_state(&validator.identity_key, epoch_to_end.start_height)
299 .await.map_err(|e| {
300tracing::error!(?e, validator_identity = %validator.identity_key, "failed to process validator pool state");
301 e
302 })?;
303304// Finally, we decide whether to keep this validator in the consensus set.
305 // Doing this here means that we no longer have to worry about validators
306 // escaping end-epoch processing.
307 //
308 // Performance: NV-storage layer churn because the validator might not actually
309 // be in the CS index. We should replace the union set approach with a merged
310 // stream that tags items with their source. See #3874.
311if !self.belongs_in_index(&validator.identity_key).await {
312self.remove_consensus_set_index(&validator.identity_key);
313 }
314315Ok(reward_queue_entry)
316 }
317318/// Compute and return the chain base rate ("L1BOR").
319async fn process_chain_base_rate(&mut self) -> Result<BaseRateData> {
320// We are transitioning to the next epoch, so the "current" base rate in
321 // the state is now the previous base rate.
322let prev_base_rate = self.get_current_base_rate().await?;
323324tracing::debug!(
325"fetching the issuance budget for this epoch from the distributions component"
326);
327// Fetch the issuance budget for the epoch we are ending.
328let issuance_budget_for_epoch = self
329.get_staking_token_issuance_for_epoch()
330 .expect("issuance budget is always set by the distributions component");
331332// Compute the base reward rate for the upcoming epoch based on the total amount
333 // of active stake and the issuance budget given to us by the distribution component.
334let total_active_stake_previous_epoch = self.total_active_stake().await?;
335tracing::debug!(
336?total_active_stake_previous_epoch,
337?issuance_budget_for_epoch,
338"computing base rate for the upcoming epoch"
339);
340341let base_reward_rate =
342 U128x128::ratio(issuance_budget_for_epoch, total_active_stake_previous_epoch)
343 .expect("total active stake is nonzero");
344let base_reward_rate: Amount = (base_reward_rate * *BPS_SQUARED_SCALING_FACTOR)
345 .expect("base reward rate is around one")
346 .round_down()
347 .try_into()
348 .expect("rounded to an integral value");
349tracing::debug!(%base_reward_rate, "base reward rate for the upcoming epoch");
350351let next_base_rate = prev_base_rate.next_epoch(base_reward_rate);
352tracing::debug!(
353?prev_base_rate,
354?next_base_rate,
355?base_reward_rate,
356?total_active_stake_previous_epoch,
357?issuance_budget_for_epoch,
358"calculated base rate for the upcoming epoch"
359);
360361// Set the next base rate as the new "current" base rate.
362self.set_base_rate(next_base_rate.clone());
363// We cache the previous base rate in the state, so that other components
364 // can use it in their end-epoch procesisng (e.g. funding for staking rewards).
365self.set_prev_base_rate(prev_base_rate);
366Ok(next_base_rate)
367 }
368369/// Called during `end_epoch`. Will perform state transitions to validators based
370 /// on changes to voting power that occurred in this epoch.
371async fn set_active_and_inactive_validators(&mut self) -> Result<()> {
372// A list of all active and inactive validators, with nonzero voting power.
373let mut validators_by_power = Vec::new();
374// A list of validators with zero power, who must be inactive.
375let mut zero_power = Vec::new();
376377let mut validator_identity_stream = self.consensus_set_stream()?;
378while let Some(identity_key) = validator_identity_stream.next().await {
379let identity_key = identity_key?;
380let state = self
381.get_validator_state(&identity_key)
382 .await?
383.context("should be able to fetch validator state")?;
384let power = self
385.get_validator_power(&identity_key)
386 .await?
387.unwrap_or_default();
388if matches!(state, validator::State::Active | validator::State::Inactive) {
389if power == Amount::zero() {
390 zero_power.push((identity_key, power));
391 } else {
392 validators_by_power.push((identity_key, power));
393 }
394 }
395 }
396397// Sort by voting power descending.
398validators_by_power.sort_by(|a, b| b.1.cmp(&a.1));
399400// The top `limit` validators with nonzero power become active.
401 // All other validators become inactive.
402let limit = self.get_stake_params().await?.active_validator_limit as usize;
403let active = validators_by_power.iter().take(limit);
404let inactive = validators_by_power
405 .iter()
406 .skip(limit)
407 .chain(zero_power.iter());
408409for (v, _) in active {
410self.set_validator_state(v, validator::State::Active)
411 .await?;
412 }
413for (v, _) in inactive {
414self.set_validator_state(v, validator::State::Inactive)
415 .await?;
416 }
417418Ok(())
419 }
420421/// Materializes the entire current validator set as a CometBFT update.
422 ///
423 /// This re-defines all validators every time, to simplify the code compared to
424 /// trying to track delta updates.
425#[instrument(skip(self))]
426async fn build_cometbft_validator_updates(&mut self) -> Result<()> {
427let current_consensus_keys: CurrentConsensusKeys = self
428.get(state_key::consensus_update::consensus_keys())
429 .await?
430.expect("current consensus keys must be present");
431let current_consensus_keys = current_consensus_keys
432 .consensus_keys
433 .into_iter()
434 .collect::<BTreeSet<_>>();
435436let mut voting_power_by_consensus_key = BTreeMap::<PublicKey, Amount>::new();
437438// First, build a mapping of consensus key to voting power for all known validators.
439440 // Using a JoinSet, run each validator's state queries concurrently.
441let mut js: JoinSet<std::prelude::v1::Result<(PublicKey, Amount), anyhow::Error>> =
442 JoinSet::new();
443let mut validator_identity_stream = self.consensus_set_stream()?;
444while let Some(identity_key) = validator_identity_stream.next().await {
445let identity_key = identity_key?;
446let state = self.get_validator_state(&identity_key);
447let power = self.get_validator_power(&identity_key);
448let consensus_key = self.fetch_validator_consensus_key(&identity_key);
449 js.spawn(async move {
450let state = state
451 .await?
452.expect("every known validator must have a recorded state");
453// Compute the effective power of this validator; this is the
454 // validator power, clamped to zero for all non-Active validators.
455let effective_power = if matches!(state, validator::State::Active) {
456 power
457 .await?
458.expect("every active validator must have a recorded power")
459 } else {
460 Amount::zero()
461 };
462463let consensus_key = consensus_key
464 .await?
465.expect("every known validator must have a recorded consensus key");
466467 anyhow::Ok((consensus_key, effective_power))
468 });
469 }
470// Now collect the computed results into the lookup table.
471while let Some(pair) = js.join_next().await.transpose()? {
472let (consensus_key, effective_power) = pair?;
473 voting_power_by_consensus_key.insert(consensus_key, effective_power);
474 }
475476// Next, filter that mapping to exclude any zero-power validators, UNLESS they
477 // were already known to CometBFT.
478voting_power_by_consensus_key.retain(|consensus_key, voting_power| {
479*voting_power > Amount::zero() || current_consensus_keys.contains(consensus_key)
480 });
481482// Finally, tell tendermint to delete any known consensus keys not otherwise updated
483for ck in current_consensus_keys.iter() {
484 voting_power_by_consensus_key
485 .entry(*ck)
486 .or_insert(Amount::zero());
487 }
488489// Save the validator updates to send to Tendermint.
490let tendermint_validator_updates = voting_power_by_consensus_key
491 .iter()
492 .map(|(consensus_key, power)| {
493Ok(Update {
494 pub_key: *consensus_key,
495// Validator voting power is measured in units of staking tokens,
496 // at time, CometBFT has an upper limit of around 2^60 - 1.
497 // This means that there is an upper bound on the maximum possible issuance
498 // at around 10^12 units of staking tokens.
499power: ((*power).value() as u64).try_into()?,
500 })
501 })
502 .collect::<Result<Vec<_>>>()?;
503self.put_cometbft_validator_updates(tendermint_validator_updates);
504505// Record the new consensus keys we will have told tendermint about.
506let updated_consensus_keys = CurrentConsensusKeys {
507 consensus_keys: voting_power_by_consensus_key
508 .iter()
509 .filter_map(|(consensus_key, power)| {
510if *power != Amount::zero() {
511Some(*consensus_key)
512 } else {
513None
514}
515 })
516 .collect(),
517 };
518tracing::debug!(?updated_consensus_keys);
519self.put(
520 state_key::consensus_update::consensus_keys().to_owned(),
521 updated_consensus_keys,
522 );
523524Ok(())
525 }
526}
527528impl<T: StateWrite + ConsensusIndexRead + ?Sized> EpochHandler for T {}