cnidarium/
snapshot.rs

1use std::iter;
2use std::{any::Any, sync::Arc};
3
4use anyhow::Result;
5use async_trait::async_trait;
6use ibc_types::core::commitment::MerkleProof;
7use tokio::sync::mpsc;
8use tracing::Span;
9
10#[cfg(feature = "metrics")]
11use crate::metrics;
12use crate::store::multistore::{self, MultistoreCache};
13use crate::{store, StateRead};
14
15mod rocks_wrapper;
16
17pub(crate) use rocks_wrapper::RocksDbSnapshot;
18
19/// A snapshot of the underlying storage at a specific state version, suitable
20/// for read-only access by multiple threads, e.g., RPC calls.
21///
22/// Snapshots are cheap to create and clone.  Internally, they're implemented as
23/// a wrapper around a [RocksDB snapshot](https://github.com/facebook/rocksdb/wiki/Snapshot)
24/// with a pinned JMT version number for the snapshot.
25#[derive(Clone)]
26pub struct Snapshot(pub(crate) Arc<Inner>);
27
28// We don't want to expose the `TreeReader` implementation outside of this crate.
29#[derive(Debug)]
30pub(crate) struct Inner {
31    /// Tracks the latest version of each substore, and routes keys to the correct substore.
32    pub(crate) multistore_cache: MultistoreCache,
33    /// A handle to the underlying RocksDB snapshot.
34    pub(crate) snapshot: Arc<RocksDbSnapshot>,
35    /// The version of the main JMT tree.
36    pub(crate) version: jmt::Version,
37    // Used to retrieve column family handles.
38    pub(crate) db: Arc<rocksdb::DB>,
39}
40
41impl Snapshot {
42    /// Creates a new `Snapshot` with the given version and substore configs.
43    pub(crate) fn new(
44        db: Arc<rocksdb::DB>,
45        version: jmt::Version,
46        multistore_cache: multistore::MultistoreCache,
47    ) -> Self {
48        Self(Arc::new(Inner {
49            snapshot: Arc::new(RocksDbSnapshot::new(db.clone())),
50            version,
51            db,
52            multistore_cache,
53        }))
54    }
55
56    pub fn version(&self) -> jmt::Version {
57        self.0.version
58    }
59
60    /// Returns some value corresponding to the key, along with an ICS23 existence proof
61    /// up to the current JMT root hash. If the key is not present, returns `None` and a
62    /// non-existence proof.
63    pub async fn get_with_proof(&self, key: Vec<u8>) -> Result<(Option<Vec<u8>>, MerkleProof)> {
64        if key.is_empty() {
65            anyhow::bail!("empty keys are not allowed")
66        }
67
68        let span = tracing::Span::current();
69        let rocksdb_snapshot = self.0.snapshot.clone();
70        let db = self.0.db.clone();
71        let mut proofs = vec![];
72
73        let (substore_key, substore_config) = self.0.multistore_cache.config.route_key_bytes(&key);
74        let substore_key_bytes = substore_key.to_vec();
75        let substore_version = self.substore_version(&substore_config).unwrap_or(u64::MAX);
76        let key_to_substore_root = substore_config.prefix.clone();
77
78        let substore = store::substore::SubstoreSnapshot {
79            config: substore_config,
80            rocksdb_snapshot: rocksdb_snapshot.clone(),
81            version: substore_version,
82            db: db.clone(),
83        };
84
85        let (substore_value, substore_commitment_proof) = tokio::task::spawn_blocking({
86            let span = span.clone();
87            move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
88        })
89        .await??;
90
91        proofs.push(substore_commitment_proof);
92
93        // in the case where we request a proof for a key that is in a substore, also get a proof from the root to the substore key.
94        if !key_to_substore_root.is_empty() {
95            let main_store_config = self.0.multistore_cache.config.main_store.clone();
96            let main_version = self
97                .substore_version(&main_store_config)
98                .unwrap_or(u64::MAX);
99            let mainstore = store::substore::SubstoreSnapshot {
100                config: main_store_config,
101                rocksdb_snapshot,
102                version: main_version,
103                db,
104            };
105
106            let (_, main_commitment_proof) = tokio::task::spawn_blocking({
107                let span = span.clone();
108                move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
109            })
110            .await??;
111
112            proofs.push(main_commitment_proof);
113        }
114
115        Ok((
116            substore_value,
117            MerkleProof {
118                proofs: proofs.clone(),
119            },
120        ))
121    }
122
123    pub fn prefix_version(&self, prefix: &str) -> Result<Option<jmt::Version>> {
124        let Some(config) = self
125            .0
126            .multistore_cache
127            .config
128            .find_substore(prefix.as_bytes())
129        else {
130            anyhow::bail!("rquested a version for a prefix that does not exist (prefix={prefix})")
131        };
132
133        Ok(self.substore_version(&config))
134    }
135
136    /// Returns the root hash of the subtree corresponding to the given prefix.
137    /// If the prefix is empty, the root hash of the main tree is returned.
138    ///
139    /// # Errors
140    /// Returns an error if the supplied prefix does not correspond to a known substore.
141    pub async fn prefix_root_hash(&self, prefix: &str) -> Result<crate::RootHash> {
142        let span = tracing::Span::current();
143        let rocksdb_snapshot = self.0.snapshot.clone();
144        let db = self.0.db.clone();
145
146        let Some(config) = self
147            .0
148            .multistore_cache
149            .config
150            .find_substore(prefix.as_bytes())
151        else {
152            anyhow::bail!("requested a root for a substore that does not exist (prefix={prefix})")
153        };
154
155        let version = self
156            .substore_version(&config)
157            .expect("the substore exists and has been initialized");
158
159        let substore = store::substore::SubstoreSnapshot {
160            config,
161            rocksdb_snapshot,
162            version,
163            db,
164        };
165
166        tracing::debug!(
167            prefix = substore.config.prefix,
168            version = substore.version,
169            "fetching root hash for substore"
170        );
171
172        tokio::task::spawn_blocking(move || span.in_scope(|| substore.root_hash())).await?
173    }
174
175    pub async fn root_hash(&self) -> Result<crate::RootHash> {
176        self.prefix_root_hash("").await
177    }
178
179    pub(crate) fn substore_version(
180        &self,
181        prefix: &Arc<store::substore::SubstoreConfig>,
182    ) -> Option<jmt::Version> {
183        self.0.multistore_cache.get_version(prefix)
184    }
185}
186
187#[async_trait]
188impl StateRead for Snapshot {
189    type GetRawFut = crate::future::SnapshotFuture;
190    type PrefixRawStream =
191        tokio_stream::wrappers::ReceiverStream<anyhow::Result<(String, Vec<u8>)>>;
192    type PrefixKeysStream = tokio_stream::wrappers::ReceiverStream<anyhow::Result<String>>;
193    type NonconsensusPrefixRawStream =
194        tokio_stream::wrappers::ReceiverStream<anyhow::Result<(Vec<u8>, Vec<u8>)>>;
195    type NonconsensusRangeRawStream =
196        tokio_stream::wrappers::ReceiverStream<anyhow::Result<(Vec<u8>, Vec<u8>)>>;
197
198    /// Fetch a key from the JMT.
199    fn get_raw(&self, key: &str) -> Self::GetRawFut {
200        let span = Span::current();
201        let (key, config) = self.0.multistore_cache.config.route_key_str(key);
202
203        let rocksdb_snapshot = self.0.snapshot.clone();
204        let db = self.0.db.clone();
205
206        let version = self
207            .substore_version(&config)
208            .expect("the substore exists and has been initialized");
209
210        let substore = store::substore::SubstoreSnapshot {
211            config,
212            rocksdb_snapshot,
213            version,
214            db,
215        };
216        let key_hash = jmt::KeyHash::with::<sha2::Sha256>(key);
217
218        crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
219            span.in_scope(|| {
220                let _start = std::time::Instant::now();
221                let rsp = substore.get_jmt(key_hash);
222                #[cfg(feature = "metrics")]
223                metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION).record(_start.elapsed());
224                rsp
225            })
226        }))
227    }
228
229    /// Fetch a key from nonverifiable storage.
230    fn nonverifiable_get_raw(&self, key: &[u8]) -> Self::GetRawFut {
231        let span = Span::current();
232        let (key, config) = self.0.multistore_cache.config.route_key_bytes(key);
233
234        let rocksdb_snapshot = self.0.snapshot.clone();
235        let db = self.0.db.clone();
236
237        let version = self
238            .substore_version(&config)
239            .expect("the substore exists and has been initialized");
240
241        let substore = store::substore::SubstoreSnapshot {
242            config,
243            rocksdb_snapshot,
244            version,
245            db,
246        };
247        let key: Vec<u8> = key.to_vec();
248
249        crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
250            span.in_scope(|| {
251                let _start = std::time::Instant::now();
252
253                let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
254                let rsp = substore
255                    .rocksdb_snapshot
256                    .get_cf(cf_nonverifiable, key)
257                    .map_err(Into::into);
258                #[cfg(feature = "metrics")]
259                metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
260                    .record(_start.elapsed());
261                rsp
262            })
263        }))
264    }
265
266    /// Returns a stream of all key-value pairs with the given prefix.
267    fn prefix_raw(&self, prefix: &str) -> Self::PrefixRawStream {
268        let span = Span::current();
269
270        let rocksdb_snapshot = self.0.snapshot.clone();
271        let db = self.0.db.clone();
272
273        let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
274        tracing::trace!(substore_key = prefix_truncated,  substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
275        let substore_prefix = config.prefix.clone();
276
277        let version = self
278            .substore_version(&config)
279            .expect("the substore exists and has been initialized");
280
281        let substore = store::substore::SubstoreSnapshot {
282            config,
283            rocksdb_snapshot,
284            version,
285            db,
286        };
287
288        let mut options = rocksdb::ReadOptions::default();
289        options.set_iterate_range(rocksdb::PrefixRange(prefix_truncated.as_bytes()));
290        let mode = rocksdb::IteratorMode::Start;
291        let (tx_prefix_item, rx_prefix_query) = mpsc::channel(10);
292
293        // Since the JMT keys are hashed, we can't use a prefix iterator directly.
294        // We need to first prefix range the key preimages column family, then use the hashed matches to fetch the values
295        // from the JMT column family.
296        tokio::task::spawn_blocking(move || {
297            span.in_scope(|| {
298                let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
299                let jmt_keys_iterator =
300                    substore
301                        .rocksdb_snapshot
302                        .iterator_cf_opt(cf_jmt_keys, options, mode);
303
304                for tuple in jmt_keys_iterator {
305                    // For each key that matches the prefix, fetch the value from the JMT column family.
306                    let (key_preimage, _) = tuple?;
307                    let substore_key = std::str::from_utf8(key_preimage.as_ref())
308                        .expect("saved jmt keys are utf-8 strings");
309                    let key_hash = jmt::KeyHash::with::<sha2::Sha256>(substore_key.as_bytes());
310
311                    let full_key = if substore_prefix.is_empty() {
312                        substore_key.to_string()
313                    } else {
314                        format!("{substore_prefix}/{substore_key}").to_string()
315                    };
316
317                    let v = substore
318                        .get_jmt(key_hash)?
319                        .expect("keys in jmt_keys should have a corresponding value in jmt");
320
321                    tx_prefix_item.blocking_send(Ok((full_key, v)))?;
322                }
323                anyhow::Ok(())
324            })
325        });
326
327        tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
328    }
329
330    // NOTE: this implementation is almost the same as the above, but without
331    // fetching the values. not totally clear if this could be combined, or if that would
332    // be better overall.
333    fn prefix_keys(&self, prefix: &str) -> Self::PrefixKeysStream {
334        let span = Span::current();
335
336        let rocksdb_snapshot = self.0.snapshot.clone();
337        let db = self.0.db.clone();
338
339        let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
340
341        let version = self
342            .substore_version(&config)
343            .expect("the substore exists and has been initialized");
344
345        let substore = store::substore::SubstoreSnapshot {
346            config,
347            rocksdb_snapshot,
348            version,
349            db,
350        };
351
352        let mut options = rocksdb::ReadOptions::default();
353        options.set_iterate_range(rocksdb::PrefixRange(prefix_truncated.as_bytes()));
354        let mode = rocksdb::IteratorMode::Start;
355        let (tx_prefix_keys, rx_prefix_keys) = mpsc::channel(10);
356
357        tokio::task::spawn_blocking(move || {
358            span.in_scope(|| {
359                let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
360                let iter = substore
361                    .rocksdb_snapshot
362                    .iterator_cf_opt(cf_jmt_keys, options, mode);
363
364                let substore_prefix = &substore.config.prefix;
365
366                for key_and_keyhash in iter {
367                    let (raw_preimage, _) = key_and_keyhash?;
368                    let preimage = std::str::from_utf8(raw_preimage.as_ref())
369                        .expect("saved jmt keys are utf-8 strings");
370
371                    let full_key = if substore_prefix.is_empty() {
372                        preimage.to_string()
373                    } else {
374                        format!("{substore_prefix}/{preimage}").to_string()
375                    };
376
377                    tx_prefix_keys.blocking_send(Ok(full_key))?;
378                }
379                anyhow::Ok(())
380            })
381        });
382
383        tokio_stream::wrappers::ReceiverStream::new(rx_prefix_keys)
384    }
385
386    /// Returns a stream of all key-value pairs with the given prefix, from nonverifiable storage.
387    fn nonverifiable_prefix_raw(&self, prefix: &[u8]) -> Self::NonconsensusPrefixRawStream {
388        let span = Span::current();
389        let rocksdb_snapshot = self.0.snapshot.clone();
390        let db = self.0.db.clone();
391
392        let (truncated_prefix, config) = self.0.multistore_cache.config.match_prefix_bytes(prefix);
393        tracing::trace!(substore_key = ?truncated_prefix,  substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
394        let version = self
395            .substore_version(&config)
396            .expect("the substore exists and has been initialized");
397
398        let substore = store::substore::SubstoreSnapshot {
399            config,
400            rocksdb_snapshot,
401            version,
402            db,
403        };
404
405        let mut options = rocksdb::ReadOptions::default();
406        options.set_iterate_range(rocksdb::PrefixRange(truncated_prefix));
407        let mode = rocksdb::IteratorMode::Start;
408
409        let (tx_prefix_query, rx_prefix_query) = mpsc::channel(10);
410
411        tokio::task::spawn_blocking(move || {
412            span.in_scope(|| {
413                let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
414                let iter =
415                    substore
416                        .rocksdb_snapshot
417                        .iterator_cf_opt(cf_nonverifiable, options, mode);
418                let substore_prefix = substore.config.prefix.as_bytes().to_vec();
419                for i in iter {
420                    let (boxed_key, boxed_value) = i?;
421                    let key: Vec<u8> = boxed_key.into();
422                    let value: Vec<u8> = boxed_value.into();
423
424                    // Costly to do on every iteration, but this should be dwarfed by the
425                    // context switch to the tokio runtime.
426                    let mut full_key: Vec<u8> = vec![];
427                    if substore_prefix.is_empty() {
428                        full_key.extend(key);
429                    } else {
430                        full_key.extend(substore_prefix.clone());
431                        full_key.extend(iter::once(b'/'));
432                        full_key.extend(key);
433                    }
434
435                    tx_prefix_query.blocking_send(Ok((full_key, value)))?;
436                }
437                anyhow::Ok(())
438            })
439        });
440
441        tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
442    }
443
444    /// Returns a stream of all key-value pairs with the given prefix, and range
445    /// from nonverifiable storage.
446    /// **Important**: Only supports range queries over the main store.
447    fn nonverifiable_range_raw(
448        &self,
449        prefix: Option<&[u8]>,
450        range: impl std::ops::RangeBounds<Vec<u8>>,
451    ) -> anyhow::Result<Self::NonconsensusRangeRawStream> {
452        let span = Span::current();
453        let rocksdb_snapshot = self.0.snapshot.clone();
454        let db = self.0.db.clone();
455
456        let (prefix, config) = self
457            .0
458            .multistore_cache
459            .config
460            .route_key_bytes(prefix.unwrap_or_default());
461
462        let version = self
463            .substore_version(&config)
464            .expect("the substore exists and has been initialized");
465
466        let substore = store::substore::SubstoreSnapshot {
467            config,
468            rocksdb_snapshot,
469            version,
470            db,
471        };
472
473        let (_range, (start, end)) = crate::utils::convert_bounds(range)?;
474        let mut options = rocksdb::ReadOptions::default();
475
476        let (start, end) = (start.unwrap_or_default(), end.unwrap_or_default());
477        let end_is_empty = end.is_empty();
478
479        let mut prefix_start = Vec::with_capacity(prefix.len() + start.len());
480        let mut prefix_end = Vec::with_capacity(prefix.len() + end.len());
481
482        prefix_start.extend(prefix);
483        prefix_start.extend(start);
484        prefix_end.extend(prefix);
485        prefix_end.extend(end);
486
487        tracing::debug!(
488            ?prefix_start,
489            ?prefix_end,
490            ?prefix,
491            "nonverifiable_range_raw"
492        );
493
494        options.set_iterate_lower_bound(prefix_start);
495
496        // Our range queries implementation relies on forward iteration, which
497        // means that if the upper key is unbounded and a prefix has been set
498        // we cannot set the upper bound to the prefix. This is because the
499        // prefix is used as a lower bound for the iterator, and the upper bound
500        // is used to stop the iteration.
501        // If we set the upper bound to the prefix, we would get a range consisting of:
502        // ```
503        // "compactblock/001" to "compactblock/"
504        // ```
505        // which would not return anything.
506        if !end_is_empty {
507            options.set_iterate_upper_bound(prefix_end);
508        }
509
510        let mode = rocksdb::IteratorMode::Start;
511        let prefix = prefix.to_vec();
512
513        let (tx, rx) = mpsc::channel::<Result<(Vec<u8>, Vec<u8>)>>(10);
514        tokio::task::spawn_blocking(move || {
515            span.in_scope(|| {
516                let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
517                let iter =
518                    substore
519                        .rocksdb_snapshot
520                        .iterator_cf_opt(cf_nonverifiable, options, mode);
521
522                for i in iter {
523                    let (key, value) = i?;
524
525                    // This is a bit of a hack, but RocksDB doesn't let us express the "prefixed range-queries",
526                    // that we want to support. In particular, we want to be able to do a prefix query that starts
527                    // at a particular key, and does not have an upper bound. Since we can't create an iterator that
528                    // cover this range, we have to filter out the keys that don't match the prefix.
529                    if !prefix.is_empty() && !key.starts_with(&prefix) {
530                        break;
531                    }
532                    tx.blocking_send(Ok((key.into(), value.into())))?;
533                }
534                Ok::<(), anyhow::Error>(())
535            })
536        });
537
538        Ok(tokio_stream::wrappers::ReceiverStream::new(rx))
539    }
540
541    fn object_get<T: Any + Send + Sync + Clone>(&self, _key: &str) -> Option<T> {
542        // No-op -- this will never be called internally, and `Snapshot` is not exposed in public API
543        None
544    }
545
546    fn object_type(&self, _key: &str) -> Option<std::any::TypeId> {
547        // No-op -- this will never be called internally, and `Snapshot` is not exposed in public API
548        None
549    }
550}
551
552impl std::fmt::Debug for Snapshot {
553    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
554        f.debug_struct("Snapshot")
555            .field("version", &self.0.version)
556            .finish_non_exhaustive()
557    }
558}