1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10 rusqlite::{OpenFlags, OptionalExtension},
11 SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16 sync::broadcast::{self, error::RecvError},
17 task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25 lp::position::{self, Position, State},
26 TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32 core::app::v1::{
33 query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34 },
35 DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct::{self as tct, builder::epoch::Root};
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51 pub id: Id,
52 pub amount: u128,
53 pub address_index: AddressIndex,
54}
55
56static SCHEMA_HASH: Lazy<String> =
58 Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62 pool: r2d2::Pool<SqliteConnectionManager>,
63
64 uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73 scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74 scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75 scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79 #[tracing::instrument(
81 skip_all,
82 fields(
83 path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84 url = %node,
85 )
86 )]
87 pub async fn load_or_initialize(
88 storage_path: Option<impl AsRef<Utf8Path>>,
89 fvk: &FullViewingKey,
90 node: Url,
91 ) -> anyhow::Result<Self> {
92 if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93 if path.exists() {
94 tracing::debug!(?path, "database exists");
95 return Self::load(path).await;
96 } else {
97 tracing::debug!(?path, "database does not exist");
98 }
99 };
100
101 let mut client = AppQueryServiceClient::connect(node.to_string())
102 .instrument(error_span!("connecting_to_endpoint"))
103 .await
104 .tap_err(|error| {
105 tracing::error!(?error, "failed to connect to app query service endpoint")
106 })?
107 .tap(|_| tracing::debug!("connected to app query service endpoint"));
108 let params = client
109 .app_parameters(tonic::Request::new(AppParametersRequest {}))
110 .instrument(error_span!("getting_app_parameters"))
111 .await?
112 .into_inner()
113 .try_into()?;
114
115 Self::initialize(storage_path, fvk.clone(), params).await
116 }
117
118 fn connect(
119 path: Option<impl AsRef<Utf8Path>>,
120 ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121 if let Some(path) = path {
122 let manager = SqliteConnectionManager::file(path.as_ref())
123 .with_flags(
124 OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127 )
128 .with_init(|conn| {
129 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133 conn.set_prepared_statement_cache_capacity(32);
136 Ok(())
137 });
138 Ok(r2d2::Pool::builder()
139 .max_size(1)
142 .build(manager)?)
143 } else {
144 let manager = SqliteConnectionManager::memory();
145 Ok(r2d2::Pool::builder()
151 .max_size(1)
152 .min_idle(Some(1))
153 .max_lifetime(Some(Duration::MAX))
154 .idle_timeout(Some(Duration::MAX))
155 .build(manager)?)
156 }
157 }
158
159 pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160 let storage = Self {
161 pool: Self::connect(Some(path))?,
162 uncommitted_height: Arc::new(Mutex::new(None)),
163 scanned_notes_tx: broadcast::channel(128).0,
164 scanned_nullifiers_tx: broadcast::channel(512).0,
165 scanned_swaps_tx: broadcast::channel(128).0,
166 };
167
168 spawn_blocking(move || {
169 let actual_schema_hash: String = storage
172 .pool
173 .get()?
174 .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175 row.get("schema_hash")
176 })
177 .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179 if actual_schema_hash != *SCHEMA_HASH {
180 let database_client_version: String = storage
181 .pool
182 .get()?
183 .query_row("SELECT client_version FROM client_version", (), |row| {
184 row.get("client_version")
185 })
186 .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188 anyhow::bail!(
189 "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190 database_client_version,
191 env!("CARGO_PKG_VERSION"),
192 );
193 }
194
195 Ok(storage)
196 })
197 .await?
198 }
199
200 pub async fn initialize(
201 storage_path: Option<impl AsRef<Utf8Path>>,
202 fvk: FullViewingKey,
203 params: AppParameters,
204 ) -> anyhow::Result<Self> {
205 tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207 let pool = Self::connect(storage_path)?;
209
210 let out = spawn_blocking(move || {
211 let mut conn = pool.get()?;
213 let tx = conn.transaction()?;
214
215 tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218 let params_bytes = params.encode_to_vec();
219 tx.execute(
220 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221 [¶ms_bytes[..]],
222 )?;
223
224 let fvk_bytes = fvk.encode_to_vec();
225 tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227 tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232 tx.execute(
234 "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235 [&*SCHEMA_HASH],
236 )?;
237
238 tx.execute(
240 "INSERT INTO client_version (client_version) VALUES (?1)",
241 [env!("CARGO_PKG_VERSION")],
242 )?;
243
244 tx.commit()?;
245 drop(conn);
246
247 anyhow::Ok(Storage {
248 pool,
249 uncommitted_height: Arc::new(Mutex::new(None)),
250 scanned_notes_tx: broadcast::channel(128).0,
251 scanned_nullifiers_tx: broadcast::channel(512).0,
252 scanned_swaps_tx: broadcast::channel(128).0,
253 })
254 })
255 .await??;
256
257 out.update_epoch(0, None, Some(0)).await?;
258
259 Ok(out)
260 }
261
262 pub async fn load_asset_metadata(
264 &self,
265 registry_path: impl AsRef<Utf8Path>,
266 ) -> anyhow::Result<()> {
267 tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
268 let registry_path = registry_path.as_ref();
269 let mut registry_json: serde_json::Value = serde_json::from_str(
271 std::fs::read_to_string(registry_path)
272 .context("failed to read file")?
273 .as_str(),
274 )
275 .context("failed to parse JSON")?;
276
277 let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
278 registry_json
279 .get_mut("assetById")
280 .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
281 .take(),
282 )
283 .context("could not parse asset registry")?;
284
285 for metadata in registry.into_values() {
286 self.record_asset(metadata).await?;
287 }
288
289 Ok(())
290 }
291
292 pub async fn balances(
294 &self,
295 address_index: Option<AddressIndex>,
296 asset_id: Option<asset::Id>,
297 ) -> anyhow::Result<Vec<BalanceEntry>> {
298 let pool = self.pool.clone();
299
300 spawn_blocking(move || {
301 let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
302 FROM notes
303 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
304 WHERE spendable_notes.height_spent IS NULL";
305
306 tracing::debug!(?query);
307
308 let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
310
311 for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
312 let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
313 let amount = row.get::<&str, Vec<u8>>("amount")?;
314 let address_index = row.get::<&str, Vec<u8>>("address_index")?;
315
316 Ok((asset_id, amount, address_index))
317 })? {
318 let (id, amount, index) = result?;
319
320 let id = Id::try_from(id.as_slice())?;
321
322 let amount: Amount = Amount::from_be_bytes(
323 amount
324 .as_slice()
325 .try_into()
326 .expect("amount slice of incorrect length"),
327 );
328
329 let index = AddressIndex::try_from(index.as_slice())?;
330
331 if let Some(address_index) = address_index {
333 if address_index != index {
334 continue;
335 }
336 }
337 if let Some(asset_id) = asset_id {
338 if asset_id != id {
339 continue;
340 }
341 }
342
343 balances
344 .entry(index)
345 .or_insert_with(BTreeMap::new)
346 .entry(id)
347 .and_modify(|e| *e += amount)
348 .or_insert(amount);
349 }
350
351 let entries = balances
352 .into_iter()
353 .flat_map(|(index, assets)| {
354 assets.into_iter().map(move |(id, amount)| BalanceEntry {
355 id,
356 amount: amount.into(),
357 address_index: index,
358 })
359 })
360 .collect::<Vec<_>>();
361 Ok(entries)
362 })
363 .await?
364 }
365
366 pub async fn note_by_commitment(
368 &self,
369 note_commitment: tct::StateCommitment,
370 await_detection: bool,
371 ) -> anyhow::Result<SpendableNoteRecord> {
372 let mut rx = self.scanned_notes_tx.subscribe();
375
376 let pool = self.pool.clone();
377
378 if let Some(record) = spawn_blocking(move || {
379 pool.get()?
381 .prepare(&format!(
382 "SELECT
383 notes.note_commitment,
384 spendable_notes.height_created,
385 notes.address,
386 notes.amount,
387 notes.asset_id,
388 notes.rseed,
389 spendable_notes.address_index,
390 spendable_notes.source,
391 spendable_notes.height_spent,
392 spendable_notes.nullifier,
393 spendable_notes.position,
394 tx.return_address
395 FROM notes
396 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
397 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
398 WHERE notes.note_commitment = x'{}'",
399 hex::encode(note_commitment.0.to_bytes())
400 ))?
401 .query_and_then((), |record| record.try_into())?
402 .next()
403 .transpose()
404 })
405 .await??
406 {
407 return Ok(record);
408 }
409
410 if !await_detection {
411 anyhow::bail!("Note commitment {} not found", note_commitment);
412 }
413
414 loop {
418 match rx.recv().await {
419 Ok(record) => {
420 if record.note_commitment == note_commitment {
421 return Ok(record);
422 }
423 }
424
425 Err(e) => match e {
426 RecvError::Closed => {
427 anyhow::bail!(
428 "Receiver error during note detection: closed (no more active senders)"
429 );
430 }
431 RecvError::Lagged(count) => {
432 anyhow::bail!(
433 "Receiver error during note detection: lagged (by {:?} messages)",
434 count
435 );
436 }
437 },
438 };
439 }
440 }
441
442 pub async fn swap_by_commitment(
444 &self,
445 swap_commitment: tct::StateCommitment,
446 await_detection: bool,
447 ) -> anyhow::Result<SwapRecord> {
448 let mut rx = self.scanned_swaps_tx.subscribe();
451
452 let pool = self.pool.clone();
453
454 if let Some(record) = spawn_blocking(move || {
455 pool.get()?
457 .prepare(&format!(
458 "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
459 hex::encode(swap_commitment.0.to_bytes())
460 ))?
461 .query_and_then((), |record| record.try_into())?
462 .next()
463 .transpose()
464 })
465 .await??
466 {
467 return Ok(record);
468 }
469
470 if !await_detection {
471 anyhow::bail!("swap commitment {} not found", swap_commitment);
472 }
473
474 loop {
478 match rx.recv().await {
479 Ok(record) => {
480 if record.swap_commitment == swap_commitment {
481 return Ok(record);
482 }
483 }
484
485 Err(e) => match e {
486 RecvError::Closed => {
487 anyhow::bail!(
488 "Receiver error during swap detection: closed (no more active senders)"
489 );
490 }
491 RecvError::Lagged(count) => {
492 anyhow::bail!(
493 "Receiver error during swap detection: lagged (by {:?} messages)",
494 count
495 );
496 }
497 },
498 };
499 }
500 }
501
502 pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
504 let pool = self.pool.clone();
505
506 let records = spawn_blocking(move || {
507 pool.get()?
509 .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
510 .query_and_then((), |record| record.try_into())?
511 .collect::<anyhow::Result<Vec<_>>>()
512 })
513 .await??;
514
515 Ok(records)
516 }
517
518 pub async fn nullifier_status(
520 &self,
521 nullifier: Nullifier,
522 await_detection: bool,
523 ) -> anyhow::Result<bool> {
524 let mut rx = self.scanned_nullifiers_tx.subscribe();
527
528 let pool = self.pool.clone();
530
531 let nullifier_bytes = nullifier.0.to_bytes().to_vec();
532
533 if let Some(height_spent) = spawn_blocking(move || {
535 pool.get()?
536 .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
537 .query_and_then([nullifier_bytes], |row| {
538 let height_spent: Option<u64> = row.get("height_spent")?;
539 anyhow::Ok(height_spent)
540 })?
541 .next()
542 .transpose()
543 })
544 .await??
545 {
546 let spent = height_spent.is_some();
547
548 if !await_detection || spent {
550 return Ok(spent);
551 }
552 }
553
554 if !await_detection {
557 return Ok(false);
558 }
559
560 loop {
563 let new_nullifier = rx.recv().await.context("change subscriber failed")?;
564
565 if new_nullifier == nullifier {
566 return Ok(true);
567 }
568 }
569 }
570
571 pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
573 if let Some(height) = *self.uncommitted_height.lock() {
575 return Ok(Some(height.get()));
576 }
577
578 let pool = self.pool.clone();
579
580 spawn_blocking(move || {
581 let height: Option<i64> = pool
582 .get()?
583 .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
584 .query_row([], |row| row.get::<_, Option<i64>>(0))?;
585
586 anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
587 })
588 .await?
589 }
590
591 pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
592 let pool = self.pool.clone();
593
594 spawn_blocking(move || {
595 let params_bytes = pool
596 .get()?
597 .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
598 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
599 .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
600
601 AppParameters::decode(params_bytes.as_slice())
602 })
603 .await?
604 }
605
606 pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
607 let pool = self.pool.clone();
608
609 spawn_blocking(move || {
610 let bytes = pool
611 .get()?
612 .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
613 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
614 .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
615
616 GasPrices::decode(bytes.as_slice())
617 })
618 .await?
619 }
620
621 pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
622 let pool = self.pool.clone();
623
624 spawn_blocking(move || {
625 let bytes = pool
626 .get()?
627 .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
628 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
629 .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
630
631 fmd::Parameters::decode(bytes.as_slice())
632 })
633 .await?
634 }
635
636 pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
637 let pool = self.pool.clone();
638
639 spawn_blocking(move || {
640 let bytes = pool
641 .get()?
642 .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
643 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
644 .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
645
646 FullViewingKey::decode(bytes.as_slice())
647 })
648 .await?
649 }
650
651 pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
652 let pool = self.pool.clone();
653 spawn_blocking(move || {
654 tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
655 })
656 .await?
657 }
658
659 pub async fn transaction_hashes(
661 &self,
662 start_height: Option<u64>,
663 end_height: Option<u64>,
664 ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
665 let starting_block = start_height.unwrap_or(0) as i64;
666 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
667
668 let pool = self.pool.clone();
669
670 spawn_blocking(move || {
671 pool.get()?
672 .prepare_cached(
673 "SELECT block_height, tx_hash
674 FROM tx
675 WHERE block_height BETWEEN ?1 AND ?2",
676 )?
677 .query_and_then([starting_block, ending_block], |row| {
678 let block_height: u64 = row.get("block_height")?;
679 let tx_hash: Vec<u8> = row.get("tx_hash")?;
680 anyhow::Ok((block_height, tx_hash))
681 })?
682 .collect()
683 })
684 .await?
685 }
686
687 pub async fn transactions(
689 &self,
690 start_height: Option<u64>,
691 end_height: Option<u64>,
692 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
693 let starting_block = start_height.unwrap_or(0) as i64;
694 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
695
696 let pool = self.pool.clone();
697
698 spawn_blocking(move || {
699 pool.get()?
700 .prepare_cached(
701 "SELECT block_height, tx_hash, tx_bytes
702 FROM tx
703 WHERE block_height BETWEEN ?1 AND ?2",
704 )?
705 .query_and_then([starting_block, ending_block], |row| {
706 let block_height: u64 = row.get("block_height")?;
707 let tx_hash: Vec<u8> = row.get("tx_hash")?;
708 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
709 let tx = Transaction::decode(tx_bytes.as_slice())?;
710 anyhow::Ok((block_height, tx_hash, tx))
711 })?
712 .collect()
713 })
714 .await?
715 }
716
717 pub async fn transaction_by_hash(
718 &self,
719 tx_hash: &[u8],
720 ) -> anyhow::Result<Option<(u64, Transaction)>> {
721 let pool = self.pool.clone();
722 let tx_hash = tx_hash.to_vec();
723
724 spawn_blocking(move || {
725 if let Some((block_height, tx_bytes)) = pool
726 .get()?
727 .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
728 .query_row([tx_hash], |row| {
729 let block_height: u64 = row.get("block_height")?;
730 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
731 Ok((block_height, tx_bytes))
732 })
733 .optional()?
734 {
735 let tx = Transaction::decode(tx_bytes.as_slice())?;
736 Ok(Some((block_height, tx)))
737 } else {
738 Ok(None)
739 }
740 })
741 .await?
742 }
743
744 pub async fn note_by_nullifier(
746 &self,
747 nullifier: Nullifier,
748 await_detection: bool,
749 ) -> anyhow::Result<SpendableNoteRecord> {
750 let mut rx = self.scanned_notes_tx.subscribe();
753
754 let pool = self.pool.clone();
756
757 let nullifier_bytes = nullifier.to_bytes().to_vec();
758
759 if let Some(record) = spawn_blocking(move || {
760 let record = pool
761 .get()?
762 .prepare(&format!(
763 "SELECT
764 notes.note_commitment,
765 spendable_notes.height_created,
766 notes.address,
767 notes.amount,
768 notes.asset_id,
769 notes.rseed,
770 spendable_notes.address_index,
771 spendable_notes.source,
772 spendable_notes.height_spent,
773 spendable_notes.nullifier,
774 spendable_notes.position,
775 tx.return_address
776 FROM notes
777 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
778 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
779 WHERE hex(spendable_notes.nullifier) = \"{}\"",
780 hex::encode_upper(nullifier_bytes)
781 ))?
782 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
783 .next()
784 .transpose()?;
785
786 anyhow::Ok(record)
787 })
788 .await??
789 {
790 return Ok(record);
791 }
792
793 if !await_detection {
794 anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
795 }
796
797 loop {
801 match rx.recv().await {
802 Ok(record) => {
803 if record.nullifier == nullifier {
804 return Ok(record);
805 }
806 }
807
808 Err(e) => match e {
809 RecvError::Closed => {
810 anyhow::bail!(
811 "Receiver error during note detection: closed (no more active senders)"
812 );
813 }
814 RecvError::Lagged(count) => {
815 anyhow::bail!(
816 "Receiver error during note detection: lagged (by {:?} messages)",
817 count
818 );
819 }
820 },
821 };
822 }
823 }
824
825 pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
826 let pool = self.pool.clone();
827
828 spawn_blocking(move || {
829 pool.get()?
830 .prepare_cached("SELECT metadata FROM assets")?
831 .query_and_then([], |row| {
832 let metadata_json = row.get::<_, String>("metadata")?;
833 let denom_metadata = serde_json::from_str(&metadata_json)?;
834
835 anyhow::Ok(denom_metadata)
836 })?
837 .collect()
838 })
839 .await?
840 }
841
842 pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
843 let id = id.to_bytes().to_vec();
844
845 let pool = self.pool.clone();
846
847 spawn_blocking(move || {
848 pool.get()?
849 .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
850 .query_and_then([id], |row| {
851 let metadata_json = row.get::<_, String>("metadata")?;
852 let denom_metadata = serde_json::from_str(&metadata_json)?;
853 anyhow::Ok(denom_metadata)
854 })?
855 .next()
856 .transpose()
857 })
858 .await?
859 }
860
861 pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
864 let pattern = pattern.to_owned();
865
866 let pool = self.pool.clone();
867
868 spawn_blocking(move || {
869 pool.get()?
870 .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
871 .query_and_then([pattern], |row| {
872 let metadata_json = row.get::<_, String>("metadata")?;
873 let denom_metadata = serde_json::from_str(&metadata_json)?;
874 anyhow::Ok(denom_metadata)
875 })?
876 .collect()
877 })
878 .await?
879 }
880
881 pub async fn notes(
882 &self,
883 include_spent: bool,
884 asset_id: Option<asset::Id>,
885 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
886 amount_to_spend: Option<Amount>,
887 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
888 let spent_clause = match include_spent {
891 false => "NULL",
892 true => "height_spent",
893 };
894
895 let asset_clause = asset_id
898 .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
899 .unwrap_or_else(|| "asset_id".to_string());
900
901 let address_clause = "address_index".to_string();
912
913 let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
919 let mut amount_total = Amount::zero();
920
921 let pool = self.pool.clone();
922
923 spawn_blocking(move || {
924 let mut output: Vec<SpendableNoteRecord> = Vec::new();
925
926 for result in pool
927 .get()?
928 .prepare(&format!(
929 "SELECT notes.note_commitment,
930 spendable_notes.height_created,
931 notes.address,
932 notes.amount,
933 notes.asset_id,
934 notes.rseed,
935 spendable_notes.address_index,
936 spendable_notes.source,
937 spendable_notes.height_spent,
938 spendable_notes.nullifier,
939 spendable_notes.position,
940 tx.return_address
941 FROM notes
942 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
943 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
944 WHERE spendable_notes.height_spent IS {spent_clause}
945 AND notes.asset_id IS {asset_clause}
946 AND spendable_notes.address_index IS {address_clause}"
947 ))?
948 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
949 {
950 let record = result?;
951
952 if let Some(address_index) = address_index {
955 if record.address_index.account != address_index.account {
956 continue;
957 }
958 }
959 let amount = record.note.amount();
960
961 if amount.value() > 0 {
964 output.push(record);
965 }
966
967 if amount_cutoff {
970 amount_total += amount;
972 if amount_total >= amount_to_spend.unwrap_or_default() {
973 break;
974 }
975 }
976 }
977
978 if amount_total < amount_to_spend.unwrap_or_default() {
979 anyhow::bail!(
980 "requested amount of {} exceeds total of {}",
981 amount_to_spend.unwrap_or_default(),
982 amount_total
983 );
984 }
985
986 anyhow::Ok(output)
987 })
988 .await?
989 }
990
991 pub async fn notes_for_voting(
992 &self,
993 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
994 votable_at_height: u64,
995 ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
996 let address_clause = address_index
999 .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
1000 .unwrap_or_else(|| "address_index".to_string());
1001
1002 let pool = self.pool.clone();
1003
1004 spawn_blocking(move || {
1005 let mut lock = pool.get()?;
1006 let dbtx = lock.transaction()?;
1007
1008 let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1009 .prepare(&format!(
1010 "SELECT notes.note_commitment,
1011 spendable_notes.height_created,
1012 notes.address,
1013 notes.amount,
1014 notes.asset_id,
1015 notes.rseed,
1016 spendable_notes.address_index,
1017 spendable_notes.source,
1018 spendable_notes.height_spent,
1019 spendable_notes.nullifier,
1020 spendable_notes.position
1021 FROM
1022 notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1023 WHERE
1024 spendable_notes.address_index IS {address_clause}
1025 AND notes.asset_id IN (
1026 SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1027 )
1028 AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1029 AND (spendable_notes.height_created < {votable_at_height})
1030 ",
1031 ))?
1032 .query_and_then((), |row| row.try_into())?
1033 .collect::<anyhow::Result<Vec<_>>>()?;
1034
1035 let mut results = Vec::new();
1038 for record in spendable_note_records {
1039 let asset_id = record.note.asset_id().to_bytes().to_vec();
1040 let denom: String = dbtx.query_row_and_then(
1041 "SELECT denom FROM assets WHERE asset_id = ?1",
1042 [asset_id],
1043 |row| row.get("denom"),
1044 )?;
1045
1046 let identity_key = DelegationToken::from_str(&denom)
1047 .context("invalid delegation token denom")?
1048 .validator();
1049
1050 results.push((record, identity_key));
1051 }
1052
1053 Ok(results)
1054 }).await?
1055 }
1056
1057 #[tracing::instrument(skip(self))]
1058 pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1059 tracing::debug!(?asset);
1060
1061 let asset_id = asset.id().to_bytes().to_vec();
1062 let denom = asset.base_denom().denom;
1063 let metadata_json = serde_json::to_string(&asset)?;
1064
1065 let pool = self.pool.clone();
1066
1067 spawn_blocking(move || {
1068 pool.get()?
1069 .execute(
1070 "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1071 (asset_id, denom, metadata_json),
1072 )
1073 .map_err(anyhow::Error::from)
1074 })
1075 .await??;
1076
1077 Ok(())
1078 }
1079
1080 pub async fn record_auction_with_state(
1081 &self,
1082 auction_id: AuctionId,
1083 auction_state: u64,
1084 ) -> anyhow::Result<()> {
1085 let auction_id = auction_id.0.to_vec();
1086
1087 let pool = self.pool.clone();
1088
1089 spawn_blocking(move || {
1090 let mut lock = pool.get()?;
1091 let tx = lock.transaction()?;
1092 tx.execute(
1093 "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1094 (auction_id.clone(), auction_state),
1095 )?;
1096 tx.execute(
1097 "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1098 (auction_id, auction_state),
1099 )
1100 .map_err(anyhow::Error::from)?;
1101
1102 tx.commit()?;
1103 Ok::<(), anyhow::Error>(())
1104 })
1105 .await??;
1106
1107 Ok(())
1108 }
1109
1110 pub async fn update_auction_with_note_commitment(
1111 &self,
1112 auction_id: AuctionId,
1113 note_commitment: StateCommitment,
1114 ) -> anyhow::Result<()> {
1115 let auction_id = auction_id.0.to_vec();
1116 let blob_nc = note_commitment.0.to_bytes().to_vec();
1117
1118 let pool = self.pool.clone();
1119
1120 spawn_blocking(move || {
1121 pool.get()?
1122 .execute(
1123 "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1124 (blob_nc, auction_id),
1125 )
1126 .map_err(anyhow::Error::from)
1127 })
1128 .await??;
1129
1130 Ok(())
1131 }
1132
1133 pub async fn fetch_auctions_by_account(
1134 &self,
1135 account_filter: Option<AddressIndex>,
1136 include_inactive: bool,
1137 ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 )>> {
1138 let account_clause = account_filter
1139 .map(|idx| {
1140 format!(
1141 "AND spendable_notes.address_index = x'{}'",
1142 hex::encode(idx.to_bytes())
1143 )
1144 })
1145 .unwrap_or_else(|| "".to_string());
1146
1147 let active_clause = if !include_inactive {
1148 "AND auctions.auction_state = 0"
1149 } else {
1150 ""
1151 };
1152
1153 let query = format!(
1154 "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1155 FROM auctions
1156 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1157 JOIN notes ON auctions.note_commitment = notes.note_commitment
1158 WHERE 1 = 1
1159 {account_clause}
1160 {active_clause}",
1161 account_clause = account_clause,
1162 active_clause = active_clause,
1163 );
1164
1165 let pool = self.pool.clone();
1166
1167 spawn_blocking(move || {
1168 let mut conn = pool.get()?;
1169 let tx = conn.transaction()?;
1170
1171 let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1172 .prepare(&query)?
1173 .query_and_then((), |row| {
1174 let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1175 let array_auction_id: [u8; 32] = raw_auction_id
1176 .try_into()
1177 .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1178 let auction_id = AuctionId(array_auction_id);
1179 let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1180 let local_seq: u64 = row.get("auction_state")?;
1181 Ok((auction_id, spendable_note_record, local_seq))
1182 })?
1183 .collect::<anyhow::Result<Vec<_>>>()?;
1184
1185 Ok(spendable_note_records)
1186 })
1187 .await?
1188 }
1189
1190 pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1191 let position_id = position.id().0.to_vec();
1192
1193 let position_state = position.state.to_string();
1194 let trading_pair = position.phi.pair.to_string();
1195
1196 let pool = self.pool.clone();
1197
1198 spawn_blocking(move || {
1199 pool.get()?
1200 .execute(
1201 "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1202 (position_id, position_state, trading_pair),
1203 )
1204 .map_err(anyhow::Error::from)
1205 })
1206 .await??;
1207
1208 Ok(())
1209 }
1210
1211 pub async fn update_position(
1212 &self,
1213 position_id: position::Id,
1214 position_state: position::State,
1215 ) -> anyhow::Result<()> {
1216 let position_id = position_id.0.to_vec();
1217 let position_state = position_state.to_string();
1218
1219 let pool = self.pool.clone();
1220
1221 spawn_blocking(move || {
1222 pool.get()?
1223 .execute(
1224 "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1225 (position_state, position_id),
1226 )
1227 .map_err(anyhow::Error::from)
1228 })
1229 .await??;
1230
1231 Ok(())
1232 }
1233
1234 pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1235 let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1237 anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1238 })?;
1239
1240 if height != last_sync_height + 1 {
1241 anyhow::bail!(
1242 "Wrong block height {} for latest sync height {}",
1243 height,
1244 last_sync_height
1245 );
1246 }
1247
1248 *self.uncommitted_height.lock() = Some(height.try_into()?);
1249 Ok(())
1250 }
1251
1252 fn record_note_inner(
1253 dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1254 note: &Note,
1255 ) -> anyhow::Result<()> {
1256 let note_commitment = note.commit().0.to_bytes().to_vec();
1257 let address = note.address().to_vec();
1258 let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1259 let asset_id = note.asset_id().to_bytes().to_vec();
1260 let rseed = note.rseed().to_bytes().to_vec();
1261
1262 dbtx.execute(
1263 "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1264 VALUES (?1, ?2, ?3, ?4, ?5)
1265 ON CONFLICT (note_commitment)
1266 DO UPDATE SET
1267 address = excluded.address,
1268 amount = excluded.amount,
1269 asset_id = excluded.asset_id,
1270 rseed = excluded.rseed",
1271 (note_commitment, address, amount, asset_id, rseed),
1272 )?;
1273
1274 Ok(())
1275 }
1276
1277 pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1278 let pool = self.pool.clone();
1279 let mut lock = pool.get()?;
1280 let dbtx = lock.transaction()?;
1281
1282 Storage::record_note_inner(&dbtx, ¬e)?;
1283
1284 dbtx.commit()?;
1285
1286 Ok(())
1287 }
1288
1289 pub async fn scan_advice(
1295 &self,
1296 note_commitments: Vec<note::StateCommitment>,
1297 ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1298 if note_commitments.is_empty() {
1299 return Ok(BTreeMap::new());
1300 }
1301
1302 let pool = self.pool.clone();
1303
1304 spawn_blocking(move || {
1308 pool.get()?
1309 .prepare(&format!(
1310 "SELECT notes.note_commitment,
1311 notes.address,
1312 notes.amount,
1313 notes.asset_id,
1314 notes.rseed
1315 FROM notes
1316 LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1317 WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1318 note_commitments
1319 .iter()
1320 .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1321 .collect::<Vec<_>>()
1322 .join(", ")
1323 ))?
1324 .query_and_then((), |row| {
1325 let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1326 let amount = row.get::<_, [u8; 16]>("amount")?;
1327 let amount_u128: u128 = u128::from_be_bytes(amount);
1328 let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1329 let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1330 let note = Note::from_parts(
1331 address,
1332 Value {
1333 amount: amount_u128.into(),
1334 asset_id,
1335 },
1336 rseed,
1337 )?;
1338 anyhow::Ok((note.commit(), note))
1339 })?
1340 .collect::<anyhow::Result<BTreeMap<_, _>>>()
1341 }).await?
1342 }
1343
1344 pub async fn filter_nullifiers(
1346 &self,
1347 nullifiers: Vec<Nullifier>,
1348 ) -> anyhow::Result<Vec<Nullifier>> {
1349 if nullifiers.is_empty() {
1350 return Ok(Vec::new());
1351 }
1352
1353 let pool = self.pool.clone();
1354
1355 spawn_blocking(move || {
1356 pool.get()?
1357 .prepare(&format!(
1358 "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1359 nullifiers
1360 .iter()
1361 .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1362 .collect::<Vec<String>>()
1363 .join(",")
1364 ))?
1365 .query_and_then((), |row| {
1366 let nullifier: Vec<u8> = row.get("nullifier")?;
1367 nullifier.as_slice().try_into()
1368 })?
1369 .collect()
1370 })
1371 .await?
1372 }
1373
1374 pub async fn record_block(
1375 &self,
1376 filtered_block: FilteredBlock,
1377 transactions: Vec<Transaction>,
1378 sct: &mut tct::Tree,
1379 channel: tonic::transport::Channel,
1380 ) -> anyhow::Result<()> {
1381 let last_sync_height = self.last_sync_height().await?;
1383
1384 let correct_height = match last_sync_height {
1385 Some(cur_height) => filtered_block.height == cur_height + 1,
1387 None => filtered_block.height == 0,
1389 };
1390
1391 if !correct_height {
1392 anyhow::bail!(
1393 "Wrong block height {} for latest sync height {:?}",
1394 filtered_block.height,
1395 last_sync_height
1396 );
1397 }
1398
1399 let pool = self.pool.clone();
1400 let uncommitted_height = self.uncommitted_height.clone();
1401 let scanned_notes_tx = self.scanned_notes_tx.clone();
1402 let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1403 let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1404
1405 let fvk = self.full_viewing_key().await?;
1406
1407 let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1409 let mut client = AppQueryServiceClient::new(channel);
1411 Some(
1412 client
1413 .app_parameters(tonic::Request::new(AppParametersRequest {}))
1414 .await?
1415 .into_inner()
1416 .try_into()?,
1417 )
1418 } else {
1419 None
1420 };
1421
1422 let mut new_sct = sct.clone();
1430
1431 *sct = spawn_blocking(move || {
1432 let mut lock = pool.get()?;
1433 let mut dbtx = lock.transaction()?;
1434
1435 if let Some(params) = new_app_parameters {
1436 let params_bytes = params.encode_to_vec();
1437 dbtx.execute(
1439 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1440 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1441 [¶ms_bytes[..]],
1442 )?;
1443 }
1444
1445 for note_record in filtered_block.new_notes.values() {
1447 let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1448 let height_created = filtered_block.height as i64;
1449 let address_index = note_record.address_index.to_bytes().to_vec();
1450 let nullifier = note_record.nullifier.to_bytes().to_vec();
1451 let position = (u64::from(note_record.position)) as i64;
1452 let source = note_record.source.encode_to_vec();
1453 let tx_hash = match note_record.source {
1455 CommitmentSource::Transaction { id } => id,
1456 _ => None,
1457 };
1458
1459 Storage::record_note_inner(&dbtx, ¬e_record.note)?;
1461
1462 dbtx.execute(
1463 "INSERT INTO spendable_notes
1464 (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1465 VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1466 ON CONFLICT (note_commitment)
1467 DO UPDATE SET nullifier = excluded.nullifier,
1468 position = excluded.position,
1469 height_created = excluded.height_created,
1470 address_index = excluded.address_index,
1471 source = excluded.source,
1472 height_spent = excluded.height_spent,
1473 tx_hash = excluded.tx_hash",
1474 (
1475 ¬e_commitment,
1476 &nullifier,
1477 &position,
1478 &height_created,
1479 &address_index,
1480 &source,
1481 &tx_hash,
1483 ),
1484 )?;
1485 }
1486
1487 for swap in filtered_block.new_swaps.values() {
1489 let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1490 let swap_bytes = swap.swap.encode_to_vec();
1491 let position = (u64::from(swap.position)) as i64;
1492 let nullifier = swap.nullifier.to_bytes().to_vec();
1493 let source = swap.source.encode_to_vec();
1494 let output_data = swap.output_data.encode_to_vec();
1495
1496 dbtx.execute(
1497 "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1498 VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1499 ON CONFLICT (swap_commitment)
1500 DO UPDATE SET swap = excluded.swap,
1501 position = excluded.position,
1502 nullifier = excluded.nullifier,
1503 output_data = excluded.output_data,
1504 height_claimed = excluded.height_claimed,
1505 source = excluded.source",
1506 (
1507 &swap_commitment,
1508 &swap_bytes,
1509 &position,
1510 &nullifier,
1511 &output_data,
1512 &source,
1514 ),
1515 )?;
1516 }
1517
1518 for nullifier in &filtered_block.spent_nullifiers {
1520 let height_spent = filtered_block.height as i64;
1521 let nullifier_bytes = nullifier.to_bytes().to_vec();
1522
1523 let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1524 "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1525 )?
1526 .query_and_then(
1527 (height_spent, &nullifier_bytes),
1528 |row| {
1529 let bytes: Vec<u8> = row.get("note_commitment")?;
1530 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1531 },
1532 )?
1533 .next()
1534 .transpose()?;
1535
1536 let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1537 "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1538 )?
1539 .query_and_then(
1540 (height_spent, &nullifier_bytes),
1541 |row| {
1542 let bytes: Vec<u8> = row.get("swap_commitment")?;
1543 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1544 },
1545 )?
1546 .next()
1547 .transpose()?;
1548
1549 let spent_denom: String
1551 = dbtx.prepare_cached(
1552 "SELECT denom FROM assets
1553 WHERE asset_id ==
1554 (SELECT asset_id FROM notes
1555 WHERE note_commitment ==
1556 (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1557 )?
1558 .query_and_then(
1559 [&nullifier_bytes],
1560 |row| row.get("denom"),
1561 )?
1562 .next()
1563 .transpose()?
1564 .unwrap_or("unknown".to_string());
1565
1566 if let Some(spent_commitment) = spent_commitment {
1568 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1569 if DelegationToken::from_str(&spent_denom).is_err() {
1574 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1575 new_sct.forget(spent_commitment);
1576 }
1577 };
1578
1579 if let Some(spent_swap_commitment) = swap_commitment {
1581 tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1582 new_sct.forget(spent_swap_commitment);
1583 };
1584 }
1585
1586 new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1588
1589 for transaction in transactions {
1591 let tx_bytes = transaction.encode_to_vec();
1592 let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1594 let tx_hash = tx_hash_owned.as_slice();
1595 let tx_block_height = filtered_block.height as i64;
1596 let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1597 let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1598 let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1599
1600 tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1601
1602 dbtx.execute(
1603 "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1604 (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1605 )?;
1606
1607 for nf in transaction.spent_nullifiers() {
1609 let nf_bytes = nf.0.to_bytes().to_vec();
1610 dbtx.execute(
1611 "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1612 (&nf_bytes, &tx_hash),
1613 )?;
1614 }
1615 }
1616
1617 if filtered_block.fmd_parameters.is_some() {
1619 let fmd_parameters_bytes =
1620 &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1621
1622 dbtx.execute(
1623 "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1624 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1625 [&fmd_parameters_bytes],
1626 )?;
1627 }
1628
1629 if filtered_block.gas_prices.is_some() {
1631 let gas_prices_bytes =
1632 &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1633
1634 dbtx.execute(
1635 "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1636 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1637 [&gas_prices_bytes],
1638 )?;
1639 }
1640
1641 let latest_sync_height = filtered_block.height as i64;
1643 dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1644
1645 dbtx.commit()?;
1647
1648 uncommitted_height.lock().take();
1656
1657 for note_record in filtered_block.new_notes.values() {
1661 let _ = scanned_notes_tx.send(note_record.clone());
1665 }
1666
1667 for nullifier in filtered_block.spent_nullifiers.iter() {
1668 let _ = scanned_nullifiers_tx.send(*nullifier);
1672 }
1673
1674 for swap_record in filtered_block.new_swaps.values() {
1675 let _ = scanned_swaps_tx.send(swap_record.clone());
1679 }
1680
1681 anyhow::Ok(new_sct)
1682 })
1683 .await??;
1684
1685 Ok(())
1686 }
1687
1688 pub async fn owned_position_ids(
1689 &self,
1690 position_state: Option<State>,
1691 trading_pair: Option<TradingPair>,
1692 ) -> anyhow::Result<Vec<position::Id>> {
1693 let pool = self.pool.clone();
1694
1695 let state_clause = match position_state {
1696 Some(state) => format!("position_state = \"{}\"", state),
1697 None => "".to_string(),
1698 };
1699
1700 let pair_clause = match trading_pair {
1701 Some(pair) => format!("trading_pair = \"{}\"", pair),
1702 None => "".to_string(),
1703 };
1704
1705 spawn_blocking(move || {
1706 let mut q = "SELECT position_id FROM positions".to_string();
1707 match (position_state.is_some(), trading_pair.is_some()) {
1708 (true, true) => {
1709 q = q + " WHERE " + &state_clause + " AND " + &pair_clause;
1710 }
1711 (true, false) => {
1712 q = q + " WHERE " + &state_clause;
1713 }
1714 (false, true) => {
1715 q = q + " WHERE " + &pair_clause;
1716 }
1717 (false, false) => (),
1718 };
1719
1720 pool.get()?
1721 .prepare_cached(&q)?
1722 .query_and_then([], |row| {
1723 let position_id: Vec<u8> = row.get("position_id")?;
1724 Ok(position::Id(position_id.as_slice().try_into()?))
1725 })?
1726 .collect()
1727 })
1728 .await?
1729 }
1730
1731 pub async fn notes_by_sender(
1732 &self,
1733 return_address: &Address,
1734 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1735 let pool = self.pool.clone();
1736
1737 let query = "SELECT notes.note_commitment,
1738 spendable_notes.height_created,
1739 notes.address,
1740 notes.amount,
1741 notes.asset_id,
1742 notes.rseed,
1743 spendable_notes.address_index,
1744 spendable_notes.source,
1745 spendable_notes.height_spent,
1746 spendable_notes.nullifier,
1747 spendable_notes.position
1748 FROM notes
1749 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1750 JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1751 WHERE tx.return_address = ?1";
1752
1753 let return_address = return_address.to_vec();
1754
1755 let records = spawn_blocking(move || {
1756 pool.get()?
1757 .prepare(query)?
1758 .query_and_then([return_address], |record| record.try_into())?
1759 .collect::<anyhow::Result<Vec<_>>>()
1760 })
1761 .await??;
1762
1763 Ok(records)
1764 }
1765
1766 pub async fn transactions_matching_memo(
1770 &self,
1771 pattern: String,
1772 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1773 let pattern = pattern.to_owned();
1774 tracing::trace!(?pattern, "searching for memos matching");
1775 let pool = self.pool.clone();
1776
1777 spawn_blocking(move || {
1778 pool.get()?
1779 .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1780 .query_and_then([pattern], |row| {
1781 let block_height: u64 = row.get("block_height")?;
1782 let tx_hash: Vec<u8> = row.get("tx_hash")?;
1783 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1784 let tx = Transaction::decode(tx_bytes.as_slice())?;
1785 let memo_text: String = row.get("memo_text")?;
1786 anyhow::Ok((block_height, tx_hash, tx, memo_text))
1787 })?
1788 .collect()
1789 })
1790 .await?
1791 }
1792
1793 pub async fn update_epoch(
1795 &self,
1796 epoch: u64,
1797 root: Option<Root>,
1798 start_height: Option<u64>,
1799 ) -> anyhow::Result<()> {
1800 let pool = self.pool.clone();
1801
1802 spawn_blocking(move || {
1803 pool.get()?
1804 .execute(
1805 r#"
1806 INSERT INTO epochs(epoch_index, root, start_height)
1807 VALUES (?1, ?2, ?3)
1808 ON CONFLICT(epoch_index)
1809 DO UPDATE SET
1810 root = COALESCE(?2, root),
1811 start_height = COALESCE(?3, start_height)
1812 "#,
1813 (epoch, root.map(|x| x.encode_to_vec()), start_height),
1814 )
1815 .map_err(anyhow::Error::from)
1816 })
1817 .await??;
1818
1819 Ok(())
1820 }
1821
1822 pub async fn get_epoch(&self, epoch: u64) -> anyhow::Result<(Option<Root>, Option<u64>)> {
1827 let pool = self.pool.clone();
1828
1829 spawn_blocking(move || {
1830 pool.get()?
1831 .query_row_and_then(
1832 r#"
1833 SELECT root, start_height
1834 FROM epochs
1835 WHERE epoch_index = ?1
1836 "#,
1837 (epoch,),
1838 |row| {
1839 let root_raw: Option<Vec<u8>> = row.get("root")?;
1840 let start_height: Option<u64> = row.get("start_height")?;
1841 let root = root_raw.map(|x| Root::decode(x.as_slice())).transpose()?;
1842 anyhow::Ok((root, start_height))
1843 },
1844 )
1845 .map_err(anyhow::Error::from)
1846 })
1847 .await?
1848 }
1849}