1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10 rusqlite::{OpenFlags, OptionalExtension},
11 SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16 sync::broadcast::{self, error::RecvError},
17 task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25 lp::position::{self, Position, State},
26 TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32 core::app::v1::{
33 query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34 },
35 DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct as tct;
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51 pub id: Id,
52 pub amount: u128,
53 pub address_index: AddressIndex,
54}
55
56static SCHEMA_HASH: Lazy<String> =
58 Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62 pool: r2d2::Pool<SqliteConnectionManager>,
63
64 uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73 scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74 scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75 scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79 #[tracing::instrument(
81 skip_all,
82 fields(
83 path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84 url = %node,
85 )
86 )]
87 pub async fn load_or_initialize(
88 storage_path: Option<impl AsRef<Utf8Path>>,
89 fvk: &FullViewingKey,
90 node: Url,
91 ) -> anyhow::Result<Self> {
92 if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93 if path.exists() {
94 tracing::debug!(?path, "database exists");
95 return Self::load(path).await;
96 } else {
97 tracing::debug!(?path, "database does not exist");
98 }
99 };
100
101 let mut client = AppQueryServiceClient::connect(node.to_string())
102 .instrument(error_span!("connecting_to_endpoint"))
103 .await
104 .tap_err(|error| {
105 tracing::error!(?error, "failed to connect to app query service endpoint")
106 })?
107 .tap(|_| tracing::debug!("connected to app query service endpoint"));
108 let params = client
109 .app_parameters(tonic::Request::new(AppParametersRequest {}))
110 .instrument(error_span!("getting_app_parameters"))
111 .await?
112 .into_inner()
113 .try_into()?;
114
115 Self::initialize(storage_path, fvk.clone(), params).await
116 }
117
118 fn connect(
119 path: Option<impl AsRef<Utf8Path>>,
120 ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121 if let Some(path) = path {
122 let manager = SqliteConnectionManager::file(path.as_ref())
123 .with_flags(
124 OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127 )
128 .with_init(|conn| {
129 conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133 conn.set_prepared_statement_cache_capacity(32);
136 Ok(())
137 });
138 Ok(r2d2::Pool::builder()
139 .max_size(1)
142 .build(manager)?)
143 } else {
144 let manager = SqliteConnectionManager::memory();
145 Ok(r2d2::Pool::builder()
151 .max_size(1)
152 .min_idle(Some(1))
153 .max_lifetime(Some(Duration::MAX))
154 .idle_timeout(Some(Duration::MAX))
155 .build(manager)?)
156 }
157 }
158
159 pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160 let storage = Self {
161 pool: Self::connect(Some(path))?,
162 uncommitted_height: Arc::new(Mutex::new(None)),
163 scanned_notes_tx: broadcast::channel(128).0,
164 scanned_nullifiers_tx: broadcast::channel(512).0,
165 scanned_swaps_tx: broadcast::channel(128).0,
166 };
167
168 spawn_blocking(move || {
169 let actual_schema_hash: String = storage
172 .pool
173 .get()?
174 .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175 row.get("schema_hash")
176 })
177 .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179 if actual_schema_hash != *SCHEMA_HASH {
180 let database_client_version: String = storage
181 .pool
182 .get()?
183 .query_row("SELECT client_version FROM client_version", (), |row| {
184 row.get("client_version")
185 })
186 .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188 anyhow::bail!(
189 "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190 database_client_version,
191 env!("CARGO_PKG_VERSION"),
192 );
193 }
194
195 Ok(storage)
196 })
197 .await?
198 }
199
200 pub async fn initialize(
201 storage_path: Option<impl AsRef<Utf8Path>>,
202 fvk: FullViewingKey,
203 params: AppParameters,
204 ) -> anyhow::Result<Self> {
205 tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207 let pool = Self::connect(storage_path)?;
209
210 spawn_blocking(move || {
211 let mut conn = pool.get()?;
213 let tx = conn.transaction()?;
214
215 tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218 let params_bytes = params.encode_to_vec();
219 tx.execute(
220 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221 [¶ms_bytes[..]],
222 )?;
223
224 let fvk_bytes = fvk.encode_to_vec();
225 tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227 tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232 tx.execute(
234 "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235 [&*SCHEMA_HASH],
236 )?;
237
238 tx.execute(
240 "INSERT INTO client_version (client_version) VALUES (?1)",
241 [env!("CARGO_PKG_VERSION")],
242 )?;
243
244 tx.commit()?;
245 drop(conn);
246
247 Ok(Storage {
248 pool,
249 uncommitted_height: Arc::new(Mutex::new(None)),
250 scanned_notes_tx: broadcast::channel(128).0,
251 scanned_nullifiers_tx: broadcast::channel(512).0,
252 scanned_swaps_tx: broadcast::channel(128).0,
253 })
254 })
255 .await?
256 }
257
258 pub async fn load_asset_metadata(
260 &self,
261 registry_path: impl AsRef<Utf8Path>,
262 ) -> anyhow::Result<()> {
263 tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
264 let registry_path = registry_path.as_ref();
265 let mut registry_json: serde_json::Value = serde_json::from_str(
267 std::fs::read_to_string(registry_path)
268 .context("failed to read file")?
269 .as_str(),
270 )
271 .context("failed to parse JSON")?;
272
273 let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
274 registry_json
275 .get_mut("assetById")
276 .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
277 .take(),
278 )
279 .context("could not parse asset registry")?;
280
281 for metadata in registry.into_values() {
282 self.record_asset(metadata).await?;
283 }
284
285 Ok(())
286 }
287
288 pub async fn balances(
290 &self,
291 address_index: Option<AddressIndex>,
292 asset_id: Option<asset::Id>,
293 ) -> anyhow::Result<Vec<BalanceEntry>> {
294 let pool = self.pool.clone();
295
296 spawn_blocking(move || {
297 let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
298 FROM notes
299 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
300 WHERE spendable_notes.height_spent IS NULL";
301
302 tracing::debug!(?query);
303
304 let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
306
307 for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
308 let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
309 let amount = row.get::<&str, Vec<u8>>("amount")?;
310 let address_index = row.get::<&str, Vec<u8>>("address_index")?;
311
312 Ok((asset_id, amount, address_index))
313 })? {
314 let (id, amount, index) = result?;
315
316 let id = Id::try_from(id.as_slice())?;
317
318 let amount: Amount = Amount::from_be_bytes(
319 amount
320 .as_slice()
321 .try_into()
322 .expect("amount slice of incorrect length"),
323 );
324
325 let index = AddressIndex::try_from(index.as_slice())?;
326
327 if let Some(address_index) = address_index {
329 if address_index != index {
330 continue;
331 }
332 }
333 if let Some(asset_id) = asset_id {
334 if asset_id != id {
335 continue;
336 }
337 }
338
339 balances
340 .entry(index)
341 .or_insert_with(BTreeMap::new)
342 .entry(id)
343 .and_modify(|e| *e += amount)
344 .or_insert(amount);
345 }
346
347 let entries = balances
348 .into_iter()
349 .flat_map(|(index, assets)| {
350 assets.into_iter().map(move |(id, amount)| BalanceEntry {
351 id,
352 amount: amount.into(),
353 address_index: index,
354 })
355 })
356 .collect::<Vec<_>>();
357 Ok(entries)
358 })
359 .await?
360 }
361
362 pub async fn note_by_commitment(
364 &self,
365 note_commitment: tct::StateCommitment,
366 await_detection: bool,
367 ) -> anyhow::Result<SpendableNoteRecord> {
368 let mut rx = self.scanned_notes_tx.subscribe();
371
372 let pool = self.pool.clone();
373
374 if let Some(record) = spawn_blocking(move || {
375 pool.get()?
377 .prepare(&format!(
378 "SELECT
379 notes.note_commitment,
380 spendable_notes.height_created,
381 notes.address,
382 notes.amount,
383 notes.asset_id,
384 notes.rseed,
385 spendable_notes.address_index,
386 spendable_notes.source,
387 spendable_notes.height_spent,
388 spendable_notes.nullifier,
389 spendable_notes.position,
390 tx.return_address
391 FROM notes
392 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
393 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
394 WHERE notes.note_commitment = x'{}'",
395 hex::encode(note_commitment.0.to_bytes())
396 ))?
397 .query_and_then((), |record| record.try_into())?
398 .next()
399 .transpose()
400 })
401 .await??
402 {
403 return Ok(record);
404 }
405
406 if !await_detection {
407 anyhow::bail!("Note commitment {} not found", note_commitment);
408 }
409
410 loop {
414 match rx.recv().await {
415 Ok(record) => {
416 if record.note_commitment == note_commitment {
417 return Ok(record);
418 }
419 }
420
421 Err(e) => match e {
422 RecvError::Closed => {
423 anyhow::bail!(
424 "Receiver error during note detection: closed (no more active senders)"
425 );
426 }
427 RecvError::Lagged(count) => {
428 anyhow::bail!(
429 "Receiver error during note detection: lagged (by {:?} messages)",
430 count
431 );
432 }
433 },
434 };
435 }
436 }
437
438 pub async fn swap_by_commitment(
440 &self,
441 swap_commitment: tct::StateCommitment,
442 await_detection: bool,
443 ) -> anyhow::Result<SwapRecord> {
444 let mut rx = self.scanned_swaps_tx.subscribe();
447
448 let pool = self.pool.clone();
449
450 if let Some(record) = spawn_blocking(move || {
451 pool.get()?
453 .prepare(&format!(
454 "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
455 hex::encode(swap_commitment.0.to_bytes())
456 ))?
457 .query_and_then((), |record| record.try_into())?
458 .next()
459 .transpose()
460 })
461 .await??
462 {
463 return Ok(record);
464 }
465
466 if !await_detection {
467 anyhow::bail!("swap commitment {} not found", swap_commitment);
468 }
469
470 loop {
474 match rx.recv().await {
475 Ok(record) => {
476 if record.swap_commitment == swap_commitment {
477 return Ok(record);
478 }
479 }
480
481 Err(e) => match e {
482 RecvError::Closed => {
483 anyhow::bail!(
484 "Receiver error during swap detection: closed (no more active senders)"
485 );
486 }
487 RecvError::Lagged(count) => {
488 anyhow::bail!(
489 "Receiver error during swap detection: lagged (by {:?} messages)",
490 count
491 );
492 }
493 },
494 };
495 }
496 }
497
498 pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
500 let pool = self.pool.clone();
501
502 let records = spawn_blocking(move || {
503 pool.get()?
505 .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
506 .query_and_then((), |record| record.try_into())?
507 .collect::<anyhow::Result<Vec<_>>>()
508 })
509 .await??;
510
511 Ok(records)
512 }
513
514 pub async fn nullifier_status(
516 &self,
517 nullifier: Nullifier,
518 await_detection: bool,
519 ) -> anyhow::Result<bool> {
520 let mut rx = self.scanned_nullifiers_tx.subscribe();
523
524 let pool = self.pool.clone();
526
527 let nullifier_bytes = nullifier.0.to_bytes().to_vec();
528
529 if let Some(height_spent) = spawn_blocking(move || {
531 pool.get()?
532 .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
533 .query_and_then([nullifier_bytes], |row| {
534 let height_spent: Option<u64> = row.get("height_spent")?;
535 anyhow::Ok(height_spent)
536 })?
537 .next()
538 .transpose()
539 })
540 .await??
541 {
542 let spent = height_spent.is_some();
543
544 if !await_detection || spent {
546 return Ok(spent);
547 }
548 }
549
550 if !await_detection {
553 return Ok(false);
554 }
555
556 loop {
559 let new_nullifier = rx.recv().await.context("change subscriber failed")?;
560
561 if new_nullifier == nullifier {
562 return Ok(true);
563 }
564 }
565 }
566
567 pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
569 if let Some(height) = *self.uncommitted_height.lock() {
571 return Ok(Some(height.get()));
572 }
573
574 let pool = self.pool.clone();
575
576 spawn_blocking(move || {
577 let height: Option<i64> = pool
578 .get()?
579 .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
580 .query_row([], |row| row.get::<_, Option<i64>>(0))?;
581
582 anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
583 })
584 .await?
585 }
586
587 pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
588 let pool = self.pool.clone();
589
590 spawn_blocking(move || {
591 let params_bytes = pool
592 .get()?
593 .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
594 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
595 .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
596
597 AppParameters::decode(params_bytes.as_slice())
598 })
599 .await?
600 }
601
602 pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
603 let pool = self.pool.clone();
604
605 spawn_blocking(move || {
606 let bytes = pool
607 .get()?
608 .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
609 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
610 .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
611
612 GasPrices::decode(bytes.as_slice())
613 })
614 .await?
615 }
616
617 pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
618 let pool = self.pool.clone();
619
620 spawn_blocking(move || {
621 let bytes = pool
622 .get()?
623 .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
624 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
625 .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
626
627 fmd::Parameters::decode(bytes.as_slice())
628 })
629 .await?
630 }
631
632 pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
633 let pool = self.pool.clone();
634
635 spawn_blocking(move || {
636 let bytes = pool
637 .get()?
638 .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
639 .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
640 .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
641
642 FullViewingKey::decode(bytes.as_slice())
643 })
644 .await?
645 }
646
647 pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
648 let pool = self.pool.clone();
649 spawn_blocking(move || {
650 tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
651 })
652 .await?
653 }
654
655 pub async fn transaction_hashes(
657 &self,
658 start_height: Option<u64>,
659 end_height: Option<u64>,
660 ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
661 let starting_block = start_height.unwrap_or(0) as i64;
662 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
663
664 let pool = self.pool.clone();
665
666 spawn_blocking(move || {
667 pool.get()?
668 .prepare_cached(
669 "SELECT block_height, tx_hash
670 FROM tx
671 WHERE block_height BETWEEN ?1 AND ?2",
672 )?
673 .query_and_then([starting_block, ending_block], |row| {
674 let block_height: u64 = row.get("block_height")?;
675 let tx_hash: Vec<u8> = row.get("tx_hash")?;
676 anyhow::Ok((block_height, tx_hash))
677 })?
678 .collect()
679 })
680 .await?
681 }
682
683 pub async fn transactions(
685 &self,
686 start_height: Option<u64>,
687 end_height: Option<u64>,
688 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
689 let starting_block = start_height.unwrap_or(0) as i64;
690 let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
691
692 let pool = self.pool.clone();
693
694 spawn_blocking(move || {
695 pool.get()?
696 .prepare_cached(
697 "SELECT block_height, tx_hash, tx_bytes
698 FROM tx
699 WHERE block_height BETWEEN ?1 AND ?2",
700 )?
701 .query_and_then([starting_block, ending_block], |row| {
702 let block_height: u64 = row.get("block_height")?;
703 let tx_hash: Vec<u8> = row.get("tx_hash")?;
704 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
705 let tx = Transaction::decode(tx_bytes.as_slice())?;
706 anyhow::Ok((block_height, tx_hash, tx))
707 })?
708 .collect()
709 })
710 .await?
711 }
712
713 pub async fn transaction_by_hash(
714 &self,
715 tx_hash: &[u8],
716 ) -> anyhow::Result<Option<(u64, Transaction)>> {
717 let pool = self.pool.clone();
718 let tx_hash = tx_hash.to_vec();
719
720 spawn_blocking(move || {
721 if let Some((block_height, tx_bytes)) = pool
722 .get()?
723 .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
724 .query_row([tx_hash], |row| {
725 let block_height: u64 = row.get("block_height")?;
726 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
727 Ok((block_height, tx_bytes))
728 })
729 .optional()?
730 {
731 let tx = Transaction::decode(tx_bytes.as_slice())?;
732 Ok(Some((block_height, tx)))
733 } else {
734 Ok(None)
735 }
736 })
737 .await?
738 }
739
740 pub async fn note_by_nullifier(
742 &self,
743 nullifier: Nullifier,
744 await_detection: bool,
745 ) -> anyhow::Result<SpendableNoteRecord> {
746 let mut rx = self.scanned_notes_tx.subscribe();
749
750 let pool = self.pool.clone();
752
753 let nullifier_bytes = nullifier.to_bytes().to_vec();
754
755 if let Some(record) = spawn_blocking(move || {
756 let record = pool
757 .get()?
758 .prepare(&format!(
759 "SELECT
760 notes.note_commitment,
761 spendable_notes.height_created,
762 notes.address,
763 notes.amount,
764 notes.asset_id,
765 notes.rseed,
766 spendable_notes.address_index,
767 spendable_notes.source,
768 spendable_notes.height_spent,
769 spendable_notes.nullifier,
770 spendable_notes.position,
771 tx.return_address
772 FROM notes
773 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
774 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
775 WHERE hex(spendable_notes.nullifier) = \"{}\"",
776 hex::encode_upper(nullifier_bytes)
777 ))?
778 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
779 .next()
780 .transpose()?;
781
782 anyhow::Ok(record)
783 })
784 .await??
785 {
786 return Ok(record);
787 }
788
789 if !await_detection {
790 anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
791 }
792
793 loop {
797 match rx.recv().await {
798 Ok(record) => {
799 if record.nullifier == nullifier {
800 return Ok(record);
801 }
802 }
803
804 Err(e) => match e {
805 RecvError::Closed => {
806 anyhow::bail!(
807 "Receiver error during note detection: closed (no more active senders)"
808 );
809 }
810 RecvError::Lagged(count) => {
811 anyhow::bail!(
812 "Receiver error during note detection: lagged (by {:?} messages)",
813 count
814 );
815 }
816 },
817 };
818 }
819 }
820
821 pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
822 let pool = self.pool.clone();
823
824 spawn_blocking(move || {
825 pool.get()?
826 .prepare_cached("SELECT metadata FROM assets")?
827 .query_and_then([], |row| {
828 let metadata_json = row.get::<_, String>("metadata")?;
829 let denom_metadata = serde_json::from_str(&metadata_json)?;
830
831 anyhow::Ok(denom_metadata)
832 })?
833 .collect()
834 })
835 .await?
836 }
837
838 pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
839 let id = id.to_bytes().to_vec();
840
841 let pool = self.pool.clone();
842
843 spawn_blocking(move || {
844 pool.get()?
845 .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
846 .query_and_then([id], |row| {
847 let metadata_json = row.get::<_, String>("metadata")?;
848 let denom_metadata = serde_json::from_str(&metadata_json)?;
849 anyhow::Ok(denom_metadata)
850 })?
851 .next()
852 .transpose()
853 })
854 .await?
855 }
856
857 pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
860 let pattern = pattern.to_owned();
861
862 let pool = self.pool.clone();
863
864 spawn_blocking(move || {
865 pool.get()?
866 .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
867 .query_and_then([pattern], |row| {
868 let metadata_json = row.get::<_, String>("metadata")?;
869 let denom_metadata = serde_json::from_str(&metadata_json)?;
870 anyhow::Ok(denom_metadata)
871 })?
872 .collect()
873 })
874 .await?
875 }
876
877 pub async fn notes(
878 &self,
879 include_spent: bool,
880 asset_id: Option<asset::Id>,
881 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
882 amount_to_spend: Option<Amount>,
883 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
884 let spent_clause = match include_spent {
887 false => "NULL",
888 true => "height_spent",
889 };
890
891 let asset_clause = asset_id
894 .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
895 .unwrap_or_else(|| "asset_id".to_string());
896
897 let address_clause = "address_index".to_string();
908
909 let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
915 let mut amount_total = Amount::zero();
916
917 let pool = self.pool.clone();
918
919 spawn_blocking(move || {
920 let mut output: Vec<SpendableNoteRecord> = Vec::new();
921
922 for result in pool
923 .get()?
924 .prepare(&format!(
925 "SELECT notes.note_commitment,
926 spendable_notes.height_created,
927 notes.address,
928 notes.amount,
929 notes.asset_id,
930 notes.rseed,
931 spendable_notes.address_index,
932 spendable_notes.source,
933 spendable_notes.height_spent,
934 spendable_notes.nullifier,
935 spendable_notes.position,
936 tx.return_address
937 FROM notes
938 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
939 LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
940 WHERE spendable_notes.height_spent IS {spent_clause}
941 AND notes.asset_id IS {asset_clause}
942 AND spendable_notes.address_index IS {address_clause}"
943 ))?
944 .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
945 {
946 let record = result?;
947
948 if let Some(address_index) = address_index {
951 if record.address_index.account != address_index.account {
952 continue;
953 }
954 }
955 let amount = record.note.amount();
956
957 if amount.value() > 0 {
960 output.push(record);
961 }
962
963 if amount_cutoff {
966 amount_total += amount;
968 if amount_total >= amount_to_spend.unwrap_or_default() {
969 break;
970 }
971 }
972 }
973
974 if amount_total < amount_to_spend.unwrap_or_default() {
975 anyhow::bail!(
976 "requested amount of {} exceeds total of {}",
977 amount_to_spend.unwrap_or_default(),
978 amount_total
979 );
980 }
981
982 anyhow::Ok(output)
983 })
984 .await?
985 }
986
987 pub async fn notes_for_voting(
988 &self,
989 address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
990 votable_at_height: u64,
991 ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
992 let address_clause = address_index
995 .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
996 .unwrap_or_else(|| "address_index".to_string());
997
998 let pool = self.pool.clone();
999
1000 spawn_blocking(move || {
1001 let mut lock = pool.get()?;
1002 let dbtx = lock.transaction()?;
1003
1004 let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1005 .prepare(&format!(
1006 "SELECT notes.note_commitment,
1007 spendable_notes.height_created,
1008 notes.address,
1009 notes.amount,
1010 notes.asset_id,
1011 notes.rseed,
1012 spendable_notes.address_index,
1013 spendable_notes.source,
1014 spendable_notes.height_spent,
1015 spendable_notes.nullifier,
1016 spendable_notes.position
1017 FROM
1018 notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1019 WHERE
1020 spendable_notes.address_index IS {address_clause}
1021 AND notes.asset_id IN (
1022 SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1023 )
1024 AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1025 AND (spendable_notes.height_created < {votable_at_height})
1026 ",
1027 ))?
1028 .query_and_then((), |row| row.try_into())?
1029 .collect::<anyhow::Result<Vec<_>>>()?;
1030
1031 let mut results = Vec::new();
1034 for record in spendable_note_records {
1035 let asset_id = record.note.asset_id().to_bytes().to_vec();
1036 let denom: String = dbtx.query_row_and_then(
1037 "SELECT denom FROM assets WHERE asset_id = ?1",
1038 [asset_id],
1039 |row| row.get("denom"),
1040 )?;
1041
1042 let identity_key = DelegationToken::from_str(&denom)
1043 .context("invalid delegation token denom")?
1044 .validator();
1045
1046 results.push((record, identity_key));
1047 }
1048
1049 Ok(results)
1050 }).await?
1051 }
1052
1053 #[tracing::instrument(skip(self))]
1054 pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1055 tracing::debug!(?asset);
1056
1057 let asset_id = asset.id().to_bytes().to_vec();
1058 let denom = asset.base_denom().denom;
1059 let metadata_json = serde_json::to_string(&asset)?;
1060
1061 let pool = self.pool.clone();
1062
1063 spawn_blocking(move || {
1064 pool.get()?
1065 .execute(
1066 "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1067 (asset_id, denom, metadata_json),
1068 )
1069 .map_err(anyhow::Error::from)
1070 })
1071 .await??;
1072
1073 Ok(())
1074 }
1075
1076 pub async fn record_auction_with_state(
1077 &self,
1078 auction_id: AuctionId,
1079 auction_state: u64,
1080 ) -> anyhow::Result<()> {
1081 let auction_id = auction_id.0.to_vec();
1082
1083 let pool = self.pool.clone();
1084
1085 spawn_blocking(move || {
1086 let mut lock = pool.get()?;
1087 let tx = lock.transaction()?;
1088 tx.execute(
1089 "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1090 (auction_id.clone(), auction_state),
1091 )?;
1092 tx.execute(
1093 "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1094 (auction_id, auction_state),
1095 )
1096 .map_err(anyhow::Error::from)?;
1097
1098 tx.commit()?;
1099 Ok::<(), anyhow::Error>(())
1100 })
1101 .await??;
1102
1103 Ok(())
1104 }
1105
1106 pub async fn update_auction_with_note_commitment(
1107 &self,
1108 auction_id: AuctionId,
1109 note_commitment: StateCommitment,
1110 ) -> anyhow::Result<()> {
1111 let auction_id = auction_id.0.to_vec();
1112 let blob_nc = note_commitment.0.to_bytes().to_vec();
1113
1114 let pool = self.pool.clone();
1115
1116 spawn_blocking(move || {
1117 pool.get()?
1118 .execute(
1119 "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1120 (blob_nc, auction_id),
1121 )
1122 .map_err(anyhow::Error::from)
1123 })
1124 .await??;
1125
1126 Ok(())
1127 }
1128
1129 pub async fn fetch_auctions_by_account(
1130 &self,
1131 account_filter: Option<AddressIndex>,
1132 include_inactive: bool,
1133 ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 )>> {
1134 let account_clause = account_filter
1135 .map(|idx| {
1136 format!(
1137 "AND spendable_notes.address_index = x'{}'",
1138 hex::encode(idx.to_bytes())
1139 )
1140 })
1141 .unwrap_or_else(|| "".to_string());
1142
1143 let active_clause = if !include_inactive {
1144 "AND auctions.auction_state = 0"
1145 } else {
1146 ""
1147 };
1148
1149 let query = format!(
1150 "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1151 FROM auctions
1152 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1153 JOIN notes ON auctions.note_commitment = notes.note_commitment
1154 WHERE 1 = 1
1155 {account_clause}
1156 {active_clause}",
1157 account_clause = account_clause,
1158 active_clause = active_clause,
1159 );
1160
1161 let pool = self.pool.clone();
1162
1163 spawn_blocking(move || {
1164 let mut conn = pool.get()?;
1165 let tx = conn.transaction()?;
1166
1167 let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1168 .prepare(&query)?
1169 .query_and_then((), |row| {
1170 let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1171 let array_auction_id: [u8; 32] = raw_auction_id
1172 .try_into()
1173 .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1174 let auction_id = AuctionId(array_auction_id);
1175 let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1176 let local_seq: u64 = row.get("auction_state")?;
1177 Ok((auction_id, spendable_note_record, local_seq))
1178 })?
1179 .collect::<anyhow::Result<Vec<_>>>()?;
1180
1181 Ok(spendable_note_records)
1182 })
1183 .await?
1184 }
1185
1186 pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1187 let position_id = position.id().0.to_vec();
1188
1189 let position_state = position.state.to_string();
1190 let trading_pair = position.phi.pair.to_string();
1191
1192 let pool = self.pool.clone();
1193
1194 spawn_blocking(move || {
1195 pool.get()?
1196 .execute(
1197 "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1198 (position_id, position_state, trading_pair),
1199 )
1200 .map_err(anyhow::Error::from)
1201 })
1202 .await??;
1203
1204 Ok(())
1205 }
1206
1207 pub async fn update_position(
1208 &self,
1209 position_id: position::Id,
1210 position_state: position::State,
1211 ) -> anyhow::Result<()> {
1212 let position_id = position_id.0.to_vec();
1213 let position_state = position_state.to_string();
1214
1215 let pool = self.pool.clone();
1216
1217 spawn_blocking(move || {
1218 pool.get()?
1219 .execute(
1220 "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1221 (position_state, position_id),
1222 )
1223 .map_err(anyhow::Error::from)
1224 })
1225 .await??;
1226
1227 Ok(())
1228 }
1229
1230 pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1231 let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1233 anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1234 })?;
1235
1236 if height != last_sync_height + 1 {
1237 anyhow::bail!(
1238 "Wrong block height {} for latest sync height {}",
1239 height,
1240 last_sync_height
1241 );
1242 }
1243
1244 *self.uncommitted_height.lock() = Some(height.try_into()?);
1245 Ok(())
1246 }
1247
1248 fn record_note_inner(
1249 dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1250 note: &Note,
1251 ) -> anyhow::Result<()> {
1252 let note_commitment = note.commit().0.to_bytes().to_vec();
1253 let address = note.address().to_vec();
1254 let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1255 let asset_id = note.asset_id().to_bytes().to_vec();
1256 let rseed = note.rseed().to_bytes().to_vec();
1257
1258 dbtx.execute(
1259 "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1260 VALUES (?1, ?2, ?3, ?4, ?5)
1261 ON CONFLICT (note_commitment)
1262 DO UPDATE SET
1263 address = excluded.address,
1264 amount = excluded.amount,
1265 asset_id = excluded.asset_id,
1266 rseed = excluded.rseed",
1267 (note_commitment, address, amount, asset_id, rseed),
1268 )?;
1269
1270 Ok(())
1271 }
1272
1273 pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1274 let pool = self.pool.clone();
1275 let mut lock = pool.get()?;
1276 let dbtx = lock.transaction()?;
1277
1278 Storage::record_note_inner(&dbtx, ¬e)?;
1279
1280 dbtx.commit()?;
1281
1282 Ok(())
1283 }
1284
1285 pub async fn scan_advice(
1291 &self,
1292 note_commitments: Vec<note::StateCommitment>,
1293 ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1294 if note_commitments.is_empty() {
1295 return Ok(BTreeMap::new());
1296 }
1297
1298 let pool = self.pool.clone();
1299
1300 spawn_blocking(move || {
1304 pool.get()?
1305 .prepare(&format!(
1306 "SELECT notes.note_commitment,
1307 notes.address,
1308 notes.amount,
1309 notes.asset_id,
1310 notes.rseed
1311 FROM notes
1312 LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1313 WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1314 note_commitments
1315 .iter()
1316 .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1317 .collect::<Vec<_>>()
1318 .join(", ")
1319 ))?
1320 .query_and_then((), |row| {
1321 let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1322 let amount = row.get::<_, [u8; 16]>("amount")?;
1323 let amount_u128: u128 = u128::from_be_bytes(amount);
1324 let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1325 let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1326 let note = Note::from_parts(
1327 address,
1328 Value {
1329 amount: amount_u128.into(),
1330 asset_id,
1331 },
1332 rseed,
1333 )?;
1334 anyhow::Ok((note.commit(), note))
1335 })?
1336 .collect::<anyhow::Result<BTreeMap<_, _>>>()
1337 }).await?
1338 }
1339
1340 pub async fn filter_nullifiers(
1342 &self,
1343 nullifiers: Vec<Nullifier>,
1344 ) -> anyhow::Result<Vec<Nullifier>> {
1345 if nullifiers.is_empty() {
1346 return Ok(Vec::new());
1347 }
1348
1349 let pool = self.pool.clone();
1350
1351 spawn_blocking(move || {
1352 pool.get()?
1353 .prepare(&format!(
1354 "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1355 nullifiers
1356 .iter()
1357 .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1358 .collect::<Vec<String>>()
1359 .join(",")
1360 ))?
1361 .query_and_then((), |row| {
1362 let nullifier: Vec<u8> = row.get("nullifier")?;
1363 nullifier.as_slice().try_into()
1364 })?
1365 .collect()
1366 })
1367 .await?
1368 }
1369
1370 pub async fn record_block(
1371 &self,
1372 filtered_block: FilteredBlock,
1373 transactions: Vec<Transaction>,
1374 sct: &mut tct::Tree,
1375 channel: tonic::transport::Channel,
1376 ) -> anyhow::Result<()> {
1377 let last_sync_height = self.last_sync_height().await?;
1379
1380 let correct_height = match last_sync_height {
1381 Some(cur_height) => filtered_block.height == cur_height + 1,
1383 None => filtered_block.height == 0,
1385 };
1386
1387 if !correct_height {
1388 anyhow::bail!(
1389 "Wrong block height {} for latest sync height {:?}",
1390 filtered_block.height,
1391 last_sync_height
1392 );
1393 }
1394
1395 let pool = self.pool.clone();
1396 let uncommitted_height = self.uncommitted_height.clone();
1397 let scanned_notes_tx = self.scanned_notes_tx.clone();
1398 let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1399 let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1400
1401 let fvk = self.full_viewing_key().await?;
1402
1403 let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1405 let mut client = AppQueryServiceClient::new(channel);
1407 Some(
1408 client
1409 .app_parameters(tonic::Request::new(AppParametersRequest {}))
1410 .await?
1411 .into_inner()
1412 .try_into()?,
1413 )
1414 } else {
1415 None
1416 };
1417
1418 let mut new_sct = sct.clone();
1426
1427 *sct = spawn_blocking(move || {
1428 let mut lock = pool.get()?;
1429 let mut dbtx = lock.transaction()?;
1430
1431 if let Some(params) = new_app_parameters {
1432 let params_bytes = params.encode_to_vec();
1433 dbtx.execute(
1435 "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1436 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1437 [¶ms_bytes[..]],
1438 )?;
1439 }
1440
1441 for note_record in filtered_block.new_notes.values() {
1443 let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1444 let height_created = filtered_block.height as i64;
1445 let address_index = note_record.address_index.to_bytes().to_vec();
1446 let nullifier = note_record.nullifier.to_bytes().to_vec();
1447 let position = (u64::from(note_record.position)) as i64;
1448 let source = note_record.source.encode_to_vec();
1449 let tx_hash = match note_record.source {
1451 CommitmentSource::Transaction { id } => id,
1452 _ => None,
1453 };
1454
1455 Storage::record_note_inner(&dbtx, ¬e_record.note)?;
1457
1458 dbtx.execute(
1459 "INSERT INTO spendable_notes
1460 (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1461 VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1462 ON CONFLICT (note_commitment)
1463 DO UPDATE SET nullifier = excluded.nullifier,
1464 position = excluded.position,
1465 height_created = excluded.height_created,
1466 address_index = excluded.address_index,
1467 source = excluded.source,
1468 height_spent = excluded.height_spent,
1469 tx_hash = excluded.tx_hash",
1470 (
1471 ¬e_commitment,
1472 &nullifier,
1473 &position,
1474 &height_created,
1475 &address_index,
1476 &source,
1477 &tx_hash,
1479 ),
1480 )?;
1481 }
1482
1483 for swap in filtered_block.new_swaps.values() {
1485 let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1486 let swap_bytes = swap.swap.encode_to_vec();
1487 let position = (u64::from(swap.position)) as i64;
1488 let nullifier = swap.nullifier.to_bytes().to_vec();
1489 let source = swap.source.encode_to_vec();
1490 let output_data = swap.output_data.encode_to_vec();
1491
1492 dbtx.execute(
1493 "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1494 VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1495 ON CONFLICT (swap_commitment)
1496 DO UPDATE SET swap = excluded.swap,
1497 position = excluded.position,
1498 nullifier = excluded.nullifier,
1499 output_data = excluded.output_data,
1500 height_claimed = excluded.height_claimed,
1501 source = excluded.source",
1502 (
1503 &swap_commitment,
1504 &swap_bytes,
1505 &position,
1506 &nullifier,
1507 &output_data,
1508 &source,
1510 ),
1511 )?;
1512 }
1513
1514 for nullifier in &filtered_block.spent_nullifiers {
1516 let height_spent = filtered_block.height as i64;
1517 let nullifier_bytes = nullifier.to_bytes().to_vec();
1518
1519 let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1520 "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1521 )?
1522 .query_and_then(
1523 (height_spent, &nullifier_bytes),
1524 |row| {
1525 let bytes: Vec<u8> = row.get("note_commitment")?;
1526 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1527 },
1528 )?
1529 .next()
1530 .transpose()?;
1531
1532 let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1533 "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1534 )?
1535 .query_and_then(
1536 (height_spent, &nullifier_bytes),
1537 |row| {
1538 let bytes: Vec<u8> = row.get("swap_commitment")?;
1539 StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1540 },
1541 )?
1542 .next()
1543 .transpose()?;
1544
1545 let spent_denom: String
1547 = dbtx.prepare_cached(
1548 "SELECT denom FROM assets
1549 WHERE asset_id ==
1550 (SELECT asset_id FROM notes
1551 WHERE note_commitment ==
1552 (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1553 )?
1554 .query_and_then(
1555 [&nullifier_bytes],
1556 |row| row.get("denom"),
1557 )?
1558 .next()
1559 .transpose()?
1560 .unwrap_or("unknown".to_string());
1561
1562 if let Some(spent_commitment) = spent_commitment {
1564 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1565 if DelegationToken::from_str(&spent_denom).is_err() {
1570 tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1571 new_sct.forget(spent_commitment);
1572 }
1573 };
1574
1575 if let Some(spent_swap_commitment) = swap_commitment {
1577 tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1578 new_sct.forget(spent_swap_commitment);
1579 };
1580 }
1581
1582 new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1584
1585 for transaction in transactions {
1587 let tx_bytes = transaction.encode_to_vec();
1588 let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1590 let tx_hash = tx_hash_owned.as_slice();
1591 let tx_block_height = filtered_block.height as i64;
1592 let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1593 let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1594 let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1595
1596 tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1597
1598 dbtx.execute(
1599 "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1600 (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1601 )?;
1602
1603 for nf in transaction.spent_nullifiers() {
1605 let nf_bytes = nf.0.to_bytes().to_vec();
1606 dbtx.execute(
1607 "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1608 (&nf_bytes, &tx_hash),
1609 )?;
1610 }
1611 }
1612
1613 if filtered_block.fmd_parameters.is_some() {
1615 let fmd_parameters_bytes =
1616 &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1617
1618 dbtx.execute(
1619 "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1620 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1621 [&fmd_parameters_bytes],
1622 )?;
1623 }
1624
1625 if filtered_block.gas_prices.is_some() {
1627 let gas_prices_bytes =
1628 &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1629
1630 dbtx.execute(
1631 "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1632 ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1633 [&gas_prices_bytes],
1634 )?;
1635 }
1636
1637 let latest_sync_height = filtered_block.height as i64;
1639 dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1640
1641 dbtx.commit()?;
1643
1644 uncommitted_height.lock().take();
1652
1653 for note_record in filtered_block.new_notes.values() {
1657 let _ = scanned_notes_tx.send(note_record.clone());
1661 }
1662
1663 for nullifier in filtered_block.spent_nullifiers.iter() {
1664 let _ = scanned_nullifiers_tx.send(*nullifier);
1668 }
1669
1670 for swap_record in filtered_block.new_swaps.values() {
1671 let _ = scanned_swaps_tx.send(swap_record.clone());
1675 }
1676
1677 anyhow::Ok(new_sct)
1678 })
1679 .await??;
1680
1681 Ok(())
1682 }
1683
1684 pub async fn owned_position_ids(
1685 &self,
1686 position_state: Option<State>,
1687 trading_pair: Option<TradingPair>,
1688 ) -> anyhow::Result<Vec<position::Id>> {
1689 let pool = self.pool.clone();
1690
1691 let state_clause = match position_state {
1692 Some(state) => format!("position_state = \"{}\"", state),
1693 None => "".to_string(),
1694 };
1695
1696 let pair_clause = match trading_pair {
1697 Some(pair) => format!("trading_pair = \"{}\"", pair),
1698 None => "".to_string(),
1699 };
1700
1701 spawn_blocking(move || {
1702 let mut q = "SELECT position_id FROM positions".to_string();
1703 match (position_state.is_some(), trading_pair.is_some()) {
1704 (true, true) => {
1705 q = q + " WHERE " + &state_clause + " AND " + &pair_clause;
1706 }
1707 (true, false) => {
1708 q = q + " WHERE " + &state_clause;
1709 }
1710 (false, true) => {
1711 q = q + " WHERE " + &pair_clause;
1712 }
1713 (false, false) => (),
1714 };
1715
1716 pool.get()?
1717 .prepare_cached(&q)?
1718 .query_and_then([], |row| {
1719 let position_id: Vec<u8> = row.get("position_id")?;
1720 Ok(position::Id(position_id.as_slice().try_into()?))
1721 })?
1722 .collect()
1723 })
1724 .await?
1725 }
1726
1727 pub async fn notes_by_sender(
1728 &self,
1729 return_address: &Address,
1730 ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1731 let pool = self.pool.clone();
1732
1733 let query = "SELECT notes.note_commitment,
1734 spendable_notes.height_created,
1735 notes.address,
1736 notes.amount,
1737 notes.asset_id,
1738 notes.rseed,
1739 spendable_notes.address_index,
1740 spendable_notes.source,
1741 spendable_notes.height_spent,
1742 spendable_notes.nullifier,
1743 spendable_notes.position
1744 FROM notes
1745 JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1746 JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1747 WHERE tx.return_address = ?1";
1748
1749 let return_address = return_address.to_vec();
1750
1751 let records = spawn_blocking(move || {
1752 pool.get()?
1753 .prepare(query)?
1754 .query_and_then([return_address], |record| record.try_into())?
1755 .collect::<anyhow::Result<Vec<_>>>()
1756 })
1757 .await??;
1758
1759 Ok(records)
1760 }
1761
1762 pub async fn transactions_matching_memo(
1766 &self,
1767 pattern: String,
1768 ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1769 let pattern = pattern.to_owned();
1770 tracing::trace!(?pattern, "searching for memos matching");
1771 let pool = self.pool.clone();
1772
1773 spawn_blocking(move || {
1774 pool.get()?
1775 .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1776 .query_and_then([pattern], |row| {
1777 let block_height: u64 = row.get("block_height")?;
1778 let tx_hash: Vec<u8> = row.get("tx_hash")?;
1779 let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1780 let tx = Transaction::decode(tx_bytes.as_slice())?;
1781 let memo_text: String = row.get("memo_text")?;
1782 anyhow::Ok((block_height, tx_hash, tx, memo_text))
1783 })?
1784 .collect()
1785 })
1786 .await?
1787 }
1788}