penumbra_sdk_view/
storage.rs

1use std::{collections::BTreeMap, num::NonZeroU64, str::FromStr, sync::Arc, time::Duration};
2
3use anyhow::{anyhow, Context};
4use camino::Utf8Path;
5use decaf377::Fq;
6use once_cell::sync::Lazy;
7use parking_lot::Mutex;
8use penumbra_sdk_auction::auction::AuctionId;
9use r2d2_sqlite::{
10    rusqlite::{OpenFlags, OptionalExtension},
11    SqliteConnectionManager,
12};
13use sha2::{Digest, Sha256};
14use tap::{Tap, TapFallible};
15use tokio::{
16    sync::broadcast::{self, error::RecvError},
17    task::spawn_blocking,
18};
19use tracing::{error_span, Instrument};
20use url::Url;
21
22use penumbra_sdk_app::params::AppParameters;
23use penumbra_sdk_asset::{asset, asset::Id, asset::Metadata, Value};
24use penumbra_sdk_dex::{
25    lp::position::{self, Position, State},
26    TradingPair,
27};
28use penumbra_sdk_fee::GasPrices;
29use penumbra_sdk_keys::{keys::AddressIndex, Address, FullViewingKey};
30use penumbra_sdk_num::Amount;
31use penumbra_sdk_proto::{
32    core::app::v1::{
33        query_service_client::QueryServiceClient as AppQueryServiceClient, AppParametersRequest,
34    },
35    DomainType,
36};
37use penumbra_sdk_sct::{CommitmentSource, Nullifier};
38use penumbra_sdk_shielded_pool::{fmd, note, Note, Rseed};
39use penumbra_sdk_stake::{DelegationToken, IdentityKey};
40use penumbra_sdk_tct::{self as tct, builder::epoch::Root};
41use penumbra_sdk_transaction::Transaction;
42use sct::TreeStore;
43use tct::StateCommitment;
44
45use crate::{sync::FilteredBlock, SpendableNoteRecord, SwapRecord};
46
47mod sct;
48
49#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)]
50pub struct BalanceEntry {
51    pub id: Id,
52    pub amount: u128,
53    pub address_index: AddressIndex,
54}
55
56/// The hash of the schema for the database.
57static SCHEMA_HASH: Lazy<String> =
58    Lazy::new(|| hex::encode(Sha256::digest(include_str!("storage/schema.sql"))));
59
60#[derive(Clone)]
61pub struct Storage {
62    pool: r2d2::Pool<SqliteConnectionManager>,
63
64    /// This allows an optimization where we only commit to the database after
65    /// scanning a nonempty block.
66    ///
67    /// If this is `Some`, we have uncommitted empty blocks up to the inner height.
68    /// If this is `None`, we don't.
69    ///
70    /// Using a `NonZeroU64` ensures that `Option<NonZeroU64>` fits in 8 bytes.
71    uncommitted_height: Arc<Mutex<Option<NonZeroU64>>>,
72
73    scanned_notes_tx: tokio::sync::broadcast::Sender<SpendableNoteRecord>,
74    scanned_nullifiers_tx: tokio::sync::broadcast::Sender<Nullifier>,
75    scanned_swaps_tx: tokio::sync::broadcast::Sender<SwapRecord>,
76}
77
78impl Storage {
79    /// If the database at `storage_path` exists, [`Self::load`] it, otherwise, [`Self::initialize`] it.
80    #[tracing::instrument(
81        skip_all,
82        fields(
83            path = ?storage_path.as_ref().map(|p| p.as_ref().as_str()),
84            url = %node,
85        )
86    )]
87    pub async fn load_or_initialize(
88        storage_path: Option<impl AsRef<Utf8Path>>,
89        fvk: &FullViewingKey,
90        node: Url,
91    ) -> anyhow::Result<Self> {
92        if let Some(path) = storage_path.as_ref().map(AsRef::as_ref) {
93            if path.exists() {
94                tracing::debug!(?path, "database exists");
95                return Self::load(path).await;
96            } else {
97                tracing::debug!(?path, "database does not exist");
98            }
99        };
100
101        let mut client = AppQueryServiceClient::connect(node.to_string())
102            .instrument(error_span!("connecting_to_endpoint"))
103            .await
104            .tap_err(|error| {
105                tracing::error!(?error, "failed to connect to app query service endpoint")
106            })?
107            .tap(|_| tracing::debug!("connected to app query service endpoint"));
108        let params = client
109            .app_parameters(tonic::Request::new(AppParametersRequest {}))
110            .instrument(error_span!("getting_app_parameters"))
111            .await?
112            .into_inner()
113            .try_into()?;
114
115        Self::initialize(storage_path, fvk.clone(), params).await
116    }
117
118    fn connect(
119        path: Option<impl AsRef<Utf8Path>>,
120    ) -> anyhow::Result<r2d2::Pool<SqliteConnectionManager>> {
121        if let Some(path) = path {
122            let manager = SqliteConnectionManager::file(path.as_ref())
123                .with_flags(
124                    // Don't allow opening URIs, because they can change the behavior of the database; we
125                    // just want to open normal filepaths.
126                    OpenFlags::default() & !OpenFlags::SQLITE_OPEN_URI,
127                )
128                .with_init(|conn| {
129                    // "NORMAL" will be consistent, but maybe not durable -- this is fine,
130                    // since all our data is being synced from the chain, so if we lose a dbtx,
131                    // it's like we're resuming sync from a previous height.
132                    conn.execute_batch("PRAGMA journal_mode=WAL; PRAGMA synchronous=NORMAL;")?;
133                    // We use `prepare_cached` a fair amount: this is an overestimate of the number
134                    // of cached prepared statements likely to be used.
135                    conn.set_prepared_statement_cache_capacity(32);
136                    Ok(())
137                });
138            Ok(r2d2::Pool::builder()
139                // We set max_size=1 to avoid "database is locked" sqlite errors,
140                // when accessing across multiple threads.
141                .max_size(1)
142                .build(manager)?)
143        } else {
144            let manager = SqliteConnectionManager::memory();
145            // Max size needs to be set to 1, otherwise a new in-memory database is created for each
146            // connection to the pool, which results in very confusing errors.
147            //
148            // Lifetimes and timeouts are likewise configured to their maximum values, since
149            // the in-memory database will disappear on connection close.
150            Ok(r2d2::Pool::builder()
151                .max_size(1)
152                .min_idle(Some(1))
153                .max_lifetime(Some(Duration::MAX))
154                .idle_timeout(Some(Duration::MAX))
155                .build(manager)?)
156        }
157    }
158
159    pub async fn load(path: impl AsRef<Utf8Path>) -> anyhow::Result<Self> {
160        let storage = Self {
161            pool: Self::connect(Some(path))?,
162            uncommitted_height: Arc::new(Mutex::new(None)),
163            scanned_notes_tx: broadcast::channel(128).0,
164            scanned_nullifiers_tx: broadcast::channel(512).0,
165            scanned_swaps_tx: broadcast::channel(128).0,
166        };
167
168        spawn_blocking(move || {
169            // Check the version of the software used when first initializing this database.
170            // If it doesn't match the current version, we should report the error to the user.
171            let actual_schema_hash: String = storage
172                .pool
173                .get()?
174                .query_row("SELECT schema_hash FROM schema_hash", (), |row| {
175                    row.get("schema_hash")
176                })
177                .context("failed to query database schema version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
178
179            if actual_schema_hash != *SCHEMA_HASH {
180                let database_client_version: String = storage
181                    .pool
182                    .get()?
183                    .query_row("SELECT client_version FROM client_version", (), |row| {
184                        row.get("client_version")
185                    })
186                    .context("failed to query client version: the database was probably created by an old client version, and needs to be reset and resynchronized")?;
187
188                anyhow::bail!(
189                    "can't load view database created by client version {} using client version {}: they have different schemata, so you need to reset your view database and resynchronize by running pcli view reset",
190                    database_client_version,
191                    env!("CARGO_PKG_VERSION"),
192                );
193            }
194
195            Ok(storage)
196        })
197            .await?
198    }
199
200    pub async fn initialize(
201        storage_path: Option<impl AsRef<Utf8Path>>,
202        fvk: FullViewingKey,
203        params: AppParameters,
204    ) -> anyhow::Result<Self> {
205        tracing::debug!(storage_path = ?storage_path.as_ref().map(AsRef::as_ref), ?fvk, ?params);
206
207        // Connect to the database (or create it)
208        let pool = Self::connect(storage_path)?;
209
210        let out = spawn_blocking(move || {
211            // In one database transaction, populate everything
212            let mut conn = pool.get()?;
213            let tx = conn.transaction()?;
214
215            // Create the tables
216            tx.execute_batch(include_str!("storage/schema.sql"))?;
217
218            let params_bytes = params.encode_to_vec();
219            tx.execute(
220                "INSERT INTO kv (k, v) VALUES ('app_params', ?1)",
221                [&params_bytes[..]],
222            )?;
223
224            let fvk_bytes = fvk.encode_to_vec();
225            tx.execute("INSERT INTO kv (k, v) VALUES ('fvk', ?1)", [&fvk_bytes[..]])?;
226
227            // Insert -1 as a signaling value for pre-genesis.
228            // We just have to be careful to treat negative values as None
229            // in last_sync_height.
230            tx.execute("INSERT INTO sync_height (height) VALUES (-1)", ())?;
231
232            // Insert the schema hash into the database
233            tx.execute(
234                "INSERT INTO schema_hash (schema_hash) VALUES (?1)",
235                [&*SCHEMA_HASH],
236            )?;
237
238            // Insert the client version into the database
239            tx.execute(
240                "INSERT INTO client_version (client_version) VALUES (?1)",
241                [env!("CARGO_PKG_VERSION")],
242            )?;
243
244            tx.commit()?;
245            drop(conn);
246
247            anyhow::Ok(Storage {
248                pool,
249                uncommitted_height: Arc::new(Mutex::new(None)),
250                scanned_notes_tx: broadcast::channel(128).0,
251                scanned_nullifiers_tx: broadcast::channel(512).0,
252                scanned_swaps_tx: broadcast::channel(128).0,
253            })
254        })
255        .await??;
256
257        out.update_epoch(0, None, Some(0)).await?;
258
259        Ok(out)
260    }
261
262    /// Loads asset metadata from a JSON file and use to update the database.
263    pub async fn load_asset_metadata(
264        &self,
265        registry_path: impl AsRef<Utf8Path>,
266    ) -> anyhow::Result<()> {
267        tracing::debug!(registry_path = ?registry_path.as_ref(), "loading asset metadata");
268        let registry_path = registry_path.as_ref();
269        // Parse into a serde_json::Value first so we can get the bits we care about
270        let mut registry_json: serde_json::Value = serde_json::from_str(
271            std::fs::read_to_string(registry_path)
272                .context("failed to read file")?
273                .as_str(),
274        )
275        .context("failed to parse JSON")?;
276
277        let registry: BTreeMap<String, Metadata> = serde_json::value::from_value(
278            registry_json
279                .get_mut("assetById")
280                .ok_or_else(|| anyhow::anyhow!("missing assetById"))?
281                .take(),
282        )
283        .context("could not parse asset registry")?;
284
285        for metadata in registry.into_values() {
286            self.record_asset(metadata).await?;
287        }
288
289        Ok(())
290    }
291
292    /// Query for account balance by address
293    pub async fn balances(
294        &self,
295        address_index: Option<AddressIndex>,
296        asset_id: Option<asset::Id>,
297    ) -> anyhow::Result<Vec<BalanceEntry>> {
298        let pool = self.pool.clone();
299
300        spawn_blocking(move || {
301            let query = "SELECT notes.asset_id, notes.amount, spendable_notes.address_index
302                FROM    notes
303                JOIN    spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
304                WHERE   spendable_notes.height_spent IS NULL";
305
306            tracing::debug!(?query);
307
308            // Combine notes of the same asset/address index together
309            let mut balances: BTreeMap<AddressIndex, BTreeMap<asset::Id, Amount>> = BTreeMap::new();
310
311            for result in pool.get()?.prepare_cached(query)?.query_map([], |row| {
312                let asset_id = row.get::<&str, Vec<u8>>("asset_id")?;
313                let amount = row.get::<&str, Vec<u8>>("amount")?;
314                let address_index = row.get::<&str, Vec<u8>>("address_index")?;
315
316                Ok((asset_id, amount, address_index))
317            })? {
318                let (id, amount, index) = result?;
319
320                let id = Id::try_from(id.as_slice())?;
321
322                let amount: Amount = Amount::from_be_bytes(
323                    amount
324                        .as_slice()
325                        .try_into()
326                        .expect("amount slice of incorrect length"),
327                );
328
329                let index = AddressIndex::try_from(index.as_slice())?;
330
331                // Skip this entry if not captured by address index filter
332                if let Some(address_index) = address_index {
333                    if address_index != index {
334                        continue;
335                    }
336                }
337                if let Some(asset_id) = asset_id {
338                    if asset_id != id {
339                        continue;
340                    }
341                }
342
343                balances
344                    .entry(index)
345                    .or_insert_with(BTreeMap::new)
346                    .entry(id)
347                    .and_modify(|e| *e += amount)
348                    .or_insert(amount);
349            }
350
351            let entries = balances
352                .into_iter()
353                .flat_map(|(index, assets)| {
354                    assets.into_iter().map(move |(id, amount)| BalanceEntry {
355                        id,
356                        amount: amount.into(),
357                        address_index: index,
358                    })
359                })
360                .collect::<Vec<_>>();
361            Ok(entries)
362        })
363        .await?
364    }
365
366    /// Query for a note by its note commitment, optionally waiting until the note is detected.
367    pub async fn note_by_commitment(
368        &self,
369        note_commitment: tct::StateCommitment,
370        await_detection: bool,
371    ) -> anyhow::Result<SpendableNoteRecord> {
372        // Start subscribing now, before querying for whether we already
373        // have the record, so that we can't miss it if we race a write.
374        let mut rx = self.scanned_notes_tx.subscribe();
375
376        let pool = self.pool.clone();
377
378        if let Some(record) = spawn_blocking(move || {
379            // Check if we already have the record
380            pool.get()?
381                .prepare(&format!(
382                    "SELECT
383                        notes.note_commitment,
384                        spendable_notes.height_created,
385                        notes.address,
386                        notes.amount,
387                        notes.asset_id,
388                        notes.rseed,
389                        spendable_notes.address_index,
390                        spendable_notes.source,
391                        spendable_notes.height_spent,
392                        spendable_notes.nullifier,
393                        spendable_notes.position,
394                        tx.return_address
395                    FROM notes
396                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
397                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
398                    WHERE notes.note_commitment = x'{}'",
399                    hex::encode(note_commitment.0.to_bytes())
400                ))?
401                .query_and_then((), |record| record.try_into())?
402                .next()
403                .transpose()
404        })
405        .await??
406        {
407            return Ok(record);
408        }
409
410        if !await_detection {
411            anyhow::bail!("Note commitment {} not found", note_commitment);
412        }
413
414        // Otherwise, wait for newly detected notes and check whether they're
415        // the requested one.
416
417        loop {
418            match rx.recv().await {
419                Ok(record) => {
420                    if record.note_commitment == note_commitment {
421                        return Ok(record);
422                    }
423                }
424
425                Err(e) => match e {
426                    RecvError::Closed => {
427                        anyhow::bail!(
428                            "Receiver error during note detection: closed (no more active senders)"
429                        );
430                    }
431                    RecvError::Lagged(count) => {
432                        anyhow::bail!(
433                            "Receiver error during note detection: lagged (by {:?} messages)",
434                            count
435                        );
436                    }
437                },
438            };
439        }
440    }
441
442    /// Query for a swap by its swap commitment, optionally waiting until the note is detected.
443    pub async fn swap_by_commitment(
444        &self,
445        swap_commitment: tct::StateCommitment,
446        await_detection: bool,
447    ) -> anyhow::Result<SwapRecord> {
448        // Start subscribing now, before querying for whether we already
449        // have the record, so that we can't miss it if we race a write.
450        let mut rx = self.scanned_swaps_tx.subscribe();
451
452        let pool = self.pool.clone();
453
454        if let Some(record) = spawn_blocking(move || {
455            // Check if we already have the swap record
456            pool.get()?
457                .prepare(&format!(
458                    "SELECT * FROM swaps WHERE swaps.swap_commitment = x'{}'",
459                    hex::encode(swap_commitment.0.to_bytes())
460                ))?
461                .query_and_then((), |record| record.try_into())?
462                .next()
463                .transpose()
464        })
465        .await??
466        {
467            return Ok(record);
468        }
469
470        if !await_detection {
471            anyhow::bail!("swap commitment {} not found", swap_commitment);
472        }
473
474        // Otherwise, wait for newly detected swaps and check whether they're
475        // the requested one.
476
477        loop {
478            match rx.recv().await {
479                Ok(record) => {
480                    if record.swap_commitment == swap_commitment {
481                        return Ok(record);
482                    }
483                }
484
485                Err(e) => match e {
486                    RecvError::Closed => {
487                        anyhow::bail!(
488                            "Receiver error during swap detection: closed (no more active senders)"
489                        );
490                    }
491                    RecvError::Lagged(count) => {
492                        anyhow::bail!(
493                            "Receiver error during swap detection: lagged (by {:?} messages)",
494                            count
495                        );
496                    }
497                },
498            };
499        }
500    }
501
502    /// Query for all unclaimed swaps.
503    pub async fn unclaimed_swaps(&self) -> anyhow::Result<Vec<SwapRecord>> {
504        let pool = self.pool.clone();
505
506        let records = spawn_blocking(move || {
507            // Check if we already have the swap record
508            pool.get()?
509                .prepare("SELECT * FROM swaps WHERE swaps.height_claimed is NULL")?
510                .query_and_then((), |record| record.try_into())?
511                .collect::<anyhow::Result<Vec<_>>>()
512        })
513        .await??;
514
515        Ok(records)
516    }
517
518    /// Query for a nullifier's status, optionally waiting until the nullifier is detected.
519    pub async fn nullifier_status(
520        &self,
521        nullifier: Nullifier,
522        await_detection: bool,
523    ) -> anyhow::Result<bool> {
524        // Start subscribing now, before querying for whether we already have the nullifier, so we
525        // can't miss it if we race a write.
526        let mut rx = self.scanned_nullifiers_tx.subscribe();
527
528        // Clone the pool handle so that the returned future is 'static
529        let pool = self.pool.clone();
530
531        let nullifier_bytes = nullifier.0.to_bytes().to_vec();
532
533        // Check if we already have the nullifier in the set of spent notes
534        if let Some(height_spent) = spawn_blocking(move || {
535            pool.get()?
536                .prepare_cached("SELECT height_spent FROM spendable_notes WHERE nullifier = ?1")?
537                .query_and_then([nullifier_bytes], |row| {
538                    let height_spent: Option<u64> = row.get("height_spent")?;
539                    anyhow::Ok(height_spent)
540                })?
541                .next()
542                .transpose()
543        })
544        .await??
545        {
546            let spent = height_spent.is_some();
547
548            // If we're awaiting detection and the nullifier isn't yet spent, don't return just yet
549            if !await_detection || spent {
550                return Ok(spent);
551            }
552        }
553
554        // After checking the database, if we didn't find it, return `false` unless we are to
555        // await detection
556        if !await_detection {
557            return Ok(false);
558        }
559
560        // Otherwise, wait for newly detected nullifiers and check whether they're the requested
561        // one.
562        loop {
563            let new_nullifier = rx.recv().await.context("change subscriber failed")?;
564
565            if new_nullifier == nullifier {
566                return Ok(true);
567            }
568        }
569    }
570
571    /// The last block height we've scanned to, if any.
572    pub async fn last_sync_height(&self) -> anyhow::Result<Option<u64>> {
573        // Check if we have uncommitted blocks beyond the database height.
574        if let Some(height) = *self.uncommitted_height.lock() {
575            return Ok(Some(height.get()));
576        }
577
578        let pool = self.pool.clone();
579
580        spawn_blocking(move || {
581            let height: Option<i64> = pool
582                .get()?
583                .prepare_cached("SELECT height FROM sync_height ORDER BY height DESC LIMIT 1")?
584                .query_row([], |row| row.get::<_, Option<i64>>(0))?;
585
586            anyhow::Ok(u64::try_from(height.ok_or_else(|| anyhow!("missing sync height"))?).ok())
587        })
588        .await?
589    }
590
591    pub async fn app_params(&self) -> anyhow::Result<AppParameters> {
592        let pool = self.pool.clone();
593
594        spawn_blocking(move || {
595            let params_bytes = pool
596                .get()?
597                .prepare_cached("SELECT v FROM kv WHERE k IS 'app_params' LIMIT 1")?
598                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
599                .ok_or_else(|| anyhow!("missing app_params in kv table"))?;
600
601            AppParameters::decode(params_bytes.as_slice())
602        })
603        .await?
604    }
605
606    pub async fn gas_prices(&self) -> anyhow::Result<GasPrices> {
607        let pool = self.pool.clone();
608
609        spawn_blocking(move || {
610            let bytes = pool
611                .get()?
612                .prepare_cached("SELECT v FROM kv WHERE k IS 'gas_prices' LIMIT 1")?
613                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
614                .ok_or_else(|| anyhow!("missing gas_prices in kv table"))?;
615
616            GasPrices::decode(bytes.as_slice())
617        })
618        .await?
619    }
620
621    pub async fn fmd_parameters(&self) -> anyhow::Result<fmd::Parameters> {
622        let pool = self.pool.clone();
623
624        spawn_blocking(move || {
625            let bytes = pool
626                .get()?
627                .prepare_cached("SELECT v FROM kv WHERE k IS 'fmd_params' LIMIT 1")?
628                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
629                .ok_or_else(|| anyhow!("missing fmd_params in kv table"))?;
630
631            fmd::Parameters::decode(bytes.as_slice())
632        })
633        .await?
634    }
635
636    pub async fn full_viewing_key(&self) -> anyhow::Result<FullViewingKey> {
637        let pool = self.pool.clone();
638
639        spawn_blocking(move || {
640            let bytes = pool
641                .get()?
642                .prepare_cached("SELECT v FROM kv WHERE k is 'fvk' LIMIT 1")?
643                .query_row([], |row| row.get::<_, Option<Vec<u8>>>("v"))?
644                .ok_or_else(|| anyhow!("missing fvk in kv table"))?;
645
646            FullViewingKey::decode(bytes.as_slice())
647        })
648        .await?
649    }
650
651    pub async fn state_commitment_tree(&self) -> anyhow::Result<tct::Tree> {
652        let pool = self.pool.clone();
653        spawn_blocking(move || {
654            tct::Tree::from_reader(&mut TreeStore(&mut pool.get()?.transaction()?))
655        })
656        .await?
657    }
658
659    /// Returns a tuple of (block height, transaction hash) for all transactions in a given range of block heights.
660    pub async fn transaction_hashes(
661        &self,
662        start_height: Option<u64>,
663        end_height: Option<u64>,
664    ) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
665        let starting_block = start_height.unwrap_or(0) as i64;
666        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
667
668        let pool = self.pool.clone();
669
670        spawn_blocking(move || {
671            pool.get()?
672                .prepare_cached(
673                    "SELECT block_height, tx_hash
674                    FROM tx
675                    WHERE block_height BETWEEN ?1 AND ?2",
676                )?
677                .query_and_then([starting_block, ending_block], |row| {
678                    let block_height: u64 = row.get("block_height")?;
679                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
680                    anyhow::Ok((block_height, tx_hash))
681                })?
682                .collect()
683        })
684        .await?
685    }
686
687    /// Returns a tuple of (block height, transaction hash, transaction) for all transactions in a given range of block heights.
688    pub async fn transactions(
689        &self,
690        start_height: Option<u64>,
691        end_height: Option<u64>,
692    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction)>> {
693        let starting_block = start_height.unwrap_or(0) as i64;
694        let ending_block = end_height.unwrap_or(self.last_sync_height().await?.unwrap_or(0)) as i64;
695
696        let pool = self.pool.clone();
697
698        spawn_blocking(move || {
699            pool.get()?
700                .prepare_cached(
701                    "SELECT block_height, tx_hash, tx_bytes
702                    FROM tx
703                    WHERE block_height BETWEEN ?1 AND ?2",
704                )?
705                .query_and_then([starting_block, ending_block], |row| {
706                    let block_height: u64 = row.get("block_height")?;
707                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
708                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
709                    let tx = Transaction::decode(tx_bytes.as_slice())?;
710                    anyhow::Ok((block_height, tx_hash, tx))
711                })?
712                .collect()
713        })
714        .await?
715    }
716
717    pub async fn transaction_by_hash(
718        &self,
719        tx_hash: &[u8],
720    ) -> anyhow::Result<Option<(u64, Transaction)>> {
721        let pool = self.pool.clone();
722        let tx_hash = tx_hash.to_vec();
723
724        spawn_blocking(move || {
725            if let Some((block_height, tx_bytes)) = pool
726                .get()?
727                .prepare_cached("SELECT block_height, tx_bytes FROM tx WHERE tx_hash = ?1")?
728                .query_row([tx_hash], |row| {
729                    let block_height: u64 = row.get("block_height")?;
730                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
731                    Ok((block_height, tx_bytes))
732                })
733                .optional()?
734            {
735                let tx = Transaction::decode(tx_bytes.as_slice())?;
736                Ok(Some((block_height, tx)))
737            } else {
738                Ok(None)
739            }
740        })
741        .await?
742    }
743
744    // Query for a note by its note commitment, optionally waiting until the note is detected.
745    pub async fn note_by_nullifier(
746        &self,
747        nullifier: Nullifier,
748        await_detection: bool,
749    ) -> anyhow::Result<SpendableNoteRecord> {
750        // Start subscribing now, before querying for whether we already
751        // have the record, so that we can't miss it if we race a write.
752        let mut rx = self.scanned_notes_tx.subscribe();
753
754        // Clone the pool handle so that the returned future is 'static
755        let pool = self.pool.clone();
756
757        let nullifier_bytes = nullifier.to_bytes().to_vec();
758
759        if let Some(record) = spawn_blocking(move || {
760            let record = pool
761                .get()?
762                .prepare(&format!(
763                    "SELECT
764                        notes.note_commitment,
765                        spendable_notes.height_created,
766                        notes.address,
767                        notes.amount,
768                        notes.asset_id,
769                        notes.rseed,
770                        spendable_notes.address_index,
771                        spendable_notes.source,
772                        spendable_notes.height_spent,
773                        spendable_notes.nullifier,
774                        spendable_notes.position,
775                        tx.return_address
776                    FROM notes
777                    JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
778                    LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
779                    WHERE hex(spendable_notes.nullifier) = \"{}\"",
780                    hex::encode_upper(nullifier_bytes)
781                ))?
782                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
783                .next()
784                .transpose()?;
785
786            anyhow::Ok(record)
787        })
788        .await??
789        {
790            return Ok(record);
791        }
792
793        if !await_detection {
794            anyhow::bail!("Note commitment for nullifier {:?} not found", nullifier);
795        }
796
797        // Otherwise, wait for newly detected notes and check whether they're
798        // the requested one.
799
800        loop {
801            match rx.recv().await {
802                Ok(record) => {
803                    if record.nullifier == nullifier {
804                        return Ok(record);
805                    }
806                }
807
808                Err(e) => match e {
809                    RecvError::Closed => {
810                        anyhow::bail!(
811                            "Receiver error during note detection: closed (no more active senders)"
812                        );
813                    }
814                    RecvError::Lagged(count) => {
815                        anyhow::bail!(
816                            "Receiver error during note detection: lagged (by {:?} messages)",
817                            count
818                        );
819                    }
820                },
821            };
822        }
823    }
824
825    pub async fn all_assets(&self) -> anyhow::Result<Vec<Metadata>> {
826        let pool = self.pool.clone();
827
828        spawn_blocking(move || {
829            pool.get()?
830                .prepare_cached("SELECT metadata FROM assets")?
831                .query_and_then([], |row| {
832                    let metadata_json = row.get::<_, String>("metadata")?;
833                    let denom_metadata = serde_json::from_str(&metadata_json)?;
834
835                    anyhow::Ok(denom_metadata)
836                })?
837                .collect()
838        })
839        .await?
840    }
841
842    pub async fn asset_by_id(&self, id: &Id) -> anyhow::Result<Option<Metadata>> {
843        let id = id.to_bytes().to_vec();
844
845        let pool = self.pool.clone();
846
847        spawn_blocking(move || {
848            pool.get()?
849                .prepare_cached("SELECT metadata FROM assets WHERE asset_id = ?1")?
850                .query_and_then([id], |row| {
851                    let metadata_json = row.get::<_, String>("metadata")?;
852                    let denom_metadata = serde_json::from_str(&metadata_json)?;
853                    anyhow::Ok(denom_metadata)
854                })?
855                .next()
856                .transpose()
857        })
858        .await?
859    }
860
861    // Get assets whose denoms match the given SQL LIKE pattern, with the `_` and `%` wildcards,
862    // where `\` is the escape character.
863    pub async fn assets_matching(&self, pattern: String) -> anyhow::Result<Vec<Metadata>> {
864        let pattern = pattern.to_owned();
865
866        let pool = self.pool.clone();
867
868        spawn_blocking(move || {
869            pool.get()?
870                .prepare_cached("SELECT metadata FROM assets WHERE denom LIKE ?1 ESCAPE '\\'")?
871                .query_and_then([pattern], |row| {
872                    let metadata_json = row.get::<_, String>("metadata")?;
873                    let denom_metadata = serde_json::from_str(&metadata_json)?;
874                    anyhow::Ok(denom_metadata)
875                })?
876                .collect()
877        })
878        .await?
879    }
880
881    pub async fn notes(
882        &self,
883        include_spent: bool,
884        asset_id: Option<asset::Id>,
885        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
886        amount_to_spend: Option<Amount>,
887    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
888        // If set, return spent notes as well as unspent notes.
889        // bool include_spent = 2;
890        let spent_clause = match include_spent {
891            false => "NULL",
892            true => "height_spent",
893        };
894
895        // If set, only return notes with the specified asset id.
896        // core.crypto.v1.AssetId asset_id = 3;
897        let asset_clause = asset_id
898            .map(|id| format!("x'{}'", hex::encode(id.to_bytes())))
899            .unwrap_or_else(|| "asset_id".to_string());
900
901        // If set, only return notes with the specified address index.
902        // crypto.AddressIndex address_index = 4;
903        // This isn't what we want any more, we need to be indexing notes
904        // by *account*, not just by address index.
905        // For now, just do filtering in software.
906        /*
907        let address_clause = address_index
908            .map(|d| format!("x'{}'", hex::encode(d.to_bytes())))
909            .unwrap_or_else(|| "address_index".to_string());
910         */
911        let address_clause = "address_index".to_string();
912
913        // If set, stop returning notes once the total exceeds this amount.
914        //
915        // Ignored if `asset_id` is unset or if `include_spent` is set.
916        // uint64 amount_to_spend = 5;
917        //TODO: figure out a clever way to only return notes up to the sum using SQL
918        let amount_cutoff = (amount_to_spend.is_some()) && !(include_spent || asset_id.is_none());
919        let mut amount_total = Amount::zero();
920
921        let pool = self.pool.clone();
922
923        spawn_blocking(move || {
924            let mut output: Vec<SpendableNoteRecord> = Vec::new();
925
926            for result in pool
927                .get()?
928                .prepare(&format!(
929                    "SELECT notes.note_commitment,
930                        spendable_notes.height_created,
931                        notes.address,
932                        notes.amount,
933                        notes.asset_id,
934                        notes.rseed,
935                        spendable_notes.address_index,
936                        spendable_notes.source,
937                        spendable_notes.height_spent,
938                        spendable_notes.nullifier,
939                        spendable_notes.position,
940                        tx.return_address
941                FROM notes
942                JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
943                LEFT JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
944                WHERE spendable_notes.height_spent IS {spent_clause}
945                AND notes.asset_id IS {asset_clause}
946                AND spendable_notes.address_index IS {address_clause}"
947                ))?
948                .query_and_then((), |row| SpendableNoteRecord::try_from(row))?
949            {
950                let record = result?;
951
952                // Skip notes that don't match the account, since we're
953                // not doing account filtering in SQL as a temporary hack (see above)
954                if let Some(address_index) = address_index {
955                    if record.address_index.account != address_index.account {
956                        continue;
957                    }
958                }
959                let amount = record.note.amount();
960
961                // Only display notes of value > 0
962
963                if amount.value() > 0 {
964                    output.push(record);
965                }
966
967                // If we're tracking amounts, accumulate the value of the note
968                // and check if we should break out of the loop.
969                if amount_cutoff {
970                    // We know all the notes are of the same type, so adding raw quantities makes sense.
971                    amount_total += amount;
972                    if amount_total >= amount_to_spend.unwrap_or_default() {
973                        break;
974                    }
975                }
976            }
977
978            if amount_total < amount_to_spend.unwrap_or_default() {
979                anyhow::bail!(
980                    "requested amount of {} exceeds total of {}",
981                    amount_to_spend.unwrap_or_default(),
982                    amount_total
983                );
984            }
985
986            anyhow::Ok(output)
987        })
988        .await?
989    }
990
991    /// Return all notes that are eligible for voting at `votable_at_height`.
992    /// If an `address_index` is provided, only notes from within that subaccount
993    /// will be returned; otherwise, all voting notes across all subaccounts will
994    /// returned. If you want just the main account, then set `Some(0)` in the caller.
995    pub async fn notes_for_voting(
996        &self,
997        address_index: Option<penumbra_sdk_keys::keys::AddressIndex>,
998        votable_at_height: u64,
999    ) -> anyhow::Result<Vec<(SpendableNoteRecord, IdentityKey)>> {
1000        let pool = self.pool.clone();
1001
1002        spawn_blocking(move || {
1003            let mut lock = pool.get()?;
1004            let dbtx = lock.transaction()?;
1005
1006            let spendable_note_records: Vec<SpendableNoteRecord> = dbtx
1007                .prepare(&format!(
1008                    "SELECT notes.note_commitment,
1009                        spendable_notes.height_created,
1010                        notes.address,
1011                        notes.amount,
1012                        notes.asset_id,
1013                        notes.rseed,
1014                        spendable_notes.address_index,
1015                        spendable_notes.source,
1016                        spendable_notes.height_spent,
1017                        spendable_notes.nullifier,
1018                        spendable_notes.position
1019                    FROM
1020                        notes JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1021                    WHERE
1022                        notes.asset_id IN (
1023                            SELECT asset_id FROM assets WHERE denom LIKE '_delegation\\_%' ESCAPE '\\'
1024                        )
1025                        AND ((spendable_notes.height_spent IS NULL) OR (spendable_notes.height_spent > {votable_at_height}))
1026                        AND (spendable_notes.height_created < {votable_at_height})
1027                    ",
1028                ))?
1029                .query_and_then((), |row| row.try_into())?
1030                .collect::<anyhow::Result<Vec<_>>>()?;
1031
1032            // TODO: this could be internalized into the SQL query in principle, but it's easier to
1033            // do it this way; if it becomes slow, we can do it better
1034            let mut results = Vec::new();
1035            for record in spendable_note_records {
1036                // Skip notes that don't match the account index, if declared.
1037                if matches!(address_index, Some(a) if a.account != record.address_index.account) {
1038                      continue;
1039                }
1040                let asset_id = record.note.asset_id().to_bytes().to_vec();
1041                let denom: String = dbtx.query_row_and_then(
1042                    "SELECT denom FROM assets WHERE asset_id = ?1",
1043                    [asset_id],
1044                    |row| row.get("denom"),
1045                )?;
1046
1047                let identity_key = DelegationToken::from_str(&denom)
1048                    .context("invalid delegation token denom")?
1049                    .validator();
1050
1051                results.push((record, identity_key));
1052            }
1053
1054            Ok(results)
1055        }).await?
1056    }
1057
1058    #[tracing::instrument(skip(self))]
1059    pub async fn record_asset(&self, asset: Metadata) -> anyhow::Result<()> {
1060        tracing::debug!(?asset);
1061
1062        let asset_id = asset.id().to_bytes().to_vec();
1063        let denom = asset.base_denom().denom;
1064        let metadata_json = serde_json::to_string(&asset)?;
1065
1066        let pool = self.pool.clone();
1067
1068        spawn_blocking(move || {
1069            pool.get()?
1070                .execute(
1071                    "INSERT OR REPLACE INTO assets (asset_id, denom, metadata) VALUES (?1, ?2, ?3)",
1072                    (asset_id, denom, metadata_json),
1073                )
1074                .map_err(anyhow::Error::from)
1075        })
1076        .await??;
1077
1078        Ok(())
1079    }
1080
1081    pub async fn record_auction_with_state(
1082        &self,
1083        auction_id: AuctionId,
1084        auction_state: u64,
1085    ) -> anyhow::Result<()> {
1086        let auction_id = auction_id.0.to_vec();
1087
1088        let pool = self.pool.clone();
1089
1090        spawn_blocking(move || {
1091            let mut lock = pool.get()?;
1092            let tx = lock.transaction()?;
1093            tx.execute(
1094                "INSERT OR IGNORE INTO auctions (auction_id, auction_state, note_commitment) VALUES (?1, ?2, NULL)",
1095                (auction_id.clone(), auction_state),
1096            )?;
1097            tx.execute(
1098                "UPDATE auctions SET auction_state = ?2 WHERE auction_id = ?1",
1099                (auction_id, auction_state),
1100            )
1101                .map_err(anyhow::Error::from)?;
1102
1103            tx.commit()?;
1104            Ok::<(), anyhow::Error>(())
1105            })
1106            .await??;
1107
1108        Ok(())
1109    }
1110
1111    pub async fn update_auction_with_note_commitment(
1112        &self,
1113        auction_id: AuctionId,
1114        note_commitment: StateCommitment,
1115    ) -> anyhow::Result<()> {
1116        let auction_id = auction_id.0.to_vec();
1117        let blob_nc = note_commitment.0.to_bytes().to_vec();
1118
1119        let pool = self.pool.clone();
1120
1121        spawn_blocking(move || {
1122            pool.get()?
1123                .execute(
1124                    "UPDATE auctions SET (note_commitment) = ?1 WHERE auction_id = ?2",
1125                    (blob_nc, auction_id),
1126                )
1127                .map_err(anyhow::Error::from)
1128        })
1129        .await??;
1130
1131        Ok(())
1132    }
1133
1134    pub async fn fetch_auctions_by_account(
1135        &self,
1136        account_filter: Option<AddressIndex>,
1137        include_inactive: bool,
1138    ) -> anyhow::Result<Vec<(AuctionId, SpendableNoteRecord, u64 /* local seqnum */)>> {
1139        let account_clause = account_filter
1140            .map(|idx| {
1141                format!(
1142                    "AND spendable_notes.address_index = x'{}'",
1143                    hex::encode(idx.to_bytes())
1144                )
1145            })
1146            .unwrap_or_else(|| "".to_string());
1147
1148        let active_clause = if !include_inactive {
1149            "AND auctions.auction_state = 0"
1150        } else {
1151            ""
1152        };
1153
1154        let query = format!(
1155            "SELECT auctions.auction_id, spendable_notes.*, notes.*, auctions.auction_state
1156                 FROM auctions
1157                 JOIN spendable_notes ON auctions.note_commitment = spendable_notes.note_commitment
1158                 JOIN notes ON auctions.note_commitment = notes.note_commitment
1159                 WHERE 1 = 1
1160                 {account_clause}
1161                 {active_clause}",
1162            account_clause = account_clause,
1163            active_clause = active_clause,
1164        );
1165
1166        let pool = self.pool.clone();
1167
1168        spawn_blocking(move || {
1169            let mut conn = pool.get()?;
1170            let tx = conn.transaction()?;
1171
1172            let spendable_note_records: Vec<(AuctionId, SpendableNoteRecord, u64)> = tx
1173                .prepare(&query)?
1174                .query_and_then((), |row| {
1175                    let raw_auction_id: Vec<u8> = row.get("auction_id")?;
1176                    let array_auction_id: [u8; 32] = raw_auction_id
1177                        .try_into()
1178                        .map_err(|_| anyhow!("auction id must be 32 bytes"))?;
1179                    let auction_id = AuctionId(array_auction_id);
1180                    let spendable_note_record: SpendableNoteRecord = row.try_into()?;
1181                    let local_seq: u64 = row.get("auction_state")?;
1182                    Ok((auction_id, spendable_note_record, local_seq))
1183                })?
1184                .collect::<anyhow::Result<Vec<_>>>()?;
1185
1186            Ok(spendable_note_records)
1187        })
1188        .await?
1189    }
1190
1191    pub async fn record_position(&self, position: Position) -> anyhow::Result<()> {
1192        let position_id = position.id().0.to_vec();
1193
1194        let position_state = position.state.to_string();
1195        let trading_pair = position.phi.pair.to_string();
1196
1197        let pool = self.pool.clone();
1198
1199        spawn_blocking(move || {
1200            pool.get()?
1201                .execute(
1202                    "INSERT OR REPLACE INTO positions (position_id, position_state, trading_pair) VALUES (?1, ?2, ?3)",
1203                    (position_id, position_state, trading_pair),
1204                )
1205                .map_err(anyhow::Error::from)
1206        })
1207            .await??;
1208
1209        Ok(())
1210    }
1211
1212    pub async fn update_position(
1213        &self,
1214        position_id: position::Id,
1215        position_state: position::State,
1216    ) -> anyhow::Result<()> {
1217        let position_id = position_id.0.to_vec();
1218        let position_state = position_state.to_string();
1219
1220        let pool = self.pool.clone();
1221
1222        spawn_blocking(move || {
1223            pool.get()?
1224                .execute(
1225                    "UPDATE positions SET (position_state) = ?1 WHERE position_id = ?2",
1226                    (position_state, position_id),
1227                )
1228                .map_err(anyhow::Error::from)
1229        })
1230        .await??;
1231
1232        Ok(())
1233    }
1234
1235    pub async fn update_position_with_account(
1236        &self,
1237        position_id: position::Id,
1238        account: u32,
1239    ) -> anyhow::Result<()> {
1240        let pool = self.pool.clone();
1241
1242        spawn_blocking(move || {
1243            pool.get()?
1244                .execute(
1245                    "UPDATE positions SET (account) = ?1 WHERE position_id = ?2",
1246                    (i64::from(account), position_id.0),
1247                )
1248                .map_err(anyhow::Error::from)
1249        })
1250        .await??;
1251
1252        Ok(())
1253    }
1254
1255    pub async fn record_empty_block(&self, height: u64) -> anyhow::Result<()> {
1256        // Check that the incoming block height follows the latest recorded height
1257        let last_sync_height = self.last_sync_height().await?.ok_or_else(|| {
1258            anyhow::anyhow!("invalid: tried to record empty block as genesis block")
1259        })?;
1260
1261        if height != last_sync_height + 1 {
1262            anyhow::bail!(
1263                "Wrong block height {} for latest sync height {}",
1264                height,
1265                last_sync_height
1266            );
1267        }
1268
1269        *self.uncommitted_height.lock() = Some(height.try_into()?);
1270        Ok(())
1271    }
1272
1273    fn record_note_inner(
1274        dbtx: &r2d2_sqlite::rusqlite::Transaction<'_>,
1275        note: &Note,
1276    ) -> anyhow::Result<()> {
1277        let note_commitment = note.commit().0.to_bytes().to_vec();
1278        let address = note.address().to_vec();
1279        let amount = u128::from(note.amount()).to_be_bytes().to_vec();
1280        let asset_id = note.asset_id().to_bytes().to_vec();
1281        let rseed = note.rseed().to_bytes().to_vec();
1282
1283        dbtx.execute(
1284            "INSERT INTO notes (note_commitment, address, amount, asset_id, rseed)
1285                VALUES (?1, ?2, ?3, ?4, ?5)
1286                ON CONFLICT (note_commitment)
1287                DO UPDATE SET
1288                address = excluded.address,
1289                amount = excluded.amount,
1290                asset_id = excluded.asset_id,
1291                rseed = excluded.rseed",
1292            (note_commitment, address, amount, asset_id, rseed),
1293        )?;
1294
1295        Ok(())
1296    }
1297
1298    pub async fn give_advice(&self, note: Note) -> anyhow::Result<()> {
1299        let pool = self.pool.clone();
1300        let mut lock = pool.get()?;
1301        let dbtx = lock.transaction()?;
1302
1303        Storage::record_note_inner(&dbtx, &note)?;
1304
1305        dbtx.commit()?;
1306
1307        Ok(())
1308    }
1309
1310    /// Return advice about note contents for use in scanning.
1311    ///
1312    /// Given a list of note commitments, this method checks whether any of them
1313    /// correspond to notes that have been recorded in the database but not yet
1314    /// observed during scanning.
1315    pub async fn scan_advice(
1316        &self,
1317        note_commitments: Vec<note::StateCommitment>,
1318    ) -> anyhow::Result<BTreeMap<note::StateCommitment, Note>> {
1319        if note_commitments.is_empty() {
1320            return Ok(BTreeMap::new());
1321        }
1322
1323        let pool = self.pool.clone();
1324
1325        // This query gives advice about notes which are known but which have not already been recorded as spendable,
1326        // in part to avoid revealing information about which notes have been spent.
1327
1328        spawn_blocking(move || {
1329            pool.get()?
1330                .prepare(&format!(
1331                    "SELECT notes.note_commitment,
1332                        notes.address,
1333                        notes.amount,
1334                        notes.asset_id,
1335                        notes.rseed
1336                    FROM notes
1337                    LEFT OUTER JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1338                    WHERE (spendable_notes.note_commitment IS NULL) AND (notes.note_commitment IN ({}))",
1339                    note_commitments
1340                        .iter()
1341                        .map(|cm| format!("x'{}'", hex::encode(cm.0.to_bytes())))
1342                        .collect::<Vec<_>>()
1343                        .join(", ")
1344                ))?
1345                .query_and_then((), |row| {
1346                    let address = Address::try_from(row.get::<_, Vec<u8>>("address")?)?;
1347                    let amount = row.get::<_, [u8; 16]>("amount")?;
1348                    let amount_u128: u128 = u128::from_be_bytes(amount);
1349                    let asset_id = asset::Id(Fq::from_bytes_checked(&row.get::<_, [u8; 32]>("asset_id")?).expect("asset id malformed"));
1350                    let rseed = Rseed(row.get::<_, [u8; 32]>("rseed")?);
1351                    let note = Note::from_parts(
1352                        address,
1353                        Value {
1354                            amount: amount_u128.into(),
1355                            asset_id,
1356                        },
1357                        rseed,
1358                    )?;
1359                    anyhow::Ok((note.commit(), note))
1360                })?
1361                .collect::<anyhow::Result<BTreeMap<_, _>>>()
1362        }).await?
1363    }
1364
1365    /// Filters for nullifiers whose notes we control
1366    pub async fn filter_nullifiers(
1367        &self,
1368        nullifiers: Vec<Nullifier>,
1369    ) -> anyhow::Result<Vec<Nullifier>> {
1370        if nullifiers.is_empty() {
1371            return Ok(Vec::new());
1372        }
1373
1374        let pool = self.pool.clone();
1375
1376        spawn_blocking(move || {
1377            pool.get()?
1378                .prepare(&format!(
1379                    "SELECT nullifier FROM (SELECT nullifier FROM spendable_notes UNION SELECT nullifier FROM swaps UNION SELECT nullifier FROM tx_by_nullifier) WHERE nullifier IN ({})",
1380                    nullifiers
1381                        .iter()
1382                        .map(|x| format!("x'{}'", hex::encode(x.0.to_bytes())))
1383                        .collect::<Vec<String>>()
1384                        .join(",")
1385                ))?
1386                .query_and_then((), |row| {
1387                    let nullifier: Vec<u8> = row.get("nullifier")?;
1388                    nullifier.as_slice().try_into()
1389                })?
1390                .collect()
1391        })
1392            .await?
1393    }
1394
1395    pub async fn record_block(
1396        &self,
1397        filtered_block: FilteredBlock,
1398        transactions: Vec<Transaction>,
1399        sct: &mut tct::Tree,
1400        channel: tonic::transport::Channel,
1401    ) -> anyhow::Result<()> {
1402        //Check that the incoming block height follows the latest recorded height
1403        let last_sync_height = self.last_sync_height().await?;
1404
1405        let correct_height = match last_sync_height {
1406            // Require that the new block follows the last one we scanned.
1407            Some(cur_height) => filtered_block.height == cur_height + 1,
1408            // Require that the new block represents the initial chain state.
1409            None => filtered_block.height == 0,
1410        };
1411
1412        if !correct_height {
1413            anyhow::bail!(
1414                "Wrong block height {} for latest sync height {:?}",
1415                filtered_block.height,
1416                last_sync_height
1417            );
1418        }
1419
1420        let pool = self.pool.clone();
1421        let uncommitted_height = self.uncommitted_height.clone();
1422        let scanned_notes_tx = self.scanned_notes_tx.clone();
1423        let scanned_nullifiers_tx = self.scanned_nullifiers_tx.clone();
1424        let scanned_swaps_tx = self.scanned_swaps_tx.clone();
1425
1426        let fvk = self.full_viewing_key().await?;
1427
1428        // If the app parameters have changed, update them.
1429        let new_app_parameters: Option<AppParameters> = if filtered_block.app_parameters_updated {
1430            // Fetch the latest parameters
1431            let mut client = AppQueryServiceClient::new(channel);
1432            Some(
1433                client
1434                    .app_parameters(tonic::Request::new(AppParametersRequest {}))
1435                    .await?
1436                    .into_inner()
1437                    .try_into()?,
1438            )
1439        } else {
1440            None
1441        };
1442
1443        // Cloning the SCT is cheap because it's a copy-on-write structure, so we move an owned copy
1444        // into the spawned thread. This means that if for any reason the thread panics or throws an
1445        // error, the changes to the SCT will be discarded, just like any changes to the database,
1446        // so the two stay transactionally in sync, even in the case of errors. This would not be
1447        // the case if we `std::mem::take` the SCT and move it into the spawned thread, because then
1448        // an error would mean the updated version would never be put back, and the outcome would be
1449        // a cleared SCT but a non-empty database.
1450        let mut new_sct = sct.clone();
1451
1452        *sct = spawn_blocking(move || {
1453            let mut lock = pool.get()?;
1454            let mut dbtx = lock.transaction()?;
1455
1456            if let Some(params) = new_app_parameters {
1457                let params_bytes = params.encode_to_vec();
1458                // We expect app_params to be present already but may as well use an upsert
1459                dbtx.execute(
1460                    "INSERT INTO kv (k, v) VALUES ('app_params', ?1)
1461                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1462                    [&params_bytes[..]],
1463                )?;
1464            }
1465
1466            // Insert new note records into storage
1467            for note_record in filtered_block.new_notes.values() {
1468                let note_commitment = note_record.note_commitment.0.to_bytes().to_vec();
1469                let height_created = filtered_block.height as i64;
1470                let address_index = note_record.address_index.to_bytes().to_vec();
1471                let nullifier = note_record.nullifier.to_bytes().to_vec();
1472                let position = (u64::from(note_record.position)) as i64;
1473                let source = note_record.source.encode_to_vec();
1474                // Check if the note is from a transaction, if so, include the tx hash (id)
1475                let tx_hash = match note_record.source {
1476                    CommitmentSource::Transaction { id } => id,
1477                    _ => None,
1478                };
1479
1480                // Record the inner note data in the notes table
1481                Storage::record_note_inner(&dbtx, &note_record.note)?;
1482
1483                dbtx.execute(
1484                    "INSERT INTO spendable_notes
1485                    (note_commitment, nullifier, position, height_created, address_index, source, height_spent, tx_hash)
1486                    VALUES (?1, ?2, ?3, ?4, ?5, ?6, NULL, ?7)
1487                    ON CONFLICT (note_commitment)
1488                    DO UPDATE SET nullifier = excluded.nullifier,
1489                    position = excluded.position,
1490                    height_created = excluded.height_created,
1491                    address_index = excluded.address_index,
1492                    source = excluded.source,
1493                    height_spent = excluded.height_spent,
1494                    tx_hash = excluded.tx_hash",
1495                    (
1496                        &note_commitment,
1497                        &nullifier,
1498                        &position,
1499                        &height_created,
1500                        &address_index,
1501                        &source,
1502                        // height_spent is NULL because the note is newly discovered
1503                        &tx_hash,
1504                    ),
1505                )?;
1506            }
1507
1508            // Insert new swap records into storage
1509            for swap in filtered_block.new_swaps.values() {
1510                let swap_commitment = swap.swap_commitment.0.to_bytes().to_vec();
1511                let swap_bytes = swap.swap.encode_to_vec();
1512                let position = (u64::from(swap.position)) as i64;
1513                let nullifier = swap.nullifier.to_bytes().to_vec();
1514                let source = swap.source.encode_to_vec();
1515                let output_data = swap.output_data.encode_to_vec();
1516
1517                dbtx.execute(
1518                    "INSERT INTO swaps (swap_commitment, swap, position, nullifier, output_data, height_claimed, source)
1519                    VALUES (?1, ?2, ?3, ?4, ?5, NULL, ?6)
1520                    ON CONFLICT (swap_commitment)
1521                    DO UPDATE SET swap = excluded.swap,
1522                    position = excluded.position,
1523                    nullifier = excluded.nullifier,
1524                    output_data = excluded.output_data,
1525                    height_claimed = excluded.height_claimed,
1526                    source = excluded.source",
1527                    (
1528                        &swap_commitment,
1529                        &swap_bytes,
1530                        &position,
1531                        &nullifier,
1532                        &output_data,
1533                        // height_claimed is NULL because the swap is newly discovered
1534                        &source,
1535                    ),
1536                )?;
1537            }
1538
1539            // Update any rows of the table with matching nullifiers to have height_spent
1540            for nullifier in &filtered_block.spent_nullifiers {
1541                let height_spent = filtered_block.height as i64;
1542                let nullifier_bytes = nullifier.to_bytes().to_vec();
1543
1544                let spent_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1545                    "UPDATE spendable_notes SET height_spent = ?1 WHERE nullifier = ?2 RETURNING note_commitment"
1546                )?
1547                    .query_and_then(
1548                        (height_spent, &nullifier_bytes),
1549                        |row| {
1550                            let bytes: Vec<u8> = row.get("note_commitment")?;
1551                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1552                        },
1553                    )?
1554                    .next()
1555                    .transpose()?;
1556
1557                let swap_commitment: Option<StateCommitment> = dbtx.prepare_cached(
1558                    "UPDATE swaps SET height_claimed = ?1 WHERE nullifier = ?2 RETURNING swap_commitment"
1559                )?
1560                    .query_and_then(
1561                        (height_spent, &nullifier_bytes),
1562                        |row| {
1563                            let bytes: Vec<u8> = row.get("swap_commitment")?;
1564                            StateCommitment::try_from(&bytes[..]).context("invalid commitment bytes")
1565                        },
1566                    )?
1567                    .next()
1568                    .transpose()?;
1569
1570                // Check denom type
1571                let spent_denom: String
1572                    = dbtx.prepare_cached(
1573                    "SELECT denom FROM assets
1574                        WHERE asset_id ==
1575                            (SELECT asset_id FROM notes
1576                             WHERE note_commitment ==
1577                                (SELECT note_commitment FROM spendable_notes WHERE nullifier = ?1))"
1578                )?
1579                    .query_and_then(
1580                        [&nullifier_bytes],
1581                        |row| row.get("denom"),
1582                    )?
1583                    .next()
1584                    .transpose()?
1585                    .unwrap_or("unknown".to_string());
1586
1587                // Mark spent notes as spent
1588                if let Some(spent_commitment) = spent_commitment {
1589                    tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "detected spent note commitment");
1590                    // Forget spent note commitments from the SCT unless they are delegation tokens,
1591                    // which must be saved to allow voting on proposals that might or might not be
1592                    // open presently
1593
1594                    if DelegationToken::from_str(&spent_denom).is_err() {
1595                        tracing::debug!(?nullifier, ?spent_commitment, ?spent_denom, "forgetting spent note commitment");
1596                        new_sct.forget(spent_commitment);
1597                    }
1598                };
1599
1600                // Mark spent swaps as spent
1601                if let Some(spent_swap_commitment) = swap_commitment {
1602                    tracing::debug!(?nullifier, ?spent_swap_commitment, "detected and forgetting spent swap commitment");
1603                    new_sct.forget(spent_swap_commitment);
1604                };
1605            }
1606
1607            // Update SCT table with current SCT state
1608            new_sct.to_writer(&mut TreeStore(&mut dbtx))?;
1609
1610            // Record all transactions
1611            for transaction in transactions {
1612                let tx_bytes = transaction.encode_to_vec();
1613                // We have to create an explicit temporary borrow, because the sqlx api is bad (see above)
1614                let tx_hash_owned = sha2::Sha256::digest(&tx_bytes);
1615                let tx_hash = tx_hash_owned.as_slice();
1616                let tx_block_height = filtered_block.height as i64;
1617                let decrypted_memo = transaction.decrypt_memo(&fvk).ok();
1618                let memo_text = decrypted_memo.clone().map_or(None,|x| Some(x.text().to_string()));
1619                let return_address = decrypted_memo.map_or(None, |x| Some(x.return_address().to_vec()));
1620
1621                tracing::debug!(tx_hash = ?hex::encode(tx_hash), "recording extended transaction");
1622
1623                dbtx.execute(
1624                    "INSERT OR IGNORE INTO tx (tx_hash, tx_bytes, block_height, return_address, memo_text) VALUES (?1, ?2, ?3, ?4, ?5)",
1625                    (&tx_hash, &tx_bytes, tx_block_height, return_address, memo_text),
1626                )?;
1627
1628                // Associate all of the spent nullifiers with the transaction by hash.
1629                for nf in transaction.spent_nullifiers() {
1630                    let nf_bytes = nf.0.to_bytes().to_vec();
1631                    dbtx.execute(
1632                        "INSERT OR IGNORE INTO tx_by_nullifier (nullifier, tx_hash) VALUES (?1, ?2)",
1633                        (&nf_bytes, &tx_hash),
1634                    )?;
1635                }
1636            }
1637
1638            // Update FMD parameters if they've changed.
1639            if filtered_block.fmd_parameters.is_some() {
1640                let fmd_parameters_bytes =
1641                    &fmd::Parameters::encode_to_vec(&filtered_block.fmd_parameters.ok_or_else(|| anyhow::anyhow!("missing fmd parameters in filtered block"))?)[..];
1642
1643                dbtx.execute(
1644                    "INSERT INTO kv (k, v) VALUES ('fmd_params', ?1)
1645                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1646                    [&fmd_parameters_bytes],
1647                )?;
1648            }
1649
1650            // Update gas prices if they've changed.
1651            if filtered_block.gas_prices.is_some() {
1652                let gas_prices_bytes =
1653                    &GasPrices::encode_to_vec(&filtered_block.gas_prices.ok_or_else(|| anyhow::anyhow!("missing gas prices in filtered block"))?)[..];
1654
1655                dbtx.execute(
1656                    "INSERT INTO kv (k, v) VALUES ('gas_prices', ?1)
1657                    ON CONFLICT(k) DO UPDATE SET v = excluded.v",
1658                    [&gas_prices_bytes],
1659                )?;
1660            }
1661
1662            // Record block height as latest synced height
1663            let latest_sync_height = filtered_block.height as i64;
1664            dbtx.execute("UPDATE sync_height SET height = ?1", [latest_sync_height])?;
1665
1666            // Commit the changes to the database
1667            dbtx.commit()?;
1668
1669            // IMPORTANT: NO PANICS OR ERRORS PAST THIS POINT
1670            // If there is a panic or error past this point, the database will be left in out of
1671            // sync with the in-memory copy of the SCT, which means that it will become corrupted as
1672            // synchronization continues.
1673
1674            // It's critical to reset the uncommitted height here, since we've just
1675            // invalidated it by committing.
1676            uncommitted_height.lock().take();
1677
1678            // Broadcast all committed note records to channel
1679            // Done following tx.commit() to avoid notifying of a new SpendableNoteRecord before it is actually committed to the database
1680
1681            for note_record in filtered_block.new_notes.values() {
1682                // This will fail to be broadcast if there is no active receiver (such as on initial
1683                // sync) The error is ignored, as this isn't a problem, because if there is no
1684                // active receiver there is nothing to do
1685                let _ = scanned_notes_tx.send(note_record.clone());
1686            }
1687
1688            for nullifier in filtered_block.spent_nullifiers.iter() {
1689                // This will fail to be broadcast if there is no active receiver (such as on initial
1690                // sync) The error is ignored, as this isn't a problem, because if there is no
1691                // active receiver there is nothing to do
1692                let _ = scanned_nullifiers_tx.send(*nullifier);
1693            }
1694
1695            for swap_record in filtered_block.new_swaps.values() {
1696                // This will fail to be broadcast if there is no active receāˆ‘iver (such as on initial
1697                // sync) The error is ignored, as this isn't a problem, because if there is no
1698                // active receiver there is nothing to do
1699                let _ = scanned_swaps_tx.send(swap_record.clone());
1700            }
1701
1702            anyhow::Ok(new_sct)
1703        })
1704            .await??;
1705
1706        Ok(())
1707    }
1708
1709    pub async fn owned_position_ids(
1710        &self,
1711        position_state: Option<State>,
1712        trading_pair: Option<TradingPair>,
1713        address_index: Option<AddressIndex>,
1714    ) -> anyhow::Result<Vec<position::Id>> {
1715        let pool = self.pool.clone();
1716
1717        let state_clause = match position_state {
1718            Some(state) => format!("position_state = \"{}\"", state),
1719            None => "true".to_string(),
1720        };
1721
1722        let pair_clause = match trading_pair {
1723            Some(pair) => format!("trading_pair = \"{}\"", pair),
1724            None => "true".to_string(),
1725        };
1726
1727        let account_clause = match address_index {
1728            Some(index) => format!("account = {}", index.account),
1729            None => "true".to_string(),
1730        };
1731
1732        spawn_blocking(move || {
1733            let q = format!(
1734                "SELECT position_id FROM positions WHERE {} AND {} AND {}",
1735                state_clause, pair_clause, account_clause
1736            );
1737
1738            pool.get()?
1739                .prepare_cached(&q)?
1740                .query_and_then([], |row| {
1741                    let position_id: Vec<u8> = row.get("position_id")?;
1742                    Ok(position::Id(position_id.as_slice().try_into()?))
1743                })?
1744                .collect()
1745        })
1746        .await?
1747    }
1748
1749    pub async fn notes_by_sender(
1750        &self,
1751        return_address: &Address,
1752    ) -> anyhow::Result<Vec<SpendableNoteRecord>> {
1753        let pool = self.pool.clone();
1754
1755        let query = "SELECT notes.note_commitment,
1756            spendable_notes.height_created,
1757            notes.address,
1758            notes.amount,
1759            notes.asset_id,
1760            notes.rseed,
1761            spendable_notes.address_index,
1762            spendable_notes.source,
1763            spendable_notes.height_spent,
1764            spendable_notes.nullifier,
1765            spendable_notes.position
1766            FROM notes
1767            JOIN spendable_notes ON notes.note_commitment = spendable_notes.note_commitment
1768            JOIN tx ON spendable_notes.tx_hash = tx.tx_hash
1769            WHERE tx.return_address = ?1";
1770
1771        let return_address = return_address.to_vec();
1772
1773        let records = spawn_blocking(move || {
1774            pool.get()?
1775                .prepare(query)?
1776                .query_and_then([return_address], |record| record.try_into())?
1777                .collect::<anyhow::Result<Vec<_>>>()
1778        })
1779        .await??;
1780
1781        Ok(records)
1782    }
1783
1784    /// Get all transactions with a matching memo text. The `pattern` argument
1785    /// should include SQL wildcards, such as `%` and `_`, to match substrings,
1786    /// e.g. `%foo%`.
1787    pub async fn transactions_matching_memo(
1788        &self,
1789        pattern: String,
1790    ) -> anyhow::Result<Vec<(u64, Vec<u8>, Transaction, String)>> {
1791        let pattern = pattern.to_owned();
1792        tracing::trace!(?pattern, "searching for memos matching");
1793        let pool = self.pool.clone();
1794
1795        spawn_blocking(move || {
1796            pool.get()?
1797                .prepare_cached("SELECT block_height, tx_hash, tx_bytes, memo_text FROM tx WHERE memo_text LIKE ?1 ESCAPE '\\'")?
1798                .query_and_then([pattern], |row| {
1799                    let block_height: u64 = row.get("block_height")?;
1800                    let tx_hash: Vec<u8> = row.get("tx_hash")?;
1801                    let tx_bytes: Vec<u8> = row.get("tx_bytes")?;
1802                    let tx = Transaction::decode(tx_bytes.as_slice())?;
1803                    let memo_text: String = row.get("memo_text")?;
1804                    anyhow::Ok((block_height, tx_hash, tx, memo_text))
1805                })?
1806                .collect()
1807        })
1808        .await?
1809    }
1810
1811    /// Update information about an epoch.
1812    pub async fn update_epoch(
1813        &self,
1814        epoch: u64,
1815        root: Option<Root>,
1816        start_height: Option<u64>,
1817    ) -> anyhow::Result<()> {
1818        let pool = self.pool.clone();
1819
1820        spawn_blocking(move || {
1821            pool.get()?
1822                .execute(
1823                    r#"
1824                    INSERT INTO epochs(epoch_index, root, start_height)
1825                    VALUES (?1, ?2, ?3)
1826                    ON CONFLICT(epoch_index)
1827                    DO UPDATE SET
1828                        root = COALESCE(?2, root),
1829                        start_height = COALESCE(?3, start_height)
1830                    "#,
1831                    (epoch, root.map(|x| x.encode_to_vec()), start_height),
1832                )
1833                .map_err(anyhow::Error::from)
1834        })
1835        .await??;
1836
1837        Ok(())
1838    }
1839
1840    /// Fetch information about the current epoch.
1841    ///
1842    /// This will return the root of the epoch, if present,
1843    /// and the start height of the epoch, if present.
1844    pub async fn get_epoch(&self, epoch: u64) -> anyhow::Result<(Option<Root>, Option<u64>)> {
1845        let pool = self.pool.clone();
1846
1847        spawn_blocking(move || {
1848            pool.get()?
1849                .query_row_and_then(
1850                    r#"
1851                    SELECT root, start_height
1852                    FROM epochs
1853                    WHERE epoch_index = ?1
1854                    "#,
1855                    (epoch,),
1856                    |row| {
1857                        let root_raw: Option<Vec<u8>> = row.get("root")?;
1858                        let start_height: Option<u64> = row.get("start_height")?;
1859                        let root = root_raw.map(|x| Root::decode(x.as_slice())).transpose()?;
1860                        anyhow::Ok((root, start_height))
1861                    },
1862                )
1863                .map_err(anyhow::Error::from)
1864        })
1865        .await?
1866    }
1867}