penumbra_sdk_view/
storage.rs

1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10    rusqlite::{OpenFlags, OptionalExtension},
11    SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16    sync::broadcast::{self, error::RecvError},
17    task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25    lp::position::{self, Position, State},
26    TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32    core::app::v1::{
33        query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34    },
35    DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct as tct;
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51    pub id: Id,
52    pub amount: u128,
53    pub address_index: AddressIndex,
54}
55
56/// The hash of the schema for the database.
57static SCHEMA_HASH: Lazy<String> =
58    Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62    pool: r2d2::Pool<SqliteConnectionManager>,
63
64    /// This allows an optimization where we only commit to the database after
65    /// scanning a nonempty block.
66    ///
67    /// If this is `Some`, we have uncommitted empty blocks up to the inner height.
68    /// If this is `None`, we don't.
69    ///
70    /// Using a `NonZeroU64` ensures that `Option<NonZeroU64>` fits in 8 bytes.
71    uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73    scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74    scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75    scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79    /// If the database at `storage_path` exists, [`Self::load`] it, otherwise, [`Self::initialize`] it.
80    #[tracing::instrument(
81        skip_all,
82        fields(
83            path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84            url = %node,
85        )
86    )]
87    pub async fn load_or_initialize(
88        storage_path: Option<impl AsRef<Utf8Path>>,
89        fvk: &FullViewingKey,
90        node: Url,
91    ) -> anyhow::Result<Self> {
92        if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93            if path.exists() {
94                tracing::debug!(?path, "database exists");
95                return Self::load(path).await;
96            } else {
97                tracing::debug!(?path, "database does not exist");
98            }
99        };
100
101        let mut client = AppQueryServiceClient::connect(node.to_string())
102            .instrument(error_span!("connecting_to_endpoint"))
103            .await
104            .tap_err(|error| {
105                tracing::error!(?error, "failed to connect to app query service endpoint")
106            })?
107            .tap(|_| tracing::debug!("connected to app query service endpoint"));
108        let params = client
109            .app_parameters(tonic::Request::new(AppParametersRequest {}))
110            .instrument(error_span!("getting_app_parameters"))
111            .await?
112            .into_inner()
113            .try_into()?;
114
115        Self::initialize(storage_path, fvk.clone(), params).await
116    }
117
118    fn connect(
119        path: Option<impl AsRef<Utf8Path>>,
120    ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121        if let Some(path) = path {
122            let manager = SqliteConnectionManager::file(path.as_ref())
123                .with_flags(
124                    // Don't allow opening URIs, because they can change the behavior of the database; we
125                    // just want to open normal filepaths.
126                    OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127                )
128                .with_init(|conn| {
129                    // "NORMAL" will be consistent, but maybe not durable -- this is fine,
130                    // since all our data is being synced from the chain, so if we lose a dbtx,
131                    // it's like we're resuming sync from a previous height.
132                    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133                    // We use `prepare_cached` a fair amount: this is an overestimate of the number
134                    // of cached prepared statements likely to be used.
135                    conn.set_prepared_statement_cache_capacity(32);
136                    Ok(())
137                });
138            Ok(r2d2::Pool::builder()
139                // We set max_size=1 to avoid "database is locked" sqlite errors,
140                // when accessing across multiple threads.
141                .max_size(1)
142                .build(manager)?)
143        } else {
144            let manager = SqliteConnectionManager::memory();
145            // Max size needs to be set to 1, otherwise a new in-memory database is created for each
146            // connection to the pool, which results in very confusing errors.
147            //
148            // Lifetimes and timeouts are likewise configured to their maximum values, since
149            // the in-memory database will disappear on connection close.
150            Ok(r2d2::Pool::builder()
151                .max_size(1)
152                .min_idle(Some(1))
153                .max_lifetime(Some(Duration::MAX))
154                .idle_timeout(Some(Duration::MAX))
155                .build(manager)?)
156        }
157    }
158
159    pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160        let storage = Self {
161            pool: Self::connect(Some(path))?,
162            uncommitted_height: Arc::new(Mutex::new(None)),
163            scanned_notes_tx: broadcast::channel(128).0,
164            scanned_nullifiers_tx: broadcast::channel(512).0,
165            scanned_swaps_tx: broadcast::channel(128).0,
166        };
167
168        spawn_blocking(move || {
169            // Check the version of the software used when first initializing this database.
170            // If it doesn't match the current version, we should report the error to the user.
171            let actual_schema_hash: String = storage
172                .pool
173                .get()?
174                .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175                    row.get("schema_hash")
176                })
177                .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179            if actual_schema_hash != *SCHEMA_HASH {
180                let database_client_version: String = storage
181                    .pool
182                    .get()?
183                    .query_row("SELECT client_version FROM client_version", (), |row| {
184                        row.get("client_version")
185                    })
186                    .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188                anyhow::bail!(
189                    "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190                    database_client_version,
191                    env!("CARGO_PKG_VERSION"),
192                );
193            }
194
195            Ok(storage)
196        })
197            .await?
198    }
199
200    pub async fn initialize(
201        storage_path: Option<impl AsRef<Utf8Path>>,
202        fvk: FullViewingKey,
203        params: AppParameters,
204    ) -> anyhow::Result<Self> {
205        tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207        // Connect to the database (or create it)
208        let pool = Self::connect(storage_path)?;
209
210        spawn_blocking(move || {
211            // In one database transaction, populate everything
212            let mut conn = pool.get()?;
213            let tx = conn.transaction()?;
214
215            // Create the tables
216            tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218            let params_bytes = params.encode_to_vec();
219            tx.execute(
220                "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221                [&params_bytes[..]],
222            )?;
223
224            let fvk_bytes = fvk.encode_to_vec();
225            tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227            // Insert -1 as a signaling value for pre-genesis.
228            // We just have to be careful to treat negative values as None
229            // in last_sync_height.
230            tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232            // Insert the schema hash into the database
233            tx.execute(
234                "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235                [&*SCHEMA_HASH],
236            )?;
237
238            // Insert the client version into the database
239            tx.execute(
240                "INSERT INTO client_version (client_version) VALUES (?1)",
241                [env!("CARGO_PKG_VERSION")],
242            )?;
243
244            tx.commit()?;
245            drop(conn);
246
247            Ok(Storage {
248                pool,
249                uncommitted_height: Arc::new(Mutex::new(None)),
250                scanned_notes_tx: broadcast::channel(128).0,
251                scanned_nullifiers_tx: broadcast::channel(512).0,
252                scanned_swaps_tx: broadcast::channel(128).0,
253            })
254        })
255        .await?
256    }
257
258    /// Loads asset metadata from a JSON file and use to update the database.
259    pub async fn load_asset_metadata(
260        &self,
261        registry_path: impl AsRef<Utf8Path>,
262    ) -> anyhow::Result<()> {
263        tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
264        let registry_path = registry_path.as_ref();
265        // Parse into a serde_json::Value first so we can get the bits we care about
266        let mut registry_json: serde_json::Value = serde_json::from_str(
267            std::fs::read_to_string(registry_path)
268                .context("failed to read file")?
269                .as_str(),
270        )
271        .context("failed to parse JSON")?;
272
273        let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
274            registry_json
275                .get_mut("assetById")
276                .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
277                .take(),
278        )
279        .context("could not parse asset registry")?;
280
281        for metadata in registry.into_values() {
282            self.record_asset(metadata).await?;
283        }
284
285        Ok(())
286    }
287
288    /// Query for account balance by address
289    pub async fn balances(
290        &self,
291        address_index: Option<AddressIndex>,
292        asset_id: Option<asset::Id>,
293    ) -> anyhow::Result<Vec<BalanceEntry>> {
294        let pool = self.pool.clone();
295
296        spawn_blocking(move || {
297            let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
298                FROM    notes
299                JOIN    spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
300                WHERE   spendable_notes.height_spent IS NULL";
301
302            tracing::debug!(?query);
303
304            // Combine notes of the same asset/address index together
305            let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
306
307            for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
308                let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
309                let amount = row.get::<&str, Vec<u8>>("amount")?;
310                let address_index = row.get::<&str, Vec<u8>>("address_index")?;
311
312                Ok((asset_id, amount, address_index))
313            })? {
314                let (id, amount, index) = result?;
315
316                let id = Id::try_from(id.as_slice())?;
317
318                let amount: Amount = Amount::from_be_bytes(
319                    amount
320                        .as_slice()
321                        .try_into()
322                        .expect("amount slice of incorrect length"),
323                );
324
325                let index = AddressIndex::try_from(index.as_slice())?;
326
327                // Skip this entry if not captured by address index filter
328                if let Some(address_index) = address_index {
329                    if address_index != index {
330                        continue;
331                    }
332                }
333                if let Some(asset_id) = asset_id {
334                    if asset_id != id {
335                        continue;
336                    }
337                }
338
339                balances
340                    .entry(index)
341                    .or_insert_with(BTreeMap::new)
342                    .entry(id)
343                    .and_modify(|e| *e += amount)
344                    .or_insert(amount);
345            }
346
347            let entries = balances
348                .into_iter()
349                .flat_map(|(index, assets)| {
350                    assets.into_iter().map(move |(id, amount)| BalanceEntry {
351                        id,
352                        amount: amount.into(),
353                        address_index: index,
354                    })
355                })
356                .collect::<Vec<_>>();
357            Ok(entries)
358        })
359        .await?
360    }
361
362    /// Query for a note by its note commitment, optionally waiting until the note is detected.
363    pub async fn note_by_commitment(
364        &self,
365        note_commitment: tct::StateCommitment,
366        await_detection: bool,
367    ) -> anyhow::Result<SpendableNoteRecord> {
368        // Start subscribing now, before querying for whether we already
369        // have the record, so that we can't miss it if we race a write.
370        let mut rx = self.scanned_notes_tx.subscribe();
371
372        let pool = self.pool.clone();
373
374        if let Some(record) = spawn_blocking(move || {
375            // Check if we already have the record
376            pool.get()?
377                .prepare(&format!(
378                    "SELECT
379                        notes.note_commitment,
380                        spendable_notes.height_created,
381                        notes.address,
382                        notes.amount,
383                        notes.asset_id,
384                        notes.rseed,
385                        spendable_notes.address_index,
386                        spendable_notes.source,
387                        spendable_notes.height_spent,
388                        spendable_notes.nullifier,
389                        spendable_notes.position,
390                        tx.return_address
391                    FROM notes
392                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
393                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
394                    WHERE notes.note_commitment = x'{}'",
395                    hex::encode(note_commitment.0.to_bytes())
396                ))?
397                .query_and_then((), |record| record.try_into())?
398                .next()
399                .transpose()
400        })
401        .await??
402        {
403            return Ok(record);
404        }
405
406        if !await_detection {
407            anyhow::bail!("Note commitment {} not found", note_commitment);
408        }
409
410        // Otherwise, wait for newly detected notes and check whether they're
411        // the requested one.
412
413        loop {
414            match rx.recv().await {
415                Ok(record) => {
416                    if record.note_commitment == note_commitment {
417                        return Ok(record);
418                    }
419                }
420
421                Err(e) => match e {
422                    RecvError::Closed => {
423                        anyhow::bail!(
424                            "Receiver error during note detection: closed (no more active senders)"
425                        );
426                    }
427                    RecvError::Lagged(count) => {
428                        anyhow::bail!(
429                            "Receiver error during note detection: lagged (by {:?} messages)",
430                            count
431                        );
432                    }
433                },
434            };
435        }
436    }
437
438    /// Query for a swap by its swap commitment, optionally waiting until the note is detected.
439    pub async fn swap_by_commitment(
440        &self,
441        swap_commitment: tct::StateCommitment,
442        await_detection: bool,
443    ) -> anyhow::Result<SwapRecord> {
444        // Start subscribing now, before querying for whether we already
445        // have the record, so that we can't miss it if we race a write.
446        let mut rx = self.scanned_swaps_tx.subscribe();
447
448        let pool = self.pool.clone();
449
450        if let Some(record) = spawn_blocking(move || {
451            // Check if we already have the swap record
452            pool.get()?
453                .prepare(&format!(
454                    "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
455                    hex::encode(swap_commitment.0.to_bytes())
456                ))?
457                .query_and_then((), |record| record.try_into())?
458                .next()
459                .transpose()
460        })
461        .await??
462        {
463            return Ok(record);
464        }
465
466        if !await_detection {
467            anyhow::bail!("swap commitment {} not found", swap_commitment);
468        }
469
470        // Otherwise, wait for newly detected swaps and check whether they're
471        // the requested one.
472
473        loop {
474            match rx.recv().await {
475                Ok(record) => {
476                    if record.swap_commitment == swap_commitment {
477                        return Ok(record);
478                    }
479                }
480
481                Err(e) => match e {
482                    RecvError::Closed => {
483                        anyhow::bail!(
484                            "Receiver error during swap detection: closed (no more active senders)"
485                        );
486                    }
487                    RecvError::Lagged(count) => {
488                        anyhow::bail!(
489                            "Receiver error during swap detection: lagged (by {:?} messages)",
490                            count
491                        );
492                    }
493                },
494            };
495        }
496    }
497
498    /// Query for all unclaimed swaps.
499    pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
500        let pool = self.pool.clone();
501
502        let records = spawn_blocking(move || {
503            // Check if we already have the swap record
504            pool.get()?
505                .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
506                .query_and_then((), |record| record.try_into())?
507                .collect::<anyhow::Result<Vec<_>>>()
508        })
509        .await??;
510
511        Ok(records)
512    }
513
514    /// Query for a nullifier's status, optionally waiting until the nullifier is detected.
515    pub async fn nullifier_status(
516        &self,
517        nullifier: Nullifier,
518        await_detection: bool,
519    ) -> anyhow::Result<bool> {
520        // Start subscribing now, before querying for whether we already have the nullifier, so we
521        // can't miss it if we race a write.
522        let mut rx = self.scanned_nullifiers_tx.subscribe();
523
524        // Clone the pool handle so that the returned future is 'static
525        let pool = self.pool.clone();
526
527        let nullifier_bytes = nullifier.0.to_bytes().to_vec();
528
529        // Check if we already have the nullifier in the set of spent notes
530        if let Some(height_spent) = spawn_blocking(move || {
531            pool.get()?
532                .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
533                .query_and_then([nullifier_bytes], |row| {
534                    let height_spent: Option<u64> = row.get("height_spent")?;
535                    anyhow::Ok(height_spent)
536                })?
537                .next()
538                .transpose()
539        })
540        .await??
541        {
542            let spent = height_spent.is_some();
543
544            // If we're awaiting detection and the nullifier isn't yet spent, don't return just yet
545            if !await_detection || spent {
546                return Ok(spent);
547            }
548        }
549
550        // After checking the database, if we didn't find it, return `false` unless we are to
551        // await detection
552        if !await_detection {
553            return Ok(false);
554        }
555
556        // Otherwise, wait for newly detected nullifiers and check whether they're the requested
557        // one.
558        loop {
559            let new_nullifier = rx.recv().await.context("change subscriber failed")?;
560
561            if new_nullifier == nullifier {
562                return Ok(true);
563            }
564        }
565    }
566
567    /// The last block height we've scanned to, if any.
568    pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
569        // Check if we have uncommitted blocks beyond the database height.
570        if let Some(height) = *self.uncommitted_height.lock() {
571            return Ok(Some(height.get()));
572        }
573
574        let pool = self.pool.clone();
575
576        spawn_blocking(move || {
577            let height: Option<i64> = pool
578                .get()?
579                .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
580                .query_row([], |row| row.get::<_, Option<i64>>(0))?;
581
582            anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
583        })
584        .await?
585    }
586
587    pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
588        let pool = self.pool.clone();
589
590        spawn_blocking(move || {
591            let params_bytes = pool
592                .get()?
593                .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
594                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
595                .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
596
597            AppParameters::decode(params_bytes.as_slice())
598        })
599        .await?
600    }
601
602    pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
603        let pool = self.pool.clone();
604
605        spawn_blocking(move || {
606            let bytes = pool
607                .get()?
608                .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
609                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
610                .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
611
612            GasPrices::decode(bytes.as_slice())
613        })
614        .await?
615    }
616
617    pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
618        let pool = self.pool.clone();
619
620        spawn_blocking(move || {
621            let bytes = pool
622                .get()?
623                .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
624                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
625                .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
626
627            fmd::Parameters::decode(bytes.as_slice())
628        })
629        .await?
630    }
631
632    pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
633        let pool = self.pool.clone();
634
635        spawn_blocking(move || {
636            let bytes = pool
637                .get()?
638                .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
639                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
640                .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
641
642            FullViewingKey::decode(bytes.as_slice())
643        })
644        .await?
645    }
646
647    pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
648        let pool = self.pool.clone();
649        spawn_blocking(move || {
650            tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
651        })
652        .await?
653    }
654
655    /// Returns a tuple of (block height, transaction hash) for all transactions in a given range of block heights.
656    pub async fn transaction_hashes(
657        &self,
658        start_height: Option<u64>,
659        end_height: Option<u64>,
660    ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
661        let starting_block = start_height.unwrap_or(0) as i64;
662        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
663
664        let pool = self.pool.clone();
665
666        spawn_blocking(move || {
667            pool.get()?
668                .prepare_cached(
669                    "SELECT block_height, tx_hash
670                    FROM tx
671                    WHERE block_height BETWEEN ?1 AND ?2",
672                )?
673                .query_and_then([starting_block, ending_block], |row| {
674                    let block_height: u64 = row.get("block_height")?;
675                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
676                    anyhow::Ok((block_height, tx_hash))
677                })?
678                .collect()
679        })
680        .await?
681    }
682
683    /// Returns a tuple of (block height, transaction hash, transaction) for all transactions in a given range of block heights.
684    pub async fn transactions(
685        &self,
686        start_height: Option<u64>,
687        end_height: Option<u64>,
688    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
689        let starting_block = start_height.unwrap_or(0) as i64;
690        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
691
692        let pool = self.pool.clone();
693
694        spawn_blocking(move || {
695            pool.get()?
696                .prepare_cached(
697                    "SELECT block_height, tx_hash, tx_bytes
698                    FROM tx
699                    WHERE block_height BETWEEN ?1 AND ?2",
700                )?
701                .query_and_then([starting_block, ending_block], |row| {
702                    let block_height: u64 = row.get("block_height")?;
703                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
704                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
705                    let tx = Transaction::decode(tx_bytes.as_slice())?;
706                    anyhow::Ok((block_height, tx_hash, tx))
707                })?
708                .collect()
709        })
710        .await?
711    }
712
713    pub async fn transaction_by_hash(
714        &self,
715        tx_hash: &[u8],
716    ) -> anyhow::Result<Option<(u64, Transaction)>> {
717        let pool = self.pool.clone();
718        let tx_hash = tx_hash.to_vec();
719
720        spawn_blocking(move || {
721            if let Some((block_height, tx_bytes)) = pool
722                .get()?
723                .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
724                .query_row([tx_hash], |row| {
725                    let block_height: u64 = row.get("block_height")?;
726                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
727                    Ok((block_height, tx_bytes))
728                })
729                .optional()?
730            {
731                let tx = Transaction::decode(tx_bytes.as_slice())?;
732                Ok(Some((block_height, tx)))
733            } else {
734                Ok(None)
735            }
736        })
737        .await?
738    }
739
740    // Query for a note by its note commitment, optionally waiting until the note is detected.
741    pub async fn note_by_nullifier(
742        &self,
743        nullifier: Nullifier,
744        await_detection: bool,
745    ) -> anyhow::Result<SpendableNoteRecord> {
746        // Start subscribing now, before querying for whether we already
747        // have the record, so that we can't miss it if we race a write.
748        let mut rx = self.scanned_notes_tx.subscribe();
749
750        // Clone the pool handle so that the returned future is 'static
751        let pool = self.pool.clone();
752
753        let nullifier_bytes = nullifier.to_bytes().to_vec();
754
755        if let Some(record) = spawn_blocking(move || {
756            let record = pool
757                .get()?
758                .prepare(&format!(
759                    "SELECT
760                        notes.note_commitment,
761                        spendable_notes.height_created,
762                        notes.address,
763                        notes.amount,
764                        notes.asset_id,
765                        notes.rseed,
766                        spendable_notes.address_index,
767                        spendable_notes.source,
768                        spendable_notes.height_spent,
769                        spendable_notes.nullifier,
770                        spendable_notes.position,
771                        tx.return_address
772                    FROM notes
773                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
774                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
775                    WHERE hex(spendable_notes.nullifier) = \"{}\"",
776                    hex::encode_upper(nullifier_bytes)
777                ))?
778                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
779                .next()
780                .transpose()?;
781
782            anyhow::Ok(record)
783        })
784        .await??
785        {
786            return Ok(record);
787        }
788
789        if !await_detection {
790            anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
791        }
792
793        // Otherwise, wait for newly detected notes and check whether they're
794        // the requested one.
795
796        loop {
797            match rx.recv().await {
798                Ok(record) => {
799                    if record.nullifier == nullifier {
800                        return Ok(record);
801                    }
802                }
803
804                Err(e) => match e {
805                    RecvError::Closed => {
806                        anyhow::bail!(
807                            "Receiver error during note detection: closed (no more active senders)"
808                        );
809                    }
810                    RecvError::Lagged(count) => {
811                        anyhow::bail!(
812                            "Receiver error during note detection: lagged (by {:?} messages)",
813                            count
814                        );
815                    }
816                },
817            };
818        }
819    }
820
821    pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
822        let pool = self.pool.clone();
823
824        spawn_blocking(move || {
825            pool.get()?
826                .prepare_cached("SELECT metadata FROM assets")?
827                .query_and_then([], |row| {
828                    let metadata_json = row.get::<_, String>("metadata")?;
829                    let denom_metadata = serde_json::from_str(&metadata_json)?;
830
831                    anyhow::Ok(denom_metadata)
832                })?
833                .collect()
834        })
835        .await?
836    }
837
838    pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
839        let id = id.to_bytes().to_vec();
840
841        let pool = self.pool.clone();
842
843        spawn_blocking(move || {
844            pool.get()?
845                .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
846                .query_and_then([id], |row| {
847                    let metadata_json = row.get::<_, String>("metadata")?;
848                    let denom_metadata = serde_json::from_str(&metadata_json)?;
849                    anyhow::Ok(denom_metadata)
850                })?
851                .next()
852                .transpose()
853        })
854        .await?
855    }
856
857    // Get assets whose denoms match the given SQL LIKE pattern, with the `_` and `%` wildcards,
858    // where `\` is the escape character.
859    pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
860        let pattern = pattern.to_owned();
861
862        let pool = self.pool.clone();
863
864        spawn_blocking(move || {
865            pool.get()?
866                .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
867                .query_and_then([pattern], |row| {
868                    let metadata_json = row.get::<_, String>("metadata")?;
869                    let denom_metadata = serde_json::from_str(&metadata_json)?;
870                    anyhow::Ok(denom_metadata)
871                })?
872                .collect()
873        })
874        .await?
875    }
876
877    pub async fn notes(
878        &self,
879        include_spent: bool,
880        asset_id: Option<asset::Id>,
881        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
882        amount_to_spend: Option<Amount>,
883    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
884        // If set, return spent notes as well as unspent notes.
885        // bool include_spent = 2;
886        let spent_clause = match include_spent {
887            false => "NULL",
888            true => "height_spent",
889        };
890
891        // If set, only return notes with the specified asset id.
892        // core.crypto.v1.AssetId asset_id = 3;
893        let asset_clause = asset_id
894            .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
895            .unwrap_or_else(|| "asset_id".to_string());
896
897        // If set, only return notes with the specified address index.
898        // crypto.AddressIndex address_index = 4;
899        // This isn't what we want any more, we need to be indexing notes
900        // by *account*, not just by address index.
901        // For now, just do filtering in software.
902        /*
903        let address_clause = address_index
904            .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
905            .unwrap_or_else(|| "address_index".to_string());
906         */
907        let address_clause = "address_index".to_string();
908
909        // If set, stop returning notes once the total exceeds this amount.
910        //
911        // Ignored if `asset_id` is unset or if `include_spent` is set.
912        // uint64 amount_to_spend = 5;
913        //TODO: figure out a clever way to only return notes up to the sum using SQL
914        let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
915        let mut amount_total = Amount::zero();
916
917        let pool = self.pool.clone();
918
919        spawn_blocking(move || {
920            let mut output: Vec<SpendableNoteRecord> = Vec::new();
921
922            for result in pool
923                .get()?
924                .prepare(&format!(
925                    "SELECT notes.note_commitment,
926                        spendable_notes.height_created,
927                        notes.address,
928                        notes.amount,
929                        notes.asset_id,
930                        notes.rseed,
931                        spendable_notes.address_index,
932                        spendable_notes.source,
933                        spendable_notes.height_spent,
934                        spendable_notes.nullifier,
935                        spendable_notes.position,
936                        tx.return_address
937                FROM notes
938                JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
939                LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
940                WHERE spendable_notes.height_spent IS {spent_clause}
941                AND notes.asset_id IS {asset_clause}
942                AND spendable_notes.address_index IS {address_clause}"
943                ))?
944                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
945            {
946                let record = result?;
947
948                // Skip notes that don't match the account, since we're
949                // not doing account filtering in SQL as a temporary hack (see above)
950                if let Some(address_index) = address_index {
951                    if record.address_index.account != address_index.account {
952                        continue;
953                    }
954                }
955                let amount = record.note.amount();
956
957                // Only display notes of value > 0
958
959                if amount.value() > 0 {
960                    output.push(record);
961                }
962
963                // If we're tracking amounts, accumulate the value of the note
964                // and check if we should break out of the loop.
965                if amount_cutoff {
966                    // We know all the notes are of the same type, so adding raw quantities makes sense.
967                    amount_total += amount;
968                    if amount_total >= amount_to_spend.unwrap_or_default() {
969                        break;
970                    }
971                }
972            }
973
974            if amount_total < amount_to_spend.unwrap_or_default() {
975                anyhow::bail!(
976                    "requested amount of {} exceeds total of {}",
977                    amount_to_spend.unwrap_or_default(),
978                    amount_total
979                );
980            }
981
982            anyhow::Ok(output)
983        })
984        .await?
985    }
986
987    pub async fn notes_for_voting(
988        &self,
989        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
990        votable_at_height: u64,
991    ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
992        // If set, only return notes with the specified address index.
993        // crypto.AddressIndex address_index = 3;
994        let address_clause = address_index
995            .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
996            .unwrap_or_else(|| "address_index".to_string());
997
998        let pool = self.pool.clone();
999
1000        spawn_blocking(move || {
1001            let mut lock = pool.get()?;
1002            let dbtx = lock.transaction()?;
1003
1004            let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1005                .prepare(&format!(
1006                    "SELECT notes.note_commitment,
1007                        spendable_notes.height_created,
1008                        notes.address,
1009                        notes.amount,
1010                        notes.asset_id,
1011                        notes.rseed,
1012                        spendable_notes.address_index,
1013                        spendable_notes.source,
1014                        spendable_notes.height_spent,
1015                        spendable_notes.nullifier,
1016                        spendable_notes.position
1017                    FROM
1018                        notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1019                    WHERE
1020                        spendable_notes.address_index IS {address_clause}
1021                        AND notes.asset_id IN (
1022                            SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1023                        )
1024                        AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1025                        AND (spendable_notes.height_created < {votable_at_height})
1026                    ",
1027                ))?
1028                .query_and_then((), |row| row.try_into())?
1029                .collect::<anyhow::Result<Vec<_>>>()?;
1030
1031            // TODO: this could be internalized into the SQL query in principle, but it's easier to
1032            // do it this way; if it becomes slow, we can do it better
1033            let mut results = Vec::new();
1034            for record in spendable_note_records {
1035                let asset_id = record.note.asset_id().to_bytes().to_vec();
1036                let denom: String = dbtx.query_row_and_then(
1037                    "SELECT denom FROM assets WHERE asset_id = ?1",
1038                    [asset_id],
1039                    |row| row.get("denom"),
1040                )?;
1041
1042                let identity_key = DelegationToken::from_str(&denom)
1043                    .context("invalid delegation token denom")?
1044                    .validator();
1045
1046                results.push((record, identity_key));
1047            }
1048
1049            Ok(results)
1050        }).await?
1051    }
1052
1053    #[tracing::instrument(skip(self))]
1054    pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1055        tracing::debug!(?asset);
1056
1057        let asset_id = asset.id().to_bytes().to_vec();
1058        let denom = asset.base_denom().denom;
1059        let metadata_json = serde_json::to_string(&asset)?;
1060
1061        let pool = self.pool.clone();
1062
1063        spawn_blocking(move || {
1064            pool.get()?
1065                .execute(
1066                    "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1067                    (asset_id, denom, metadata_json),
1068                )
1069                .map_err(anyhow::Error::from)
1070        })
1071        .await??;
1072
1073        Ok(())
1074    }
1075
1076    pub async fn record_auction_with_state(
1077        &self,
1078        auction_id: AuctionId,
1079        auction_state: u64,
1080    ) -> anyhow::Result<()> {
1081        let auction_id = auction_id.0.to_vec();
1082
1083        let pool = self.pool.clone();
1084
1085        spawn_blocking(move || {
1086            let mut lock = pool.get()?;
1087            let tx = lock.transaction()?;
1088            tx.execute(
1089                "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1090                (auction_id.clone(), auction_state),
1091            )?;
1092            tx.execute(
1093                "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1094                (auction_id, auction_state),
1095            )
1096                .map_err(anyhow::Error::from)?;
1097
1098            tx.commit()?;
1099            Ok::<(), anyhow::Error>(())
1100            })
1101            .await??;
1102
1103        Ok(())
1104    }
1105
1106    pub async fn update_auction_with_note_commitment(
1107        &self,
1108        auction_id: AuctionId,
1109        note_commitment: StateCommitment,
1110    ) -> anyhow::Result<()> {
1111        let auction_id = auction_id.0.to_vec();
1112        let blob_nc = note_commitment.0.to_bytes().to_vec();
1113
1114        let pool = self.pool.clone();
1115
1116        spawn_blocking(move || {
1117            pool.get()?
1118                .execute(
1119                    "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1120                    (blob_nc, auction_id),
1121                )
1122                .map_err(anyhow::Error::from)
1123        })
1124        .await??;
1125
1126        Ok(())
1127    }
1128
1129    pub async fn fetch_auctions_by_account(
1130        &self,
1131        account_filter: Option<AddressIndex>,
1132        include_inactive: bool,
1133    ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 /* local seqnum */)>> {
1134        let account_clause = account_filter
1135            .map(|idx| {
1136                format!(
1137                    "AND spendable_notes.address_index = x'{}'",
1138                    hex::encode(idx.to_bytes())
1139                )
1140            })
1141            .unwrap_or_else(|| "".to_string());
1142
1143        let active_clause = if !include_inactive {
1144            "AND auctions.auction_state = 0"
1145        } else {
1146            ""
1147        };
1148
1149        let query = format!(
1150            "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1151                 FROM auctions
1152                 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1153                 JOIN notes ON auctions.note_commitment = notes.note_commitment
1154                 WHERE 1 = 1
1155                 {account_clause}
1156                 {active_clause}",
1157            account_clause = account_clause,
1158            active_clause = active_clause,
1159        );
1160
1161        let pool = self.pool.clone();
1162
1163        spawn_blocking(move || {
1164            let mut conn = pool.get()?;
1165            let tx = conn.transaction()?;
1166
1167            let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1168                .prepare(&query)?
1169                .query_and_then((), |row| {
1170                    let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1171                    let array_auction_id: [u8; 32] = raw_auction_id
1172                        .try_into()
1173                        .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1174                    let auction_id = AuctionId(array_auction_id);
1175                    let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1176                    let local_seq: u64 = row.get("auction_state")?;
1177                    Ok((auction_id, spendable_note_record, local_seq))
1178                })?
1179                .collect::<anyhow::Result<Vec<_>>>()?;
1180
1181            Ok(spendable_note_records)
1182        })
1183        .await?
1184    }
1185
1186    pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1187        let position_id = position.id().0.to_vec();
1188
1189        let position_state = position.state.to_string();
1190        let trading_pair = position.phi.pair.to_string();
1191
1192        let pool = self.pool.clone();
1193
1194        spawn_blocking(move || {
1195            pool.get()?
1196                .execute(
1197                    "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1198                    (position_id, position_state, trading_pair),
1199                )
1200                .map_err(anyhow::Error::from)
1201        })
1202            .await??;
1203
1204        Ok(())
1205    }
1206
1207    pub async fn update_position(
1208        &self,
1209        position_id: position::Id,
1210        position_state: position::State,
1211    ) -> anyhow::Result<()> {
1212        let position_id = position_id.0.to_vec();
1213        let position_state = position_state.to_string();
1214
1215        let pool = self.pool.clone();
1216
1217        spawn_blocking(move || {
1218            pool.get()?
1219                .execute(
1220                    "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1221                    (position_state, position_id),
1222                )
1223                .map_err(anyhow::Error::from)
1224        })
1225        .await??;
1226
1227        Ok(())
1228    }
1229
1230    pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1231        // Check that the incoming block height follows the latest recorded height
1232        let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1233            anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1234        })?;
1235
1236        if height != last_sync_height + 1 {
1237            anyhow::bail!(
1238                "Wrong block height {} for latest sync height {}",
1239                height,
1240                last_sync_height
1241            );
1242        }
1243
1244        *self.uncommitted_height.lock() = Some(height.try_into()?);
1245        Ok(())
1246    }
1247
1248    fn record_note_inner(
1249        dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1250        note: &Note,
1251    ) -> anyhow::Result<()> {
1252        let note_commitment = note.commit().0.to_bytes().to_vec();
1253        let address = note.address().to_vec();
1254        let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1255        let asset_id = note.asset_id().to_bytes().to_vec();
1256        let rseed = note.rseed().to_bytes().to_vec();
1257
1258        dbtx.execute(
1259            "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1260                VALUES (?1, ?2, ?3, ?4, ?5)
1261                ON CONFLICT (note_commitment)
1262                DO UPDATE SET
1263                address = excluded.address,
1264                amount = excluded.amount,
1265                asset_id = excluded.asset_id,
1266                rseed = excluded.rseed",
1267            (note_commitment, address, amount, asset_id, rseed),
1268        )?;
1269
1270        Ok(())
1271    }
1272
1273    pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1274        let pool = self.pool.clone();
1275        let mut lock = pool.get()?;
1276        let dbtx = lock.transaction()?;
1277
1278        Storage::record_note_inner(&dbtx, &note)?;
1279
1280        dbtx.commit()?;
1281
1282        Ok(())
1283    }
1284
1285    /// Return advice about note contents for use in scanning.
1286    ///
1287    /// Given a list of note commitments, this method checks whether any of them
1288    /// correspond to notes that have been recorded in the database but not yet
1289    /// observed during scanning.
1290    pub async fn scan_advice(
1291        &self,
1292        note_commitments: Vec<note::StateCommitment>,
1293    ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1294        if note_commitments.is_empty() {
1295            return Ok(BTreeMap::new());
1296        }
1297
1298        let pool = self.pool.clone();
1299
1300        // This query gives advice about notes which are known but which have not already been recorded as spendable,
1301        // in part to avoid revealing information about which notes have been spent.
1302
1303        spawn_blocking(move || {
1304            pool.get()?
1305                .prepare(&format!(
1306                    "SELECT notes.note_commitment,
1307                        notes.address,
1308                        notes.amount,
1309                        notes.asset_id,
1310                        notes.rseed
1311                    FROM notes
1312                    LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1313                    WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1314                    note_commitments
1315                        .iter()
1316                        .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1317                        .collect::<Vec<_>>()
1318                        .join(", ")
1319                ))?
1320                .query_and_then((), |row| {
1321                    let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1322                    let amount = row.get::<_, [u8; 16]>("amount")?;
1323                    let amount_u128: u128 = u128::from_be_bytes(amount);
1324                    let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1325                    let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1326                    let note = Note::from_parts(
1327                        address,
1328                        Value {
1329                            amount: amount_u128.into(),
1330                            asset_id,
1331                        },
1332                        rseed,
1333                    )?;
1334                    anyhow::Ok((note.commit(), note))
1335                })?
1336                .collect::<anyhow::Result<BTreeMap<_, _>>>()
1337        }).await?
1338    }
1339
1340    /// Filters for nullifiers whose notes we control
1341    pub async fn filter_nullifiers(
1342        &self,
1343        nullifiers: Vec<Nullifier>,
1344    ) -> anyhow::Result<Vec<Nullifier>> {
1345        if nullifiers.is_empty() {
1346            return Ok(Vec::new());
1347        }
1348
1349        let pool = self.pool.clone();
1350
1351        spawn_blocking(move || {
1352            pool.get()?
1353                .prepare(&format!(
1354                    "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1355                    nullifiers
1356                        .iter()
1357                        .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1358                        .collect::<Vec<String>>()
1359                        .join(",")
1360                ))?
1361                .query_and_then((), |row| {
1362                    let nullifier: Vec<u8> = row.get("nullifier")?;
1363                    nullifier.as_slice().try_into()
1364                })?
1365                .collect()
1366        })
1367            .await?
1368    }
1369
1370    pub async fn record_block(
1371        &self,
1372        filtered_block: FilteredBlock,
1373        transactions: Vec<Transaction>,
1374        sct: &mut tct::Tree,
1375        channel: tonic::transport::Channel,
1376    ) -> anyhow::Result<()> {
1377        //Check that the incoming block height follows the latest recorded height
1378        let last_sync_height = self.last_sync_height().await?;
1379
1380        let correct_height = match last_sync_height {
1381            // Require that the new block follows the last one we scanned.
1382            Some(cur_height) => filtered_block.height == cur_height + 1,
1383            // Require that the new block represents the initial chain state.
1384            None => filtered_block.height == 0,
1385        };
1386
1387        if !correct_height {
1388            anyhow::bail!(
1389                "Wrong block height {} for latest sync height {:?}",
1390                filtered_block.height,
1391                last_sync_height
1392            );
1393        }
1394
1395        let pool = self.pool.clone();
1396        let uncommitted_height = self.uncommitted_height.clone();
1397        let scanned_notes_tx = self.scanned_notes_tx.clone();
1398        let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1399        let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1400
1401        let fvk = self.full_viewing_key().await?;
1402
1403        // If the app parameters have changed, update them.
1404        let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1405            // Fetch the latest parameters
1406            let mut client = AppQueryServiceClient::new(channel);
1407            Some(
1408                client
1409                    .app_parameters(tonic::Request::new(AppParametersRequest {}))
1410                    .await?
1411                    .into_inner()
1412                    .try_into()?,
1413            )
1414        } else {
1415            None
1416        };
1417
1418        // Cloning the SCT is cheap because it's a copy-on-write structure, so we move an owned copy
1419        // into the spawned thread. This means that if for any reason the thread panics or throws an
1420        // error, the changes to the SCT will be discarded, just like any changes to the database,
1421        // so the two stay transactionally in sync, even in the case of errors. This would not be
1422        // the case if we `std::mem::take` the SCT and move it into the spawned thread, because then
1423        // an error would mean the updated version would never be put back, and the outcome would be
1424        // a cleared SCT but a non-empty database.
1425        let mut new_sct = sct.clone();
1426
1427        *sct = spawn_blocking(move || {
1428            let mut lock = pool.get()?;
1429            let mut dbtx = lock.transaction()?;
1430
1431            if let Some(params) = new_app_parameters {
1432                let params_bytes = params.encode_to_vec();
1433                // We expect app_params to be present already but may as well use an upsert
1434                dbtx.execute(
1435                    "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1436                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1437                    [&params_bytes[..]],
1438                )?;
1439            }
1440
1441            // Insert new note records into storage
1442            for note_record in filtered_block.new_notes.values() {
1443                let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1444                let height_created = filtered_block.height as i64;
1445                let address_index = note_record.address_index.to_bytes().to_vec();
1446                let nullifier = note_record.nullifier.to_bytes().to_vec();
1447                let position = (u64::from(note_record.position)) as i64;
1448                let source = note_record.source.encode_to_vec();
1449                // Check if the note is from a transaction, if so, include the tx hash (id)
1450                let tx_hash = match note_record.source {
1451                    CommitmentSource::Transaction { id } => id,
1452                    _ => None,
1453                };
1454
1455                // Record the inner note data in the notes table
1456                Storage::record_note_inner(&dbtx, &note_record.note)?;
1457
1458                dbtx.execute(
1459                    "INSERT INTO spendable_notes
1460                    (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1461                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1462                    ON CONFLICT (note_commitment)
1463                    DO UPDATE SET nullifier = excluded.nullifier,
1464                    position = excluded.position,
1465                    height_created = excluded.height_created,
1466                    address_index = excluded.address_index,
1467                    source = excluded.source,
1468                    height_spent = excluded.height_spent,
1469                    tx_hash = excluded.tx_hash",
1470                    (
1471                        &note_commitment,
1472                        &nullifier,
1473                        &position,
1474                        &height_created,
1475                        &address_index,
1476                        &source,
1477                        // height_spent is NULL because the note is newly discovered
1478                        &tx_hash,
1479                    ),
1480                )?;
1481            }
1482
1483            // Insert new swap records into storage
1484            for swap in filtered_block.new_swaps.values() {
1485                let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1486                let swap_bytes = swap.swap.encode_to_vec();
1487                let position = (u64::from(swap.position)) as i64;
1488                let nullifier = swap.nullifier.to_bytes().to_vec();
1489                let source = swap.source.encode_to_vec();
1490                let output_data = swap.output_data.encode_to_vec();
1491
1492                dbtx.execute(
1493                    "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1494                    VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1495                    ON CONFLICT (swap_commitment)
1496                    DO UPDATE SET swap = excluded.swap,
1497                    position = excluded.position,
1498                    nullifier = excluded.nullifier,
1499                    output_data = excluded.output_data,
1500                    height_claimed = excluded.height_claimed,
1501                    source = excluded.source",
1502                    (
1503                        &swap_commitment,
1504                        &swap_bytes,
1505                        &position,
1506                        &nullifier,
1507                        &output_data,
1508                        // height_claimed is NULL because the swap is newly discovered
1509                        &source,
1510                    ),
1511                )?;
1512            }
1513
1514            // Update any rows of the table with matching nullifiers to have height_spent
1515            for nullifier in &filtered_block.spent_nullifiers {
1516                let height_spent = filtered_block.height as i64;
1517                let nullifier_bytes = nullifier.to_bytes().to_vec();
1518
1519                let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1520                    "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1521                )?
1522                    .query_and_then(
1523                        (height_spent, &nullifier_bytes),
1524                        |row| {
1525                            let bytes: Vec<u8> = row.get("note_commitment")?;
1526                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1527                        },
1528                    )?
1529                    .next()
1530                    .transpose()?;
1531
1532                let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1533                    "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1534                )?
1535                    .query_and_then(
1536                        (height_spent, &nullifier_bytes),
1537                        |row| {
1538                            let bytes: Vec<u8> = row.get("swap_commitment")?;
1539                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1540                        },
1541                    )?
1542                    .next()
1543                    .transpose()?;
1544
1545                // Check denom type
1546                let spent_denom: String
1547                    = dbtx.prepare_cached(
1548                    "SELECT denom FROM assets
1549                        WHERE asset_id ==
1550                            (SELECT asset_id FROM notes
1551                             WHERE note_commitment ==
1552                                (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1553                )?
1554                    .query_and_then(
1555                        [&nullifier_bytes],
1556                        |row| row.get("denom"),
1557                    )?
1558                    .next()
1559                    .transpose()?
1560                    .unwrap_or("unknown".to_string());
1561
1562                // Mark spent notes as spent
1563                if let Some(spent_commitment) = spent_commitment {
1564                    tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1565                    // Forget spent note commitments from the SCT unless they are delegation tokens,
1566                    // which must be saved to allow voting on proposals that might or might not be
1567                    // open presently
1568
1569                    if DelegationToken::from_str(&spent_denom).is_err() {
1570                        tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1571                        new_sct.forget(spent_commitment);
1572                    }
1573                };
1574
1575                // Mark spent swaps as spent
1576                if let Some(spent_swap_commitment) = swap_commitment {
1577                    tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1578                    new_sct.forget(spent_swap_commitment);
1579                };
1580            }
1581
1582            // Update SCT table with current SCT state
1583            new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1584
1585            // Record all transactions
1586            for transaction in transactions {
1587                let tx_bytes = transaction.encode_to_vec();
1588                // We have to create an explicit temporary borrow, because the sqlx api is bad (see above)
1589                let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1590                let tx_hash = tx_hash_owned.as_slice();
1591                let tx_block_height = filtered_block.height as i64;
1592                let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1593                let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1594                let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1595
1596                tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1597
1598                dbtx.execute(
1599                    "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1600                    (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1601                )?;
1602
1603                // Associate all of the spent nullifiers with the transaction by hash.
1604                for nf in transaction.spent_nullifiers() {
1605                    let nf_bytes = nf.0.to_bytes().to_vec();
1606                    dbtx.execute(
1607                        "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1608                        (&nf_bytes, &tx_hash),
1609                    )?;
1610                }
1611            }
1612
1613            // Update FMD parameters if they've changed.
1614            if filtered_block.fmd_parameters.is_some() {
1615                let fmd_parameters_bytes =
1616                    &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1617
1618                dbtx.execute(
1619                    "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1620                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1621                    [&fmd_parameters_bytes],
1622                )?;
1623            }
1624
1625            // Update gas prices if they've changed.
1626            if filtered_block.gas_prices.is_some() {
1627                let gas_prices_bytes =
1628                    &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1629
1630                dbtx.execute(
1631                    "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1632                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1633                    [&gas_prices_bytes],
1634                )?;
1635            }
1636
1637            // Record block height as latest synced height
1638            let latest_sync_height = filtered_block.height as i64;
1639            dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1640
1641            // Commit the changes to the database
1642            dbtx.commit()?;
1643
1644            // IMPORTANT: NO PANICS OR ERRORS PAST THIS POINT
1645            // If there is a panic or error past this point, the database will be left in out of
1646            // sync with the in-memory copy of the SCT, which means that it will become corrupted as
1647            // synchronization continues.
1648
1649            // It's critical to reset the uncommitted height here, since we've just
1650            // invalidated it by committing.
1651            uncommitted_height.lock().take();
1652
1653            // Broadcast all committed note records to channel
1654            // Done following tx.commit() to avoid notifying of a new SpendableNoteRecord before it is actually committed to the database
1655
1656            for note_record in filtered_block.new_notes.values() {
1657                // This will fail to be broadcast if there is no active receiver (such as on initial
1658                // sync) The error is ignored, as this isn't a problem, because if there is no
1659                // active receiver there is nothing to do
1660                let _ = scanned_notes_tx.send(note_record.clone());
1661            }
1662
1663            for nullifier in filtered_block.spent_nullifiers.iter() {
1664                // This will fail to be broadcast if there is no active receiver (such as on initial
1665                // sync) The error is ignored, as this isn't a problem, because if there is no
1666                // active receiver there is nothing to do
1667                let _ = scanned_nullifiers_tx.send(*nullifier);
1668            }
1669
1670            for swap_record in filtered_block.new_swaps.values() {
1671                // This will fail to be broadcast if there is no active receāˆ‘iver (such as on initial
1672                // sync) The error is ignored, as this isn't a problem, because if there is no
1673                // active receiver there is nothing to do
1674                let _ = scanned_swaps_tx.send(swap_record.clone());
1675            }
1676
1677            anyhow::Ok(new_sct)
1678        })
1679            .await??;
1680
1681        Ok(())
1682    }
1683
1684    pub async fn owned_position_ids(
1685        &self,
1686        position_state: Option<State>,
1687        trading_pair: Option<TradingPair>,
1688    ) -> anyhow::Result<Vec<position::Id>> {
1689        let pool = self.pool.clone();
1690
1691        let state_clause = match position_state {
1692            Some(state) => format!("position_state = \"{}\"", state),
1693            None => "".to_string(),
1694        };
1695
1696        let pair_clause = match trading_pair {
1697            Some(pair) => format!("trading_pair = \"{}\"", pair),
1698            None => "".to_string(),
1699        };
1700
1701        spawn_blocking(move || {
1702            let mut q = "SELECT position_id FROM positions".to_string();
1703            match (position_state.is_some(), trading_pair.is_some()) {
1704                (true, true) => {
1705                    q = q + " WHERE " + &state_clause + " AND " + &pair_clause;
1706                }
1707                (true, false) => {
1708                    q = q + " WHERE " + &state_clause;
1709                }
1710                (false, true) => {
1711                    q = q + " WHERE " + &pair_clause;
1712                }
1713                (false, false) => (),
1714            };
1715
1716            pool.get()?
1717                .prepare_cached(&q)?
1718                .query_and_then([], |row| {
1719                    let position_id: Vec<u8> = row.get("position_id")?;
1720                    Ok(position::Id(position_id.as_slice().try_into()?))
1721                })?
1722                .collect()
1723        })
1724        .await?
1725    }
1726
1727    pub async fn notes_by_sender(
1728        &self,
1729        return_address: &Address,
1730    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1731        let pool = self.pool.clone();
1732
1733        let query = "SELECT notes.note_commitment,
1734            spendable_notes.height_created,
1735            notes.address,
1736            notes.amount,
1737            notes.asset_id,
1738            notes.rseed,
1739            spendable_notes.address_index,
1740            spendable_notes.source,
1741            spendable_notes.height_spent,
1742            spendable_notes.nullifier,
1743            spendable_notes.position
1744            FROM notes
1745            JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1746            JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1747            WHERE tx.return_address = ?1";
1748
1749        let return_address = return_address.to_vec();
1750
1751        let records = spawn_blocking(move || {
1752            pool.get()?
1753                .prepare(query)?
1754                .query_and_then([return_address], |record| record.try_into())?
1755                .collect::<anyhow::Result<Vec<_>>>()
1756        })
1757        .await??;
1758
1759        Ok(records)
1760    }
1761
1762    /// Get all transactions with a matching memo text. The `pattern` argument
1763    /// should include SQL wildcards, such as `%` and `_`, to match substrings,
1764    /// e.g. `%foo%`.
1765    pub async fn transactions_matching_memo(
1766        &self,
1767        pattern: String,
1768    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1769        let pattern = pattern.to_owned();
1770        tracing::trace!(?pattern, "searching for memos matching");
1771        let pool = self.pool.clone();
1772
1773        spawn_blocking(move || {
1774            pool.get()?
1775                .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1776                .query_and_then([pattern], |row| {
1777                    let block_height: u64 = row.get("block_height")?;
1778                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
1779                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1780                    let tx = Transaction::decode(tx_bytes.as_slice())?;
1781                    let memo_text: String = row.get("memo_text")?;
1782                    anyhow::Ok((block_height, tx_hash, tx, memo_text))
1783                })?
1784                .collect()
1785        })
1786        .await?
1787    }
1788}