penumbra_stake/component/
epoch_handler.rsuse crate::{
component::{
stake::{
ConsensusIndexRead, ConsensusIndexWrite, ConsensusUpdateWrite, InternalStakingData,
RateDataWrite,
},
validator_handler::{
ValidatorDataRead, ValidatorDataWrite, ValidatorManager, ValidatorPoolTracker,
},
SlashingData,
},
rate::BaseRateData,
state_key, validator, CurrentConsensusKeys, FundingStreams, IdentityKey, Penalty, StateReadExt,
StateWriteExt, BPS_SQUARED_SCALING_FACTOR,
};
use anyhow::{Context, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
use futures::{StreamExt, TryStreamExt};
use penumbra_distributions::component::StateReadExt as _;
use penumbra_num::{fixpoint::U128x128, Amount};
use penumbra_proto::{StateReadProto, StateWriteProto};
use penumbra_sct::{component::clock::EpochRead, epoch::Epoch};
use std::collections::{BTreeMap, BTreeSet};
use tendermint::{validator::Update, PublicKey};
use tokio::task::JoinSet;
use tracing::instrument;
#[async_trait]
pub trait EpochHandler: StateWriteExt + ConsensusIndexRead {
#[instrument(skip(self, epoch_to_end), fields(index = epoch_to_end.index))]
async fn end_epoch(&mut self, epoch_to_end: Epoch) -> Result<()> {
let mut delegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
let mut undelegations_by_validator = BTreeMap::<IdentityKey, Amount>::new();
let end_height = self.get_block_height().await?;
let mut num_delegations = 0usize;
let mut num_undelegations = 0usize;
for height in epoch_to_end.start_height..=end_height {
let changes = self
.get_delegation_changes(
height
.try_into()
.context("should be able to convert u64 into block height")?,
)
.await?;
num_delegations = num_delegations.saturating_add(changes.delegations.len());
num_undelegations = num_undelegations.saturating_add(changes.undelegations.len());
for d in changes.delegations {
let validator_identity = d.validator_identity.clone();
let delegation_tally = delegations_by_validator
.entry(validator_identity)
.or_default()
.saturating_add(&d.delegation_amount);
delegations_by_validator.insert(validator_identity, delegation_tally);
}
for u in changes.undelegations {
let validator_identity = u.validator_identity.clone();
let undelegation_tally = undelegations_by_validator
.entry(validator_identity)
.or_default()
.saturating_add(&u.delegation_amount);
undelegations_by_validator.insert(validator_identity, undelegation_tally);
}
}
tracing::debug!(
num_delegations,
num_undelegations,
epoch_start = epoch_to_end.start_height,
epoch_end = end_height,
epoch_index = epoch_to_end.index,
"collected delegation changes for the epoch"
);
let next_base_rate = self.process_chain_base_rate().await?;
let delegation_set = delegations_by_validator
.keys()
.cloned()
.collect::<BTreeSet<_>>();
let undelegation_set = undelegations_by_validator
.keys()
.cloned()
.collect::<BTreeSet<_>>();
let validators_with_delegation_changes = delegation_set
.union(&undelegation_set)
.cloned()
.collect::<BTreeSet<_>>();
let consensus_set = self
.consensus_set_stream()?
.try_collect::<BTreeSet<IdentityKey>>()
.await?;
let validators_to_process = validators_with_delegation_changes
.union(&consensus_set)
.collect::<BTreeSet<_>>();
let mut funding_queue: Vec<(IdentityKey, FundingStreams, Amount)> = Vec::new();
for validator_identity in validators_to_process {
let total_delegations = delegations_by_validator
.remove(validator_identity)
.unwrap_or_else(Amount::zero);
let total_undelegations = undelegations_by_validator
.remove(validator_identity)
.unwrap_or_else(Amount::zero);
if let Some(rewards) = self
.process_validator(
validator_identity,
epoch_to_end,
next_base_rate.clone(),
total_delegations,
total_undelegations,
)
.await
.map_err(|e| {
tracing::error!(
?e,
?validator_identity,
"failed to process validator's end-epoch"
);
e
})?
{
funding_queue.push(rewards)
}
}
assert!(delegations_by_validator.is_empty());
assert!(undelegations_by_validator.is_empty());
self.queue_staking_rewards(funding_queue);
self.set_active_and_inactive_validators().await?;
Ok(())
}
async fn process_validator(
&mut self,
validator_identity: &IdentityKey,
epoch_to_end: Epoch,
next_base_rate: BaseRateData,
total_delegations: Amount,
total_undelegations: Amount,
) -> Result<Option<(IdentityKey, FundingStreams, Amount)>> {
let validator = self.get_validator_definition(&validator_identity).await?.ok_or_else(|| {
anyhow::anyhow!("validator (identity={}) is in consensus index but its definition was not found in the JMT", &validator_identity)
})?;
let validator_state = self
.get_validator_state(&validator.identity_key)
.await?
.ok_or_else(|| {
anyhow::anyhow!("validator (identity={}) is in consensus index but its state was not found in the JMT", &validator.identity_key)
})?;
let prev_validator_rate = self
.get_validator_rate(&validator.identity_key)
.await?
.ok_or_else(|| {
anyhow::anyhow!("validator (identity={}) is in consensus index but its rate data was not found in the JMT", &validator.identity_key)
})?;
let penalty = self
.get_penalty_in_epoch(&validator.identity_key, epoch_to_end.index)
.await
.unwrap_or(Penalty::from_percent(0));
let prev_validator_rate_with_penalty = prev_validator_rate.slash(penalty);
self.set_prev_validator_rate(
&validator.identity_key,
prev_validator_rate_with_penalty.clone(),
);
let next_validator_rate = prev_validator_rate_with_penalty.next_epoch(
&next_base_rate,
validator.funding_streams.as_ref(),
&validator_state,
);
let delegation_delta =
(total_delegations.value() as i128) - (total_undelegations.value() as i128);
tracing::debug!(
validator = ?validator.identity_key,
?total_delegations,
?total_undelegations,
delegation_delta,
"net delegation change for validator's pool for the epoch"
);
let abs_delegation_change = Amount::from(delegation_delta.unsigned_abs());
if delegation_delta > 0 {
self.increase_validator_pool_size(validator_identity, abs_delegation_change)
.await
.expect("overflow should be impossible");
} else if delegation_delta < 0 {
self.decrease_validator_pool_size(validator_identity, abs_delegation_change)
.await
.expect("underflow should be impossible");
} else {
tracing::debug!(
validator = ?validator.identity_key,
"no change in delegation, no change in token supply")
}
let delegation_token_supply = self
.get_validator_pool_size(validator_identity)
.await
.unwrap_or(Amount::zero());
let voting_power = next_validator_rate.voting_power(delegation_token_supply);
tracing::debug!(
validator = ?validator.identity_key,
validator_delegation_pool = ?delegation_token_supply,
validator_power = ?voting_power,
"calculated validator's voting power for the upcoming epoch"
);
self.set_validator_rate_data(&validator.identity_key, next_validator_rate.clone());
self.set_validator_power(&validator.identity_key, voting_power)?;
let reward_queue_entry = if validator_state == validator::State::Active {
Some((
validator.identity_key.clone(),
validator.funding_streams.clone(),
delegation_token_supply,
))
} else {
None
};
let final_state = self
.try_precursor_transition(
validator_identity,
validator_state,
&next_validator_rate,
delegation_token_supply,
)
.await;
tracing::debug!(validator_identity = %validator.identity_key,
previous_epoch_validator_rate= ?prev_validator_rate,
next_epoch_validator_rate = ?next_validator_rate,
?delegation_token_supply,
voting_power = ?voting_power,
final_state = ?final_state,
"validator's end-epoch has been processed");
self.process_validator_pool_state(&validator.identity_key, epoch_to_end.start_height)
.await.map_err(|e| {
tracing::error!(?e, validator_identity = %validator.identity_key, "failed to process validator pool state");
e
})?;
if !self.belongs_in_index(&validator.identity_key).await {
self.remove_consensus_set_index(&validator.identity_key);
}
Ok(reward_queue_entry)
}
async fn process_chain_base_rate(&mut self) -> Result<BaseRateData> {
let prev_base_rate = self.get_current_base_rate().await?;
tracing::debug!(
"fetching the issuance budget for this epoch from the distributions component"
);
let issuance_budget_for_epoch = self
.get_staking_token_issuance_for_epoch()
.expect("issuance budget is always set by the distributions component");
let total_active_stake_previous_epoch = self.total_active_stake().await?;
tracing::debug!(
?total_active_stake_previous_epoch,
?issuance_budget_for_epoch,
"computing base rate for the upcoming epoch"
);
let base_reward_rate =
U128x128::ratio(issuance_budget_for_epoch, total_active_stake_previous_epoch)
.expect("total active stake is nonzero");
let base_reward_rate: Amount = (base_reward_rate * *BPS_SQUARED_SCALING_FACTOR)
.expect("base reward rate is around one")
.round_down()
.try_into()
.expect("rounded to an integral value");
tracing::debug!(%base_reward_rate, "base reward rate for the upcoming epoch");
let next_base_rate = prev_base_rate.next_epoch(base_reward_rate);
tracing::debug!(
?prev_base_rate,
?next_base_rate,
?base_reward_rate,
?total_active_stake_previous_epoch,
?issuance_budget_for_epoch,
"calculated base rate for the upcoming epoch"
);
self.set_base_rate(next_base_rate.clone());
self.set_prev_base_rate(prev_base_rate);
Ok(next_base_rate)
}
async fn set_active_and_inactive_validators(&mut self) -> Result<()> {
let mut validators_by_power = Vec::new();
let mut zero_power = Vec::new();
let mut validator_identity_stream = self.consensus_set_stream()?;
while let Some(identity_key) = validator_identity_stream.next().await {
let identity_key = identity_key?;
let state = self
.get_validator_state(&identity_key)
.await?
.context("should be able to fetch validator state")?;
let power = self
.get_validator_power(&identity_key)
.await?
.unwrap_or_default();
if matches!(state, validator::State::Active | validator::State::Inactive) {
if power == Amount::zero() {
zero_power.push((identity_key, power));
} else {
validators_by_power.push((identity_key, power));
}
}
}
validators_by_power.sort_by(|a, b| b.1.cmp(&a.1));
let limit = self.get_stake_params().await?.active_validator_limit as usize;
let active = validators_by_power.iter().take(limit);
let inactive = validators_by_power
.iter()
.skip(limit)
.chain(zero_power.iter());
for (v, _) in active {
self.set_validator_state(v, validator::State::Active)
.await?;
}
for (v, _) in inactive {
self.set_validator_state(v, validator::State::Inactive)
.await?;
}
Ok(())
}
#[instrument(skip(self))]
async fn build_cometbft_validator_updates(&mut self) -> Result<()> {
let current_consensus_keys: CurrentConsensusKeys = self
.get(state_key::consensus_update::consensus_keys())
.await?
.expect("current consensus keys must be present");
let current_consensus_keys = current_consensus_keys
.consensus_keys
.into_iter()
.collect::<BTreeSet<_>>();
let mut voting_power_by_consensus_key = BTreeMap::<PublicKey, Amount>::new();
let mut js: JoinSet<std::prelude::v1::Result<(PublicKey, Amount), anyhow::Error>> =
JoinSet::new();
let mut validator_identity_stream = self.consensus_set_stream()?;
while let Some(identity_key) = validator_identity_stream.next().await {
let identity_key = identity_key?;
let state = self.get_validator_state(&identity_key);
let power = self.get_validator_power(&identity_key);
let consensus_key = self.fetch_validator_consensus_key(&identity_key);
js.spawn(async move {
let state = state
.await?
.expect("every known validator must have a recorded state");
let effective_power = if matches!(state, validator::State::Active) {
power
.await?
.expect("every active validator must have a recorded power")
} else {
Amount::zero()
};
let consensus_key = consensus_key
.await?
.expect("every known validator must have a recorded consensus key");
anyhow::Ok((consensus_key, effective_power))
});
}
while let Some(pair) = js.join_next().await.transpose()? {
let (consensus_key, effective_power) = pair?;
voting_power_by_consensus_key.insert(consensus_key, effective_power);
}
voting_power_by_consensus_key.retain(|consensus_key, voting_power| {
*voting_power > Amount::zero() || current_consensus_keys.contains(consensus_key)
});
for ck in current_consensus_keys.iter() {
voting_power_by_consensus_key
.entry(*ck)
.or_insert(Amount::zero());
}
let tendermint_validator_updates = voting_power_by_consensus_key
.iter()
.map(|(consensus_key, power)| {
Ok(Update {
pub_key: *consensus_key,
power: ((*power).value() as u64).try_into()?,
})
})
.collect::<Result<Vec<_>>>()?;
self.put_cometbft_validator_updates(tendermint_validator_updates);
let updated_consensus_keys = CurrentConsensusKeys {
consensus_keys: voting_power_by_consensus_key
.iter()
.filter_map(|(consensus_key, power)| {
if *power != Amount::zero() {
Some(*consensus_key)
} else {
None
}
})
.collect(),
};
tracing::debug!(?updated_consensus_keys);
self.put(
state_key::consensus_update::consensus_keys().to_owned(),
updated_consensus_keys,
);
Ok(())
}
}
impl<T: StateWrite + ConsensusIndexRead + ?Sized> EpochHandler for T {}