penumbra_sdk_view/
worker.rs

1use std::{
2    collections::BTreeSet,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use anyhow::Context;
8use penumbra_sdk_auction::auction::AuctionNft;
9use penumbra_sdk_compact_block::CompactBlock;
10use penumbra_sdk_dex::lp::{position, LpNft};
11use penumbra_sdk_keys::FullViewingKey;
12use penumbra_sdk_proto::core::{
13    app::v1::{
14        query_service_client::QueryServiceClient as AppQueryServiceClient,
15        TransactionsByHeightRequest,
16    },
17    component::{
18        compact_block::v1::{
19            query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
20            CompactBlockRangeRequest,
21        },
22        shielded_pool::v1::{
23            query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
24            AssetMetadataByIdRequest,
25        },
26    },
27};
28use penumbra_sdk_sct::{CommitmentSource, Nullifier};
29use penumbra_sdk_transaction::Transaction;
30use tap::Tap;
31use tokio::sync::{watch, RwLock};
32use tonic::transport::Channel;
33use tracing::instrument;
34
35use crate::{
36    sync::{scan_block, FilteredBlock},
37    Storage,
38};
39
40// The maximum size of a compact block, in bytes (12MB).
41const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
42
43pub struct Worker {
44    storage: Storage,
45    sct: Arc<RwLock<penumbra_sdk_tct::Tree>>,
46    fvk: FullViewingKey, // TODO: notifications (see TODOs on ViewService)
47    error_slot: Arc<Mutex<Option<anyhow::Error>>>,
48    sync_height_tx: watch::Sender<u64>,
49    /// Tonic channel used to create GRPC clients.
50    channel: Channel,
51}
52
53impl Worker {
54    /// Creates a new worker, returning:
55    ///
56    /// - the worker itself;
57    /// - a shared, in-memory SCT instance;
58    /// - a shared error slot;
59    /// - a channel for notifying the client of sync progress.
60    #[instrument(skip_all)]
61    pub async fn new(
62        storage: Storage,
63        channel: Channel,
64    ) -> Result<
65        (
66            Self,
67            Arc<RwLock<penumbra_sdk_tct::Tree>>,
68            Arc<Mutex<Option<anyhow::Error>>>,
69            watch::Receiver<u64>,
70        ),
71        anyhow::Error,
72    > {
73        tracing::trace!("constructing view server worker");
74        let fvk = storage
75            .full_viewing_key()
76            .await
77            .context("failed to retrieve full viewing key from storage")?
78            .tap(|_| tracing::debug!("retrieved full viewing key"));
79
80        // Create a shared, in-memory SCT.
81        let sct = Arc::new(RwLock::new(storage.state_commitment_tree().await?));
82        // Create a shared error slot
83        let error_slot = Arc::new(Mutex::new(None));
84        // Create a channel for the worker to notify of sync height changes.
85        let (sync_height_tx, mut sync_height_rx) =
86            watch::channel(storage.last_sync_height().await?.unwrap_or(0));
87        // Mark the current height as seen, since it's not new.
88        sync_height_rx.borrow_and_update();
89
90        Ok((
91            Self {
92                storage,
93                sct: sct.clone(),
94                fvk,
95                error_slot: error_slot.clone(),
96                sync_height_tx,
97                channel,
98            },
99            sct,
100            error_slot,
101            sync_height_rx,
102        ))
103    }
104
105    pub async fn fetch_transactions(
106        &self,
107        filtered_block: &mut FilteredBlock,
108    ) -> anyhow::Result<Vec<Transaction>> {
109        let spent_nullifiers = filtered_block
110            .spent_nullifiers
111            .iter()
112            .cloned()
113            .collect::<BTreeSet<Nullifier>>();
114
115        let has_tx_sources = filtered_block
116            .new_notes
117            .values()
118            .map(|record| &record.source)
119            .chain(
120                filtered_block
121                    .new_swaps
122                    .values()
123                    .map(|record| &record.source),
124            )
125            .any(|source| matches!(source, CommitmentSource::Transaction { .. }));
126
127        // Only make a block request if we detected transactions in the FilteredBlock.
128        // TODO: in the future, we could perform chaff downloads.
129        if spent_nullifiers.is_empty() && !has_tx_sources {
130            return Ok(Vec::new());
131        }
132
133        tracing::debug!(
134            height = filtered_block.height,
135            "fetching full transaction data"
136        );
137
138        let all_transactions =
139            fetch_transactions(self.channel.clone(), filtered_block.height).await?;
140
141        let mut transactions = Vec::new();
142
143        for tx in all_transactions {
144            let tx_id = tx.id().0;
145
146            let mut relevant = false;
147
148            if tx
149                .spent_nullifiers()
150                .any(|nf| spent_nullifiers.contains(&nf))
151            {
152                // The transaction is relevant, it spends one of our nullifiers.
153                relevant = true;
154            }
155
156            // Rehydrate commitment sources.
157            for commitment in tx.state_commitments() {
158                filtered_block
159                    .new_notes
160                    .entry(commitment)
161                    .and_modify(|record| {
162                        relevant = true;
163                        record.source = CommitmentSource::Transaction { id: Some(tx_id) };
164                    });
165                filtered_block
166                    .new_swaps
167                    .entry(commitment)
168                    .and_modify(|record| {
169                        relevant = true;
170                        record.source = CommitmentSource::Transaction { id: Some(tx_id) };
171                    });
172            }
173
174            if relevant {
175                transactions.push(tx);
176            }
177        }
178
179        tracing::debug!(
180            matched = transactions.len(),
181            "filtered relevant transactions"
182        );
183
184        Ok(transactions)
185    }
186
187    pub async fn sync(&mut self) -> anyhow::Result<()> {
188        // Do a single sync run, up to whatever the latest block height is
189        tracing::info!("starting client sync");
190
191        let start_height = self
192            .storage
193            .last_sync_height()
194            .await?
195            .map(|h| h + 1)
196            .unwrap_or(0);
197
198        let mut client = CompactBlockQueryServiceClient::new(self.channel.clone())
199            .max_decoding_message_size(MAX_CB_SIZE_BYTES);
200        let mut stream = client
201            .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
202                start_height,
203                end_height: 0,
204                // Instruct the server to keep feeding us blocks as they're created.
205                keep_alive: true,
206            }))
207            .await?
208            .into_inner();
209
210        // Spawn a task to consume items from the stream (somewhat)
211        // independently of the execution of the block scanning.  This has two
212        // purposes: first, it allows buffering to smooth performance; second,
213        // it makes it slightly more difficult for a remote server to observe
214        // the exact timings of the scanning of each CompactBlock.
215        let (tx, mut buffered_stream) = tokio::sync::mpsc::channel(1000);
216        tokio::spawn(async move {
217            while let Some(block) = stream.message().await.transpose() {
218                if tx.send(block).await.is_err() {
219                    break;
220                }
221            }
222        });
223
224        let mut expected_height = start_height;
225
226        while let Some(block) = buffered_stream.recv().await {
227            let block: CompactBlock = block?.try_into()?;
228
229            let height = block.height;
230            if height != expected_height {
231                tracing::warn!("out of order block detected");
232                continue;
233            }
234            expected_height += 1;
235
236            // Lock the SCT only while processing this block.
237            let mut sct_guard = self.sct.write().await;
238
239            if let Some(root) = block.epoch_root {
240                // We now know the root for this epoch.
241                self.storage
242                    .update_epoch(block.epoch_index, Some(root), None)
243                    .await?;
244                // And also where the next epoch starts, since this block is the last.
245                self.storage
246                    .update_epoch(block.epoch_index + 1, None, Some(block.height + 1))
247                    .await?;
248            }
249
250            if !block.requires_scanning() {
251                // Optimization: if the block is empty, seal the in-memory SCT,
252                // and skip touching the database:
253                sct_guard.end_block()?;
254                // We also need to end the epoch, since if there are no funding streams, then an
255                // epoch boundary won't necessarily require scanning:
256                if block.epoch_root.is_some() {
257                    sct_guard
258                        .end_epoch()
259                        .expect("ending the epoch must succeed");
260                }
261                self.storage.record_empty_block(height).await?;
262                // Notify all watchers of the new height we just recorded.
263                self.sync_height_tx.send(height)?;
264            } else {
265                // Otherwise, scan the block and commit its changes:
266                let mut filtered_block =
267                    scan_block(&self.fvk, &mut sct_guard, block, &self.storage).await?;
268
269                // Download any transactions we detected.
270                let transactions = self.fetch_transactions(&mut filtered_block).await?;
271
272                // LPNFT asset IDs won't be known to the chain, so we need to pre-populate them in the local
273                // registry based on transaction contents.
274                for transaction in &transactions {
275                    for action in transaction.actions() {
276                        match action {
277                            penumbra_sdk_transaction::Action::PositionOpen(position_open) => {
278                                let position_id = position_open.position.id();
279
280                                // Record every possible permutation.
281                                let lp_nft = LpNft::new(position_id, position::State::Opened);
282                                let _id = lp_nft.asset_id();
283                                let denom = lp_nft.denom();
284                                self.storage.record_asset(denom).await?;
285
286                                let lp_nft = LpNft::new(position_id, position::State::Closed);
287                                let _id = lp_nft.asset_id();
288                                let denom = lp_nft.denom();
289                                self.storage.record_asset(denom).await?;
290
291                                let lp_nft = LpNft::new(
292                                    position_id,
293                                    position::State::Withdrawn { sequence: 0 },
294                                );
295                                let _id = lp_nft.asset_id();
296                                let denom = lp_nft.denom();
297                                self.storage.record_asset(denom).await?;
298
299                                // Record the position itself
300                                self.storage
301                                    .record_position(position_open.position.clone())
302                                    .await?;
303                            }
304                            penumbra_sdk_transaction::Action::PositionClose(position_close) => {
305                                let position_id = position_close.position_id;
306
307                                // Update the position record
308                                self.storage
309                                    .update_position(position_id, position::State::Closed)
310                                    .await?;
311                            }
312                            penumbra_sdk_transaction::Action::PositionWithdraw(
313                                position_withdraw,
314                            ) => {
315                                let position_id = position_withdraw.position_id;
316
317                                // Record the LPNFT for the current sequence number.
318                                let state = position::State::Withdrawn {
319                                    sequence: position_withdraw.sequence,
320                                };
321                                let lp_nft = LpNft::new(position_id, state);
322                                let denom = lp_nft.denom();
323                                self.storage.record_asset(denom).await?;
324
325                                // Update the position record
326                                self.storage.update_position(position_id, state).await?;
327                            }
328                            penumbra_sdk_transaction::Action::ActionDutchAuctionSchedule(
329                                schedule_da,
330                            ) => {
331                                let auction_id = schedule_da.description.id();
332                                let auction_nft_opened = AuctionNft::new(auction_id, 0);
333                                let nft_metadata_opened = auction_nft_opened.metadata.clone();
334
335                                self.storage.record_asset(nft_metadata_opened).await?;
336
337                                self.storage
338                                    .record_auction_with_state(
339                                        schedule_da.description.id(),
340                                        0u64, // Opened
341                                    )
342                                    .await?;
343                            }
344                            penumbra_sdk_transaction::Action::ActionDutchAuctionEnd(end_da) => {
345                                let auction_id = end_da.auction_id;
346                                let auction_nft_closed = AuctionNft::new(auction_id, 1);
347                                let nft_metadata_closed = auction_nft_closed.metadata.clone();
348
349                                self.storage.record_asset(nft_metadata_closed).await?;
350
351                                self.storage
352                                    .record_auction_with_state(end_da.auction_id, 1)
353                                    .await?;
354                            }
355                            penumbra_sdk_transaction::Action::ActionDutchAuctionWithdraw(
356                                withdraw_da,
357                            ) => {
358                                let auction_id = withdraw_da.auction_id;
359                                let auction_nft_withdrawn =
360                                    AuctionNft::new(auction_id, withdraw_da.seq);
361                                let nft_metadata_withdrawn = auction_nft_withdrawn.metadata.clone();
362
363                                self.storage.record_asset(nft_metadata_withdrawn).await?;
364                                self.storage
365                                    .record_auction_with_state(auction_id, withdraw_da.seq)
366                                    .await?;
367                            }
368                            _ => (),
369                        };
370                    }
371                }
372
373                // Record any new assets we detected.
374                for note_record in filtered_block.new_notes.values() {
375                    // If the asset is already known, skip it, unless there's useful information
376                    // to cross-reference.
377                    if let Some(note_denom) = self
378                        .storage
379                        .asset_by_id(&note_record.note.asset_id())
380                        .await?
381                    {
382                        // If the asset metata is for an auction, we record the associated note commitment
383                        // in the auction state table to cross reference with SNRs.
384                        if note_denom.is_auction_nft() {
385                            let note_commitment = note_record.note_commitment;
386                            let auction_nft: AuctionNft = note_denom.try_into()?;
387                            self.storage
388                                .update_auction_with_note_commitment(
389                                    auction_nft.id,
390                                    note_commitment,
391                                )
392                                .await?;
393                        }
394                        continue;
395                    } else {
396                        // If the asset is unknown, we may be able to query for its denom metadata and store that.
397
398                        let mut client = ShieldedPoolQueryServiceClient::new(self.channel.clone());
399                        if let Some(denom_metadata) = client
400                            .asset_metadata_by_id(AssetMetadataByIdRequest {
401                                asset_id: Some(note_record.note.asset_id().into()),
402                            })
403                            .await?
404                            .into_inner()
405                            .denom_metadata
406                        {
407                            // If we get metadata: great, record it.
408                            self.storage
409                                .record_asset(denom_metadata.try_into()?)
410                                .await?;
411                        } else {
412                            tracing::warn!(asset_id = ?note_record.note.asset_id(), "received unknown asset ID with no available metadata");
413                        }
414                    }
415                }
416
417                // Commit the block to the database.
418                self.storage
419                    .record_block(
420                        filtered_block.clone(),
421                        transactions,
422                        &mut sct_guard,
423                        self.channel.clone(),
424                    )
425                    .await?;
426                // Notify all watchers of the new height we just recorded.
427                self.sync_height_tx.send(filtered_block.height)?;
428            }
429            #[cfg(feature = "sct-divergence-check")]
430            sct_divergence_check(self.channel.clone(), height, sct_guard.root()).await?;
431
432            // Release the SCT RwLock
433            drop(sct_guard);
434
435            // Check if we should stop waiting for blocks to arrive, because the view
436            // services are dropped and we're supposed to shut down.
437            if self.sync_height_tx.is_closed() {
438                return Ok(());
439            }
440        }
441
442        Ok(())
443    }
444
445    pub async fn run(mut self) -> anyhow::Result<()> {
446        loop {
447            // Do a single sync run, recording any errors.
448            if let Err(e) = self.sync().await {
449                tracing::error!(?e, "view worker error");
450                self.error_slot
451                    .lock()
452                    .expect("mutex is not poisoned")
453                    .replace(e);
454            }
455            // Sleep 10s (maybe later use exponential backoff?)
456            tokio::time::sleep(Duration::from_secs(10)).await;
457            // Clear the error slot before retrying.
458            *self.error_slot.lock().expect("mutex is not poisoned") = None;
459        }
460    }
461}
462
463// Fetches all transactions in the block.
464async fn fetch_transactions(
465    channel: Channel,
466    block_height: u64,
467) -> anyhow::Result<Vec<Transaction>> {
468    let mut client = AppQueryServiceClient::new(channel);
469    let request = TransactionsByHeightRequest {
470        block_height,
471        ..Default::default()
472    };
473    // HACK: this is not a robust long-term solution but may help
474    // avoid "split-brain" block fetch issues, where a client learns
475    // of a new block, then immediately tries to fetch it, but that
476    // fetch is load-balanced over a different node that hasn't yet
477    // learned about that block.
478    let response = match client.transactions_by_height(request.clone()).await {
479        Ok(rsp) => rsp,
480        Err(e) => {
481            tracing::warn!(?e, "failed to fetch block, waiting and retrying once");
482            tokio::time::sleep(Duration::from_secs(1)).await;
483            client.transactions_by_height(request).await?
484        }
485    };
486    let transactions = response
487        .into_inner()
488        .transactions
489        .into_iter()
490        .map(TryInto::try_into)
491        .collect::<anyhow::Result<Vec<_>>>()?;
492    Ok(transactions)
493}
494
495#[cfg(feature = "sct-divergence-check")]
496async fn sct_divergence_check(
497    channel: Channel,
498    height: u64,
499    actual_root: penumbra_sdk_tct::Root,
500) -> anyhow::Result<()> {
501    use cnidarium::proto::v1::query_service_client::QueryServiceClient;
502    use penumbra_sdk_proto::DomainType;
503    use penumbra_sdk_sct::state_key as sct_state_key;
504
505    let mut client = QueryServiceClient::new(channel);
506    tracing::info!(?height, "fetching anchor @ height");
507
508    let value = client
509        .key_value(cnidarium::proto::v1::KeyValueRequest {
510            key: sct_state_key::tree::anchor_by_height(height),
511            proof: false,
512            ..Default::default()
513        })
514        .await?
515        .into_inner()
516        .value
517        .context("sct state not found")?;
518
519    let expected_root = penumbra_sdk_tct::Root::decode(value.value.as_slice())?;
520
521    if actual_root == expected_root {
522        tracing::info!(?height, ?actual_root, ?expected_root, "sct roots match");
523        Ok(())
524    } else {
525        let e = anyhow::anyhow!(
526            "SCT divergence detected at height {}: expected {}, got {}",
527            height,
528            expected_root,
529            actual_root
530        );
531        // Print the error immediately, so that it's visible in the logs.
532        tracing::error!(?e);
533        Err(e)
534    }
535}