penumbra_view/sync.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
use std::collections::BTreeMap;
use penumbra_compact_block::{CompactBlock, StatePayload};
use penumbra_dex::swap::{SwapPayload, SwapPlaintext};
use penumbra_fee::GasPrices;
use penumbra_keys::FullViewingKey;
use penumbra_sct::Nullifier;
use penumbra_shielded_pool::{fmd, Note, NotePayload};
use penumbra_tct::{self as tct, StateCommitment};
use tracing::Instrument;
use crate::{SpendableNoteRecord, Storage, SwapRecord};
/// Contains the results of scanning a single block.
#[derive(Debug, Clone)]
pub struct FilteredBlock {
pub new_notes: BTreeMap<StateCommitment, SpendableNoteRecord>,
pub new_swaps: BTreeMap<StateCommitment, SwapRecord>,
pub spent_nullifiers: Vec<Nullifier>,
pub height: u64,
pub fmd_parameters: Option<fmd::Parameters>,
pub app_parameters_updated: bool,
pub gas_prices: Option<GasPrices>,
}
#[tracing::instrument(skip_all, fields(height = %height))]
pub async fn scan_block(
fvk: &FullViewingKey,
state_commitment_tree: &mut tct::Tree,
CompactBlock {
height,
state_payloads,
nullifiers,
block_root,
epoch_root,
fmd_parameters,
swap_outputs,
app_parameters_updated,
gas_prices,
// TODO: do we need this, or is there a bug in scan_block?
// proposal_started,
..
}: CompactBlock,
storage: &Storage,
) -> anyhow::Result<FilteredBlock> {
// Trial-decrypt a note with our own specific viewing key
let trial_decrypt_note = |note_payload: NotePayload| -> tokio::task::JoinHandle<Option<Note>> {
// TODO: change fvk to Arc<FVK> in Worker and pass to scan_block as Arc
// need this so the task is 'static and not dependent on key lifetime
let fvk2 = fvk.clone();
tokio::spawn(
async move { note_payload.trial_decrypt(&fvk2) }.instrument(tracing::Span::current()),
)
};
// Trial-decrypt a swap with our own specific viewing key
let trial_decrypt_swap =
|swap_payload: SwapPayload| -> tokio::task::JoinHandle<Option<SwapPlaintext>> {
// TODO: change fvk to Arc<FVK> in Worker and pass to scan_block as Arc
// need this so the task is 'static and not dependent on key lifetime
let fvk2 = fvk.clone();
tokio::spawn(
async move { swap_payload.trial_decrypt(&fvk2) }
.instrument(tracing::Span::current()),
)
};
// Nullifiers we've found in this block
let spent_nullifiers: Vec<Nullifier> = nullifiers;
// Trial-decrypt the notes in this block, keeping track of the ones that were meant for us
let mut note_decryptions = Vec::new();
let mut swap_decryptions = Vec::new();
let mut unknown_commitments = Vec::new();
for payload in state_payloads.iter() {
match payload {
StatePayload::Note { note, .. } => {
note_decryptions.push(trial_decrypt_note((**note).clone()));
}
StatePayload::Swap { swap, .. } => {
swap_decryptions.push(trial_decrypt_swap((**swap).clone()));
}
StatePayload::RolledUp { commitment, .. } => unknown_commitments.push(*commitment),
}
}
// Having started trial decryption in the background, ask the Storage for scanning advice:
let mut note_advice = storage.scan_advice(unknown_commitments).await?;
for decryption in note_decryptions {
if let Some(note) = decryption
.await
.expect("able to join tokio note decryption handle")
{
note_advice.insert(note.commit(), note);
}
}
let mut swap_advice = BTreeMap::new();
for decryption in swap_decryptions {
if let Some(swap) = decryption
.await
.expect("able to join tokio swap decryption handle")
{
swap_advice.insert(swap.swap_commitment(), swap);
}
}
// Newly detected spendable notes.
let mut new_notes = BTreeMap::new();
// Newly detected claimable swaps.
let mut new_swaps = BTreeMap::new();
if note_advice.is_empty() && swap_advice.is_empty() {
// If there are no notes we care about in this block, just insert the block root into the
// tree instead of processing each commitment individually
state_commitment_tree
.insert_block(block_root)
.expect("inserting a block root must succeed");
} else {
// If we found at least one note for us in this block, we have to explicitly construct the
// whole block in the SCT by inserting each commitment one at a time
tracing::debug!("found at least one relevant SCT entry, reconstructing block subtree");
for payload in state_payloads.into_iter() {
// We need to insert each commitment, so use a match statement to ensure we
// exhaustively cover all possible cases.
match (
note_advice.get(payload.commitment()),
swap_advice.get(payload.commitment()),
) {
(Some(note), None) => {
// Keep track of this commitment for later witnessing
let position = state_commitment_tree
.insert(tct::Witness::Keep, *payload.commitment())
.expect("inserting a commitment must succeed");
let source = payload.source().clone();
let nullifier =
Nullifier::derive(fvk.nullifier_key(), position, payload.commitment());
let address_index = fvk.incoming().index_for_diversifier(note.diversifier());
new_notes.insert(
*payload.commitment(),
SpendableNoteRecord {
note_commitment: *payload.commitment(),
height_spent: None,
height_created: height,
note: note.clone(),
address_index,
nullifier,
position,
source,
return_address: None,
},
);
}
(None, Some(swap)) => {
// Keep track of this commitment for later witnessing
let position = state_commitment_tree
.insert(tct::Witness::Keep, *payload.commitment())
.expect("inserting a commitment must succeed");
let Some(output_data) = swap_outputs.get(&swap.trading_pair).cloned() else {
// We've been given an invalid compact block, but we
// should keep going, because the fullnode we're talking
// to could be lying to us and handing us crafted blocks
// with garbage data only we can see, in order to
// pinpoint whether or not we control a specific address,
// so we can't let on that we've noticed any problem.
tracing::warn!("invalid compact block, batch swap output data missing for trading pair {:?}", swap.trading_pair);
continue;
};
// Record the output notes for the future swap claim, so we can detect
// them when the swap is claimed.
let (output_1, output_2) = swap.output_notes(&output_data);
storage.give_advice(output_1).await?;
storage.give_advice(output_2).await?;
let source = payload.source().clone();
let nullifier =
Nullifier::derive(fvk.nullifier_key(), position, payload.commitment());
new_swaps.insert(
*payload.commitment(),
SwapRecord {
swap_commitment: *payload.commitment(),
swap: swap.clone(),
position,
nullifier,
source,
output_data,
height_claimed: None,
},
);
}
(None, None) => {
// Don't remember this commitment; it wasn't ours
state_commitment_tree
.insert(tct::Witness::Forget, *payload.commitment())
.expect("inserting a commitment must succeed");
}
(Some(_), Some(_)) => unreachable!("swap and note commitments are distinct"),
}
}
// End the block in the commitment tree
state_commitment_tree
.end_block()
.expect("ending the block must succed");
}
// If we've also reached the end of the epoch, end the epoch in the commitment tree
let is_epoch_end = epoch_root.is_some();
if is_epoch_end {
tracing::debug!(?height, "end of epoch");
state_commitment_tree
.end_epoch()
.expect("ending the epoch must succeed");
}
// Print the TCT root for debugging
#[cfg(feature = "sct-divergence-check")]
tracing::debug!(tct_root = %state_commitment_tree.root(), "tct root");
// Filter nullifiers to remove any without matching note note_commitments
// This is a very important optimization to avoid unnecessary query load on the storage backend
// -- it results in 100x+ slower sync times if we don't do this!
let filtered_nullifiers = storage.filter_nullifiers(spent_nullifiers).await?;
// Construct filtered block
let result = FilteredBlock {
new_notes,
new_swaps,
spent_nullifiers: filtered_nullifiers,
height,
fmd_parameters,
app_parameters_updated,
gas_prices,
};
Ok(result)
}