penumbra_sdk_view/
worker.rs

1use std::{
2    collections::BTreeSet,
3    sync::{Arc, Mutex},
4    time::Duration,
5};
6
7use anyhow::Context;
8use penumbra_sdk_auction::auction::AuctionNft;
9use penumbra_sdk_compact_block::CompactBlock;
10use penumbra_sdk_dex::lp::{position, LpNft};
11use penumbra_sdk_keys::FullViewingKey;
12use penumbra_sdk_proto::core::{
13    app::v1::{
14        query_service_client::QueryServiceClient as AppQueryServiceClient,
15        TransactionsByHeightRequest,
16    },
17    component::{
18        compact_block::v1::{
19            query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
20            CompactBlockRangeRequest,
21        },
22        shielded_pool::v1::{
23            query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
24            AssetMetadataByIdRequest,
25        },
26    },
27};
28use penumbra_sdk_sct::{CommitmentSource, Nullifier};
29use penumbra_sdk_transaction::Transaction;
30use tap::Tap;
31use tokio::sync::{watch, RwLock};
32use tonic::transport::Channel;
33use tracing::instrument;
34
35use crate::{
36    sync::{scan_block, FilteredBlock},
37    Storage,
38};
39
40// The maximum size of a compact block, in bytes (12MB).
41const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
42
43pub struct Worker {
44    storage: Storage,
45    sct: Arc<RwLock<penumbra_sdk_tct::Tree>>,
46    fvk: FullViewingKey, // TODO: notifications (see TODOs on ViewService)
47    error_slot: Arc<Mutex<Option<anyhow::Error>>>,
48    sync_height_tx: watch::Sender<u64>,
49    /// Tonic channel used to create GRPC clients.
50    channel: Channel,
51}
52
53impl Worker {
54    /// Creates a new worker, returning:
55    ///
56    /// - the worker itself;
57    /// - a shared, in-memory SCT instance;
58    /// - a shared error slot;
59    /// - a channel for notifying the client of sync progress.
60    #[instrument(skip_all)]
61    pub async fn new(
62        storage: Storage,
63        channel: Channel,
64    ) -> Result<
65        (
66            Self,
67            Arc<RwLock<penumbra_sdk_tct::Tree>>,
68            Arc<Mutex<Option<anyhow::Error>>>,
69            watch::Receiver<u64>,
70        ),
71        anyhow::Error,
72    > {
73        tracing::trace!("constructing view server worker");
74        let fvk = storage
75            .full_viewing_key()
76            .await
77            .context("failed to retrieve full viewing key from storage")?
78            .tap(|_| tracing::debug!("retrieved full viewing key"));
79
80        // Create a shared, in-memory SCT.
81        let sct = Arc::new(RwLock::new(storage.state_commitment_tree().await?));
82        // Create a shared error slot
83        let error_slot = Arc::new(Mutex::new(None));
84        // Create a channel for the worker to notify of sync height changes.
85        let (sync_height_tx, mut sync_height_rx) =
86            watch::channel(storage.last_sync_height().await?.unwrap_or(0));
87        // Mark the current height as seen, since it's not new.
88        sync_height_rx.borrow_and_update();
89
90        Ok((
91            Self {
92                storage,
93                sct: sct.clone(),
94                fvk,
95                error_slot: error_slot.clone(),
96                sync_height_tx,
97                channel,
98            },
99            sct,
100            error_slot,
101            sync_height_rx,
102        ))
103    }
104
105    pub async fn fetch_transactions(
106        &self,
107        filtered_block: &mut FilteredBlock,
108    ) -> anyhow::Result<Vec<Transaction>> {
109        let spent_nullifiers = filtered_block
110            .spent_nullifiers
111            .iter()
112            .cloned()
113            .collect::<BTreeSet<Nullifier>>();
114
115        let has_tx_sources = filtered_block
116            .new_notes
117            .values()
118            .map(|record| &record.source)
119            .chain(
120                filtered_block
121                    .new_swaps
122                    .values()
123                    .map(|record| &record.source),
124            )
125            .any(|source| matches!(source, CommitmentSource::Transaction { .. }));
126
127        // Only make a block request if we detected transactions in the FilteredBlock.
128        // TODO: in the future, we could perform chaff downloads.
129        if spent_nullifiers.is_empty() && !has_tx_sources {
130            return Ok(Vec::new());
131        }
132
133        tracing::debug!(
134            height = filtered_block.height,
135            "fetching full transaction data"
136        );
137
138        let all_transactions =
139            fetch_transactions(self.channel.clone(), filtered_block.height).await?;
140
141        let mut transactions = Vec::new();
142
143        for tx in all_transactions {
144            let tx_id = tx.id().0;
145
146            let mut relevant = false;
147
148            if tx
149                .spent_nullifiers()
150                .any(|nf| spent_nullifiers.contains(&nf))
151            {
152                // The transaction is relevant, it spends one of our nullifiers.
153                relevant = true;
154            }
155
156            // Rehydrate commitment sources.
157            for commitment in tx.state_commitments() {
158                filtered_block
159                    .new_notes
160                    .entry(commitment)
161                    .and_modify(|record| {
162                        relevant = true;
163                        record.source = CommitmentSource::Transaction { id: Some(tx_id) };
164                    });
165                filtered_block
166                    .new_swaps
167                    .entry(commitment)
168                    .and_modify(|record| {
169                        relevant = true;
170                        record.source = CommitmentSource::Transaction { id: Some(tx_id) };
171                    });
172            }
173
174            if relevant {
175                transactions.push(tx);
176            }
177        }
178
179        tracing::debug!(
180            matched = transactions.len(),
181            "filtered relevant transactions"
182        );
183
184        Ok(transactions)
185    }
186
187    pub async fn sync(&mut self) -> anyhow::Result<()> {
188        // Do a single sync run, up to whatever the latest block height is
189        tracing::info!("starting client sync");
190
191        let start_height = self
192            .storage
193            .last_sync_height()
194            .await?
195            .map(|h| h + 1)
196            .unwrap_or(0);
197
198        let mut client = CompactBlockQueryServiceClient::new(self.channel.clone())
199            .max_decoding_message_size(MAX_CB_SIZE_BYTES);
200        let mut stream = client
201            .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
202                start_height,
203                end_height: 0,
204                // Instruct the server to keep feeding us blocks as they're created.
205                keep_alive: true,
206            }))
207            .await?
208            .into_inner();
209
210        // Spawn a task to consume items from the stream (somewhat)
211        // independently of the execution of the block scanning.  This has two
212        // purposes: first, it allows buffering to smooth performance; second,
213        // it makes it slightly more difficult for a remote server to observe
214        // the exact timings of the scanning of each CompactBlock.
215        let (tx, mut buffered_stream) = tokio::sync::mpsc::channel(1000);
216        tokio::spawn(async move {
217            while let Some(block) = stream.message().await.transpose() {
218                if tx.send(block).await.is_err() {
219                    break;
220                }
221            }
222        });
223
224        let mut expected_height = start_height;
225
226        while let Some(block) = buffered_stream.recv().await {
227            let block: CompactBlock = block?.try_into()?;
228
229            let height = block.height;
230            if height != expected_height {
231                tracing::warn!("out of order block detected");
232                continue;
233            }
234            expected_height += 1;
235
236            // Lock the SCT only while processing this block.
237            let mut sct_guard = self.sct.write().await;
238
239            if !block.requires_scanning() {
240                // Optimization: if the block is empty, seal the in-memory SCT,
241                // and skip touching the database:
242                sct_guard.end_block()?;
243                // We also need to end the epoch, since if there are no funding streams, then an
244                // epoch boundary won't necessarily require scanning:
245                if block.epoch_root.is_some() {
246                    sct_guard
247                        .end_epoch()
248                        .expect("ending the epoch must succeed");
249                }
250                self.storage.record_empty_block(height).await?;
251                // Notify all watchers of the new height we just recorded.
252                self.sync_height_tx.send(height)?;
253            } else {
254                // Otherwise, scan the block and commit its changes:
255                let mut filtered_block =
256                    scan_block(&self.fvk, &mut sct_guard, block, &self.storage).await?;
257
258                // Download any transactions we detected.
259                let transactions = self.fetch_transactions(&mut filtered_block).await?;
260
261                // LPNFT asset IDs won't be known to the chain, so we need to pre-populate them in the local
262                // registry based on transaction contents.
263                for transaction in &transactions {
264                    for action in transaction.actions() {
265                        match action {
266                            penumbra_sdk_transaction::Action::PositionOpen(position_open) => {
267                                let position_id = position_open.position.id();
268
269                                // Record every possible permutation.
270                                let lp_nft = LpNft::new(position_id, position::State::Opened);
271                                let _id = lp_nft.asset_id();
272                                let denom = lp_nft.denom();
273                                self.storage.record_asset(denom).await?;
274
275                                let lp_nft = LpNft::new(position_id, position::State::Closed);
276                                let _id = lp_nft.asset_id();
277                                let denom = lp_nft.denom();
278                                self.storage.record_asset(denom).await?;
279
280                                let lp_nft = LpNft::new(
281                                    position_id,
282                                    position::State::Withdrawn { sequence: 0 },
283                                );
284                                let _id = lp_nft.asset_id();
285                                let denom = lp_nft.denom();
286                                self.storage.record_asset(denom).await?;
287
288                                // Record the position itself
289                                self.storage
290                                    .record_position(position_open.position.clone())
291                                    .await?;
292                            }
293                            penumbra_sdk_transaction::Action::PositionClose(position_close) => {
294                                let position_id = position_close.position_id;
295
296                                // Update the position record
297                                self.storage
298                                    .update_position(position_id, position::State::Closed)
299                                    .await?;
300                            }
301                            penumbra_sdk_transaction::Action::PositionWithdraw(
302                                position_withdraw,
303                            ) => {
304                                let position_id = position_withdraw.position_id;
305
306                                // Record the LPNFT for the current sequence number.
307                                let state = position::State::Withdrawn {
308                                    sequence: position_withdraw.sequence,
309                                };
310                                let lp_nft = LpNft::new(position_id, state);
311                                let denom = lp_nft.denom();
312                                self.storage.record_asset(denom).await?;
313
314                                // Update the position record
315                                self.storage.update_position(position_id, state).await?;
316                            }
317                            penumbra_sdk_transaction::Action::ActionDutchAuctionSchedule(
318                                schedule_da,
319                            ) => {
320                                let auction_id = schedule_da.description.id();
321                                let auction_nft_opened = AuctionNft::new(auction_id, 0);
322                                let nft_metadata_opened = auction_nft_opened.metadata.clone();
323
324                                self.storage.record_asset(nft_metadata_opened).await?;
325
326                                self.storage
327                                    .record_auction_with_state(
328                                        schedule_da.description.id(),
329                                        0u64, // Opened
330                                    )
331                                    .await?;
332                            }
333                            penumbra_sdk_transaction::Action::ActionDutchAuctionEnd(end_da) => {
334                                let auction_id = end_da.auction_id;
335                                let auction_nft_closed = AuctionNft::new(auction_id, 1);
336                                let nft_metadata_closed = auction_nft_closed.metadata.clone();
337
338                                self.storage.record_asset(nft_metadata_closed).await?;
339
340                                self.storage
341                                    .record_auction_with_state(end_da.auction_id, 1)
342                                    .await?;
343                            }
344                            penumbra_sdk_transaction::Action::ActionDutchAuctionWithdraw(
345                                withdraw_da,
346                            ) => {
347                                let auction_id = withdraw_da.auction_id;
348                                let auction_nft_withdrawn =
349                                    AuctionNft::new(auction_id, withdraw_da.seq);
350                                let nft_metadata_withdrawn = auction_nft_withdrawn.metadata.clone();
351
352                                self.storage.record_asset(nft_metadata_withdrawn).await?;
353                                self.storage
354                                    .record_auction_with_state(auction_id, withdraw_da.seq)
355                                    .await?;
356                            }
357                            _ => (),
358                        };
359                    }
360                }
361
362                // Record any new assets we detected.
363                for note_record in filtered_block.new_notes.values() {
364                    // If the asset is already known, skip it, unless there's useful information
365                    // to cross-reference.
366                    if let Some(note_denom) = self
367                        .storage
368                        .asset_by_id(&note_record.note.asset_id())
369                        .await?
370                    {
371                        // If the asset metata is for an auction, we record the associated note commitment
372                        // in the auction state table to cross reference with SNRs.
373                        if note_denom.is_auction_nft() {
374                            let note_commitment = note_record.note_commitment;
375                            let auction_nft: AuctionNft = note_denom.try_into()?;
376                            self.storage
377                                .update_auction_with_note_commitment(
378                                    auction_nft.id,
379                                    note_commitment,
380                                )
381                                .await?;
382                        }
383                        continue;
384                    } else {
385                        // If the asset is unknown, we may be able to query for its denom metadata and store that.
386
387                        let mut client = ShieldedPoolQueryServiceClient::new(self.channel.clone());
388                        if let Some(denom_metadata) = client
389                            .asset_metadata_by_id(AssetMetadataByIdRequest {
390                                asset_id: Some(note_record.note.asset_id().into()),
391                            })
392                            .await?
393                            .into_inner()
394                            .denom_metadata
395                        {
396                            // If we get metadata: great, record it.
397                            self.storage
398                                .record_asset(denom_metadata.try_into()?)
399                                .await?;
400                        } else {
401                            tracing::warn!(asset_id = ?note_record.note.asset_id(), "received unknown asset ID with no available metadata");
402                        }
403                    }
404                }
405
406                // Commit the block to the database.
407                self.storage
408                    .record_block(
409                        filtered_block.clone(),
410                        transactions,
411                        &mut sct_guard,
412                        self.channel.clone(),
413                    )
414                    .await?;
415                // Notify all watchers of the new height we just recorded.
416                self.sync_height_tx.send(filtered_block.height)?;
417            }
418            #[cfg(feature = "sct-divergence-check")]
419            sct_divergence_check(self.channel.clone(), height, sct_guard.root()).await?;
420
421            // Release the SCT RwLock
422            drop(sct_guard);
423
424            // Check if we should stop waiting for blocks to arrive, because the view
425            // services are dropped and we're supposed to shut down.
426            if self.sync_height_tx.is_closed() {
427                return Ok(());
428            }
429        }
430
431        Ok(())
432    }
433
434    pub async fn run(mut self) -> anyhow::Result<()> {
435        loop {
436            // Do a single sync run, recording any errors.
437            if let Err(e) = self.sync().await {
438                tracing::error!(?e, "view worker error");
439                self.error_slot
440                    .lock()
441                    .expect("mutex is not poisoned")
442                    .replace(e);
443            }
444            // Sleep 10s (maybe later use exponential backoff?)
445            tokio::time::sleep(Duration::from_secs(10)).await;
446            // Clear the error slot before retrying.
447            *self.error_slot.lock().expect("mutex is not poisoned") = None;
448        }
449    }
450}
451
452// Fetches all transactions in the block.
453async fn fetch_transactions(
454    channel: Channel,
455    block_height: u64,
456) -> anyhow::Result<Vec<Transaction>> {
457    let mut client = AppQueryServiceClient::new(channel);
458    let request = TransactionsByHeightRequest {
459        block_height,
460        ..Default::default()
461    };
462    // HACK: this is not a robust long-term solution but may help
463    // avoid "split-brain" block fetch issues, where a client learns
464    // of a new block, then immediately tries to fetch it, but that
465    // fetch is load-balanced over a different node that hasn't yet
466    // learned about that block.
467    let response = match client.transactions_by_height(request.clone()).await {
468        Ok(rsp) => rsp,
469        Err(e) => {
470            tracing::warn!(?e, "failed to fetch block, waiting and retrying once");
471            tokio::time::sleep(Duration::from_secs(1)).await;
472            client.transactions_by_height(request).await?
473        }
474    };
475    let transactions = response
476        .into_inner()
477        .transactions
478        .into_iter()
479        .map(TryInto::try_into)
480        .collect::<anyhow::Result<Vec<_>>>()?;
481    Ok(transactions)
482}
483
484#[cfg(feature = "sct-divergence-check")]
485async fn sct_divergence_check(
486    channel: Channel,
487    height: u64,
488    actual_root: penumbra_sdk_tct::Root,
489) -> anyhow::Result<()> {
490    use cnidarium::proto::v1::query_service_client::QueryServiceClient;
491    use penumbra_sdk_proto::DomainType;
492    use penumbra_sdk_sct::state_key as sct_state_key;
493
494    let mut client = QueryServiceClient::new(channel);
495    tracing::info!(?height, "fetching anchor @ height");
496
497    let value = client
498        .key_value(cnidarium::proto::v1::KeyValueRequest {
499            key: sct_state_key::tree::anchor_by_height(height),
500            proof: false,
501            ..Default::default()
502        })
503        .await?
504        .into_inner()
505        .value
506        .context("sct state not found")?;
507
508    let expected_root = penumbra_sdk_tct::Root::decode(value.value.as_slice())?;
509
510    if actual_root == expected_root {
511        tracing::info!(?height, ?actual_root, ?expected_root, "sct roots match");
512        Ok(())
513    } else {
514        let e = anyhow::anyhow!(
515            "SCT divergence detected at height {}: expected {}, got {}",
516            height,
517            expected_root,
518            actual_root
519        );
520        // Print the error immediately, so that it's visible in the logs.
521        tracing::error!(?e);
522        Err(e)
523    }
524}