1use std::{
2 collections::BTreeSet,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use anyhow::Context;
8use penumbra_sdk_auction::auction::AuctionNft;
9use penumbra_sdk_compact_block::CompactBlock;
10use penumbra_sdk_dex::lp::{position, LpNft};
11use penumbra_sdk_keys::FullViewingKey;
12use penumbra_sdk_proto::core::{
13 app::v1::{
14 query_service_client::QueryServiceClient as AppQueryServiceClient,
15 TransactionsByHeightRequest,
16 },
17 component::{
18 compact_block::v1::{
19 query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
20 CompactBlockRangeRequest,
21 },
22 shielded_pool::v1::{
23 query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
24 AssetMetadataByIdRequest,
25 },
26 },
27};
28use penumbra_sdk_sct::{CommitmentSource, Nullifier};
29use penumbra_sdk_transaction::Transaction;
30use tap::Tap;
31use tokio::sync::{watch, RwLock};
32use tonic::transport::Channel;
33use tracing::instrument;
34
35use crate::{
36 sync::{scan_block, FilteredBlock},
37 Storage,
38};
39
40const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
42
43pub struct Worker {
44 storage: Storage,
45 sct: Arc<RwLock<penumbra_sdk_tct::Tree>>,
46 fvk: FullViewingKey, error_slot: Arc<Mutex<Option<anyhow::Error>>>,
48 sync_height_tx: watch::Sender<u64>,
49 channel: Channel,
51}
52
53impl Worker {
54 #[instrument(skip_all)]
61 pub async fn new(
62 storage: Storage,
63 channel: Channel,
64 ) -> Result<
65 (
66 Self,
67 Arc<RwLock<penumbra_sdk_tct::Tree>>,
68 Arc<Mutex<Option<anyhow::Error>>>,
69 watch::Receiver<u64>,
70 ),
71 anyhow::Error,
72 > {
73 tracing::trace!("constructing view server worker");
74 let fvk = storage
75 .full_viewing_key()
76 .await
77 .context("failed to retrieve full viewing key from storage")?
78 .tap(|_| tracing::debug!("retrieved full viewing key"));
79
80 let sct = Arc::new(RwLock::new(storage.state_commitment_tree().await?));
82 let error_slot = Arc::new(Mutex::new(None));
84 let (sync_height_tx, mut sync_height_rx) =
86 watch::channel(storage.last_sync_height().await?.unwrap_or(0));
87 sync_height_rx.borrow_and_update();
89
90 Ok((
91 Self {
92 storage,
93 sct: sct.clone(),
94 fvk,
95 error_slot: error_slot.clone(),
96 sync_height_tx,
97 channel,
98 },
99 sct,
100 error_slot,
101 sync_height_rx,
102 ))
103 }
104
105 pub async fn fetch_transactions(
106 &self,
107 filtered_block: &mut FilteredBlock,
108 ) -> anyhow::Result<Vec<Transaction>> {
109 let spent_nullifiers = filtered_block
110 .spent_nullifiers
111 .iter()
112 .cloned()
113 .collect::<BTreeSet<Nullifier>>();
114
115 let has_tx_sources = filtered_block
116 .new_notes
117 .values()
118 .map(|record| &record.source)
119 .chain(
120 filtered_block
121 .new_swaps
122 .values()
123 .map(|record| &record.source),
124 )
125 .any(|source| matches!(source, CommitmentSource::Transaction { .. }));
126
127 if spent_nullifiers.is_empty() && !has_tx_sources {
130 return Ok(Vec::new());
131 }
132
133 tracing::debug!(
134 height = filtered_block.height,
135 "fetching full transaction data"
136 );
137
138 let all_transactions =
139 fetch_transactions(self.channel.clone(), filtered_block.height).await?;
140
141 let mut transactions = Vec::new();
142
143 for tx in all_transactions {
144 let tx_id = tx.id().0;
145
146 let mut relevant = false;
147
148 if tx
149 .spent_nullifiers()
150 .any(|nf| spent_nullifiers.contains(&nf))
151 {
152 relevant = true;
154 }
155
156 for commitment in tx.state_commitments() {
158 filtered_block
159 .new_notes
160 .entry(commitment)
161 .and_modify(|record| {
162 relevant = true;
163 record.source = CommitmentSource::Transaction { id: Some(tx_id) };
164 });
165 filtered_block
166 .new_swaps
167 .entry(commitment)
168 .and_modify(|record| {
169 relevant = true;
170 record.source = CommitmentSource::Transaction { id: Some(tx_id) };
171 });
172 }
173
174 if relevant {
175 transactions.push(tx);
176 }
177 }
178
179 tracing::debug!(
180 matched = transactions.len(),
181 "filtered relevant transactions"
182 );
183
184 Ok(transactions)
185 }
186
187 pub async fn sync(&mut self) -> anyhow::Result<()> {
188 tracing::info!("starting client sync");
190
191 let start_height = self
192 .storage
193 .last_sync_height()
194 .await?
195 .map(|h| h + 1)
196 .unwrap_or(0);
197
198 let mut client = CompactBlockQueryServiceClient::new(self.channel.clone())
199 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
200 let mut stream = client
201 .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
202 start_height,
203 end_height: 0,
204 keep_alive: true,
206 }))
207 .await?
208 .into_inner();
209
210 let (tx, mut buffered_stream) = tokio::sync::mpsc::channel(1000);
216 tokio::spawn(async move {
217 while let Some(block) = stream.message().await.transpose() {
218 if tx.send(block).await.is_err() {
219 break;
220 }
221 }
222 });
223
224 let mut expected_height = start_height;
225
226 while let Some(block) = buffered_stream.recv().await {
227 let block: CompactBlock = block?.try_into()?;
228
229 let height = block.height;
230 if height != expected_height {
231 tracing::warn!("out of order block detected");
232 continue;
233 }
234 expected_height += 1;
235
236 let mut sct_guard = self.sct.write().await;
238
239 if let Some(root) = block.epoch_root {
240 self.storage
242 .update_epoch(block.epoch_index, Some(root), None)
243 .await?;
244 self.storage
246 .update_epoch(block.epoch_index + 1, None, Some(block.height + 1))
247 .await?;
248 }
249
250 if !block.requires_scanning() {
251 sct_guard.end_block()?;
254 if block.epoch_root.is_some() {
257 sct_guard
258 .end_epoch()
259 .expect("ending the epoch must succeed");
260 }
261 self.storage.record_empty_block(height).await?;
262 self.sync_height_tx.send(height)?;
264 } else {
265 let mut filtered_block =
267 scan_block(&self.fvk, &mut sct_guard, block, &self.storage).await?;
268
269 let transactions = self.fetch_transactions(&mut filtered_block).await?;
271
272 for transaction in &transactions {
275 for action in transaction.actions() {
276 match action {
277 penumbra_sdk_transaction::Action::PositionOpen(position_open) => {
278 let position_id = position_open.position.id();
279
280 let lp_nft = LpNft::new(position_id, position::State::Opened);
282 let _id = lp_nft.asset_id();
283 let denom = lp_nft.denom();
284 self.storage.record_asset(denom).await?;
285
286 let lp_nft = LpNft::new(position_id, position::State::Closed);
287 let _id = lp_nft.asset_id();
288 let denom = lp_nft.denom();
289 self.storage.record_asset(denom).await?;
290
291 let lp_nft = LpNft::new(
292 position_id,
293 position::State::Withdrawn { sequence: 0 },
294 );
295 let _id = lp_nft.asset_id();
296 let denom = lp_nft.denom();
297 self.storage.record_asset(denom).await?;
298
299 self.storage
301 .record_position(position_open.position.clone())
302 .await?;
303 }
304 penumbra_sdk_transaction::Action::PositionClose(position_close) => {
305 let position_id = position_close.position_id;
306
307 self.storage
309 .update_position(position_id, position::State::Closed)
310 .await?;
311 }
312 penumbra_sdk_transaction::Action::PositionWithdraw(
313 position_withdraw,
314 ) => {
315 let position_id = position_withdraw.position_id;
316
317 let state = position::State::Withdrawn {
319 sequence: position_withdraw.sequence,
320 };
321 let lp_nft = LpNft::new(position_id, state);
322 let denom = lp_nft.denom();
323 self.storage.record_asset(denom).await?;
324
325 self.storage.update_position(position_id, state).await?;
327 }
328 penumbra_sdk_transaction::Action::ActionDutchAuctionSchedule(
329 schedule_da,
330 ) => {
331 let auction_id = schedule_da.description.id();
332 let auction_nft_opened = AuctionNft::new(auction_id, 0);
333 let nft_metadata_opened = auction_nft_opened.metadata.clone();
334
335 self.storage.record_asset(nft_metadata_opened).await?;
336
337 self.storage
338 .record_auction_with_state(
339 schedule_da.description.id(),
340 0u64, )
342 .await?;
343 }
344 penumbra_sdk_transaction::Action::ActionDutchAuctionEnd(end_da) => {
345 let auction_id = end_da.auction_id;
346 let auction_nft_closed = AuctionNft::new(auction_id, 1);
347 let nft_metadata_closed = auction_nft_closed.metadata.clone();
348
349 self.storage.record_asset(nft_metadata_closed).await?;
350
351 self.storage
352 .record_auction_with_state(end_da.auction_id, 1)
353 .await?;
354 }
355 penumbra_sdk_transaction::Action::ActionDutchAuctionWithdraw(
356 withdraw_da,
357 ) => {
358 let auction_id = withdraw_da.auction_id;
359 let auction_nft_withdrawn =
360 AuctionNft::new(auction_id, withdraw_da.seq);
361 let nft_metadata_withdrawn = auction_nft_withdrawn.metadata.clone();
362
363 self.storage.record_asset(nft_metadata_withdrawn).await?;
364 self.storage
365 .record_auction_with_state(auction_id, withdraw_da.seq)
366 .await?;
367 }
368 _ => (),
369 };
370 }
371 }
372
373 for note_record in filtered_block.new_notes.values() {
375 if let Some(note_denom) = self
378 .storage
379 .asset_by_id(¬e_record.note.asset_id())
380 .await?
381 {
382 if note_denom.is_auction_nft() {
385 let note_commitment = note_record.note_commitment;
386 let auction_nft: AuctionNft = note_denom.try_into()?;
387 self.storage
388 .update_auction_with_note_commitment(
389 auction_nft.id,
390 note_commitment,
391 )
392 .await?;
393 }
394 continue;
395 } else {
396 let mut client = ShieldedPoolQueryServiceClient::new(self.channel.clone());
399 if let Some(denom_metadata) = client
400 .asset_metadata_by_id(AssetMetadataByIdRequest {
401 asset_id: Some(note_record.note.asset_id().into()),
402 })
403 .await?
404 .into_inner()
405 .denom_metadata
406 {
407 self.storage
409 .record_asset(denom_metadata.try_into()?)
410 .await?;
411 } else {
412 tracing::warn!(asset_id = ?note_record.note.asset_id(), "received unknown asset ID with no available metadata");
413 }
414 }
415 }
416
417 self.storage
419 .record_block(
420 filtered_block.clone(),
421 transactions,
422 &mut sct_guard,
423 self.channel.clone(),
424 )
425 .await?;
426 self.sync_height_tx.send(filtered_block.height)?;
428 }
429 #[cfg(feature = "sct-divergence-check")]
430 sct_divergence_check(self.channel.clone(), height, sct_guard.root()).await?;
431
432 drop(sct_guard);
434
435 if self.sync_height_tx.is_closed() {
438 return Ok(());
439 }
440 }
441
442 Ok(())
443 }
444
445 pub async fn run(mut self) -> anyhow::Result<()> {
446 loop {
447 if let Err(e) = self.sync().await {
449 tracing::error!(?e, "view worker error");
450 self.error_slot
451 .lock()
452 .expect("mutex is not poisoned")
453 .replace(e);
454 }
455 tokio::time::sleep(Duration::from_secs(10)).await;
457 *self.error_slot.lock().expect("mutex is not poisoned") = None;
459 }
460 }
461}
462
463async fn fetch_transactions(
465 channel: Channel,
466 block_height: u64,
467) -> anyhow::Result<Vec<Transaction>> {
468 let mut client = AppQueryServiceClient::new(channel);
469 let request = TransactionsByHeightRequest {
470 block_height,
471 ..Default::default()
472 };
473 let response = match client.transactions_by_height(request.clone()).await {
479 Ok(rsp) => rsp,
480 Err(e) => {
481 tracing::warn!(?e, "failed to fetch block, waiting and retrying once");
482 tokio::time::sleep(Duration::from_secs(1)).await;
483 client.transactions_by_height(request).await?
484 }
485 };
486 let transactions = response
487 .into_inner()
488 .transactions
489 .into_iter()
490 .map(TryInto::try_into)
491 .collect::<anyhow::Result<Vec<_>>>()?;
492 Ok(transactions)
493}
494
495#[cfg(feature = "sct-divergence-check")]
496async fn sct_divergence_check(
497 channel: Channel,
498 height: u64,
499 actual_root: penumbra_sdk_tct::Root,
500) -> anyhow::Result<()> {
501 use cnidarium::proto::v1::query_service_client::QueryServiceClient;
502 use penumbra_sdk_proto::DomainType;
503 use penumbra_sdk_sct::state_key as sct_state_key;
504
505 let mut client = QueryServiceClient::new(channel);
506 tracing::info!(?height, "fetching anchor @ height");
507
508 let value = client
509 .key_value(cnidarium::proto::v1::KeyValueRequest {
510 key: sct_state_key::tree::anchor_by_height(height),
511 proof: false,
512 ..Default::default()
513 })
514 .await?
515 .into_inner()
516 .value
517 .context("sct state not found")?;
518
519 let expected_root = penumbra_sdk_tct::Root::decode(value.value.as_slice())?;
520
521 if actual_root == expected_root {
522 tracing::info!(?height, ?actual_root, ?expected_root, "sct roots match");
523 Ok(())
524 } else {
525 let e = anyhow::anyhow!(
526 "SCT divergence detected at height {}: expected {}, got {}",
527 height,
528 expected_root,
529 actual_root
530 );
531 tracing::error!(?e);
533 Err(e)
534 }
535}