penumbra_stake/component/validator_handler/uptime_tracker.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
use {
super::{ValidatorDataRead, ValidatorDataWrite, ValidatorManager},
crate::{
component::{
metrics,
stake::{
address::{validator_address, Address},
ConsensusIndexRead,
},
StateReadExt as _,
},
event,
params::StakeParameters,
validator, IdentityKey, Uptime,
},
anyhow::Result,
async_trait::async_trait,
cnidarium::StateWrite,
futures::StreamExt as _,
penumbra_proto::{DomainType, StateWriteProto},
penumbra_sct::component::clock::EpochRead,
std::collections::BTreeMap,
tap::Tap,
tendermint::abci::types::CommitInfo,
tokio::task::{AbortHandle, JoinSet},
tracing::{debug, error_span, instrument, trace, Instrument},
};
/// A bundle of information about a validator used to track its uptime.
type ValidatorInformation = (IdentityKey, tendermint::PublicKey, Uptime);
/// The output of a [`ValidatorUptimeTracker::spawn_validator_lookup_fut()`] task.
type LookupResult = anyhow::Result<Option<ValidatorInformation>>;
/// Tracks validator uptimes.
///
/// Use [`track_uptime()`] to process a block's [`CommitInfo`] and update validator uptime
/// bookkeeping.
///
/// [`track_uptime()`]: Self::track_uptime
#[async_trait]
pub trait ValidatorUptimeTracker: StateWrite {
#[instrument(skip(self, last_commit_info))]
async fn track_uptime(&mut self, last_commit_info: &CommitInfo) -> Result<()> {
// Note: this probably isn't the correct height for the LastCommitInfo,
// which is about the *last* commit, but at least it'll be consistent,
// which is all we need to count signatures.
let height = self.get_block_height().await?;
let params = self.get_stake_params().await?;
// Build a mapping from addresses (20-byte truncated SHA256(pubkey)) to vote statuses.
let did_address_vote = last_commit_info
.votes
.as_slice()
.tap(|votes| {
if votes.is_empty() {
debug!("no validators voted")
} else {
debug!(len = %votes.len(), "collecting validator votes")
}
})
.into_iter()
.map(|vote| (vote.validator.address, vote.sig_info.is_signed()))
.inspect(|(address, voted)| {
trace!(
address = %hex::encode(address),
%voted,
"validator vote information"
)
})
.collect::<BTreeMap<Address, bool>>();
// Since we don't have a lookup from "addresses" to identity keys,
// iterate over our app's validators, and match them up with the vote data.
// We can fetch all the data required for processing each validator concurrently:
let mut lookups = JoinSet::new();
let mut validator_identity_stream = self.consensus_set_stream()?;
while let Some(identity_key) = validator_identity_stream.next().await.transpose()? {
self.spawn_validator_lookup_fut(identity_key, &mut lookups);
}
// Now process the data we fetched concurrently.
// Note that this will process validator uptime changes in a random order, but because they are all
// independent, this doesn't introduce any nondeterminism into the complete state change.
while let Some(data) = lookups.join_next().await.transpose()? {
if let Some(validator_info) = data? {
self.process_validator_uptime(validator_info, &did_address_vote, ¶ms, height)
.await?;
}
}
Ok(())
}
/// Spawns a future that will retrieve validator information.
///
/// NB: This function is synchronous, but the lookup will run asynchronously as part of the
/// provided [`JoinSet`]. This permits us to fetch information about all of the validators
/// in the consensus set in parallel.
///
/// # Panics
///
/// This will panic if there is no recorded state for a validator with the given
/// [`IdentityKey`].
fn spawn_validator_lookup_fut(
&self,
identity_key: crate::IdentityKey,
lookups: &mut JoinSet<LookupResult>,
) -> AbortHandle {
// Define, but do not yet `.await` upon, a collection of futures fetching information
// about a validator.
let state = self.get_validator_state(&identity_key);
let uptime = self.get_validator_uptime(&identity_key);
let consensus_key = self.fetch_validator_consensus_key(&identity_key);
// Define a span indicating that the spawned future follows from the current context.
let span = {
let span = error_span!("fetching validator information", %identity_key);
let current = tracing::Span::current();
span.follows_from(current);
span
};
lookups.spawn(
async move {
let state = state
.await?
.expect("every known validator must have a recorded state");
match state {
validator::State::Active => {
// If the validator is active, we need its consensus key and current uptime data:
Ok(Some((
identity_key,
consensus_key
.await?
.expect("every known validator must have a recorded consensus key"),
uptime
.await?
.expect("every known validator must have a recorded uptime"),
)))
}
_ => {
// Otherwise, we don't need to track its uptime, and there's no data to fetch.
Ok(None)
}
}
}
.instrument(span),
)
}
async fn process_validator_uptime(
&mut self,
(identity_key, consensus_key, mut uptime): ValidatorInformation,
did_address_vote: &BTreeMap<Address, bool>,
params: &StakeParameters,
height: u64,
) -> anyhow::Result<()> {
let addr = validator_address(&consensus_key);
let voted = did_address_vote
.get(&addr)
.cloned()
// If the height is `1`, then the `LastCommitInfo` refers to the genesis block,
// which has no signers -- so we'll mark all validators as having signed.
// https://github.com/penumbra-zone/penumbra/issues/1050
.unwrap_or(height == 1);
tracing::debug!(
?voted,
num_missed_blocks = ?uptime.num_missed_blocks(),
?identity_key,
?params.missed_blocks_maximum,
"recorded vote info"
);
metrics::gauge!(metrics::MISSED_BLOCKS, "identity_key" => identity_key.to_string())
.set(uptime.num_missed_blocks() as f64);
if !voted {
// If the validator didn't sign, we need to emit a missed block event.
self.record_proto(event::EventValidatorMissedBlock { identity_key }.to_proto());
}
uptime.mark_height_as_signed(height, voted)?;
if uptime.num_missed_blocks() as u64 >= params.missed_blocks_maximum {
self.set_validator_state(&identity_key, validator::State::Jailed)
.await?;
} else {
self.set_validator_uptime(&identity_key, uptime);
}
Ok(())
}
}
impl<T: StateWrite + ?Sized> ValidatorUptimeTracker for T {}