penumbra_sdk_stake/component/
epoch_handler.rs1use crate::{
2 component::{
3 stake::{
4 ConsensusIndexRead, ConsensusIndexWrite, ConsensusUpdateWrite, InternalStakingData,
5 RateDataWrite,
6 },
7 validator_handler::{
8 ValidatorDataRead, ValidatorDataWrite, ValidatorManager, ValidatorPoolTracker,
9 },
10 SlashingData,
11 },
12 rate::BaseRateData,
13 state_key, validator, CurrentConsensusKeys, FundingStreams, IdentityKey, Penalty, StateReadExt,
14 StateWriteExt, BPS_SQUARED_SCALING_FACTOR,
15};
16use anyhow::{Context, Result};
17use async_trait::async_trait;
18use cnidarium::StateWrite;
19use futures::{StreamExt, TryStreamExt};
20use penumbra_sdk_distributions::component::StateReadExt as _;
21use penumbra_sdk_num::{fixpoint::U128x128, Amount};
22use penumbra_sdk_proto::{StateReadProto, StateWriteProto};
23use penumbra_sdk_sct::{component::clock::EpochRead, epoch::Epoch};
24use std::collections::{BTreeMap, BTreeSet};
25use tendermint::{validator::Update, PublicKey};
26use tokio::task::JoinSet;
27use tracing::instrument;
28
29#[async_trait]
30pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
31 #[instrument(skip(self, epoch_to_end), fields(index = epoch_to_end.index))]
32 async fn end_epoch(&mut self, epoch_to_end: Epoch) -> Result<()> {
34 let mut delegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
36 let mut undelegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
37
38 let end_height = self.get_block_height().await?;
39 let mut num_delegations = 0usize;
40 let mut num_undelegations = 0usize;
41
42 for height in epoch_to_end.start_height..=end_height {
44 let changes = self
45 .get_delegation_changes(
46 height
47 .try_into()
48 .context("should be able to convert u64 into block height")?,
49 )
50 .await?;
51
52 num_delegations = num_delegations.saturating_add(changes.delegations.len());
53 num_undelegations = num_undelegations.saturating_add(changes.undelegations.len());
54
55 for d in changes.delegations {
56 let validator_identity = d.validator_identity.clone();
57 let delegation_tally = delegations_by_validator
58 .entry(validator_identity)
59 .or_default()
60 .saturating_add(&d.delegation_amount);
61 delegations_by_validator.insert(validator_identity, delegation_tally);
62 }
63 for u in changes.undelegations {
64 let validator_identity = u.validator_identity.clone();
65 let undelegation_tally = undelegations_by_validator
66 .entry(validator_identity)
67 .or_default()
68 .saturating_add(&u.delegation_amount);
69
70 undelegations_by_validator.insert(validator_identity, undelegation_tally);
71 }
72 }
73
74 tracing::debug!(
75 num_delegations,
76 num_undelegations,
77 epoch_start = epoch_to_end.start_height,
78 epoch_end = end_height,
79 epoch_index = epoch_to_end.index,
80 "collected delegation changes for the epoch"
81 );
82
83 let next_base_rate = self.process_chain_base_rate().await?;
85
86 let delegation_set = delegations_by_validator
88 .keys()
89 .cloned()
90 .collect::<BTreeSet<_>>();
91 let undelegation_set = undelegations_by_validator
92 .keys()
93 .cloned()
94 .collect::<BTreeSet<_>>();
95 let validators_with_delegation_changes = delegation_set
96 .union(&undelegation_set)
97 .cloned()
98 .collect::<BTreeSet<_>>();
99
100 let consensus_set = self
104 .consensus_set_stream()?
105 .try_collect::<BTreeSet<IdentityKey>>()
106 .await?;
107
108 let validators_to_process = validators_with_delegation_changes
109 .union(&consensus_set)
110 .collect::<BTreeSet<_>>();
111
112 let mut funding_queue: Vec<(IdentityKey, FundingStreams, Amount)> = Vec::new();
113
114 for validator_identity in validators_to_process {
115 let total_delegations = delegations_by_validator
116 .remove(validator_identity)
117 .unwrap_or_else(Amount::zero);
118
119 let total_undelegations = undelegations_by_validator
120 .remove(validator_identity)
121 .unwrap_or_else(Amount::zero);
122
123 if let Some(rewards) = self
124 .process_validator(
125 validator_identity,
126 epoch_to_end,
127 next_base_rate.clone(),
128 total_delegations,
129 total_undelegations,
130 )
131 .await
132 .map_err(|e| {
133 tracing::error!(
134 ?e,
135 ?validator_identity,
136 "failed to process validator's end-epoch"
137 );
138 e
139 })?
140 {
141 funding_queue.push(rewards)
142 }
143 }
144
145 assert!(delegations_by_validator.is_empty());
150 assert!(undelegations_by_validator.is_empty());
151
152 self.queue_staking_rewards(funding_queue);
155
156 self.set_active_and_inactive_validators().await?;
159 Ok(())
160 }
161
162 async fn process_validator(
163 &mut self,
164 validator_identity: &IdentityKey,
165 epoch_to_end: Epoch,
166 next_base_rate: BaseRateData,
167 total_delegations: Amount,
168 total_undelegations: Amount,
169 ) -> Result<Option<(IdentityKey, FundingStreams, Amount)>> {
170 let validator = self.get_validator_definition(&validator_identity).await?.ok_or_else(|| {
171 anyhow::anyhow!("validator (identity={}) is in consensus index but its definition was not found in the JMT", &validator_identity)
172 })?;
173
174 let validator_state = self
176 .get_validator_state(&validator.identity_key)
177 .await?
178 .ok_or_else(|| {
179 anyhow::anyhow!("validator (identity={}) is in consensus index but its state was not found in the JMT", &validator.identity_key)
180 })?;
181
182 let prev_validator_rate = self
185 .get_validator_rate(&validator.identity_key)
186 .await?
187 .ok_or_else(|| {
188 anyhow::anyhow!("validator (identity={}) is in consensus index but its rate data was not found in the JMT", &validator.identity_key)
189 })?;
190
191 let penalty = self
193 .get_penalty_in_epoch(&validator.identity_key, epoch_to_end.index)
194 .await
195 .unwrap_or(Penalty::from_percent(0));
196 let prev_validator_rate_with_penalty = prev_validator_rate.slash(penalty);
197
198 self.set_prev_validator_rate(
199 &validator.identity_key,
200 prev_validator_rate_with_penalty.clone(),
201 );
202
203 let next_validator_rate = prev_validator_rate_with_penalty.next_epoch(
205 &next_base_rate,
206 validator.funding_streams.as_ref(),
207 &validator_state,
208 );
209
210 let delegation_delta =
215 (total_delegations.value() as i128) - (total_undelegations.value() as i128);
216
217 tracing::debug!(
218 validator = ?validator.identity_key,
219 ?total_delegations,
220 ?total_undelegations,
221 delegation_delta,
222 "net delegation change for validator's pool for the epoch"
223 );
224
225 let abs_delegation_change = Amount::from(delegation_delta.unsigned_abs());
226
227 if delegation_delta > 0 {
231 self.increase_validator_pool_size(validator_identity, abs_delegation_change)
232 .await
233 .expect("overflow should be impossible");
234 } else if delegation_delta < 0 {
235 self.decrease_validator_pool_size(validator_identity, abs_delegation_change)
236 .await
237 .expect("underflow should be impossible");
238 } else {
239 tracing::debug!(
240 validator = ?validator.identity_key,
241 "no change in delegation, no change in token supply")
242 }
243
244 let delegation_token_supply = self
246 .get_validator_pool_size(validator_identity)
247 .await
248 .unwrap_or(Amount::zero());
249
250 let voting_power = next_validator_rate.voting_power(delegation_token_supply);
252
253 tracing::debug!(
254 validator = ?validator.identity_key,
255 validator_delegation_pool = ?delegation_token_supply,
256 validator_power = ?voting_power,
257 "calculated validator's voting power for the upcoming epoch"
258 );
259
260 self.set_validator_rate_data(&validator.identity_key, next_validator_rate.clone());
263 self.set_validator_power(&validator.identity_key, voting_power)?;
264
265 let reward_queue_entry = if validator_state == validator::State::Active {
268 Some((
273 validator.identity_key.clone(),
274 validator.funding_streams.clone(),
275 delegation_token_supply,
276 ))
277 } else {
278 None
279 };
280
281 let final_state = self
282 .try_precursor_transition(
283 validator_identity,
284 validator_state,
285 &next_validator_rate,
286 delegation_token_supply,
287 )
288 .await;
289
290 tracing::debug!(validator_identity = %validator.identity_key,
291 previous_epoch_validator_rate= ?prev_validator_rate,
292 next_epoch_validator_rate = ?next_validator_rate,
293 ?delegation_token_supply,
294 voting_power = ?voting_power,
295 final_state = ?final_state,
296 "validator's end-epoch has been processed");
297
298 self.process_validator_pool_state(&validator.identity_key, epoch_to_end.start_height)
299 .await.map_err(|e| {
300 tracing::error!(?e, validator_identity = %validator.identity_key, "failed to process validator pool state");
301 e
302 })?;
303
304 if !self.belongs_in_index(&validator.identity_key).await {
312 self.remove_consensus_set_index(&validator.identity_key);
313 }
314
315 Ok(reward_queue_entry)
316 }
317
318 async fn process_chain_base_rate(&mut self) -> Result<BaseRateData> {
320 let prev_base_rate = self.get_current_base_rate().await?;
323
324 tracing::debug!(
325 "fetching the issuance budget for this epoch from the distributions component"
326 );
327 let issuance_budget_for_epoch = self
329 .get_staking_token_issuance_for_epoch()
330 .expect("issuance budget is always set by the distributions component");
331
332 let total_active_stake_previous_epoch = self.total_active_stake().await?;
335 tracing::debug!(
336 ?total_active_stake_previous_epoch,
337 ?issuance_budget_for_epoch,
338 "computing base rate for the upcoming epoch"
339 );
340
341 let base_reward_rate =
342 U128x128::ratio(issuance_budget_for_epoch, total_active_stake_previous_epoch)
343 .expect("total active stake is nonzero");
344 let base_reward_rate: Amount = (base_reward_rate * *BPS_SQUARED_SCALING_FACTOR)
345 .expect("base reward rate is around one")
346 .round_down()
347 .try_into()
348 .expect("rounded to an integral value");
349 tracing::debug!(%base_reward_rate, "base reward rate for the upcoming epoch");
350
351 let next_base_rate = prev_base_rate.next_epoch(base_reward_rate);
352 tracing::debug!(
353 ?prev_base_rate,
354 ?next_base_rate,
355 ?base_reward_rate,
356 ?total_active_stake_previous_epoch,
357 ?issuance_budget_for_epoch,
358 "calculated base rate for the upcoming epoch"
359 );
360
361 self.set_base_rate(next_base_rate.clone());
363 self.set_prev_base_rate(prev_base_rate);
366 Ok(next_base_rate)
367 }
368
369 async fn set_active_and_inactive_validators(&mut self) -> Result<()> {
372 let mut validators_by_power = Vec::new();
374 let mut zero_power = Vec::new();
376
377 let mut validator_identity_stream = self.consensus_set_stream()?;
378 while let Some(identity_key) = validator_identity_stream.next().await {
379 let identity_key = identity_key?;
380 let state = self
381 .get_validator_state(&identity_key)
382 .await?
383 .context("should be able to fetch validator state")?;
384 let power = self
385 .get_validator_power(&identity_key)
386 .await?
387 .unwrap_or_default();
388 if matches!(state, validator::State::Active | validator::State::Inactive) {
389 if power == Amount::zero() {
390 zero_power.push((identity_key, power));
391 } else {
392 validators_by_power.push((identity_key, power));
393 }
394 }
395 }
396
397 validators_by_power.sort_by(|a, b| b.1.cmp(&a.1));
399
400 let limit = self.get_stake_params().await?.active_validator_limit as usize;
403 let active = validators_by_power.iter().take(limit);
404 let inactive = validators_by_power
405 .iter()
406 .skip(limit)
407 .chain(zero_power.iter());
408
409 for (v, _) in active {
410 self.set_validator_state(v, validator::State::Active)
411 .await?;
412 }
413 for (v, _) in inactive {
414 self.set_validator_state(v, validator::State::Inactive)
415 .await?;
416 }
417
418 Ok(())
419 }
420
421 #[instrument(skip(self))]
426 async fn build_cometbft_validator_updates(&mut self) -> Result<()> {
427 let current_consensus_keys: CurrentConsensusKeys = self
428 .get(state_key::consensus_update::consensus_keys())
429 .await?
430 .expect("current consensus keys must be present");
431 let current_consensus_keys = current_consensus_keys
432 .consensus_keys
433 .into_iter()
434 .collect::<BTreeSet<_>>();
435
436 let mut voting_power_by_consensus_key = BTreeMap::<PublicKey, Amount>::new();
437
438 let mut js: JoinSet<std::prelude::v1::Result<(PublicKey, Amount), anyhow::Error>> =
442 JoinSet::new();
443 let mut validator_identity_stream = self.consensus_set_stream()?;
444 while let Some(identity_key) = validator_identity_stream.next().await {
445 let identity_key = identity_key?;
446 let state = self.get_validator_state(&identity_key);
447 let power = self.get_validator_power(&identity_key);
448 let consensus_key = self.fetch_validator_consensus_key(&identity_key);
449 js.spawn(async move {
450 let state = state
451 .await?
452 .expect("every known validator must have a recorded state");
453 let effective_power = if matches!(state, validator::State::Active) {
456 power
457 .await?
458 .expect("every active validator must have a recorded power")
459 } else {
460 Amount::zero()
461 };
462
463 let consensus_key = consensus_key
464 .await?
465 .expect("every known validator must have a recorded consensus key");
466
467 anyhow::Ok((consensus_key, effective_power))
468 });
469 }
470 while let Some(pair) = js.join_next().await.transpose()? {
472 let (consensus_key, effective_power) = pair?;
473 voting_power_by_consensus_key.insert(consensus_key, effective_power);
474 }
475
476 voting_power_by_consensus_key.retain(|consensus_key, voting_power| {
479 *voting_power > Amount::zero() || current_consensus_keys.contains(consensus_key)
480 });
481
482 for ck in current_consensus_keys.iter() {
484 voting_power_by_consensus_key
485 .entry(*ck)
486 .or_insert(Amount::zero());
487 }
488
489 let tendermint_validator_updates = voting_power_by_consensus_key
491 .iter()
492 .map(|(consensus_key, power)| {
493 Ok(Update {
494 pub_key: *consensus_key,
495 power: ((*power).value() as u64).try_into()?,
500 })
501 })
502 .collect::<Result<Vec<_>>>()?;
503 self.put_cometbft_validator_updates(tendermint_validator_updates);
504
505 let updated_consensus_keys = CurrentConsensusKeys {
507 consensus_keys: voting_power_by_consensus_key
508 .iter()
509 .filter_map(|(consensus_key, power)| {
510 if *power != Amount::zero() {
511 Some(*consensus_key)
512 } else {
513 None
514 }
515 })
516 .collect(),
517 };
518 tracing::debug!(?updated_consensus_keys);
519 self.put(
520 state_key::consensus_update::consensus_keys().to_owned(),
521 updated_consensus_keys,
522 );
523
524 Ok(())
525 }
526}
527
528impl<T: StateWrite + ConsensusIndexRead + ?Sized> EpochHandler for T {}