penumbra_sdk_view/
storage.rs

1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10    rusqlite::{OpenFlags, OptionalExtension},
11    SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16    sync::broadcast::{self, error::RecvError},
17    task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25    lp::position::{self, Position, State},
26    TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32    core::app::v1::{
33        query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34    },
35    DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct::{self as tct, builder::epoch::Root};
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51    pub id: Id,
52    pub amount: u128,
53    pub address_index: AddressIndex,
54}
55
56/// The hash of the schema for the database.
57static SCHEMA_HASH: Lazy<String> =
58    Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62    pool: r2d2::Pool<SqliteConnectionManager>,
63
64    /// This allows an optimization where we only commit to the database after
65    /// scanning a nonempty block.
66    ///
67    /// If this is `Some`, we have uncommitted empty blocks up to the inner height.
68    /// If this is `None`, we don't.
69    ///
70    /// Using a `NonZeroU64` ensures that `Option<NonZeroU64>` fits in 8 bytes.
71    uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73    scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74    scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75    scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79    /// If the database at `storage_path` exists, [`Self::load`] it, otherwise, [`Self::initialize`] it.
80    #[tracing::instrument(
81        skip_all,
82        fields(
83            path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84            url = %node,
85        )
86    )]
87    pub async fn load_or_initialize(
88        storage_path: Option<impl AsRef<Utf8Path>>,
89        fvk: &FullViewingKey,
90        node: Url,
91    ) -> anyhow::Result<Self> {
92        if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93            if path.exists() {
94                tracing::debug!(?path, "database exists");
95                return Self::load(path).await;
96            } else {
97                tracing::debug!(?path, "database does not exist");
98            }
99        };
100
101        let mut client = AppQueryServiceClient::connect(node.to_string())
102            .instrument(error_span!("connecting_to_endpoint"))
103            .await
104            .tap_err(|error| {
105                tracing::error!(?error, "failed to connect to app query service endpoint")
106            })?
107            .tap(|_| tracing::debug!("connected to app query service endpoint"));
108        let params = client
109            .app_parameters(tonic::Request::new(AppParametersRequest {}))
110            .instrument(error_span!("getting_app_parameters"))
111            .await?
112            .into_inner()
113            .try_into()?;
114
115        Self::initialize(storage_path, fvk.clone(), params).await
116    }
117
118    fn connect(
119        path: Option<impl AsRef<Utf8Path>>,
120    ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121        if let Some(path) = path {
122            let manager = SqliteConnectionManager::file(path.as_ref())
123                .with_flags(
124                    // Don't allow opening URIs, because they can change the behavior of the database; we
125                    // just want to open normal filepaths.
126                    OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127                )
128                .with_init(|conn| {
129                    // "NORMAL" will be consistent, but maybe not durable -- this is fine,
130                    // since all our data is being synced from the chain, so if we lose a dbtx,
131                    // it's like we're resuming sync from a previous height.
132                    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133                    // We use `prepare_cached` a fair amount: this is an overestimate of the number
134                    // of cached prepared statements likely to be used.
135                    conn.set_prepared_statement_cache_capacity(32);
136                    Ok(())
137                });
138            Ok(r2d2::Pool::builder()
139                // We set max_size=1 to avoid "database is locked" sqlite errors,
140                // when accessing across multiple threads.
141                .max_size(1)
142                .build(manager)?)
143        } else {
144            let manager = SqliteConnectionManager::memory();
145            // Max size needs to be set to 1, otherwise a new in-memory database is created for each
146            // connection to the pool, which results in very confusing errors.
147            //
148            // Lifetimes and timeouts are likewise configured to their maximum values, since
149            // the in-memory database will disappear on connection close.
150            Ok(r2d2::Pool::builder()
151                .max_size(1)
152                .min_idle(Some(1))
153                .max_lifetime(Some(Duration::MAX))
154                .idle_timeout(Some(Duration::MAX))
155                .build(manager)?)
156        }
157    }
158
159    pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160        let storage = Self {
161            pool: Self::connect(Some(path))?,
162            uncommitted_height: Arc::new(Mutex::new(None)),
163            scanned_notes_tx: broadcast::channel(128).0,
164            scanned_nullifiers_tx: broadcast::channel(512).0,
165            scanned_swaps_tx: broadcast::channel(128).0,
166        };
167
168        spawn_blocking(move || {
169            // Check the version of the software used when first initializing this database.
170            // If it doesn't match the current version, we should report the error to the user.
171            let actual_schema_hash: String = storage
172                .pool
173                .get()?
174                .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175                    row.get("schema_hash")
176                })
177                .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179            if actual_schema_hash != *SCHEMA_HASH {
180                let database_client_version: String = storage
181                    .pool
182                    .get()?
183                    .query_row("SELECT client_version FROM client_version", (), |row| {
184                        row.get("client_version")
185                    })
186                    .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188                anyhow::bail!(
189                    "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190                    database_client_version,
191                    env!("CARGO_PKG_VERSION"),
192                );
193            }
194
195            Ok(storage)
196        })
197            .await?
198    }
199
200    pub async fn initialize(
201        storage_path: Option<impl AsRef<Utf8Path>>,
202        fvk: FullViewingKey,
203        params: AppParameters,
204    ) -> anyhow::Result<Self> {
205        tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207        // Connect to the database (or create it)
208        let pool = Self::connect(storage_path)?;
209
210        let out = spawn_blocking(move || {
211            // In one database transaction, populate everything
212            let mut conn = pool.get()?;
213            let tx = conn.transaction()?;
214
215            // Create the tables
216            tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218            let params_bytes = params.encode_to_vec();
219            tx.execute(
220                "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221                [&params_bytes[..]],
222            )?;
223
224            let fvk_bytes = fvk.encode_to_vec();
225            tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227            // Insert -1 as a signaling value for pre-genesis.
228            // We just have to be careful to treat negative values as None
229            // in last_sync_height.
230            tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232            // Insert the schema hash into the database
233            tx.execute(
234                "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235                [&*SCHEMA_HASH],
236            )?;
237
238            // Insert the client version into the database
239            tx.execute(
240                "INSERT INTO client_version (client_version) VALUES (?1)",
241                [env!("CARGO_PKG_VERSION")],
242            )?;
243
244            tx.commit()?;
245            drop(conn);
246
247            anyhow::Ok(Storage {
248                pool,
249                uncommitted_height: Arc::new(Mutex::new(None)),
250                scanned_notes_tx: broadcast::channel(128).0,
251                scanned_nullifiers_tx: broadcast::channel(512).0,
252                scanned_swaps_tx: broadcast::channel(128).0,
253            })
254        })
255        .await??;
256
257        out.update_epoch(0, None, Some(0)).await?;
258
259        Ok(out)
260    }
261
262    /// Loads asset metadata from a JSON file and use to update the database.
263    pub async fn load_asset_metadata(
264        &self,
265        registry_path: impl AsRef<Utf8Path>,
266    ) -> anyhow::Result<()> {
267        tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
268        let registry_path = registry_path.as_ref();
269        // Parse into a serde_json::Value first so we can get the bits we care about
270        let mut registry_json: serde_json::Value = serde_json::from_str(
271            std::fs::read_to_string(registry_path)
272                .context("failed to read file")?
273                .as_str(),
274        )
275        .context("failed to parse JSON")?;
276
277        let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
278            registry_json
279                .get_mut("assetById")
280                .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
281                .take(),
282        )
283        .context("could not parse asset registry")?;
284
285        for metadata in registry.into_values() {
286            self.record_asset(metadata).await?;
287        }
288
289        Ok(())
290    }
291
292    /// Query for account balance by address
293    pub async fn balances(
294        &self,
295        address_index: Option<AddressIndex>,
296        asset_id: Option<asset::Id>,
297    ) -> anyhow::Result<Vec<BalanceEntry>> {
298        let pool = self.pool.clone();
299
300        spawn_blocking(move || {
301            let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
302                FROM    notes
303                JOIN    spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
304                WHERE   spendable_notes.height_spent IS NULL";
305
306            tracing::debug!(?query);
307
308            // Combine notes of the same asset/address index together
309            let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
310
311            for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
312                let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
313                let amount = row.get::<&str, Vec<u8>>("amount")?;
314                let address_index = row.get::<&str, Vec<u8>>("address_index")?;
315
316                Ok((asset_id, amount, address_index))
317            })? {
318                let (id, amount, index) = result?;
319
320                let id = Id::try_from(id.as_slice())?;
321
322                let amount: Amount = Amount::from_be_bytes(
323                    amount
324                        .as_slice()
325                        .try_into()
326                        .expect("amount slice of incorrect length"),
327                );
328
329                let index = AddressIndex::try_from(index.as_slice())?;
330
331                // Skip this entry if not captured by address index filter
332                if let Some(address_index) = address_index {
333                    if address_index != index {
334                        continue;
335                    }
336                }
337                if let Some(asset_id) = asset_id {
338                    if asset_id != id {
339                        continue;
340                    }
341                }
342
343                balances
344                    .entry(index)
345                    .or_insert_with(BTreeMap::new)
346                    .entry(id)
347                    .and_modify(|e| *e += amount)
348                    .or_insert(amount);
349            }
350
351            let entries = balances
352                .into_iter()
353                .flat_map(|(index, assets)| {
354                    assets.into_iter().map(move |(id, amount)| BalanceEntry {
355                        id,
356                        amount: amount.into(),
357                        address_index: index,
358                    })
359                })
360                .collect::<Vec<_>>();
361            Ok(entries)
362        })
363        .await?
364    }
365
366    /// Query for a note by its note commitment, optionally waiting until the note is detected.
367    pub async fn note_by_commitment(
368        &self,
369        note_commitment: tct::StateCommitment,
370        await_detection: bool,
371    ) -> anyhow::Result<SpendableNoteRecord> {
372        // Start subscribing now, before querying for whether we already
373        // have the record, so that we can't miss it if we race a write.
374        let mut rx = self.scanned_notes_tx.subscribe();
375
376        let pool = self.pool.clone();
377
378        if let Some(record) = spawn_blocking(move || {
379            // Check if we already have the record
380            pool.get()?
381                .prepare(&format!(
382                    "SELECT
383                        notes.note_commitment,
384                        spendable_notes.height_created,
385                        notes.address,
386                        notes.amount,
387                        notes.asset_id,
388                        notes.rseed,
389                        spendable_notes.address_index,
390                        spendable_notes.source,
391                        spendable_notes.height_spent,
392                        spendable_notes.nullifier,
393                        spendable_notes.position,
394                        tx.return_address
395                    FROM notes
396                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
397                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
398                    WHERE notes.note_commitment = x'{}'",
399                    hex::encode(note_commitment.0.to_bytes())
400                ))?
401                .query_and_then((), |record| record.try_into())?
402                .next()
403                .transpose()
404        })
405        .await??
406        {
407            return Ok(record);
408        }
409
410        if !await_detection {
411            anyhow::bail!("Note commitment {} not found", note_commitment);
412        }
413
414        // Otherwise, wait for newly detected notes and check whether they're
415        // the requested one.
416
417        loop {
418            match rx.recv().await {
419                Ok(record) => {
420                    if record.note_commitment == note_commitment {
421                        return Ok(record);
422                    }
423                }
424
425                Err(e) => match e {
426                    RecvError::Closed => {
427                        anyhow::bail!(
428                            "Receiver error during note detection: closed (no more active senders)"
429                        );
430                    }
431                    RecvError::Lagged(count) => {
432                        anyhow::bail!(
433                            "Receiver error during note detection: lagged (by {:?} messages)",
434                            count
435                        );
436                    }
437                },
438            };
439        }
440    }
441
442    /// Query for a swap by its swap commitment, optionally waiting until the note is detected.
443    pub async fn swap_by_commitment(
444        &self,
445        swap_commitment: tct::StateCommitment,
446        await_detection: bool,
447    ) -> anyhow::Result<SwapRecord> {
448        // Start subscribing now, before querying for whether we already
449        // have the record, so that we can't miss it if we race a write.
450        let mut rx = self.scanned_swaps_tx.subscribe();
451
452        let pool = self.pool.clone();
453
454        if let Some(record) = spawn_blocking(move || {
455            // Check if we already have the swap record
456            pool.get()?
457                .prepare(&format!(
458                    "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
459                    hex::encode(swap_commitment.0.to_bytes())
460                ))?
461                .query_and_then((), |record| record.try_into())?
462                .next()
463                .transpose()
464        })
465        .await??
466        {
467            return Ok(record);
468        }
469
470        if !await_detection {
471            anyhow::bail!("swap commitment {} not found", swap_commitment);
472        }
473
474        // Otherwise, wait for newly detected swaps and check whether they're
475        // the requested one.
476
477        loop {
478            match rx.recv().await {
479                Ok(record) => {
480                    if record.swap_commitment == swap_commitment {
481                        return Ok(record);
482                    }
483                }
484
485                Err(e) => match e {
486                    RecvError::Closed => {
487                        anyhow::bail!(
488                            "Receiver error during swap detection: closed (no more active senders)"
489                        );
490                    }
491                    RecvError::Lagged(count) => {
492                        anyhow::bail!(
493                            "Receiver error during swap detection: lagged (by {:?} messages)",
494                            count
495                        );
496                    }
497                },
498            };
499        }
500    }
501
502    /// Query for all unclaimed swaps.
503    pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
504        let pool = self.pool.clone();
505
506        let records = spawn_blocking(move || {
507            // Check if we already have the swap record
508            pool.get()?
509                .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
510                .query_and_then((), |record| record.try_into())?
511                .collect::<anyhow::Result<Vec<_>>>()
512        })
513        .await??;
514
515        Ok(records)
516    }
517
518    /// Query for a nullifier's status, optionally waiting until the nullifier is detected.
519    pub async fn nullifier_status(
520        &self,
521        nullifier: Nullifier,
522        await_detection: bool,
523    ) -> anyhow::Result<bool> {
524        // Start subscribing now, before querying for whether we already have the nullifier, so we
525        // can't miss it if we race a write.
526        let mut rx = self.scanned_nullifiers_tx.subscribe();
527
528        // Clone the pool handle so that the returned future is 'static
529        let pool = self.pool.clone();
530
531        let nullifier_bytes = nullifier.0.to_bytes().to_vec();
532
533        // Check if we already have the nullifier in the set of spent notes
534        if let Some(height_spent) = spawn_blocking(move || {
535            pool.get()?
536                .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
537                .query_and_then([nullifier_bytes], |row| {
538                    let height_spent: Option<u64> = row.get("height_spent")?;
539                    anyhow::Ok(height_spent)
540                })?
541                .next()
542                .transpose()
543        })
544        .await??
545        {
546            let spent = height_spent.is_some();
547
548            // If we're awaiting detection and the nullifier isn't yet spent, don't return just yet
549            if !await_detection || spent {
550                return Ok(spent);
551            }
552        }
553
554        // After checking the database, if we didn't find it, return `false` unless we are to
555        // await detection
556        if !await_detection {
557            return Ok(false);
558        }
559
560        // Otherwise, wait for newly detected nullifiers and check whether they're the requested
561        // one.
562        loop {
563            let new_nullifier = rx.recv().await.context("change subscriber failed")?;
564
565            if new_nullifier == nullifier {
566                return Ok(true);
567            }
568        }
569    }
570
571    /// The last block height we've scanned to, if any.
572    pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
573        // Check if we have uncommitted blocks beyond the database height.
574        if let Some(height) = *self.uncommitted_height.lock() {
575            return Ok(Some(height.get()));
576        }
577
578        let pool = self.pool.clone();
579
580        spawn_blocking(move || {
581            let height: Option<i64> = pool
582                .get()?
583                .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
584                .query_row([], |row| row.get::<_, Option<i64>>(0))?;
585
586            anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
587        })
588        .await?
589    }
590
591    pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
592        let pool = self.pool.clone();
593
594        spawn_blocking(move || {
595            let params_bytes = pool
596                .get()?
597                .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
598                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
599                .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
600
601            AppParameters::decode(params_bytes.as_slice())
602        })
603        .await?
604    }
605
606    pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
607        let pool = self.pool.clone();
608
609        spawn_blocking(move || {
610            let bytes = pool
611                .get()?
612                .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
613                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
614                .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
615
616            GasPrices::decode(bytes.as_slice())
617        })
618        .await?
619    }
620
621    pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
622        let pool = self.pool.clone();
623
624        spawn_blocking(move || {
625            let bytes = pool
626                .get()?
627                .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
628                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
629                .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
630
631            fmd::Parameters::decode(bytes.as_slice())
632        })
633        .await?
634    }
635
636    pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
637        let pool = self.pool.clone();
638
639        spawn_blocking(move || {
640            let bytes = pool
641                .get()?
642                .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
643                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
644                .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
645
646            FullViewingKey::decode(bytes.as_slice())
647        })
648        .await?
649    }
650
651    pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
652        let pool = self.pool.clone();
653        spawn_blocking(move || {
654            tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
655        })
656        .await?
657    }
658
659    /// Returns a tuple of (block height, transaction hash) for all transactions in a given range of block heights.
660    pub async fn transaction_hashes(
661        &self,
662        start_height: Option<u64>,
663        end_height: Option<u64>,
664    ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
665        let starting_block = start_height.unwrap_or(0) as i64;
666        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
667
668        let pool = self.pool.clone();
669
670        spawn_blocking(move || {
671            pool.get()?
672                .prepare_cached(
673                    "SELECT block_height, tx_hash
674                    FROM tx
675                    WHERE block_height BETWEEN ?1 AND ?2",
676                )?
677                .query_and_then([starting_block, ending_block], |row| {
678                    let block_height: u64 = row.get("block_height")?;
679                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
680                    anyhow::Ok((block_height, tx_hash))
681                })?
682                .collect()
683        })
684        .await?
685    }
686
687    /// Returns a tuple of (block height, transaction hash, transaction) for all transactions in a given range of block heights.
688    pub async fn transactions(
689        &self,
690        start_height: Option<u64>,
691        end_height: Option<u64>,
692    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
693        let starting_block = start_height.unwrap_or(0) as i64;
694        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
695
696        let pool = self.pool.clone();
697
698        spawn_blocking(move || {
699            pool.get()?
700                .prepare_cached(
701                    "SELECT block_height, tx_hash, tx_bytes
702                    FROM tx
703                    WHERE block_height BETWEEN ?1 AND ?2",
704                )?
705                .query_and_then([starting_block, ending_block], |row| {
706                    let block_height: u64 = row.get("block_height")?;
707                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
708                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
709                    let tx = Transaction::decode(tx_bytes.as_slice())?;
710                    anyhow::Ok((block_height, tx_hash, tx))
711                })?
712                .collect()
713        })
714        .await?
715    }
716
717    pub async fn transaction_by_hash(
718        &self,
719        tx_hash: &[u8],
720    ) -> anyhow::Result<Option<(u64, Transaction)>> {
721        let pool = self.pool.clone();
722        let tx_hash = tx_hash.to_vec();
723
724        spawn_blocking(move || {
725            if let Some((block_height, tx_bytes)) = pool
726                .get()?
727                .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
728                .query_row([tx_hash], |row| {
729                    let block_height: u64 = row.get("block_height")?;
730                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
731                    Ok((block_height, tx_bytes))
732                })
733                .optional()?
734            {
735                let tx = Transaction::decode(tx_bytes.as_slice())?;
736                Ok(Some((block_height, tx)))
737            } else {
738                Ok(None)
739            }
740        })
741        .await?
742    }
743
744    // Query for a note by its note commitment, optionally waiting until the note is detected.
745    pub async fn note_by_nullifier(
746        &self,
747        nullifier: Nullifier,
748        await_detection: bool,
749    ) -> anyhow::Result<SpendableNoteRecord> {
750        // Start subscribing now, before querying for whether we already
751        // have the record, so that we can't miss it if we race a write.
752        let mut rx = self.scanned_notes_tx.subscribe();
753
754        // Clone the pool handle so that the returned future is 'static
755        let pool = self.pool.clone();
756
757        let nullifier_bytes = nullifier.to_bytes().to_vec();
758
759        if let Some(record) = spawn_blocking(move || {
760            let record = pool
761                .get()?
762                .prepare(&format!(
763                    "SELECT
764                        notes.note_commitment,
765                        spendable_notes.height_created,
766                        notes.address,
767                        notes.amount,
768                        notes.asset_id,
769                        notes.rseed,
770                        spendable_notes.address_index,
771                        spendable_notes.source,
772                        spendable_notes.height_spent,
773                        spendable_notes.nullifier,
774                        spendable_notes.position,
775                        tx.return_address
776                    FROM notes
777                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
778                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
779                    WHERE hex(spendable_notes.nullifier) = \"{}\"",
780                    hex::encode_upper(nullifier_bytes)
781                ))?
782                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
783                .next()
784                .transpose()?;
785
786            anyhow::Ok(record)
787        })
788        .await??
789        {
790            return Ok(record);
791        }
792
793        if !await_detection {
794            anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
795        }
796
797        // Otherwise, wait for newly detected notes and check whether they're
798        // the requested one.
799
800        loop {
801            match rx.recv().await {
802                Ok(record) => {
803                    if record.nullifier == nullifier {
804                        return Ok(record);
805                    }
806                }
807
808                Err(e) => match e {
809                    RecvError::Closed => {
810                        anyhow::bail!(
811                            "Receiver error during note detection: closed (no more active senders)"
812                        );
813                    }
814                    RecvError::Lagged(count) => {
815                        anyhow::bail!(
816                            "Receiver error during note detection: lagged (by {:?} messages)",
817                            count
818                        );
819                    }
820                },
821            };
822        }
823    }
824
825    pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
826        let pool = self.pool.clone();
827
828        spawn_blocking(move || {
829            pool.get()?
830                .prepare_cached("SELECT metadata FROM assets")?
831                .query_and_then([], |row| {
832                    let metadata_json = row.get::<_, String>("metadata")?;
833                    let denom_metadata = serde_json::from_str(&metadata_json)?;
834
835                    anyhow::Ok(denom_metadata)
836                })?
837                .collect()
838        })
839        .await?
840    }
841
842    pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
843        let id = id.to_bytes().to_vec();
844
845        let pool = self.pool.clone();
846
847        spawn_blocking(move || {
848            pool.get()?
849                .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
850                .query_and_then([id], |row| {
851                    let metadata_json = row.get::<_, String>("metadata")?;
852                    let denom_metadata = serde_json::from_str(&metadata_json)?;
853                    anyhow::Ok(denom_metadata)
854                })?
855                .next()
856                .transpose()
857        })
858        .await?
859    }
860
861    // Get assets whose denoms match the given SQL LIKE pattern, with the `_` and `%` wildcards,
862    // where `\` is the escape character.
863    pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
864        let pattern = pattern.to_owned();
865
866        let pool = self.pool.clone();
867
868        spawn_blocking(move || {
869            pool.get()?
870                .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
871                .query_and_then([pattern], |row| {
872                    let metadata_json = row.get::<_, String>("metadata")?;
873                    let denom_metadata = serde_json::from_str(&metadata_json)?;
874                    anyhow::Ok(denom_metadata)
875                })?
876                .collect()
877        })
878        .await?
879    }
880
881    pub async fn notes(
882        &self,
883        include_spent: bool,
884        asset_id: Option<asset::Id>,
885        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
886        amount_to_spend: Option<Amount>,
887    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
888        // If set, return spent notes as well as unspent notes.
889        // bool include_spent = 2;
890        let spent_clause = match include_spent {
891            false => "NULL",
892            true => "height_spent",
893        };
894
895        // If set, only return notes with the specified asset id.
896        // core.crypto.v1.AssetId asset_id = 3;
897        let asset_clause = asset_id
898            .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
899            .unwrap_or_else(|| "asset_id".to_string());
900
901        // If set, only return notes with the specified address index.
902        // crypto.AddressIndex address_index = 4;
903        // This isn't what we want any more, we need to be indexing notes
904        // by *account*, not just by address index.
905        // For now, just do filtering in software.
906        /*
907        let address_clause = address_index
908            .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
909            .unwrap_or_else(|| "address_index".to_string());
910         */
911        let address_clause = "address_index".to_string();
912
913        // If set, stop returning notes once the total exceeds this amount.
914        //
915        // Ignored if `asset_id` is unset or if `include_spent` is set.
916        // uint64 amount_to_spend = 5;
917        //TODO: figure out a clever way to only return notes up to the sum using SQL
918        let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
919        let mut amount_total = Amount::zero();
920
921        let pool = self.pool.clone();
922
923        spawn_blocking(move || {
924            let mut output: Vec<SpendableNoteRecord> = Vec::new();
925
926            for result in pool
927                .get()?
928                .prepare(&format!(
929                    "SELECT notes.note_commitment,
930                        spendable_notes.height_created,
931                        notes.address,
932                        notes.amount,
933                        notes.asset_id,
934                        notes.rseed,
935                        spendable_notes.address_index,
936                        spendable_notes.source,
937                        spendable_notes.height_spent,
938                        spendable_notes.nullifier,
939                        spendable_notes.position,
940                        tx.return_address
941                FROM notes
942                JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
943                LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
944                WHERE spendable_notes.height_spent IS {spent_clause}
945                AND notes.asset_id IS {asset_clause}
946                AND spendable_notes.address_index IS {address_clause}"
947                ))?
948                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
949            {
950                let record = result?;
951
952                // Skip notes that don't match the account, since we're
953                // not doing account filtering in SQL as a temporary hack (see above)
954                if let Some(address_index) = address_index {
955                    if record.address_index.account != address_index.account {
956                        continue;
957                    }
958                }
959                let amount = record.note.amount();
960
961                // Only display notes of value > 0
962
963                if amount.value() > 0 {
964                    output.push(record);
965                }
966
967                // If we're tracking amounts, accumulate the value of the note
968                // and check if we should break out of the loop.
969                if amount_cutoff {
970                    // We know all the notes are of the same type, so adding raw quantities makes sense.
971                    amount_total += amount;
972                    if amount_total >= amount_to_spend.unwrap_or_default() {
973                        break;
974                    }
975                }
976            }
977
978            if amount_total < amount_to_spend.unwrap_or_default() {
979                anyhow::bail!(
980                    "requested amount of {} exceeds total of {}",
981                    amount_to_spend.unwrap_or_default(),
982                    amount_total
983                );
984            }
985
986            anyhow::Ok(output)
987        })
988        .await?
989    }
990
991    pub async fn notes_for_voting(
992        &self,
993        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
994        votable_at_height: u64,
995    ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
996        // If set, only return notes with the specified address index.
997        // crypto.AddressIndex address_index = 3;
998        let address_clause = address_index
999            .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
1000            .unwrap_or_else(|| "address_index".to_string());
1001
1002        let pool = self.pool.clone();
1003
1004        spawn_blocking(move || {
1005            let mut lock = pool.get()?;
1006            let dbtx = lock.transaction()?;
1007
1008            let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1009                .prepare(&format!(
1010                    "SELECT notes.note_commitment,
1011                        spendable_notes.height_created,
1012                        notes.address,
1013                        notes.amount,
1014                        notes.asset_id,
1015                        notes.rseed,
1016                        spendable_notes.address_index,
1017                        spendable_notes.source,
1018                        spendable_notes.height_spent,
1019                        spendable_notes.nullifier,
1020                        spendable_notes.position
1021                    FROM
1022                        notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1023                    WHERE
1024                        spendable_notes.address_index IS {address_clause}
1025                        AND notes.asset_id IN (
1026                            SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1027                        )
1028                        AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1029                        AND (spendable_notes.height_created < {votable_at_height})
1030                    ",
1031                ))?
1032                .query_and_then((), |row| row.try_into())?
1033                .collect::<anyhow::Result<Vec<_>>>()?;
1034
1035            // TODO: this could be internalized into the SQL query in principle, but it's easier to
1036            // do it this way; if it becomes slow, we can do it better
1037            let mut results = Vec::new();
1038            for record in spendable_note_records {
1039                let asset_id = record.note.asset_id().to_bytes().to_vec();
1040                let denom: String = dbtx.query_row_and_then(
1041                    "SELECT denom FROM assets WHERE asset_id = ?1",
1042                    [asset_id],
1043                    |row| row.get("denom"),
1044                )?;
1045
1046                let identity_key = DelegationToken::from_str(&denom)
1047                    .context("invalid delegation token denom")?
1048                    .validator();
1049
1050                results.push((record, identity_key));
1051            }
1052
1053            Ok(results)
1054        }).await?
1055    }
1056
1057    #[tracing::instrument(skip(self))]
1058    pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1059        tracing::debug!(?asset);
1060
1061        let asset_id = asset.id().to_bytes().to_vec();
1062        let denom = asset.base_denom().denom;
1063        let metadata_json = serde_json::to_string(&asset)?;
1064
1065        let pool = self.pool.clone();
1066
1067        spawn_blocking(move || {
1068            pool.get()?
1069                .execute(
1070                    "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1071                    (asset_id, denom, metadata_json),
1072                )
1073                .map_err(anyhow::Error::from)
1074        })
1075        .await??;
1076
1077        Ok(())
1078    }
1079
1080    pub async fn record_auction_with_state(
1081        &self,
1082        auction_id: AuctionId,
1083        auction_state: u64,
1084    ) -> anyhow::Result<()> {
1085        let auction_id = auction_id.0.to_vec();
1086
1087        let pool = self.pool.clone();
1088
1089        spawn_blocking(move || {
1090            let mut lock = pool.get()?;
1091            let tx = lock.transaction()?;
1092            tx.execute(
1093                "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1094                (auction_id.clone(), auction_state),
1095            )?;
1096            tx.execute(
1097                "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1098                (auction_id, auction_state),
1099            )
1100                .map_err(anyhow::Error::from)?;
1101
1102            tx.commit()?;
1103            Ok::<(), anyhow::Error>(())
1104            })
1105            .await??;
1106
1107        Ok(())
1108    }
1109
1110    pub async fn update_auction_with_note_commitment(
1111        &self,
1112        auction_id: AuctionId,
1113        note_commitment: StateCommitment,
1114    ) -> anyhow::Result<()> {
1115        let auction_id = auction_id.0.to_vec();
1116        let blob_nc = note_commitment.0.to_bytes().to_vec();
1117
1118        let pool = self.pool.clone();
1119
1120        spawn_blocking(move || {
1121            pool.get()?
1122                .execute(
1123                    "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1124                    (blob_nc, auction_id),
1125                )
1126                .map_err(anyhow::Error::from)
1127        })
1128        .await??;
1129
1130        Ok(())
1131    }
1132
1133    pub async fn fetch_auctions_by_account(
1134        &self,
1135        account_filter: Option<AddressIndex>,
1136        include_inactive: bool,
1137    ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 /* local seqnum */)>> {
1138        let account_clause = account_filter
1139            .map(|idx| {
1140                format!(
1141                    "AND spendable_notes.address_index = x'{}'",
1142                    hex::encode(idx.to_bytes())
1143                )
1144            })
1145            .unwrap_or_else(|| "".to_string());
1146
1147        let active_clause = if !include_inactive {
1148            "AND auctions.auction_state = 0"
1149        } else {
1150            ""
1151        };
1152
1153        let query = format!(
1154            "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1155                 FROM auctions
1156                 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1157                 JOIN notes ON auctions.note_commitment = notes.note_commitment
1158                 WHERE 1 = 1
1159                 {account_clause}
1160                 {active_clause}",
1161            account_clause = account_clause,
1162            active_clause = active_clause,
1163        );
1164
1165        let pool = self.pool.clone();
1166
1167        spawn_blocking(move || {
1168            let mut conn = pool.get()?;
1169            let tx = conn.transaction()?;
1170
1171            let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1172                .prepare(&query)?
1173                .query_and_then((), |row| {
1174                    let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1175                    let array_auction_id: [u8; 32] = raw_auction_id
1176                        .try_into()
1177                        .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1178                    let auction_id = AuctionId(array_auction_id);
1179                    let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1180                    let local_seq: u64 = row.get("auction_state")?;
1181                    Ok((auction_id, spendable_note_record, local_seq))
1182                })?
1183                .collect::<anyhow::Result<Vec<_>>>()?;
1184
1185            Ok(spendable_note_records)
1186        })
1187        .await?
1188    }
1189
1190    pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1191        let position_id = position.id().0.to_vec();
1192
1193        let position_state = position.state.to_string();
1194        let trading_pair = position.phi.pair.to_string();
1195
1196        let pool = self.pool.clone();
1197
1198        spawn_blocking(move || {
1199            pool.get()?
1200                .execute(
1201                    "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1202                    (position_id, position_state, trading_pair),
1203                )
1204                .map_err(anyhow::Error::from)
1205        })
1206            .await??;
1207
1208        Ok(())
1209    }
1210
1211    pub async fn update_position(
1212        &self,
1213        position_id: position::Id,
1214        position_state: position::State,
1215    ) -> anyhow::Result<()> {
1216        let position_id = position_id.0.to_vec();
1217        let position_state = position_state.to_string();
1218
1219        let pool = self.pool.clone();
1220
1221        spawn_blocking(move || {
1222            pool.get()?
1223                .execute(
1224                    "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1225                    (position_state, position_id),
1226                )
1227                .map_err(anyhow::Error::from)
1228        })
1229        .await??;
1230
1231        Ok(())
1232    }
1233
1234    pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1235        // Check that the incoming block height follows the latest recorded height
1236        let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1237            anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1238        })?;
1239
1240        if height != last_sync_height + 1 {
1241            anyhow::bail!(
1242                "Wrong block height {} for latest sync height {}",
1243                height,
1244                last_sync_height
1245            );
1246        }
1247
1248        *self.uncommitted_height.lock() = Some(height.try_into()?);
1249        Ok(())
1250    }
1251
1252    fn record_note_inner(
1253        dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1254        note: &Note,
1255    ) -> anyhow::Result<()> {
1256        let note_commitment = note.commit().0.to_bytes().to_vec();
1257        let address = note.address().to_vec();
1258        let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1259        let asset_id = note.asset_id().to_bytes().to_vec();
1260        let rseed = note.rseed().to_bytes().to_vec();
1261
1262        dbtx.execute(
1263            "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1264                VALUES (?1, ?2, ?3, ?4, ?5)
1265                ON CONFLICT (note_commitment)
1266                DO UPDATE SET
1267                address = excluded.address,
1268                amount = excluded.amount,
1269                asset_id = excluded.asset_id,
1270                rseed = excluded.rseed",
1271            (note_commitment, address, amount, asset_id, rseed),
1272        )?;
1273
1274        Ok(())
1275    }
1276
1277    pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1278        let pool = self.pool.clone();
1279        let mut lock = pool.get()?;
1280        let dbtx = lock.transaction()?;
1281
1282        Storage::record_note_inner(&dbtx, &note)?;
1283
1284        dbtx.commit()?;
1285
1286        Ok(())
1287    }
1288
1289    /// Return advice about note contents for use in scanning.
1290    ///
1291    /// Given a list of note commitments, this method checks whether any of them
1292    /// correspond to notes that have been recorded in the database but not yet
1293    /// observed during scanning.
1294    pub async fn scan_advice(
1295        &self,
1296        note_commitments: Vec<note::StateCommitment>,
1297    ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1298        if note_commitments.is_empty() {
1299            return Ok(BTreeMap::new());
1300        }
1301
1302        let pool = self.pool.clone();
1303
1304        // This query gives advice about notes which are known but which have not already been recorded as spendable,
1305        // in part to avoid revealing information about which notes have been spent.
1306
1307        spawn_blocking(move || {
1308            pool.get()?
1309                .prepare(&format!(
1310                    "SELECT notes.note_commitment,
1311                        notes.address,
1312                        notes.amount,
1313                        notes.asset_id,
1314                        notes.rseed
1315                    FROM notes
1316                    LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1317                    WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1318                    note_commitments
1319                        .iter()
1320                        .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1321                        .collect::<Vec<_>>()
1322                        .join(", ")
1323                ))?
1324                .query_and_then((), |row| {
1325                    let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1326                    let amount = row.get::<_, [u8; 16]>("amount")?;
1327                    let amount_u128: u128 = u128::from_be_bytes(amount);
1328                    let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1329                    let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1330                    let note = Note::from_parts(
1331                        address,
1332                        Value {
1333                            amount: amount_u128.into(),
1334                            asset_id,
1335                        },
1336                        rseed,
1337                    )?;
1338                    anyhow::Ok((note.commit(), note))
1339                })?
1340                .collect::<anyhow::Result<BTreeMap<_, _>>>()
1341        }).await?
1342    }
1343
1344    /// Filters for nullifiers whose notes we control
1345    pub async fn filter_nullifiers(
1346        &self,
1347        nullifiers: Vec<Nullifier>,
1348    ) -> anyhow::Result<Vec<Nullifier>> {
1349        if nullifiers.is_empty() {
1350            return Ok(Vec::new());
1351        }
1352
1353        let pool = self.pool.clone();
1354
1355        spawn_blocking(move || {
1356            pool.get()?
1357                .prepare(&format!(
1358                    "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1359                    nullifiers
1360                        .iter()
1361                        .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1362                        .collect::<Vec<String>>()
1363                        .join(",")
1364                ))?
1365                .query_and_then((), |row| {
1366                    let nullifier: Vec<u8> = row.get("nullifier")?;
1367                    nullifier.as_slice().try_into()
1368                })?
1369                .collect()
1370        })
1371            .await?
1372    }
1373
1374    pub async fn record_block(
1375        &self,
1376        filtered_block: FilteredBlock,
1377        transactions: Vec<Transaction>,
1378        sct: &mut tct::Tree,
1379        channel: tonic::transport::Channel,
1380    ) -> anyhow::Result<()> {
1381        //Check that the incoming block height follows the latest recorded height
1382        let last_sync_height = self.last_sync_height().await?;
1383
1384        let correct_height = match last_sync_height {
1385            // Require that the new block follows the last one we scanned.
1386            Some(cur_height) => filtered_block.height == cur_height + 1,
1387            // Require that the new block represents the initial chain state.
1388            None => filtered_block.height == 0,
1389        };
1390
1391        if !correct_height {
1392            anyhow::bail!(
1393                "Wrong block height {} for latest sync height {:?}",
1394                filtered_block.height,
1395                last_sync_height
1396            );
1397        }
1398
1399        let pool = self.pool.clone();
1400        let uncommitted_height = self.uncommitted_height.clone();
1401        let scanned_notes_tx = self.scanned_notes_tx.clone();
1402        let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1403        let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1404
1405        let fvk = self.full_viewing_key().await?;
1406
1407        // If the app parameters have changed, update them.
1408        let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1409            // Fetch the latest parameters
1410            let mut client = AppQueryServiceClient::new(channel);
1411            Some(
1412                client
1413                    .app_parameters(tonic::Request::new(AppParametersRequest {}))
1414                    .await?
1415                    .into_inner()
1416                    .try_into()?,
1417            )
1418        } else {
1419            None
1420        };
1421
1422        // Cloning the SCT is cheap because it's a copy-on-write structure, so we move an owned copy
1423        // into the spawned thread. This means that if for any reason the thread panics or throws an
1424        // error, the changes to the SCT will be discarded, just like any changes to the database,
1425        // so the two stay transactionally in sync, even in the case of errors. This would not be
1426        // the case if we `std::mem::take` the SCT and move it into the spawned thread, because then
1427        // an error would mean the updated version would never be put back, and the outcome would be
1428        // a cleared SCT but a non-empty database.
1429        let mut new_sct = sct.clone();
1430
1431        *sct = spawn_blocking(move || {
1432            let mut lock = pool.get()?;
1433            let mut dbtx = lock.transaction()?;
1434
1435            if let Some(params) = new_app_parameters {
1436                let params_bytes = params.encode_to_vec();
1437                // We expect app_params to be present already but may as well use an upsert
1438                dbtx.execute(
1439                    "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1440                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1441                    [&params_bytes[..]],
1442                )?;
1443            }
1444
1445            // Insert new note records into storage
1446            for note_record in filtered_block.new_notes.values() {
1447                let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1448                let height_created = filtered_block.height as i64;
1449                let address_index = note_record.address_index.to_bytes().to_vec();
1450                let nullifier = note_record.nullifier.to_bytes().to_vec();
1451                let position = (u64::from(note_record.position)) as i64;
1452                let source = note_record.source.encode_to_vec();
1453                // Check if the note is from a transaction, if so, include the tx hash (id)
1454                let tx_hash = match note_record.source {
1455                    CommitmentSource::Transaction { id } => id,
1456                    _ => None,
1457                };
1458
1459                // Record the inner note data in the notes table
1460                Storage::record_note_inner(&dbtx, &note_record.note)?;
1461
1462                dbtx.execute(
1463                    "INSERT INTO spendable_notes
1464                    (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1465                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1466                    ON CONFLICT (note_commitment)
1467                    DO UPDATE SET nullifier = excluded.nullifier,
1468                    position = excluded.position,
1469                    height_created = excluded.height_created,
1470                    address_index = excluded.address_index,
1471                    source = excluded.source,
1472                    height_spent = excluded.height_spent,
1473                    tx_hash = excluded.tx_hash",
1474                    (
1475                        &note_commitment,
1476                        &nullifier,
1477                        &position,
1478                        &height_created,
1479                        &address_index,
1480                        &source,
1481                        // height_spent is NULL because the note is newly discovered
1482                        &tx_hash,
1483                    ),
1484                )?;
1485            }
1486
1487            // Insert new swap records into storage
1488            for swap in filtered_block.new_swaps.values() {
1489                let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1490                let swap_bytes = swap.swap.encode_to_vec();
1491                let position = (u64::from(swap.position)) as i64;
1492                let nullifier = swap.nullifier.to_bytes().to_vec();
1493                let source = swap.source.encode_to_vec();
1494                let output_data = swap.output_data.encode_to_vec();
1495
1496                dbtx.execute(
1497                    "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1498                    VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1499                    ON CONFLICT (swap_commitment)
1500                    DO UPDATE SET swap = excluded.swap,
1501                    position = excluded.position,
1502                    nullifier = excluded.nullifier,
1503                    output_data = excluded.output_data,
1504                    height_claimed = excluded.height_claimed,
1505                    source = excluded.source",
1506                    (
1507                        &swap_commitment,
1508                        &swap_bytes,
1509                        &position,
1510                        &nullifier,
1511                        &output_data,
1512                        // height_claimed is NULL because the swap is newly discovered
1513                        &source,
1514                    ),
1515                )?;
1516            }
1517
1518            // Update any rows of the table with matching nullifiers to have height_spent
1519            for nullifier in &filtered_block.spent_nullifiers {
1520                let height_spent = filtered_block.height as i64;
1521                let nullifier_bytes = nullifier.to_bytes().to_vec();
1522
1523                let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1524                    "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1525                )?
1526                    .query_and_then(
1527                        (height_spent, &nullifier_bytes),
1528                        |row| {
1529                            let bytes: Vec<u8> = row.get("note_commitment")?;
1530                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1531                        },
1532                    )?
1533                    .next()
1534                    .transpose()?;
1535
1536                let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1537                    "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1538                )?
1539                    .query_and_then(
1540                        (height_spent, &nullifier_bytes),
1541                        |row| {
1542                            let bytes: Vec<u8> = row.get("swap_commitment")?;
1543                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1544                        },
1545                    )?
1546                    .next()
1547                    .transpose()?;
1548
1549                // Check denom type
1550                let spent_denom: String
1551                    = dbtx.prepare_cached(
1552                    "SELECT denom FROM assets
1553                        WHERE asset_id ==
1554                            (SELECT asset_id FROM notes
1555                             WHERE note_commitment ==
1556                                (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1557                )?
1558                    .query_and_then(
1559                        [&nullifier_bytes],
1560                        |row| row.get("denom"),
1561                    )?
1562                    .next()
1563                    .transpose()?
1564                    .unwrap_or("unknown".to_string());
1565
1566                // Mark spent notes as spent
1567                if let Some(spent_commitment) = spent_commitment {
1568                    tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1569                    // Forget spent note commitments from the SCT unless they are delegation tokens,
1570                    // which must be saved to allow voting on proposals that might or might not be
1571                    // open presently
1572
1573                    if DelegationToken::from_str(&spent_denom).is_err() {
1574                        tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1575                        new_sct.forget(spent_commitment);
1576                    }
1577                };
1578
1579                // Mark spent swaps as spent
1580                if let Some(spent_swap_commitment) = swap_commitment {
1581                    tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1582                    new_sct.forget(spent_swap_commitment);
1583                };
1584            }
1585
1586            // Update SCT table with current SCT state
1587            new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1588
1589            // Record all transactions
1590            for transaction in transactions {
1591                let tx_bytes = transaction.encode_to_vec();
1592                // We have to create an explicit temporary borrow, because the sqlx api is bad (see above)
1593                let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1594                let tx_hash = tx_hash_owned.as_slice();
1595                let tx_block_height = filtered_block.height as i64;
1596                let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1597                let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1598                let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1599
1600                tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1601
1602                dbtx.execute(
1603                    "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1604                    (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1605                )?;
1606
1607                // Associate all of the spent nullifiers with the transaction by hash.
1608                for nf in transaction.spent_nullifiers() {
1609                    let nf_bytes = nf.0.to_bytes().to_vec();
1610                    dbtx.execute(
1611                        "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1612                        (&nf_bytes, &tx_hash),
1613                    )?;
1614                }
1615            }
1616
1617            // Update FMD parameters if they've changed.
1618            if filtered_block.fmd_parameters.is_some() {
1619                let fmd_parameters_bytes =
1620                    &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1621
1622                dbtx.execute(
1623                    "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1624                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1625                    [&fmd_parameters_bytes],
1626                )?;
1627            }
1628
1629            // Update gas prices if they've changed.
1630            if filtered_block.gas_prices.is_some() {
1631                let gas_prices_bytes =
1632                    &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1633
1634                dbtx.execute(
1635                    "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1636                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1637                    [&gas_prices_bytes],
1638                )?;
1639            }
1640
1641            // Record block height as latest synced height
1642            let latest_sync_height = filtered_block.height as i64;
1643            dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1644
1645            // Commit the changes to the database
1646            dbtx.commit()?;
1647
1648            // IMPORTANT: NO PANICS OR ERRORS PAST THIS POINT
1649            // If there is a panic or error past this point, the database will be left in out of
1650            // sync with the in-memory copy of the SCT, which means that it will become corrupted as
1651            // synchronization continues.
1652
1653            // It's critical to reset the uncommitted height here, since we've just
1654            // invalidated it by committing.
1655            uncommitted_height.lock().take();
1656
1657            // Broadcast all committed note records to channel
1658            // Done following tx.commit() to avoid notifying of a new SpendableNoteRecord before it is actually committed to the database
1659
1660            for note_record in filtered_block.new_notes.values() {
1661                // This will fail to be broadcast if there is no active receiver (such as on initial
1662                // sync) The error is ignored, as this isn't a problem, because if there is no
1663                // active receiver there is nothing to do
1664                let _ = scanned_notes_tx.send(note_record.clone());
1665            }
1666
1667            for nullifier in filtered_block.spent_nullifiers.iter() {
1668                // This will fail to be broadcast if there is no active receiver (such as on initial
1669                // sync) The error is ignored, as this isn't a problem, because if there is no
1670                // active receiver there is nothing to do
1671                let _ = scanned_nullifiers_tx.send(*nullifier);
1672            }
1673
1674            for swap_record in filtered_block.new_swaps.values() {
1675                // This will fail to be broadcast if there is no active receāˆ‘iver (such as on initial
1676                // sync) The error is ignored, as this isn't a problem, because if there is no
1677                // active receiver there is nothing to do
1678                let _ = scanned_swaps_tx.send(swap_record.clone());
1679            }
1680
1681            anyhow::Ok(new_sct)
1682        })
1683            .await??;
1684
1685        Ok(())
1686    }
1687
1688    pub async fn owned_position_ids(
1689        &self,
1690        position_state: Option<State>,
1691        trading_pair: Option<TradingPair>,
1692    ) -> anyhow::Result<Vec<position::Id>> {
1693        let pool = self.pool.clone();
1694
1695        let state_clause = match position_state {
1696            Some(state) => format!("position_state = \"{}\"", state),
1697            None => "".to_string(),
1698        };
1699
1700        let pair_clause = match trading_pair {
1701            Some(pair) => format!("trading_pair = \"{}\"", pair),
1702            None => "".to_string(),
1703        };
1704
1705        spawn_blocking(move || {
1706            let mut q = "SELECT position_id FROM positions".to_string();
1707            match (position_state.is_some(), trading_pair.is_some()) {
1708                (true, true) => {
1709                    q = q + " WHERE " + &state_clause + " AND " + &pair_clause;
1710                }
1711                (true, false) => {
1712                    q = q + " WHERE " + &state_clause;
1713                }
1714                (false, true) => {
1715                    q = q + " WHERE " + &pair_clause;
1716                }
1717                (false, false) => (),
1718            };
1719
1720            pool.get()?
1721                .prepare_cached(&q)?
1722                .query_and_then([], |row| {
1723                    let position_id: Vec<u8> = row.get("position_id")?;
1724                    Ok(position::Id(position_id.as_slice().try_into()?))
1725                })?
1726                .collect()
1727        })
1728        .await?
1729    }
1730
1731    pub async fn notes_by_sender(
1732        &self,
1733        return_address: &Address,
1734    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1735        let pool = self.pool.clone();
1736
1737        let query = "SELECT notes.note_commitment,
1738            spendable_notes.height_created,
1739            notes.address,
1740            notes.amount,
1741            notes.asset_id,
1742            notes.rseed,
1743            spendable_notes.address_index,
1744            spendable_notes.source,
1745            spendable_notes.height_spent,
1746            spendable_notes.nullifier,
1747            spendable_notes.position
1748            FROM notes
1749            JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1750            JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1751            WHERE tx.return_address = ?1";
1752
1753        let return_address = return_address.to_vec();
1754
1755        let records = spawn_blocking(move || {
1756            pool.get()?
1757                .prepare(query)?
1758                .query_and_then([return_address], |record| record.try_into())?
1759                .collect::<anyhow::Result<Vec<_>>>()
1760        })
1761        .await??;
1762
1763        Ok(records)
1764    }
1765
1766    /// Get all transactions with a matching memo text. The `pattern` argument
1767    /// should include SQL wildcards, such as `%` and `_`, to match substrings,
1768    /// e.g. `%foo%`.
1769    pub async fn transactions_matching_memo(
1770        &self,
1771        pattern: String,
1772    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1773        let pattern = pattern.to_owned();
1774        tracing::trace!(?pattern, "searching for memos matching");
1775        let pool = self.pool.clone();
1776
1777        spawn_blocking(move || {
1778            pool.get()?
1779                .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1780                .query_and_then([pattern], |row| {
1781                    let block_height: u64 = row.get("block_height")?;
1782                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
1783                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1784                    let tx = Transaction::decode(tx_bytes.as_slice())?;
1785                    let memo_text: String = row.get("memo_text")?;
1786                    anyhow::Ok((block_height, tx_hash, tx, memo_text))
1787                })?
1788                .collect()
1789        })
1790        .await?
1791    }
1792
1793    /// Update information about an epoch.
1794    pub async fn update_epoch(
1795        &self,
1796        epoch: u64,
1797        root: Option<Root>,
1798        start_height: Option<u64>,
1799    ) -> anyhow::Result<()> {
1800        let pool = self.pool.clone();
1801
1802        spawn_blocking(move || {
1803            pool.get()?
1804                .execute(
1805                    r#"
1806                    INSERT INTO epochs(epoch_index, root, start_height)
1807                    VALUES (?1, ?2, ?3)
1808                    ON CONFLICT(epoch_index)
1809                    DO UPDATE SET
1810                        root = COALESCE(?2, root),
1811                        start_height = COALESCE(?3, start_height)
1812                    "#,
1813                    (epoch, root.map(|x| x.encode_to_vec()), start_height),
1814                )
1815                .map_err(anyhow::Error::from)
1816        })
1817        .await??;
1818
1819        Ok(())
1820    }
1821
1822    /// Fetch information about the current epoch.
1823    ///
1824    /// This will return the root of the epoch, if present,
1825    /// and the start height of the epoch, if present.
1826    pub async fn get_epoch(&self, epoch: u64) -> anyhow::Result<(Option<Root>, Option<u64>)> {
1827        let pool = self.pool.clone();
1828
1829        spawn_blocking(move || {
1830            pool.get()?
1831                .query_row_and_then(
1832                    r#"
1833                    SELECT root, start_height
1834                    FROM epochs
1835                    WHERE epoch_index = ?1
1836                    "#,
1837                    (epoch,),
1838                    |row| {
1839                        let root_raw: Option<Vec<u8>> = row.get("root")?;
1840                        let start_height: Option<u64> = row.get("start_height")?;
1841                        let root = root_raw.map(|x| Root::decode(x.as_slice())).transpose()?;
1842                        anyhow::Ok((root, start_height))
1843                    },
1844                )
1845                .map_err(anyhow::Error::from)
1846        })
1847        .await?
1848    }
1849}