penumbra_sdk_stake/component/validator_handler/
uptime_tracker.rs

1use {
2    super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager},
3    crate::{
4        component::{
5            metrics,
6            stake::{
7                address::{validator_address, Address},
8                ConsensusIndexRead,
9            },
10            StateReadExt as _,
11        },
12        event,
13        params::StakeParameters,
14        validator, IdentityKey, Uptime,
15    },
16    anyhow::Result,
17    async_trait::async_trait,
18    cnidarium::StateWrite,
19    futures::StreamExt as _,
20    penumbra_sdk_proto::{DomainType, StateWriteProto},
21    penumbra_sdk_sct::component::clock::EpochRead,
22    std::collections::BTreeMap,
23    tap::Tap,
24    tendermint::abci::types::CommitInfo,
25    tokio::task::{AbortHandle, JoinSet},
26    tracing::{debug, error_span, instrument, trace, Instrument},
27};
28
29/// A bundle of information about a validator used to track its uptime.
30type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime);
31
32/// The output of a [`ValidatorUptimeTracker::spawn_validator_lookup_fut()`] task.
33type LookupResult = anyhow::Result<Option<ValidatorInformation>>;
34
35/// Tracks validator uptimes.
36///
37/// Use [`track_uptime()`] to process a block's [`CommitInfo`] and update validator uptime
38/// bookkeeping.
39///
40/// [`track_uptime()`]: Self::track_uptime
41#[async_trait]
42pub trait ValidatorUptimeTracker: StateWrite {
43    #[instrument(skip(self, last_commit_info))]
44    async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> {
45        // Note: this probably isn't the correct height for the LastCommitInfo,
46        // which is about the *last* commit, but at least it'll be consistent,
47        // which is all we need to count signatures.
48        let height = self.get_block_height().await?;
49        let params = self.get_stake_params().await?;
50
51        // Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses.
52        let did_address_vote = last_commit_info
53            .votes
54            .as_slice()
55            .tap(|votes| {
56                if votes.is_empty() {
57                    debug!("no validators voted")
58                } else {
59                    debug!(len = %votes.len(), "collecting validator votes")
60                }
61            })
62            .into_iter()
63            .map(|vote| (vote.validator.address, vote.sig_info.is_signed()))
64            .inspect(|(address, voted)| {
65                trace!(
66                    address = %hex::encode(address),
67                    %voted,
68                    "validator vote information"
69                )
70            })
71            .collect::<BTreeMap<Address, bool>>();
72
73        // Since we don't have a lookup from "addresses" to identity keys,
74        // iterate over our app's validators, and match them up with the vote data.
75        // We can fetch all the data required for processing each validator concurrently:
76        let mut lookups = JoinSet::new();
77        let mut validator_identity_stream = self.consensus_set_stream()?;
78        while let Some(identity_key) = validator_identity_stream.next().await.transpose()? {
79            self.spawn_validator_lookup_fut(identity_key, &mut lookups);
80        }
81
82        // Now process the data we fetched concurrently.
83        // Note that this will process validator uptime changes in a random order, but because they are all
84        // independent, this doesn't introduce any nondeterminism into the complete state change.
85        while let Some(data) = lookups.join_next().await.transpose()? {
86            if let Some(validator_info) = data? {
87                self.process_validator_uptime(validator_info, &did_address_vote, &params, height)
88                    .await?;
89            }
90        }
91
92        Ok(())
93    }
94
95    /// Spawns a future that will retrieve validator information.
96    ///
97    /// NB: This function is synchronous, but the lookup will run asynchronously as part of the
98    /// provided [`JoinSet`]. This permits us to fetch information about all of the validators
99    /// in the consensus set in parallel.
100    ///
101    /// # Panics
102    ///
103    /// This will panic if there is no recorded state for a validator with the given
104    /// [`IdentityKey`].
105    fn spawn_validator_lookup_fut(
106        &self,
107        identity_key: crate::IdentityKey,
108        lookups: &mut JoinSet<LookupResult>,
109    ) -> AbortHandle {
110        // Define, but do not yet `.await` upon, a collection of futures fetching information
111        // about a validator.
112        let state = self.get_validator_state(&identity_key);
113        let uptime = self.get_validator_uptime(&identity_key);
114        let consensus_key = self.fetch_validator_consensus_key(&identity_key);
115
116        // Define a span indicating that the spawned future follows from the current context.
117        let span = {
118            let span = error_span!("fetching validator information", %identity_key);
119            let current = tracing::Span::current();
120            span.follows_from(current);
121            span
122        };
123
124        lookups.spawn(
125            async move {
126                let state = state
127                    .await?
128                    .expect("every known validator must have a recorded state");
129
130                match state {
131                    validator::State::Active => {
132                        // If the validator is active, we need its consensus key and current uptime data:
133                        Ok(Some((
134                            identity_key,
135                            consensus_key
136                                .await?
137                                .expect("every known validator must have a recorded consensus key"),
138                            uptime
139                                .await?
140                                .expect("every known validator must have a recorded uptime"),
141                        )))
142                    }
143                    _ => {
144                        // Otherwise, we don't need to track its uptime, and there's no data to fetch.
145                        Ok(None)
146                    }
147                }
148            }
149            .instrument(span),
150        )
151    }
152
153    async fn process_validator_uptime(
154        &mut self,
155        (identity_key, consensus_key, mut uptime): ValidatorInformation,
156        did_address_vote: &BTreeMap<Address, bool>,
157        params: &StakeParameters,
158        height: u64,
159    ) -> anyhow::Result<()> {
160        let addr = validator_address(&consensus_key);
161        let voted = did_address_vote
162            .get(&addr)
163            .cloned()
164            // If the height is `1`, then the `LastCommitInfo` refers to the genesis block,
165            // which has no signers -- so we'll mark all validators as having signed.
166            // https://github.com/penumbra-zone/penumbra/issues/1050
167            .unwrap_or(height == 1);
168
169        tracing::debug!(
170            ?voted,
171            num_missed_blocks = ?uptime.num_missed_blocks(),
172            ?identity_key,
173            ?params.missed_blocks_maximum,
174            "recorded vote info"
175        );
176        metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string())
177            .set(uptime.num_missed_blocks() as f64);
178
179        if !voted {
180            // If the validator didn't sign, we need to emit a missed block event.
181            self.record_proto(event::EventValidatorMissedBlock { identity_key }.to_proto());
182        }
183
184        uptime.mark_height_as_signed(height, voted)?;
185        if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum {
186            self.set_validator_state(&identity_key, validator::State::Jailed)
187                .await?;
188        } else {
189            self.set_validator_uptime(&identity_key, uptime);
190        }
191
192        Ok(())
193    }
194}
195
196impl<T: StateWrite + ?Sized> ValidatorUptimeTracker for T {}