1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10 rusqlite::{OpenFlags, OptionalExtension},
11 SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16 sync::broadcast::{self, error::RecvError},
17 task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25 lp::position::{self, Position, State},
26 TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32 core::app::v1::{
33 query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34 },
35 DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct::{self as tct, builder::epoch::Root};
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51 pub id: Id,
52 pub amount: u128,
53 pub address_index: AddressIndex,
54}
55
56static SCHEMA_HASH: Lazy<String> =
58 Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62 pool: r2d2::Pool<SqliteConnectionManager>,
63
64 uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73 scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74 scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75 scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79 #[tracing::instrument(
81 skip_all,
82 fields(
83 path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84 url = %node,
85 )
86 )]
87 pub async fn load_or_initialize(
88 storage_path: Option<impl AsRef<Utf8Path>>,
89 fvk: &FullViewingKey,
90 node: Url,
91 ) -> anyhow::Result<Self> {
92 if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93 if path.exists() {
94 tracing::debug!(?path, "database exists");
95 return Self::load(path).await;
96 } else {
97 tracing::debug!(?path, "database does not exist");
98 }
99 };
100
101 let mut client = AppQueryServiceClient::connect(node.to_string())
102 .instrument(error_span!("connecting_to_endpoint"))
103 .await
104 .tap_err(|error| {
105 tracing::error!(?error, "failed to connect to app query service endpoint")
106 })?
107 .tap(|_| tracing::debug!("connected to app query service endpoint"));
108 let params = client
109 .app_parameters(tonic::Request::new(AppParametersRequest {}))
110 .instrument(error_span!("getting_app_parameters"))
111 .await?
112 .into_inner()
113 .try_into()?;
114
115 Self::initialize(storage_path, fvk.clone(), params).await
116 }
117
118 fn connect(
119 path: Option<impl AsRef<Utf8Path>>,
120 ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121 if let Some(path) = path {
122 let manager = SqliteConnectionManager::file(path.as_ref())
123 .with_flags(
124 OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127 )
128 .with_init(|conn| {
129 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133 conn.set_prepared_statement_cache_capacity(32);
136 Ok(())
137 });
138 Ok(r2d2::Pool::builder()
139 .max_size(1)
142 .build(manager)?)
143 } else {
144 let manager = SqliteConnectionManager::memory();
145 Ok(r2d2::Pool::builder()
151 .max_size(1)
152 .min_idle(Some(1))
153 .max_lifetime(Some(Duration::MAX))
154 .idle_timeout(Some(Duration::MAX))
155 .build(manager)?)
156 }
157 }
158
159 pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160 let storage = Self {
161 pool: Self::connect(Some(path))?,
162 uncommitted_height: Arc::new(Mutex::new(None)),
163 scanned_notes_tx: broadcast::channel(128).0,
164 scanned_nullifiers_tx: broadcast::channel(512).0,
165 scanned_swaps_tx: broadcast::channel(128).0,
166 };
167
168 spawn_blocking(move || {
169 let actual_schema_hash: String = storage
172 .pool
173 .get()?
174 .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175 row.get("schema_hash")
176 })
177 .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179 if actual_schema_hash != *SCHEMA_HASH {
180 let database_client_version: String = storage
181 .pool
182 .get()?
183 .query_row("SELECT client_version FROM client_version", (), |row| {
184 row.get("client_version")
185 })
186 .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188 anyhow::bail!(
189 "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190 database_client_version,
191 env!("CARGO_PKG_VERSION"),
192 );
193 }
194
195 Ok(storage)
196 })
197 .await?
198 }
199
200 pub async fn initialize(
201 storage_path: Option<impl AsRef<Utf8Path>>,
202 fvk: FullViewingKey,
203 params: AppParameters,
204 ) -> anyhow::Result<Self> {
205 tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207 let pool = Self::connect(storage_path)?;
209
210 let out = spawn_blocking(move || {
211 let mut conn = pool.get()?;
213 let tx = conn.transaction()?;
214
215 tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218 let params_bytes = params.encode_to_vec();
219 tx.execute(
220 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221 [¶ms_bytes[..]],
222 )?;
223
224 let fvk_bytes = fvk.encode_to_vec();
225 tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227 tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232 tx.execute(
234 "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235 [&*SCHEMA_HASH],
236 )?;
237
238 tx.execute(
240 "INSERT INTO client_version (client_version) VALUES (?1)",
241 [env!("CARGO_PKG_VERSION")],
242 )?;
243
244 tx.commit()?;
245 drop(conn);
246
247 anyhow::Ok(Storage {
248 pool,
249 uncommitted_height: Arc::new(Mutex::new(None)),
250 scanned_notes_tx: broadcast::channel(128).0,
251 scanned_nullifiers_tx: broadcast::channel(512).0,
252 scanned_swaps_tx: broadcast::channel(128).0,
253 })
254 })
255 .await??;
256
257 out.update_epoch(0, None, Some(0)).await?;
258
259 Ok(out)
260 }
261
262 pub async fn load_asset_metadata(
264 &self,
265 registry_path: impl AsRef<Utf8Path>,
266 ) -> anyhow::Result<()> {
267 tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
268 let registry_path = registry_path.as_ref();
269 let mut registry_json: serde_json::Value = serde_json::from_str(
271 std::fs::read_to_string(registry_path)
272 .context("failed to read file")?
273 .as_str(),
274 )
275 .context("failed to parse JSON")?;
276
277 let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
278 registry_json
279 .get_mut("assetById")
280 .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
281 .take(),
282 )
283 .context("could not parse asset registry")?;
284
285 for metadata in registry.into_values() {
286 self.record_asset(metadata).await?;
287 }
288
289 Ok(())
290 }
291
292 pub async fn balances(
294 &self,
295 address_index: Option<AddressIndex>,
296 asset_id: Option<asset::Id>,
297 ) -> anyhow::Result<Vec<BalanceEntry>> {
298 let pool = self.pool.clone();
299
300 spawn_blocking(move || {
301 let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
302 FROM notes
303 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
304 WHERE spendable_notes.height_spent IS NULL";
305
306 tracing::debug!(?query);
307
308 let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
310
311 for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
312 let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
313 let amount = row.get::<&str, Vec<u8>>("amount")?;
314 let address_index = row.get::<&str, Vec<u8>>("address_index")?;
315
316 Ok((asset_id, amount, address_index))
317 })? {
318 let (id, amount, index) = result?;
319
320 let id = Id::try_from(id.as_slice())?;
321
322 let amount: Amount = Amount::from_be_bytes(
323 amount
324 .as_slice()
325 .try_into()
326 .expect("amount slice of incorrect length"),
327 );
328
329 let index = AddressIndex::try_from(index.as_slice())?;
330
331 if let Some(address_index) = address_index {
333 if address_index != index {
334 continue;
335 }
336 }
337 if let Some(asset_id) = asset_id {
338 if asset_id != id {
339 continue;
340 }
341 }
342
343 balances
344 .entry(index)
345 .or_insert_with(BTreeMap::new)
346 .entry(id)
347 .and_modify(|e| *e += amount)
348 .or_insert(amount);
349 }
350
351 let entries = balances
352 .into_iter()
353 .flat_map(|(index, assets)| {
354 assets.into_iter().map(move |(id, amount)| BalanceEntry {
355 id,
356 amount: amount.into(),
357 address_index: index,
358 })
359 })
360 .collect::<Vec<_>>();
361 Ok(entries)
362 })
363 .await?
364 }
365
366 pub async fn note_by_commitment(
368 &self,
369 note_commitment: tct::StateCommitment,
370 await_detection: bool,
371 ) -> anyhow::Result<SpendableNoteRecord> {
372 let mut rx = self.scanned_notes_tx.subscribe();
375
376 let pool = self.pool.clone();
377
378 if let Some(record) = spawn_blocking(move || {
379 pool.get()?
381 .prepare(&format!(
382 "SELECT
383 notes.note_commitment,
384 spendable_notes.height_created,
385 notes.address,
386 notes.amount,
387 notes.asset_id,
388 notes.rseed,
389 spendable_notes.address_index,
390 spendable_notes.source,
391 spendable_notes.height_spent,
392 spendable_notes.nullifier,
393 spendable_notes.position,
394 tx.return_address
395 FROM notes
396 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
397 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
398 WHERE notes.note_commitment = x'{}'",
399 hex::encode(note_commitment.0.to_bytes())
400 ))?
401 .query_and_then((), |record| record.try_into())?
402 .next()
403 .transpose()
404 })
405 .await??
406 {
407 return Ok(record);
408 }
409
410 if !await_detection {
411 anyhow::bail!("Note commitment {} not found", note_commitment);
412 }
413
414 loop {
418 match rx.recv().await {
419 Ok(record) => {
420 if record.note_commitment == note_commitment {
421 return Ok(record);
422 }
423 }
424
425 Err(e) => match e {
426 RecvError::Closed => {
427 anyhow::bail!(
428 "Receiver error during note detection: closed (no more active senders)"
429 );
430 }
431 RecvError::Lagged(count) => {
432 anyhow::bail!(
433 "Receiver error during note detection: lagged (by {:?} messages)",
434 count
435 );
436 }
437 },
438 };
439 }
440 }
441
442 pub async fn swap_by_commitment(
444 &self,
445 swap_commitment: tct::StateCommitment,
446 await_detection: bool,
447 ) -> anyhow::Result<SwapRecord> {
448 let mut rx = self.scanned_swaps_tx.subscribe();
451
452 let pool = self.pool.clone();
453
454 if let Some(record) = spawn_blocking(move || {
455 pool.get()?
457 .prepare(&format!(
458 "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
459 hex::encode(swap_commitment.0.to_bytes())
460 ))?
461 .query_and_then((), |record| record.try_into())?
462 .next()
463 .transpose()
464 })
465 .await??
466 {
467 return Ok(record);
468 }
469
470 if !await_detection {
471 anyhow::bail!("swap commitment {} not found", swap_commitment);
472 }
473
474 loop {
478 match rx.recv().await {
479 Ok(record) => {
480 if record.swap_commitment == swap_commitment {
481 return Ok(record);
482 }
483 }
484
485 Err(e) => match e {
486 RecvError::Closed => {
487 anyhow::bail!(
488 "Receiver error during swap detection: closed (no more active senders)"
489 );
490 }
491 RecvError::Lagged(count) => {
492 anyhow::bail!(
493 "Receiver error during swap detection: lagged (by {:?} messages)",
494 count
495 );
496 }
497 },
498 };
499 }
500 }
501
502 pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
504 let pool = self.pool.clone();
505
506 let records = spawn_blocking(move || {
507 pool.get()?
509 .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
510 .query_and_then((), |record| record.try_into())?
511 .collect::<anyhow::Result<Vec<_>>>()
512 })
513 .await??;
514
515 Ok(records)
516 }
517
518 pub async fn nullifier_status(
520 &self,
521 nullifier: Nullifier,
522 await_detection: bool,
523 ) -> anyhow::Result<bool> {
524 let mut rx = self.scanned_nullifiers_tx.subscribe();
527
528 let pool = self.pool.clone();
530
531 let nullifier_bytes = nullifier.0.to_bytes().to_vec();
532
533 if let Some(height_spent) = spawn_blocking(move || {
535 pool.get()?
536 .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
537 .query_and_then([nullifier_bytes], |row| {
538 let height_spent: Option<u64> = row.get("height_spent")?;
539 anyhow::Ok(height_spent)
540 })?
541 .next()
542 .transpose()
543 })
544 .await??
545 {
546 let spent = height_spent.is_some();
547
548 if !await_detection || spent {
550 return Ok(spent);
551 }
552 }
553
554 if !await_detection {
557 return Ok(false);
558 }
559
560 loop {
563 let new_nullifier = rx.recv().await.context("change subscriber failed")?;
564
565 if new_nullifier == nullifier {
566 return Ok(true);
567 }
568 }
569 }
570
571 pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
573 if let Some(height) = *self.uncommitted_height.lock() {
575 return Ok(Some(height.get()));
576 }
577
578 let pool = self.pool.clone();
579
580 spawn_blocking(move || {
581 let height: Option<i64> = pool
582 .get()?
583 .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
584 .query_row([], |row| row.get::<_, Option<i64>>(0))?;
585
586 anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
587 })
588 .await?
589 }
590
591 pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
592 let pool = self.pool.clone();
593
594 spawn_blocking(move || {
595 let params_bytes = pool
596 .get()?
597 .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
598 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
599 .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
600
601 AppParameters::decode(params_bytes.as_slice())
602 })
603 .await?
604 }
605
606 pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
607 let pool = self.pool.clone();
608
609 spawn_blocking(move || {
610 let bytes = pool
611 .get()?
612 .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
613 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
614 .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
615
616 GasPrices::decode(bytes.as_slice())
617 })
618 .await?
619 }
620
621 pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
622 let pool = self.pool.clone();
623
624 spawn_blocking(move || {
625 let bytes = pool
626 .get()?
627 .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
628 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
629 .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
630
631 fmd::Parameters::decode(bytes.as_slice())
632 })
633 .await?
634 }
635
636 pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
637 let pool = self.pool.clone();
638
639 spawn_blocking(move || {
640 let bytes = pool
641 .get()?
642 .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
643 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
644 .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
645
646 FullViewingKey::decode(bytes.as_slice())
647 })
648 .await?
649 }
650
651 pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
652 let pool = self.pool.clone();
653 spawn_blocking(move || {
654 tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
655 })
656 .await?
657 }
658
659 pub async fn transaction_hashes(
661 &self,
662 start_height: Option<u64>,
663 end_height: Option<u64>,
664 ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
665 let starting_block = start_height.unwrap_or(0) as i64;
666 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
667
668 let pool = self.pool.clone();
669
670 spawn_blocking(move || {
671 pool.get()?
672 .prepare_cached(
673 "SELECT block_height, tx_hash
674 FROM tx
675 WHERE block_height BETWEEN ?1 AND ?2",
676 )?
677 .query_and_then([starting_block, ending_block], |row| {
678 let block_height: u64 = row.get("block_height")?;
679 let tx_hash: Vec<u8> = row.get("tx_hash")?;
680 anyhow::Ok((block_height, tx_hash))
681 })?
682 .collect()
683 })
684 .await?
685 }
686
687 pub async fn transactions(
689 &self,
690 start_height: Option<u64>,
691 end_height: Option<u64>,
692 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
693 let starting_block = start_height.unwrap_or(0) as i64;
694 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
695
696 let pool = self.pool.clone();
697
698 spawn_blocking(move || {
699 pool.get()?
700 .prepare_cached(
701 "SELECT block_height, tx_hash, tx_bytes
702 FROM tx
703 WHERE block_height BETWEEN ?1 AND ?2",
704 )?
705 .query_and_then([starting_block, ending_block], |row| {
706 let block_height: u64 = row.get("block_height")?;
707 let tx_hash: Vec<u8> = row.get("tx_hash")?;
708 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
709 let tx = Transaction::decode(tx_bytes.as_slice())?;
710 anyhow::Ok((block_height, tx_hash, tx))
711 })?
712 .collect()
713 })
714 .await?
715 }
716
717 pub async fn transaction_by_hash(
718 &self,
719 tx_hash: &[u8],
720 ) -> anyhow::Result<Option<(u64, Transaction)>> {
721 let pool = self.pool.clone();
722 let tx_hash = tx_hash.to_vec();
723
724 spawn_blocking(move || {
725 if let Some((block_height, tx_bytes)) = pool
726 .get()?
727 .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
728 .query_row([tx_hash], |row| {
729 let block_height: u64 = row.get("block_height")?;
730 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
731 Ok((block_height, tx_bytes))
732 })
733 .optional()?
734 {
735 let tx = Transaction::decode(tx_bytes.as_slice())?;
736 Ok(Some((block_height, tx)))
737 } else {
738 Ok(None)
739 }
740 })
741 .await?
742 }
743
744 pub async fn note_by_nullifier(
746 &self,
747 nullifier: Nullifier,
748 await_detection: bool,
749 ) -> anyhow::Result<SpendableNoteRecord> {
750 let mut rx = self.scanned_notes_tx.subscribe();
753
754 let pool = self.pool.clone();
756
757 let nullifier_bytes = nullifier.to_bytes().to_vec();
758
759 if let Some(record) = spawn_blocking(move || {
760 let record = pool
761 .get()?
762 .prepare(&format!(
763 "SELECT
764 notes.note_commitment,
765 spendable_notes.height_created,
766 notes.address,
767 notes.amount,
768 notes.asset_id,
769 notes.rseed,
770 spendable_notes.address_index,
771 spendable_notes.source,
772 spendable_notes.height_spent,
773 spendable_notes.nullifier,
774 spendable_notes.position,
775 tx.return_address
776 FROM notes
777 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
778 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
779 WHERE hex(spendable_notes.nullifier) = \"{}\"",
780 hex::encode_upper(nullifier_bytes)
781 ))?
782 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
783 .next()
784 .transpose()?;
785
786 anyhow::Ok(record)
787 })
788 .await??
789 {
790 return Ok(record);
791 }
792
793 if !await_detection {
794 anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
795 }
796
797 loop {
801 match rx.recv().await {
802 Ok(record) => {
803 if record.nullifier == nullifier {
804 return Ok(record);
805 }
806 }
807
808 Err(e) => match e {
809 RecvError::Closed => {
810 anyhow::bail!(
811 "Receiver error during note detection: closed (no more active senders)"
812 );
813 }
814 RecvError::Lagged(count) => {
815 anyhow::bail!(
816 "Receiver error during note detection: lagged (by {:?} messages)",
817 count
818 );
819 }
820 },
821 };
822 }
823 }
824
825 pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
826 let pool = self.pool.clone();
827
828 spawn_blocking(move || {
829 pool.get()?
830 .prepare_cached("SELECT metadata FROM assets")?
831 .query_and_then([], |row| {
832 let metadata_json = row.get::<_, String>("metadata")?;
833 let denom_metadata = serde_json::from_str(&metadata_json)?;
834
835 anyhow::Ok(denom_metadata)
836 })?
837 .collect()
838 })
839 .await?
840 }
841
842 pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
843 let id = id.to_bytes().to_vec();
844
845 let pool = self.pool.clone();
846
847 spawn_blocking(move || {
848 pool.get()?
849 .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
850 .query_and_then([id], |row| {
851 let metadata_json = row.get::<_, String>("metadata")?;
852 let denom_metadata = serde_json::from_str(&metadata_json)?;
853 anyhow::Ok(denom_metadata)
854 })?
855 .next()
856 .transpose()
857 })
858 .await?
859 }
860
861 pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
864 let pattern = pattern.to_owned();
865
866 let pool = self.pool.clone();
867
868 spawn_blocking(move || {
869 pool.get()?
870 .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
871 .query_and_then([pattern], |row| {
872 let metadata_json = row.get::<_, String>("metadata")?;
873 let denom_metadata = serde_json::from_str(&metadata_json)?;
874 anyhow::Ok(denom_metadata)
875 })?
876 .collect()
877 })
878 .await?
879 }
880
881 pub async fn notes(
882 &self,
883 include_spent: bool,
884 asset_id: Option<asset::Id>,
885 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
886 amount_to_spend: Option<Amount>,
887 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
888 let spent_clause = match include_spent {
891 false => "NULL",
892 true => "height_spent",
893 };
894
895 let asset_clause = asset_id
898 .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
899 .unwrap_or_else(|| "asset_id".to_string());
900
901 let address_clause = "address_index".to_string();
912
913 let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
919 let mut amount_total = Amount::zero();
920
921 let pool = self.pool.clone();
922
923 spawn_blocking(move || {
924 let mut output: Vec<SpendableNoteRecord> = Vec::new();
925
926 for result in pool
927 .get()?
928 .prepare(&format!(
929 "SELECT notes.note_commitment,
930 spendable_notes.height_created,
931 notes.address,
932 notes.amount,
933 notes.asset_id,
934 notes.rseed,
935 spendable_notes.address_index,
936 spendable_notes.source,
937 spendable_notes.height_spent,
938 spendable_notes.nullifier,
939 spendable_notes.position,
940 tx.return_address
941 FROM notes
942 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
943 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
944 WHERE spendable_notes.height_spent IS {spent_clause}
945 AND notes.asset_id IS {asset_clause}
946 AND spendable_notes.address_index IS {address_clause}"
947 ))?
948 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
949 {
950 let record = result?;
951
952 if let Some(address_index) = address_index {
955 if record.address_index.account != address_index.account {
956 continue;
957 }
958 }
959 let amount = record.note.amount();
960
961 if amount.value() > 0 {
964 output.push(record);
965 }
966
967 if amount_cutoff {
970 amount_total += amount;
972 if amount_total >= amount_to_spend.unwrap_or_default() {
973 break;
974 }
975 }
976 }
977
978 if amount_total < amount_to_spend.unwrap_or_default() {
979 anyhow::bail!(
980 "requested amount of {} exceeds total of {}",
981 amount_to_spend.unwrap_or_default(),
982 amount_total
983 );
984 }
985
986 anyhow::Ok(output)
987 })
988 .await?
989 }
990
991 pub async fn notes_for_voting(
996 &self,
997 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
998 votable_at_height: u64,
999 ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
1000 let pool = self.pool.clone();
1001
1002 spawn_blocking(move || {
1003 let mut lock = pool.get()?;
1004 let dbtx = lock.transaction()?;
1005
1006 let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1007 .prepare(&format!(
1008 "SELECT notes.note_commitment,
1009 spendable_notes.height_created,
1010 notes.address,
1011 notes.amount,
1012 notes.asset_id,
1013 notes.rseed,
1014 spendable_notes.address_index,
1015 spendable_notes.source,
1016 spendable_notes.height_spent,
1017 spendable_notes.nullifier,
1018 spendable_notes.position
1019 FROM
1020 notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1021 WHERE
1022 notes.asset_id IN (
1023 SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1024 )
1025 AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1026 AND (spendable_notes.height_created < {votable_at_height})
1027 ",
1028 ))?
1029 .query_and_then((), |row| row.try_into())?
1030 .collect::<anyhow::Result<Vec<_>>>()?;
1031
1032 let mut results = Vec::new();
1035 for record in spendable_note_records {
1036 if matches!(address_index, Some(a) if a.account != record.address_index.account) {
1038 continue;
1039 }
1040 let asset_id = record.note.asset_id().to_bytes().to_vec();
1041 let denom: String = dbtx.query_row_and_then(
1042 "SELECT denom FROM assets WHERE asset_id = ?1",
1043 [asset_id],
1044 |row| row.get("denom"),
1045 )?;
1046
1047 let identity_key = DelegationToken::from_str(&denom)
1048 .context("invalid delegation token denom")?
1049 .validator();
1050
1051 results.push((record, identity_key));
1052 }
1053
1054 Ok(results)
1055 }).await?
1056 }
1057
1058 #[tracing::instrument(skip(self))]
1059 pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1060 tracing::debug!(?asset);
1061
1062 let asset_id = asset.id().to_bytes().to_vec();
1063 let denom = asset.base_denom().denom;
1064 let metadata_json = serde_json::to_string(&asset)?;
1065
1066 let pool = self.pool.clone();
1067
1068 spawn_blocking(move || {
1069 pool.get()?
1070 .execute(
1071 "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1072 (asset_id, denom, metadata_json),
1073 )
1074 .map_err(anyhow::Error::from)
1075 })
1076 .await??;
1077
1078 Ok(())
1079 }
1080
1081 pub async fn record_auction_with_state(
1082 &self,
1083 auction_id: AuctionId,
1084 auction_state: u64,
1085 ) -> anyhow::Result<()> {
1086 let auction_id = auction_id.0.to_vec();
1087
1088 let pool = self.pool.clone();
1089
1090 spawn_blocking(move || {
1091 let mut lock = pool.get()?;
1092 let tx = lock.transaction()?;
1093 tx.execute(
1094 "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1095 (auction_id.clone(), auction_state),
1096 )?;
1097 tx.execute(
1098 "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1099 (auction_id, auction_state),
1100 )
1101 .map_err(anyhow::Error::from)?;
1102
1103 tx.commit()?;
1104 Ok::<(), anyhow::Error>(())
1105 })
1106 .await??;
1107
1108 Ok(())
1109 }
1110
1111 pub async fn update_auction_with_note_commitment(
1112 &self,
1113 auction_id: AuctionId,
1114 note_commitment: StateCommitment,
1115 ) -> anyhow::Result<()> {
1116 let auction_id = auction_id.0.to_vec();
1117 let blob_nc = note_commitment.0.to_bytes().to_vec();
1118
1119 let pool = self.pool.clone();
1120
1121 spawn_blocking(move || {
1122 pool.get()?
1123 .execute(
1124 "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1125 (blob_nc, auction_id),
1126 )
1127 .map_err(anyhow::Error::from)
1128 })
1129 .await??;
1130
1131 Ok(())
1132 }
1133
1134 pub async fn fetch_auctions_by_account(
1135 &self,
1136 account_filter: Option<AddressIndex>,
1137 include_inactive: bool,
1138 ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 )>> {
1139 let account_clause = account_filter
1140 .map(|idx| {
1141 format!(
1142 "AND spendable_notes.address_index = x'{}'",
1143 hex::encode(idx.to_bytes())
1144 )
1145 })
1146 .unwrap_or_else(|| "".to_string());
1147
1148 let active_clause = if !include_inactive {
1149 "AND auctions.auction_state = 0"
1150 } else {
1151 ""
1152 };
1153
1154 let query = format!(
1155 "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1156 FROM auctions
1157 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1158 JOIN notes ON auctions.note_commitment = notes.note_commitment
1159 WHERE 1 = 1
1160 {account_clause}
1161 {active_clause}",
1162 account_clause = account_clause,
1163 active_clause = active_clause,
1164 );
1165
1166 let pool = self.pool.clone();
1167
1168 spawn_blocking(move || {
1169 let mut conn = pool.get()?;
1170 let tx = conn.transaction()?;
1171
1172 let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1173 .prepare(&query)?
1174 .query_and_then((), |row| {
1175 let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1176 let array_auction_id: [u8; 32] = raw_auction_id
1177 .try_into()
1178 .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1179 let auction_id = AuctionId(array_auction_id);
1180 let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1181 let local_seq: u64 = row.get("auction_state")?;
1182 Ok((auction_id, spendable_note_record, local_seq))
1183 })?
1184 .collect::<anyhow::Result<Vec<_>>>()?;
1185
1186 Ok(spendable_note_records)
1187 })
1188 .await?
1189 }
1190
1191 pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1192 let position_id = position.id().0.to_vec();
1193
1194 let position_state = position.state.to_string();
1195 let trading_pair = position.phi.pair.to_string();
1196
1197 let pool = self.pool.clone();
1198
1199 spawn_blocking(move || {
1200 pool.get()?
1201 .execute(
1202 "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1203 (position_id, position_state, trading_pair),
1204 )
1205 .map_err(anyhow::Error::from)
1206 })
1207 .await??;
1208
1209 Ok(())
1210 }
1211
1212 pub async fn update_position(
1213 &self,
1214 position_id: position::Id,
1215 position_state: position::State,
1216 ) -> anyhow::Result<()> {
1217 let position_id = position_id.0.to_vec();
1218 let position_state = position_state.to_string();
1219
1220 let pool = self.pool.clone();
1221
1222 spawn_blocking(move || {
1223 pool.get()?
1224 .execute(
1225 "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1226 (position_state, position_id),
1227 )
1228 .map_err(anyhow::Error::from)
1229 })
1230 .await??;
1231
1232 Ok(())
1233 }
1234
1235 pub async fn update_position_with_account(
1236 &self,
1237 position_id: position::Id,
1238 account: u32,
1239 ) -> anyhow::Result<()> {
1240 let pool = self.pool.clone();
1241
1242 spawn_blocking(move || {
1243 pool.get()?
1244 .execute(
1245 "UPDATE positions SET (account) = ?1 WHERE position_id = ?2",
1246 (i64::from(account), position_id.0),
1247 )
1248 .map_err(anyhow::Error::from)
1249 })
1250 .await??;
1251
1252 Ok(())
1253 }
1254
1255 pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1256 let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1258 anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1259 })?;
1260
1261 if height != last_sync_height + 1 {
1262 anyhow::bail!(
1263 "Wrong block height {} for latest sync height {}",
1264 height,
1265 last_sync_height
1266 );
1267 }
1268
1269 *self.uncommitted_height.lock() = Some(height.try_into()?);
1270 Ok(())
1271 }
1272
1273 fn record_note_inner(
1274 dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1275 note: &Note,
1276 ) -> anyhow::Result<()> {
1277 let note_commitment = note.commit().0.to_bytes().to_vec();
1278 let address = note.address().to_vec();
1279 let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1280 let asset_id = note.asset_id().to_bytes().to_vec();
1281 let rseed = note.rseed().to_bytes().to_vec();
1282
1283 dbtx.execute(
1284 "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1285 VALUES (?1, ?2, ?3, ?4, ?5)
1286 ON CONFLICT (note_commitment)
1287 DO UPDATE SET
1288 address = excluded.address,
1289 amount = excluded.amount,
1290 asset_id = excluded.asset_id,
1291 rseed = excluded.rseed",
1292 (note_commitment, address, amount, asset_id, rseed),
1293 )?;
1294
1295 Ok(())
1296 }
1297
1298 pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1299 let pool = self.pool.clone();
1300 let mut lock = pool.get()?;
1301 let dbtx = lock.transaction()?;
1302
1303 Storage::record_note_inner(&dbtx, ¬e)?;
1304
1305 dbtx.commit()?;
1306
1307 Ok(())
1308 }
1309
1310 pub async fn scan_advice(
1316 &self,
1317 note_commitments: Vec<note::StateCommitment>,
1318 ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1319 if note_commitments.is_empty() {
1320 return Ok(BTreeMap::new());
1321 }
1322
1323 let pool = self.pool.clone();
1324
1325 spawn_blocking(move || {
1329 pool.get()?
1330 .prepare(&format!(
1331 "SELECT notes.note_commitment,
1332 notes.address,
1333 notes.amount,
1334 notes.asset_id,
1335 notes.rseed
1336 FROM notes
1337 LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1338 WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1339 note_commitments
1340 .iter()
1341 .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1342 .collect::<Vec<_>>()
1343 .join(", ")
1344 ))?
1345 .query_and_then((), |row| {
1346 let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1347 let amount = row.get::<_, [u8; 16]>("amount")?;
1348 let amount_u128: u128 = u128::from_be_bytes(amount);
1349 let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1350 let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1351 let note = Note::from_parts(
1352 address,
1353 Value {
1354 amount: amount_u128.into(),
1355 asset_id,
1356 },
1357 rseed,
1358 )?;
1359 anyhow::Ok((note.commit(), note))
1360 })?
1361 .collect::<anyhow::Result<BTreeMap<_, _>>>()
1362 }).await?
1363 }
1364
1365 pub async fn filter_nullifiers(
1367 &self,
1368 nullifiers: Vec<Nullifier>,
1369 ) -> anyhow::Result<Vec<Nullifier>> {
1370 if nullifiers.is_empty() {
1371 return Ok(Vec::new());
1372 }
1373
1374 let pool = self.pool.clone();
1375
1376 spawn_blocking(move || {
1377 pool.get()?
1378 .prepare(&format!(
1379 "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1380 nullifiers
1381 .iter()
1382 .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1383 .collect::<Vec<String>>()
1384 .join(",")
1385 ))?
1386 .query_and_then((), |row| {
1387 let nullifier: Vec<u8> = row.get("nullifier")?;
1388 nullifier.as_slice().try_into()
1389 })?
1390 .collect()
1391 })
1392 .await?
1393 }
1394
1395 pub async fn record_block(
1396 &self,
1397 filtered_block: FilteredBlock,
1398 transactions: Vec<Transaction>,
1399 sct: &mut tct::Tree,
1400 channel: tonic::transport::Channel,
1401 ) -> anyhow::Result<()> {
1402 let last_sync_height = self.last_sync_height().await?;
1404
1405 let correct_height = match last_sync_height {
1406 Some(cur_height) => filtered_block.height == cur_height + 1,
1408 None => filtered_block.height == 0,
1410 };
1411
1412 if !correct_height {
1413 anyhow::bail!(
1414 "Wrong block height {} for latest sync height {:?}",
1415 filtered_block.height,
1416 last_sync_height
1417 );
1418 }
1419
1420 let pool = self.pool.clone();
1421 let uncommitted_height = self.uncommitted_height.clone();
1422 let scanned_notes_tx = self.scanned_notes_tx.clone();
1423 let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1424 let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1425
1426 let fvk = self.full_viewing_key().await?;
1427
1428 let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1430 let mut client = AppQueryServiceClient::new(channel);
1432 Some(
1433 client
1434 .app_parameters(tonic::Request::new(AppParametersRequest {}))
1435 .await?
1436 .into_inner()
1437 .try_into()?,
1438 )
1439 } else {
1440 None
1441 };
1442
1443 let mut new_sct = sct.clone();
1451
1452 *sct = spawn_blocking(move || {
1453 let mut lock = pool.get()?;
1454 let mut dbtx = lock.transaction()?;
1455
1456 if let Some(params) = new_app_parameters {
1457 let params_bytes = params.encode_to_vec();
1458 dbtx.execute(
1460 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1461 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1462 [¶ms_bytes[..]],
1463 )?;
1464 }
1465
1466 for note_record in filtered_block.new_notes.values() {
1468 let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1469 let height_created = filtered_block.height as i64;
1470 let address_index = note_record.address_index.to_bytes().to_vec();
1471 let nullifier = note_record.nullifier.to_bytes().to_vec();
1472 let position = (u64::from(note_record.position)) as i64;
1473 let source = note_record.source.encode_to_vec();
1474 let tx_hash = match note_record.source {
1476 CommitmentSource::Transaction { id } => id,
1477 _ => None,
1478 };
1479
1480 Storage::record_note_inner(&dbtx, ¬e_record.note)?;
1482
1483 dbtx.execute(
1484 "INSERT INTO spendable_notes
1485 (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1486 VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1487 ON CONFLICT (note_commitment)
1488 DO UPDATE SET nullifier = excluded.nullifier,
1489 position = excluded.position,
1490 height_created = excluded.height_created,
1491 address_index = excluded.address_index,
1492 source = excluded.source,
1493 height_spent = excluded.height_spent,
1494 tx_hash = excluded.tx_hash",
1495 (
1496 ¬e_commitment,
1497 &nullifier,
1498 &position,
1499 &height_created,
1500 &address_index,
1501 &source,
1502 &tx_hash,
1504 ),
1505 )?;
1506 }
1507
1508 for swap in filtered_block.new_swaps.values() {
1510 let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1511 let swap_bytes = swap.swap.encode_to_vec();
1512 let position = (u64::from(swap.position)) as i64;
1513 let nullifier = swap.nullifier.to_bytes().to_vec();
1514 let source = swap.source.encode_to_vec();
1515 let output_data = swap.output_data.encode_to_vec();
1516
1517 dbtx.execute(
1518 "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1519 VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1520 ON CONFLICT (swap_commitment)
1521 DO UPDATE SET swap = excluded.swap,
1522 position = excluded.position,
1523 nullifier = excluded.nullifier,
1524 output_data = excluded.output_data,
1525 height_claimed = excluded.height_claimed,
1526 source = excluded.source",
1527 (
1528 &swap_commitment,
1529 &swap_bytes,
1530 &position,
1531 &nullifier,
1532 &output_data,
1533 &source,
1535 ),
1536 )?;
1537 }
1538
1539 for nullifier in &filtered_block.spent_nullifiers {
1541 let height_spent = filtered_block.height as i64;
1542 let nullifier_bytes = nullifier.to_bytes().to_vec();
1543
1544 let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1545 "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1546 )?
1547 .query_and_then(
1548 (height_spent, &nullifier_bytes),
1549 |row| {
1550 let bytes: Vec<u8> = row.get("note_commitment")?;
1551 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1552 },
1553 )?
1554 .next()
1555 .transpose()?;
1556
1557 let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1558 "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1559 )?
1560 .query_and_then(
1561 (height_spent, &nullifier_bytes),
1562 |row| {
1563 let bytes: Vec<u8> = row.get("swap_commitment")?;
1564 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1565 },
1566 )?
1567 .next()
1568 .transpose()?;
1569
1570 let spent_denom: String
1572 = dbtx.prepare_cached(
1573 "SELECT denom FROM assets
1574 WHERE asset_id ==
1575 (SELECT asset_id FROM notes
1576 WHERE note_commitment ==
1577 (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1578 )?
1579 .query_and_then(
1580 [&nullifier_bytes],
1581 |row| row.get("denom"),
1582 )?
1583 .next()
1584 .transpose()?
1585 .unwrap_or("unknown".to_string());
1586
1587 if let Some(spent_commitment) = spent_commitment {
1589 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1590 if DelegationToken::from_str(&spent_denom).is_err() {
1595 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1596 new_sct.forget(spent_commitment);
1597 }
1598 };
1599
1600 if let Some(spent_swap_commitment) = swap_commitment {
1602 tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1603 new_sct.forget(spent_swap_commitment);
1604 };
1605 }
1606
1607 new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1609
1610 for transaction in transactions {
1612 let tx_bytes = transaction.encode_to_vec();
1613 let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1615 let tx_hash = tx_hash_owned.as_slice();
1616 let tx_block_height = filtered_block.height as i64;
1617 let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1618 let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1619 let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1620
1621 tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1622
1623 dbtx.execute(
1624 "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1625 (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1626 )?;
1627
1628 for nf in transaction.spent_nullifiers() {
1630 let nf_bytes = nf.0.to_bytes().to_vec();
1631 dbtx.execute(
1632 "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1633 (&nf_bytes, &tx_hash),
1634 )?;
1635 }
1636 }
1637
1638 if filtered_block.fmd_parameters.is_some() {
1640 let fmd_parameters_bytes =
1641 &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1642
1643 dbtx.execute(
1644 "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1645 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1646 [&fmd_parameters_bytes],
1647 )?;
1648 }
1649
1650 if filtered_block.gas_prices.is_some() {
1652 let gas_prices_bytes =
1653 &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1654
1655 dbtx.execute(
1656 "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1657 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1658 [&gas_prices_bytes],
1659 )?;
1660 }
1661
1662 let latest_sync_height = filtered_block.height as i64;
1664 dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1665
1666 dbtx.commit()?;
1668
1669 uncommitted_height.lock().take();
1677
1678 for note_record in filtered_block.new_notes.values() {
1682 let _ = scanned_notes_tx.send(note_record.clone());
1686 }
1687
1688 for nullifier in filtered_block.spent_nullifiers.iter() {
1689 let _ = scanned_nullifiers_tx.send(*nullifier);
1693 }
1694
1695 for swap_record in filtered_block.new_swaps.values() {
1696 let _ = scanned_swaps_tx.send(swap_record.clone());
1700 }
1701
1702 anyhow::Ok(new_sct)
1703 })
1704 .await??;
1705
1706 Ok(())
1707 }
1708
1709 pub async fn owned_position_ids(
1710 &self,
1711 position_state: Option<State>,
1712 trading_pair: Option<TradingPair>,
1713 address_index: Option<AddressIndex>,
1714 ) -> anyhow::Result<Vec<position::Id>> {
1715 let pool = self.pool.clone();
1716
1717 let state_clause = match position_state {
1718 Some(state) => format!("position_state = \"{}\"", state),
1719 None => "true".to_string(),
1720 };
1721
1722 let pair_clause = match trading_pair {
1723 Some(pair) => format!("trading_pair = \"{}\"", pair),
1724 None => "true".to_string(),
1725 };
1726
1727 let account_clause = match address_index {
1728 Some(index) => format!("account = {}", index.account),
1729 None => "true".to_string(),
1730 };
1731
1732 spawn_blocking(move || {
1733 let q = format!(
1734 "SELECT position_id FROM positions WHERE {} AND {} AND {}",
1735 state_clause, pair_clause, account_clause
1736 );
1737
1738 pool.get()?
1739 .prepare_cached(&q)?
1740 .query_and_then([], |row| {
1741 let position_id: Vec<u8> = row.get("position_id")?;
1742 Ok(position::Id(position_id.as_slice().try_into()?))
1743 })?
1744 .collect()
1745 })
1746 .await?
1747 }
1748
1749 pub async fn notes_by_sender(
1750 &self,
1751 return_address: &Address,
1752 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1753 let pool = self.pool.clone();
1754
1755 let query = "SELECT notes.note_commitment,
1756 spendable_notes.height_created,
1757 notes.address,
1758 notes.amount,
1759 notes.asset_id,
1760 notes.rseed,
1761 spendable_notes.address_index,
1762 spendable_notes.source,
1763 spendable_notes.height_spent,
1764 spendable_notes.nullifier,
1765 spendable_notes.position
1766 FROM notes
1767 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1768 JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1769 WHERE tx.return_address = ?1";
1770
1771 let return_address = return_address.to_vec();
1772
1773 let records = spawn_blocking(move || {
1774 pool.get()?
1775 .prepare(query)?
1776 .query_and_then([return_address], |record| record.try_into())?
1777 .collect::<anyhow::Result<Vec<_>>>()
1778 })
1779 .await??;
1780
1781 Ok(records)
1782 }
1783
1784 pub async fn transactions_matching_memo(
1788 &self,
1789 pattern: String,
1790 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1791 let pattern = pattern.to_owned();
1792 tracing::trace!(?pattern, "searching for memos matching");
1793 let pool = self.pool.clone();
1794
1795 spawn_blocking(move || {
1796 pool.get()?
1797 .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1798 .query_and_then([pattern], |row| {
1799 let block_height: u64 = row.get("block_height")?;
1800 let tx_hash: Vec<u8> = row.get("tx_hash")?;
1801 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1802 let tx = Transaction::decode(tx_bytes.as_slice())?;
1803 let memo_text: String = row.get("memo_text")?;
1804 anyhow::Ok((block_height, tx_hash, tx, memo_text))
1805 })?
1806 .collect()
1807 })
1808 .await?
1809 }
1810
1811 pub async fn update_epoch(
1813 &self,
1814 epoch: u64,
1815 root: Option<Root>,
1816 start_height: Option<u64>,
1817 ) -> anyhow::Result<()> {
1818 let pool = self.pool.clone();
1819
1820 spawn_blocking(move || {
1821 pool.get()?
1822 .execute(
1823 r#"
1824 INSERT INTO epochs(epoch_index, root, start_height)
1825 VALUES (?1, ?2, ?3)
1826 ON CONFLICT(epoch_index)
1827 DO UPDATE SET
1828 root = COALESCE(?2, root),
1829 start_height = COALESCE(?3, start_height)
1830 "#,
1831 (epoch, root.map(|x| x.encode_to_vec()), start_height),
1832 )
1833 .map_err(anyhow::Error::from)
1834 })
1835 .await??;
1836
1837 Ok(())
1838 }
1839
1840 pub async fn get_epoch(&self, epoch: u64) -> anyhow::Result<(Option<Root>, Option<u64>)> {
1845 let pool = self.pool.clone();
1846
1847 spawn_blocking(move || {
1848 pool.get()?
1849 .query_row_and_then(
1850 r#"
1851 SELECT root, start_height
1852 FROM epochs
1853 WHERE epoch_index = ?1
1854 "#,
1855 (epoch,),
1856 |row| {
1857 let root_raw: Option<Vec<u8>> = row.get("root")?;
1858 let start_height: Option<u64> = row.get("start_height")?;
1859 let root = root_raw.map(|x| Root::decode(x.as_slice())).transpose()?;
1860 anyhow::Ok((root, start_height))
1861 },
1862 )
1863 .map_err(anyhow::Error::from)
1864 })
1865 .await?
1866 }
1867}