penumbra_sdk_stake/component/
stake.rs1pub mod address;
2
3use crate::event::EventSlashingPenaltyApplied;
4use crate::params::StakeParameters;
5use crate::rate::BaseRateData;
6use crate::validator::{self, Validator};
7use crate::{
8 state_key, CurrentConsensusKeys, Delegate, DelegationChanges, FundingStreams, IdentityKey,
9 Penalty, Undelegate,
10};
11use anyhow::Context;
12use anyhow::{anyhow, Result};
13use async_trait::async_trait;
14use cnidarium::{StateRead, StateWrite};
15use cnidarium_component::Component;
16use futures::{StreamExt, TryStreamExt};
17use penumbra_sdk_num::Amount;
18use penumbra_sdk_proto::{DomainType, StateReadProto, StateWriteProto};
19use penumbra_sdk_sct::component::clock::EpochRead;
20use std::pin::Pin;
21use std::str::FromStr;
22use std::{collections::BTreeMap, sync::Arc};
23use tap::{Tap, TapFallible, TapOptional};
24use tendermint::v0_37::abci;
25use tendermint::validator::Update;
26use tendermint::{block, PublicKey};
27use tracing::{error, instrument, trace};
28
29use crate::component::epoch_handler::EpochHandler;
30use crate::component::validator_handler::{
31 ValidatorDataRead, ValidatorManager, ValidatorUptimeTracker,
32};
33
34#[cfg(test)]
35mod tests;
36
37pub struct Staking {}
38
39impl Staking {}
40
41#[async_trait]
42impl Component for Staking {
43 type AppState = (
44 crate::genesis::Content,
45 penumbra_sdk_shielded_pool::genesis::Content,
46 );
47
48 #[instrument(name = "staking", skip(state, app_state))]
49 async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
50 match app_state {
51 None => { }
52 Some((staking_genesis, sp_genesis)) => {
53 state.put_stake_params(staking_genesis.stake_params.clone());
54
55 let starting_height = state
56 .get_block_height()
57 .await
58 .expect("should be able to get initial block height")
59 .tap(|height| trace!(%height,"found initial block height"));
60 let starting_epoch = state
61 .get_epoch_by_height(starting_height)
62 .await
63 .expect("should be able to get initial epoch")
64 .tap(|epoch| trace!(?epoch, "found initial epoch"));
65 let epoch_index = starting_epoch.index;
66
67 let genesis_base_rate = BaseRateData {
68 epoch_index,
69 base_reward_rate: 0u128.into(),
70 base_exchange_rate: 1_0000_0000u128.into(),
71 };
72 state.set_base_rate(genesis_base_rate.clone());
73 trace!(?genesis_base_rate, "set base rate");
74
75 let mut genesis_allocations = BTreeMap::<_, Amount>::new();
76 for allocation in &sp_genesis.allocations {
77 let value = allocation.value();
78 *genesis_allocations.entry(value.asset_id).or_default() += value.amount;
79 }
80
81 trace!("parsing genesis validators");
82 for (i, validator) in staking_genesis.validators.iter().enumerate() {
83 let validator = Validator::try_from(validator.clone())
85 .expect("should be able to parse genesis validator")
86 .tap(|Validator { name, enabled, .. }|
87 trace!(%i, %name, %enabled, "parsed genesis validator")
88 );
89
90 state
91 .add_genesis_validator(&genesis_allocations, &genesis_base_rate, validator)
92 .await
93 .expect("should be able to add genesis validator to state");
94 }
95
96 state.put(
98 state_key::consensus_update::consensus_keys().to_owned(),
99 CurrentConsensusKeys::default(),
100 );
101
102 state
105 .set_delegation_changes(
106 starting_height
107 .try_into()
108 .expect("should be able to convert u64 into block height"),
109 Default::default(),
110 )
111 .await;
112 }
113 }
114 state
116 .build_cometbft_validator_updates()
117 .await
118 .expect("should be able to build initial tendermint validator updates");
119 }
120
121 #[instrument(name = "staking", skip(state, begin_block))]
122 async fn begin_block<S: StateWrite + 'static>(
123 state: &mut Arc<S>,
124 begin_block: &abci::request::BeginBlock,
125 ) {
126 let state = Arc::get_mut(state).expect("state should be unique");
127 for evidence in begin_block.byzantine_validators.iter() {
131 let _ = state.process_evidence(evidence).await.map_err(|e| {
132 tracing::warn!(?e, "failed to process byzantine misbehavior evidence")
133 });
134 }
135
136 state
137 .track_uptime(&begin_block.last_commit_info)
138 .await
139 .expect("should be able to track uptime");
140 }
141
142 #[instrument(name = "staking", skip(state, end_block))]
144 async fn end_block<S: StateWrite + 'static>(
145 state: &mut Arc<S>,
146 end_block: &abci::request::EndBlock,
147 ) {
148 let state = Arc::get_mut(state).expect("state should be unique");
149 let height = end_block
150 .height
151 .try_into()
152 .expect("should be able to convert i64 into block height");
153 let changes = state.get_delegation_changes_tally();
154
155 state.set_delegation_changes(height, changes).await;
156 }
157
158 #[instrument(name = "staking", skip(state))]
160 async fn end_epoch<S: StateWrite + 'static>(state: &mut Arc<S>) -> anyhow::Result<()> {
161 let state = Arc::get_mut(state).context("state should be unique")?;
162 let epoch_ending = state
163 .get_current_epoch()
164 .await
165 .context("should be able to get current epoch during end_epoch")?;
166 state
167 .end_epoch(epoch_ending)
168 .await
169 .context("should be able to write end_epoch")?;
170 state
173 .build_cometbft_validator_updates()
174 .await
175 .context("should be able to build tendermint validator updates")?;
176 Ok(())
177 }
178}
179
180pub trait ConsensusUpdateRead: StateRead {
181 fn cometbft_validator_updates(&self) -> Option<Vec<Update>> {
185 self.object_get(state_key::internal::cometbft_validator_updates())
186 .unwrap_or(None)
187 }
188}
189
190impl<T: StateRead + ?Sized> ConsensusUpdateRead for T {}
191
192pub(crate) trait ConsensusUpdateWrite: StateWrite {
193 fn put_cometbft_validator_updates(&mut self, updates: Vec<Update>) {
194 tracing::debug!(?updates);
195 self.object_put(
196 state_key::internal::cometbft_validator_updates(),
197 Some(updates),
198 )
199 }
200}
201
202impl<T: StateWrite + ?Sized> ConsensusUpdateWrite for T {}
203
204#[async_trait]
206pub trait StateReadExt: StateRead {
207 #[instrument(skip(self), level = "trace")]
209 async fn get_stake_params(&self) -> Result<StakeParameters> {
210 self.get(state_key::parameters::key())
211 .await
212 .tap_err(|err| error!(?err, "could not deserialize stake parameters"))
213 .expect("no deserialization error should happen")
214 .tap_none(|| error!("could not find stake parameters"))
215 .ok_or_else(|| anyhow!("Missing StakeParameters"))
216 }
217
218 #[instrument(skip(self), level = "trace")]
219 async fn signed_blocks_window_len(&self) -> Result<u64> {
220 self.get_stake_params()
221 .await
222 .map(|p| p.signed_blocks_window_len)
223 }
224
225 #[instrument(skip(self), level = "trace")]
226 async fn missed_blocks_maximum(&self) -> Result<u64> {
227 self.get_stake_params()
228 .await
229 .map(|p| p.missed_blocks_maximum)
230 }
231
232 #[instrument(skip(self), level = "trace")]
236 fn get_delegation_changes_tally(&self) -> DelegationChanges {
237 self.object_get(state_key::chain::delegation_changes::key())
238 .unwrap_or_default()
239 }
240
241 #[instrument(skip(self), level = "trace")]
242 async fn get_current_base_rate(&self) -> Result<BaseRateData> {
243 self.get(state_key::chain::base_rate::current())
244 .await
245 .map(|rate_data| rate_data.expect("rate data must be set after init_chain"))
246 }
247
248 #[instrument(skip(self), level = "trace")]
249 fn get_previous_base_rate(&self) -> Option<BaseRateData> {
250 self.object_get(state_key::chain::base_rate::previous())
251 }
252
253 #[instrument(skip(self), level = "trace")]
255 fn get_funding_queue(&self) -> Option<Vec<(IdentityKey, FundingStreams, Amount)>> {
256 self.object_get(state_key::validators::rewards::staking())
257 }
258
259 #[instrument(skip(self), level = "trace")]
261 async fn get_delegation_changes(&self, height: block::Height) -> Result<DelegationChanges> {
262 self.get(&state_key::chain::delegation_changes::by_height(
263 height.value(),
264 ))
265 .await
266 .tap_err(|err| error!(?err, "delegation changes for block exist but are invalid"))?
267 .tap_none(|| error!("could not find delegation changes for block"))
268 .ok_or_else(|| anyhow!("missing delegation changes for block {}", height))
269 }
270}
271
272impl<T: StateRead + ?Sized> StateReadExt for T {}
273
274#[async_trait]
276pub trait StateWriteExt: StateWrite {
277 fn put_stake_params(&mut self, params: StakeParameters) {
279 self.put(state_key::parameters::key().into(), params)
281 }
282
283 fn put_delegation_changes(&mut self, delegation_changes: DelegationChanges) {
287 self.object_put(
288 state_key::chain::delegation_changes::key(),
289 delegation_changes,
290 )
291 }
292
293 fn push_delegation(&mut self, delegation: Delegate) {
295 let mut changes = self.get_delegation_changes_tally();
296 changes.delegations.push(delegation);
297 self.put_delegation_changes(changes);
298 }
299
300 fn push_undelegation(&mut self, undelegation: Undelegate) {
302 let mut changes = self.get_delegation_changes_tally();
303 changes.undelegations.push(undelegation);
304 self.put_delegation_changes(changes);
305 }
306
307 #[instrument(skip(self))]
308 fn queue_staking_rewards(
309 &mut self,
310 staking_reward_queue: Vec<(IdentityKey, FundingStreams, Amount)>,
311 ) {
312 self.object_put(
313 state_key::validators::rewards::staking(),
314 staking_reward_queue,
315 )
316 }
317
318 #[instrument(skip_all)]
327 fn register_consensus_key(&mut self, identity_key: &IdentityKey, consensus_key: &PublicKey) {
328 let address = self::address::validator_address(consensus_key);
329 tracing::debug!(?identity_key, ?consensus_key, hash = ?hex::encode(address), "registering consensus key");
330 self.put(
331 state_key::validators::lookup_by::cometbft_address(&address),
332 consensus_key.clone(),
333 );
334 self.put(
335 state_key::validators::lookup_by::consensus_key(consensus_key),
336 identity_key.clone(),
337 );
338 }
339}
340
341impl<T: StateWrite + ?Sized> StateWriteExt for T {}
342
343#[async_trait]
344pub trait SlashingData: StateRead {
345 async fn get_penalty_in_epoch(&self, id: &IdentityKey, epoch_index: u64) -> Option<Penalty> {
346 self.get(&state_key::penalty::for_id_in_epoch(id, epoch_index))
347 .await
348 .expect("serialization error cannot happen")
349 }
350
351 async fn get_penalty_for_range(&self, id: &IdentityKey, start: u64, end: u64) -> Vec<Penalty> {
352 let prefix = state_key::penalty::prefix(id);
353 let all_penalties: BTreeMap<String, Penalty> = self
354 .prefix::<Penalty>(&prefix)
355 .try_collect()
356 .await
357 .unwrap_or_default();
358 let start_key = state_key::penalty::for_id_in_epoch(id, start);
359 let end_key = state_key::penalty::for_id_in_epoch(id, end);
360 all_penalties
361 .range(start_key..end_key)
362 .map(|(_k, v)| v.to_owned())
363 .collect()
364 }
365
366 fn compute_compounded_penalty(penalties: Vec<Penalty>) -> Penalty {
367 let compounded = Penalty::from_percent(0);
368 penalties
369 .into_iter()
370 .fold(compounded, |acc, penalty| acc.compound(penalty))
371 }
372
373 async fn compounded_penalty_over_range(
375 &self,
376 id: &IdentityKey,
377 epoch_index_start: u64,
378 epoch_index_end: u64,
379 ) -> Result<Penalty> {
380 if epoch_index_start > epoch_index_end {
381 anyhow::bail!("invalid penalty window")
382 }
383 let range = self
384 .get_penalty_for_range(id, epoch_index_start, epoch_index_end)
385 .await;
386 let compounded_penalty = Self::compute_compounded_penalty(range);
387 Ok(compounded_penalty)
388 }
389}
390
391impl<T: StateRead + ?Sized> SlashingData for T {}
392
393#[async_trait]
394pub(crate) trait InternalStakingData: StateRead {
395 #[instrument(skip(self))]
398 async fn total_active_stake(&self) -> Result<Amount> {
399 let mut total_active_stake = Amount::zero();
400
401 let mut validator_stream = self.consensus_set_stream()?;
402 while let Some(validator_identity) = validator_stream.next().await {
403 let validator_identity = validator_identity?;
404 let validator_state = self
405 .get_validator_state(&validator_identity)
406 .await?
407 .ok_or_else(|| {
408 anyhow::anyhow!("validator (identity_key={}) is in the consensus set index but its state was not found", validator_identity)
409 })?;
410
411 if validator_state != validator::State::Active {
412 continue;
413 }
414
415 let delegation_token_supply = self
416 .get_validator_pool_size(&validator_identity)
417 .await
418 .ok_or_else(|| {
419 anyhow::anyhow!(
420 "validator delegation pool not found for {}",
421 validator_identity
422 )
423 })?;
424
425 let validator_rate = self
426 .get_validator_rate(&validator_identity)
427 .await?
428 .ok_or_else(|| {
429 anyhow::anyhow!("validator (identity_key={}) is in the consensus set index but its rate data was not found", validator_identity)
430 })?;
431
432 total_active_stake = total_active_stake
434 .checked_add(&validator_rate.unbonded_amount(delegation_token_supply))
435 .ok_or_else(|| {
436 anyhow::anyhow!("total active stake overflowed `Amount` (128 bits)")
437 })?;
438 }
439
440 Ok(total_active_stake)
441 }
442}
443
444impl<T: StateRead + ?Sized> InternalStakingData for T {}
445
446#[async_trait]
447pub(crate) trait RateDataWrite: StateWrite {
448 #[instrument(skip(self))]
449 fn set_base_rate(&mut self, rate_data: BaseRateData) {
450 tracing::debug!("setting base rate");
451 self.put(state_key::chain::base_rate::current().to_owned(), rate_data);
452 }
453
454 #[instrument(skip(self))]
455 fn set_prev_base_rate(&mut self, rate_data: BaseRateData) {
456 self.object_put(state_key::chain::base_rate::previous(), rate_data);
457 }
458
459 async fn record_slashing_penalty(
460 &mut self,
461 identity_key: &IdentityKey,
462 slashing_penalty: Penalty,
463 ) {
464 let current_epoch_index = self
465 .get_current_epoch()
466 .await
467 .expect("epoch has been set")
468 .index;
469
470 let current_penalty = self
471 .get_penalty_in_epoch(identity_key, current_epoch_index)
472 .await
473 .unwrap_or(Penalty::from_percent(0));
474
475 let new_penalty = current_penalty.compound(slashing_penalty);
476
477 self.record_proto(
479 EventSlashingPenaltyApplied {
480 identity_key: *identity_key,
481 epoch_index: current_epoch_index,
482 new_penalty,
483 }
484 .to_proto(),
485 );
486 self.put(
487 state_key::penalty::for_id_in_epoch(identity_key, current_epoch_index),
488 new_penalty,
489 );
490 }
491
492 #[tracing::instrument(
493 level = "trace",
494 skip_all,
495 fields(
496 %height,
497 delegations = ?changes.delegations,
498 undelegations = ?changes.undelegations,
499 )
500 )]
501 async fn set_delegation_changes(&mut self, height: block::Height, changes: DelegationChanges) {
502 let key = state_key::chain::delegation_changes::by_height(height.value());
503 tracing::trace!(%key, "setting delegation changes");
504 self.put(key, changes);
505 }
506}
507
508impl<T: StateWrite + ?Sized> RateDataWrite for T {}
509
510#[async_trait]
511pub trait ConsensusIndexRead: StateRead {
512 fn consensus_set_stream(
516 &self,
517 ) -> Result<Pin<Box<dyn futures::Stream<Item = Result<IdentityKey>> + Send + 'static>>> {
518 Ok(self
519 .nonverifiable_prefix_raw(
520 state_key::validators::consensus_set_index::prefix().as_bytes(),
521 )
522 .map(|res| {
523 res.map(|(_, raw_identity_key)| {
524 let str_identity_key = std::str::from_utf8(raw_identity_key.as_slice())
526 .expect("state keys should only have valid identity keys");
527 IdentityKey::from_str(str_identity_key)
528 .expect("state keys should only have valid identity keys")
529 })
530 })
531 .boxed())
532 }
533
534 async fn get_consensus_set(&self) -> anyhow::Result<Vec<IdentityKey>> {
536 use futures::TryStreamExt;
537 self.consensus_set_stream()?.try_collect().await
538 }
539
540 #[instrument(level = "error", skip(self))]
544 async fn belongs_in_index(&self, validator_id: &IdentityKey) -> bool {
545 let Some(state) = self
546 .get_validator_state(validator_id)
547 .await
548 .expect("no deserialization error")
549 else {
550 tracing::error!("validator state was not found");
551 return false;
552 };
553
554 match state {
555 validator::State::Active | validator::State::Inactive => {
556 tracing::debug!(?state, "validator belongs in the consensus set");
557 true
558 }
559 _ => {
560 tracing::debug!(?state, "validator does not belong in the consensus set");
561 false
562 }
563 }
564 }
565}
566
567impl<T: StateRead + ?Sized> ConsensusIndexRead for T {}
568
569#[async_trait]
570pub trait ConsensusIndexWrite: StateWrite {
571 fn add_consensus_set_index(&mut self, identity_key: &IdentityKey) {
575 tracing::debug!(validator = %identity_key, "adding validator identity to consensus set index");
576 self.nonverifiable_put_raw(
577 state_key::validators::consensus_set_index::by_id(identity_key)
578 .as_bytes()
579 .to_vec(),
580 identity_key.to_string().as_bytes().to_vec(),
581 );
582 }
583
584 fn remove_consensus_set_index(&mut self, identity_key: &IdentityKey) {
588 tracing::debug!(validator = %identity_key, "removing validator identity from consensus set index");
589 self.nonverifiable_delete(
590 state_key::validators::consensus_set_index::by_id(identity_key)
591 .as_bytes()
592 .to_vec(),
593 );
594 }
595}
596
597impl<T: StateWrite + ?Sized> ConsensusIndexWrite for T {}