penumbra_sdk_stake/component/validator_handler/
uptime_tracker.rs1use {
2 super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager},
3 crate::{
4 component::{
5 metrics,
6 stake::{
7 address::{validator_address, Address},
8 ConsensusIndexRead,
9 },
10 StateReadExt as _,
11 },
12 event,
13 params::StakeParameters,
14 validator, IdentityKey, Uptime,
15 },
16 anyhow::Result,
17 async_trait::async_trait,
18 cnidarium::StateWrite,
19 futures::StreamExt as _,
20 penumbra_sdk_proto::{DomainType, StateWriteProto},
21 penumbra_sdk_sct::component::clock::EpochRead,
22 std::collections::BTreeMap,
23 tap::Tap,
24 tendermint::abci::types::CommitInfo,
25 tokio::task::{AbortHandle, JoinSet},
26 tracing::{debug, error_span, instrument, trace, Instrument},
27};
28
29type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime);
31
32type LookupResult = anyhow::Result<Option<ValidatorInformation>>;
34
35#[async_trait]
42pub trait ValidatorUptimeTracker: StateWrite {
43 #[instrument(skip(self, last_commit_info))]
44 async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> {
45 let height = self.get_block_height().await?;
49 let params = self.get_stake_params().await?;
50
51 let did_address_vote = last_commit_info
53 .votes
54 .as_slice()
55 .tap(|votes| {
56 if votes.is_empty() {
57 debug!("no validators voted")
58 } else {
59 debug!(len = %votes.len(), "collecting validator votes")
60 }
61 })
62 .into_iter()
63 .map(|vote| (vote.validator.address, vote.sig_info.is_signed()))
64 .inspect(|(address, voted)| {
65 trace!(
66 address = %hex::encode(address),
67 %voted,
68 "validator vote information"
69 )
70 })
71 .collect::<BTreeMap<Address, bool>>();
72
73 let mut lookups = JoinSet::new();
77 let mut validator_identity_stream = self.consensus_set_stream()?;
78 while let Some(identity_key) = validator_identity_stream.next().await.transpose()? {
79 self.spawn_validator_lookup_fut(identity_key, &mut lookups);
80 }
81
82 while let Some(data) = lookups.join_next().await.transpose()? {
86 if let Some(validator_info) = data? {
87 self.process_validator_uptime(validator_info, &did_address_vote, ¶ms, height)
88 .await?;
89 }
90 }
91
92 Ok(())
93 }
94
95 fn spawn_validator_lookup_fut(
106 &self,
107 identity_key: crate::IdentityKey,
108 lookups: &mut JoinSet<LookupResult>,
109 ) -> AbortHandle {
110 let state = self.get_validator_state(&identity_key);
113 let uptime = self.get_validator_uptime(&identity_key);
114 let consensus_key = self.fetch_validator_consensus_key(&identity_key);
115
116 let span = {
118 let span = error_span!("fetching validator information", %identity_key);
119 let current = tracing::Span::current();
120 span.follows_from(current);
121 span
122 };
123
124 lookups.spawn(
125 async move {
126 let state = state
127 .await?
128 .expect("every known validator must have a recorded state");
129
130 match state {
131 validator::State::Active => {
132 Ok(Some((
134 identity_key,
135 consensus_key
136 .await?
137 .expect("every known validator must have a recorded consensus key"),
138 uptime
139 .await?
140 .expect("every known validator must have a recorded uptime"),
141 )))
142 }
143 _ => {
144 Ok(None)
146 }
147 }
148 }
149 .instrument(span),
150 )
151 }
152
153 async fn process_validator_uptime(
154 &mut self,
155 (identity_key, consensus_key, mut uptime): ValidatorInformation,
156 did_address_vote: &BTreeMap<Address, bool>,
157 params: &StakeParameters,
158 height: u64,
159 ) -> anyhow::Result<()> {
160 let addr = validator_address(&consensus_key);
161 let voted = did_address_vote
162 .get(&addr)
163 .cloned()
164 .unwrap_or(height == 1);
168
169 tracing::debug!(
170 ?voted,
171 num_missed_blocks = ?uptime.num_missed_blocks(),
172 ?identity_key,
173 ?params.missed_blocks_maximum,
174 "recorded vote info"
175 );
176 metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string())
177 .set(uptime.num_missed_blocks() as f64);
178
179 if !voted {
180 self.record_proto(event::EventValidatorMissedBlock { identity_key }.to_proto());
182 }
183
184 uptime.mark_height_as_signed(height, voted)?;
185 if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum {
186 self.set_validator_state(&identity_key, validator::State::Jailed)
187 .await?;
188 } else {
189 self.set_validator_uptime(&identity_key, uptime);
190 }
191
192 Ok(())
193 }
194}
195
196impl<T: StateWrite + ?Sized> ValidatorUptimeTracker for T {}