1use std::{
2 collections::BTreeSet,
3 sync::{Arc, Mutex},
4 time::Duration,
5};
6
7use anyhow::Context;
8use penumbra_sdk_auction::auction::AuctionNft;
9use penumbra_sdk_compact_block::CompactBlock;
10use penumbra_sdk_dex::lp::{position, LpNft};
11use penumbra_sdk_keys::FullViewingKey;
12use penumbra_sdk_proto::core::{
13 app::v1::{
14 query_service_client::QueryServiceClient as AppQueryServiceClient,
15 TransactionsByHeightRequest,
16 },
17 component::{
18 compact_block::v1::{
19 query_service_client::QueryServiceClient as CompactBlockQueryServiceClient,
20 CompactBlockRangeRequest,
21 },
22 shielded_pool::v1::{
23 query_service_client::QueryServiceClient as ShieldedPoolQueryServiceClient,
24 AssetMetadataByIdRequest,
25 },
26 },
27};
28use penumbra_sdk_sct::{CommitmentSource, Nullifier};
29use penumbra_sdk_transaction::Transaction;
30use tap::Tap;
31use tokio::sync::{watch, RwLock};
32use tonic::transport::Channel;
33use tracing::instrument;
34
35use crate::{
36 sync::{scan_block, FilteredBlock},
37 Storage,
38};
39
40const MAX_CB_SIZE_BYTES: usize = 12 * 1024 * 1024;
42
43pub struct Worker {
44 storage: Storage,
45 sct: Arc<RwLock<penumbra_sdk_tct::Tree>>,
46 fvk: FullViewingKey, error_slot: Arc<Mutex<Option<anyhow::Error>>>,
48 sync_height_tx: watch::Sender<u64>,
49 channel: Channel,
51}
52
53impl Worker {
54 #[instrument(skip_all)]
61 pub async fn new(
62 storage: Storage,
63 channel: Channel,
64 ) -> Result<
65 (
66 Self,
67 Arc<RwLock<penumbra_sdk_tct::Tree>>,
68 Arc<Mutex<Option<anyhow::Error>>>,
69 watch::Receiver<u64>,
70 ),
71 anyhow::Error,
72 > {
73 tracing::trace!("constructing view server worker");
74 let fvk = storage
75 .full_viewing_key()
76 .await
77 .context("failed to retrieve full viewing key from storage")?
78 .tap(|_| tracing::debug!("retrieved full viewing key"));
79
80 let sct = Arc::new(RwLock::new(storage.state_commitment_tree().await?));
82 let error_slot = Arc::new(Mutex::new(None));
84 let (sync_height_tx, mut sync_height_rx) =
86 watch::channel(storage.last_sync_height().await?.unwrap_or(0));
87 sync_height_rx.borrow_and_update();
89
90 Ok((
91 Self {
92 storage,
93 sct: sct.clone(),
94 fvk,
95 error_slot: error_slot.clone(),
96 sync_height_tx,
97 channel,
98 },
99 sct,
100 error_slot,
101 sync_height_rx,
102 ))
103 }
104
105 pub async fn fetch_transactions(
106 &self,
107 filtered_block: &mut FilteredBlock,
108 ) -> anyhow::Result<Vec<Transaction>> {
109 let spent_nullifiers = filtered_block
110 .spent_nullifiers
111 .iter()
112 .cloned()
113 .collect::<BTreeSet<Nullifier>>();
114
115 let has_tx_sources = filtered_block
116 .new_notes
117 .values()
118 .map(|record| &record.source)
119 .chain(
120 filtered_block
121 .new_swaps
122 .values()
123 .map(|record| &record.source),
124 )
125 .any(|source| matches!(source, CommitmentSource::Transaction { .. }));
126
127 if spent_nullifiers.is_empty() && !has_tx_sources {
130 return Ok(Vec::new());
131 }
132
133 tracing::debug!(
134 height = filtered_block.height,
135 "fetching full transaction data"
136 );
137
138 let all_transactions =
139 fetch_transactions(self.channel.clone(), filtered_block.height).await?;
140
141 let mut transactions = Vec::new();
142
143 for tx in all_transactions {
144 let tx_id = tx.id().0;
145
146 let mut relevant = false;
147
148 if tx
149 .spent_nullifiers()
150 .any(|nf| spent_nullifiers.contains(&nf))
151 {
152 relevant = true;
154 }
155
156 for commitment in tx.state_commitments() {
158 filtered_block
159 .new_notes
160 .entry(commitment)
161 .and_modify(|record| {
162 relevant = true;
163 record.source = CommitmentSource::Transaction { id: Some(tx_id) };
164 });
165 filtered_block
166 .new_swaps
167 .entry(commitment)
168 .and_modify(|record| {
169 relevant = true;
170 record.source = CommitmentSource::Transaction { id: Some(tx_id) };
171 });
172 }
173
174 if relevant {
175 transactions.push(tx);
176 }
177 }
178
179 tracing::debug!(
180 matched = transactions.len(),
181 "filtered relevant transactions"
182 );
183
184 Ok(transactions)
185 }
186
187 pub async fn sync(&mut self) -> anyhow::Result<()> {
188 tracing::info!("starting client sync");
190
191 let start_height = self
192 .storage
193 .last_sync_height()
194 .await?
195 .map(|h| h + 1)
196 .unwrap_or(0);
197
198 let mut client = CompactBlockQueryServiceClient::new(self.channel.clone())
199 .max_decoding_message_size(MAX_CB_SIZE_BYTES);
200 let mut stream = client
201 .compact_block_range(tonic::Request::new(CompactBlockRangeRequest {
202 start_height,
203 end_height: 0,
204 keep_alive: true,
206 }))
207 .await?
208 .into_inner();
209
210 let (tx, mut buffered_stream) = tokio::sync::mpsc::channel(1000);
216 tokio::spawn(async move {
217 while let Some(block) = stream.message().await.transpose() {
218 if tx.send(block).await.is_err() {
219 break;
220 }
221 }
222 });
223
224 let mut expected_height = start_height;
225
226 while let Some(block) = buffered_stream.recv().await {
227 let block: CompactBlock = block?.try_into()?;
228
229 let height = block.height;
230 if height != expected_height {
231 tracing::warn!("out of order block detected");
232 continue;
233 }
234 expected_height += 1;
235
236 let mut sct_guard = self.sct.write().await;
238
239 if !block.requires_scanning() {
240 sct_guard.end_block()?;
243 if block.epoch_root.is_some() {
246 sct_guard
247 .end_epoch()
248 .expect("ending the epoch must succeed");
249 }
250 self.storage.record_empty_block(height).await?;
251 self.sync_height_tx.send(height)?;
253 } else {
254 let mut filtered_block =
256 scan_block(&self.fvk, &mut sct_guard, block, &self.storage).await?;
257
258 let transactions = self.fetch_transactions(&mut filtered_block).await?;
260
261 for transaction in &transactions {
264 for action in transaction.actions() {
265 match action {
266 penumbra_sdk_transaction::Action::PositionOpen(position_open) => {
267 let position_id = position_open.position.id();
268
269 let lp_nft = LpNft::new(position_id, position::State::Opened);
271 let _id = lp_nft.asset_id();
272 let denom = lp_nft.denom();
273 self.storage.record_asset(denom).await?;
274
275 let lp_nft = LpNft::new(position_id, position::State::Closed);
276 let _id = lp_nft.asset_id();
277 let denom = lp_nft.denom();
278 self.storage.record_asset(denom).await?;
279
280 let lp_nft = LpNft::new(
281 position_id,
282 position::State::Withdrawn { sequence: 0 },
283 );
284 let _id = lp_nft.asset_id();
285 let denom = lp_nft.denom();
286 self.storage.record_asset(denom).await?;
287
288 self.storage
290 .record_position(position_open.position.clone())
291 .await?;
292 }
293 penumbra_sdk_transaction::Action::PositionClose(position_close) => {
294 let position_id = position_close.position_id;
295
296 self.storage
298 .update_position(position_id, position::State::Closed)
299 .await?;
300 }
301 penumbra_sdk_transaction::Action::PositionWithdraw(
302 position_withdraw,
303 ) => {
304 let position_id = position_withdraw.position_id;
305
306 let state = position::State::Withdrawn {
308 sequence: position_withdraw.sequence,
309 };
310 let lp_nft = LpNft::new(position_id, state);
311 let denom = lp_nft.denom();
312 self.storage.record_asset(denom).await?;
313
314 self.storage.update_position(position_id, state).await?;
316 }
317 penumbra_sdk_transaction::Action::ActionDutchAuctionSchedule(
318 schedule_da,
319 ) => {
320 let auction_id = schedule_da.description.id();
321 let auction_nft_opened = AuctionNft::new(auction_id, 0);
322 let nft_metadata_opened = auction_nft_opened.metadata.clone();
323
324 self.storage.record_asset(nft_metadata_opened).await?;
325
326 self.storage
327 .record_auction_with_state(
328 schedule_da.description.id(),
329 0u64, )
331 .await?;
332 }
333 penumbra_sdk_transaction::Action::ActionDutchAuctionEnd(end_da) => {
334 let auction_id = end_da.auction_id;
335 let auction_nft_closed = AuctionNft::new(auction_id, 1);
336 let nft_metadata_closed = auction_nft_closed.metadata.clone();
337
338 self.storage.record_asset(nft_metadata_closed).await?;
339
340 self.storage
341 .record_auction_with_state(end_da.auction_id, 1)
342 .await?;
343 }
344 penumbra_sdk_transaction::Action::ActionDutchAuctionWithdraw(
345 withdraw_da,
346 ) => {
347 let auction_id = withdraw_da.auction_id;
348 let auction_nft_withdrawn =
349 AuctionNft::new(auction_id, withdraw_da.seq);
350 let nft_metadata_withdrawn = auction_nft_withdrawn.metadata.clone();
351
352 self.storage.record_asset(nft_metadata_withdrawn).await?;
353 self.storage
354 .record_auction_with_state(auction_id, withdraw_da.seq)
355 .await?;
356 }
357 _ => (),
358 };
359 }
360 }
361
362 for note_record in filtered_block.new_notes.values() {
364 if let Some(note_denom) = self
367 .storage
368 .asset_by_id(¬e_record.note.asset_id())
369 .await?
370 {
371 if note_denom.is_auction_nft() {
374 let note_commitment = note_record.note_commitment;
375 let auction_nft: AuctionNft = note_denom.try_into()?;
376 self.storage
377 .update_auction_with_note_commitment(
378 auction_nft.id,
379 note_commitment,
380 )
381 .await?;
382 }
383 continue;
384 } else {
385 let mut client = ShieldedPoolQueryServiceClient::new(self.channel.clone());
388 if let Some(denom_metadata) = client
389 .asset_metadata_by_id(AssetMetadataByIdRequest {
390 asset_id: Some(note_record.note.asset_id().into()),
391 })
392 .await?
393 .into_inner()
394 .denom_metadata
395 {
396 self.storage
398 .record_asset(denom_metadata.try_into()?)
399 .await?;
400 } else {
401 tracing::warn!(asset_id = ?note_record.note.asset_id(), "received unknown asset ID with no available metadata");
402 }
403 }
404 }
405
406 self.storage
408 .record_block(
409 filtered_block.clone(),
410 transactions,
411 &mut sct_guard,
412 self.channel.clone(),
413 )
414 .await?;
415 self.sync_height_tx.send(filtered_block.height)?;
417 }
418 #[cfg(feature = "sct-divergence-check")]
419 sct_divergence_check(self.channel.clone(), height, sct_guard.root()).await?;
420
421 drop(sct_guard);
423
424 if self.sync_height_tx.is_closed() {
427 return Ok(());
428 }
429 }
430
431 Ok(())
432 }
433
434 pub async fn run(mut self) -> anyhow::Result<()> {
435 loop {
436 if let Err(e) = self.sync().await {
438 tracing::error!(?e, "view worker error");
439 self.error_slot
440 .lock()
441 .expect("mutex is not poisoned")
442 .replace(e);
443 }
444 tokio::time::sleep(Duration::from_secs(10)).await;
446 *self.error_slot.lock().expect("mutex is not poisoned") = None;
448 }
449 }
450}
451
452async fn fetch_transactions(
454 channel: Channel,
455 block_height: u64,
456) -> anyhow::Result<Vec<Transaction>> {
457 let mut client = AppQueryServiceClient::new(channel);
458 let request = TransactionsByHeightRequest {
459 block_height,
460 ..Default::default()
461 };
462 let response = match client.transactions_by_height(request.clone()).await {
468 Ok(rsp) => rsp,
469 Err(e) => {
470 tracing::warn!(?e, "failed to fetch block, waiting and retrying once");
471 tokio::time::sleep(Duration::from_secs(1)).await;
472 client.transactions_by_height(request).await?
473 }
474 };
475 let transactions = response
476 .into_inner()
477 .transactions
478 .into_iter()
479 .map(TryInto::try_into)
480 .collect::<anyhow::Result<Vec<_>>>()?;
481 Ok(transactions)
482}
483
484#[cfg(feature = "sct-divergence-check")]
485async fn sct_divergence_check(
486 channel: Channel,
487 height: u64,
488 actual_root: penumbra_sdk_tct::Root,
489) -> anyhow::Result<()> {
490 use cnidarium::proto::v1::query_service_client::QueryServiceClient;
491 use penumbra_sdk_proto::DomainType;
492 use penumbra_sdk_sct::state_key as sct_state_key;
493
494 let mut client = QueryServiceClient::new(channel);
495 tracing::info!(?height, "fetching anchor @ height");
496
497 let value = client
498 .key_value(cnidarium::proto::v1::KeyValueRequest {
499 key: sct_state_key::tree::anchor_by_height(height),
500 proof: false,
501 ..Default::default()
502 })
503 .await?
504 .into_inner()
505 .value
506 .context("sct state not found")?;
507
508 let expected_root = penumbra_sdk_tct::Root::decode(value.value.as_slice())?;
509
510 if actual_root == expected_root {
511 tracing::info!(?height, ?actual_root, ?expected_root, "sct roots match");
512 Ok(())
513 } else {
514 let e = anyhow::anyhow!(
515 "SCT divergence detected at height {}: expected {}, got {}",
516 height,
517 expected_root,
518 actual_root
519 );
520 tracing::error!(?e);
522 Err(e)
523 }
524}