cnidarium/
storage.rs

1use std::{path::PathBuf, sync::Arc};
2
3use anyhow::{bail, ensure, Result};
4use parking_lot::RwLock;
5use rocksdb::{Options, DB};
6use std::collections::HashMap;
7use tokio::sync::watch;
8use tracing::Span;
9
10use crate::{
11    cache::Cache,
12    snapshot::Snapshot,
13    store::{
14        multistore::{self, MultistoreConfig},
15        substore::{SubstoreConfig, SubstoreSnapshot, SubstoreStorage},
16    },
17};
18use crate::{snapshot_cache::SnapshotCache, StagedWriteBatch, StateDelta};
19
20mod temp;
21pub use temp::TempStorage;
22
23/// A handle for a storage instance, backed by RocksDB.
24///
25/// The handle is cheaply clonable; all clones share the same backing data store.
26#[derive(Clone)]
27pub struct Storage(Arc<Inner>);
28
29impl std::fmt::Debug for Storage {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        f.debug_struct("Storage").finish_non_exhaustive()
32    }
33}
34
35// A private inner element to prevent the `TreeWriter` implementation
36// from leaking outside of this crate.
37struct Inner {
38    dispatcher_tx: watch::Sender<(Snapshot, (jmt::Version, Arc<Cache>))>,
39    snapshot_rx: watch::Receiver<Snapshot>,
40    changes_rx: watch::Receiver<(jmt::Version, Arc<Cache>)>,
41    snapshots: RwLock<SnapshotCache>,
42    multistore_config: MultistoreConfig,
43    /// A handle to the dispatcher task.
44    /// This is used by `Storage::release` to wait for the task to terminate.
45    jh_dispatcher: Option<tokio::task::JoinHandle<()>>,
46    db: Arc<DB>,
47}
48
49impl Storage {
50    /// Loads a storage instance from the given path, initializing it if necessary.
51    pub async fn load(path: PathBuf, default_prefixes: Vec<String>) -> Result<Self> {
52        let span = Span::current();
53        let db_path = path.clone();
54        // initializing main storage instance.
55        let prefixes = tokio::task::spawn_blocking(move || {
56            span.in_scope(|| {
57                let mut opts = Options::default();
58                opts.create_if_missing(true);
59                opts.create_missing_column_families(true);
60                tracing::info!(?path, "opening rocksdb config column");
61
62                // Hack(erwan): RocksDB requires us to specify all the column families
63                // that we want to use upfront. This is problematic when we are initializing
64                // a new database, because the call to `DBCommon<T>::list_cf` will fail
65                // if the database manifest is not found. To work around this, we ignore
66                // the error and assume that the database is empty.
67                // Tracked in: https://github.com/rust-rocksdb/rust-rocksdb/issues/608
68                let mut columns = DB::list_cf(&opts, path.clone()).unwrap_or_default();
69                if columns.is_empty() {
70                    columns.push("config".to_string());
71                }
72
73                let db = DB::open_cf(&opts, path, columns).expect("can open database");
74                let cf_config = db
75                    .cf_handle("config")
76                    .expect("config column family is created if missing");
77                let config_iter = db.iterator_cf(cf_config, rocksdb::IteratorMode::Start);
78                let mut prefixes = Vec::new();
79                tracing::info!("reading prefixes from config column family");
80                for i in config_iter {
81                    let (key, _) = i.expect("can read from iterator");
82                    prefixes.push(String::from_utf8(key.to_vec()).expect("prefix is utf8"));
83                }
84
85                for prefix in default_prefixes {
86                    if !prefixes.contains(&prefix) {
87                        db.put_cf(cf_config, prefix.as_bytes(), b"")
88                            .expect("can write to db");
89                        prefixes.push(prefix);
90                    }
91                }
92
93                std::mem::drop(db);
94                prefixes
95            })
96        })
97        .await?;
98
99        Storage::init(db_path, prefixes).await
100    }
101
102    /// Initializes a new storage instance at the given path. Takes a list of default prefixes
103    /// to initialize the storage configuration with.
104    /// Here is a high-level overview of the initialization process:
105    /// 1. Create a new RocksDB instance at the given path.
106    /// 2. Read the prefix list and create a [`SubstoreConfig`] for each prefix.
107    /// 3. Create a new [`MultistoreConfig`] from supplied prefixes.
108    /// 4. Initialize the substore cache with the latest version of each substore.
109    /// 5. Spawn a dispatcher task that forwards new snapshots to subscribers.
110    pub async fn init(path: PathBuf, prefixes: Vec<String>) -> Result<Self> {
111        let span = Span::current();
112
113        tokio::task
114            ::spawn_blocking(move || {
115                span.in_scope(|| {
116                    let mut substore_configs = Vec::new();
117                    tracing::info!("initializing global store config");
118                    let main_store = Arc::new(SubstoreConfig::new(""));
119                    for substore_prefix in prefixes {
120                        tracing::info!(prefix = ?substore_prefix, "creating substore config for prefix");
121                        if substore_prefix.is_empty() {
122                            bail!("the empty prefix is reserved")
123                        }
124                        substore_configs.push(Arc::new(SubstoreConfig::new(substore_prefix)));
125                    }
126
127                    let multistore_config = MultistoreConfig {
128                        main_store: main_store.clone(),
129                        substores: substore_configs.clone(),
130                    };
131
132                    let mut substore_columns: Vec<&String> = substore_configs
133                        .iter()
134                        .flat_map(|config| config.columns())
135                        .collect();
136                    let mut columns: Vec<&String> = main_store.columns().collect();
137                    columns.append(&mut substore_columns);
138
139                    tracing::info!(?path, "opening rocksdb");
140                    let cf_config_string = "config".to_string();
141                    // RocksDB setup: define options, collect all the columns, and open the database.
142                    // Each substore defines a prefix and its own set of columns.
143                    // See [`crate::store::SubstoreConfig`] for more details.
144                    let mut opts = Options::default();
145                    opts.create_if_missing(true);
146                    opts.create_missing_column_families(true);
147                    columns.push(&cf_config_string);
148
149                    let db = DB::open_cf(&opts, path, columns)?;
150                    let shared_db = Arc::new(db);
151
152                    // Initialize the substore cache with the latest version of each substore.
153                    // Note: for compatibility reasons with Tendermint/CometBFT, we set the "pre-genesis"
154                    // jmt version to be u64::MAX, corresponding to -1 mod 2^64.
155                    let jmt_version = main_store
156                        .latest_version_from_db(&shared_db)?
157                        .unwrap_or(u64::MAX);
158
159                    let mut multistore_cache =
160                        multistore::MultistoreCache::from_config(multistore_config.clone());
161
162                    for substore_config in substore_configs {
163                        let substore_version = substore_config
164                            .latest_version_from_db(&shared_db)?
165                            .unwrap_or(u64::MAX);
166
167                        multistore_cache.set_version(substore_config.clone(), substore_version);
168                        tracing::debug!(
169                            substore_prefix = ?substore_config.prefix,
170                            ?substore_version,
171                            "initializing substore"
172                        );
173                    }
174
175                    multistore_cache.set_version(main_store, jmt_version);
176                    tracing::debug!(?jmt_version, "initializing main store");
177
178                    let latest_snapshot =
179                        Snapshot::new(shared_db.clone(), jmt_version, multistore_cache);
180
181                    // A concurrent-safe ring buffer of the latest 10 snapshots.
182                    let snapshots = RwLock::new(SnapshotCache::new(latest_snapshot.clone(), 10));
183
184                    // Setup a dispatcher task that acts as an intermediary between the storage
185                    // and the rest of the system. Its purpose is to forward new snapshots to
186                    // subscribers.
187                    //
188                    // If we were to send snapshots directly to subscribers, a slow subscriber could
189                    // hold a lock on the watch channel for too long, and block the consensus-critical
190                    // commit logic, which needs to acquire a write lock on the watch channel.
191                    //
192                    // Instead, we "proxy" through a dispatcher task that copies values from one 
193                    // channel to the other, ensuring that if an API consumer misuses the watch 
194                    // channels, it will only affect other subscribers, not the commit logic.
195
196                    let (snapshot_tx, snapshot_rx) = watch::channel(latest_snapshot.clone());
197                    // Note: this will never be seen by consumers, since we mark the current value as seen
198                    // before returning the receiver.
199                    let dummy_cache = (u64::MAX, Arc::new(Cache::default()));
200                    let (changes_tx, changes_rx) = watch::channel(dummy_cache.clone());
201                    let (tx_dispatcher, mut rx_dispatcher) = watch::channel((latest_snapshot, dummy_cache));
202
203                    let jh_dispatcher = tokio::spawn(async move {
204                        tracing::info!("snapshot dispatcher task has started");
205                        // If the sender is dropped, the task will terminate.
206                        while rx_dispatcher.changed().await.is_ok() {
207                            tracing::debug!("dispatcher has received a new snapshot");
208                            let (snapshot, changes) = rx_dispatcher.borrow_and_update().clone();
209                            // [`watch::Sender<T>::send`] only returns an error if there are no
210                            // receivers, so we can safely ignore the result here.
211                            let _ = snapshot_tx.send(snapshot);
212                            let _ = changes_tx.send(changes);
213                        }
214                        tracing::info!("dispatcher task has terminated")
215                    });
216
217                    Ok(Self(Arc::new(Inner {
218                        // We don't need to wrap the task in a `CancelOnDrop<T>` because
219                        // the task will stop when the sender is dropped. However, certain
220                        // test scenarios require us to wait that all resources are released.
221                        jh_dispatcher: Some(jh_dispatcher),
222                        dispatcher_tx: tx_dispatcher,
223                        snapshot_rx,
224                        changes_rx,
225                        multistore_config,
226                        snapshots,
227                        db: shared_db,
228                    })))
229                })
230            })
231            .await?
232    }
233
234    /// Returns the latest version (block height) of the tree recorded by the
235    /// `Storage`.
236    ///
237    /// If the tree is empty and has not been initialized, returns `u64::MAX`.
238    pub fn latest_version(&self) -> jmt::Version {
239        self.latest_snapshot().version()
240    }
241
242    /// Returns a [`watch::Receiver`] that can be used to subscribe to new state versions.
243    pub fn subscribe(&self) -> watch::Receiver<Snapshot> {
244        let mut rx = self.0.snapshot_rx.clone();
245        // Mark the current value as seen, so that the user of the receiver
246        // will only be notified of *subsequent* values.
247        rx.borrow_and_update();
248        rx
249    }
250
251    /// Returns a [`watch::Receiver`] that can be used to subscribe to state changes.
252    pub fn subscribe_changes(&self) -> watch::Receiver<(jmt::Version, Arc<Cache>)> {
253        let mut rx = self.0.changes_rx.clone();
254        // Mark the current value as seen, so that the user of the receiver
255        // will only be notified of *subsequent* values.
256        rx.borrow_and_update();
257        rx
258    }
259
260    /// Returns a new [`Snapshot`] on top of the latest version of the tree.
261    pub fn latest_snapshot(&self) -> Snapshot {
262        self.0.snapshots.read().latest()
263    }
264
265    /// Fetches the [`Snapshot`] corresponding to the supplied `jmt::Version` from
266    /// the [`SnapshotCache`]. Returns `None` if no match was found.
267    pub fn snapshot(&self, version: jmt::Version) -> Option<Snapshot> {
268        self.0.snapshots.read().get(version)
269    }
270
271    /// Prepares a commit for the provided [`StateDelta`], returning a [`StagedWriteBatch`].
272    /// The batch can be committed to the database using the [`Storage::commit_batch`] method.
273    pub async fn prepare_commit(&self, delta: StateDelta<Snapshot>) -> Result<StagedWriteBatch> {
274        // Extract the snapshot and the changes from the state delta
275        let (snapshot, changes) = delta.flatten();
276        let prev_snapshot_version = snapshot.version();
277
278        // We use wrapping_add here so that we can write `new_version = 0` by
279        // overflowing `PRE_GENESIS_VERSION`.
280        let prev_storage_version = self.latest_version();
281        let next_storage_version = prev_storage_version.wrapping_add(1);
282        tracing::debug!(prev_storage_version, next_storage_version);
283
284        ensure!(
285            prev_storage_version == prev_snapshot_version,
286            "trying to prepare a commit for a delta forked from version {}, but the latest version is {}",
287            prev_snapshot_version,
288            prev_storage_version
289        );
290
291        self.prepare_commit_inner(snapshot, changes, next_storage_version, false)
292            .await
293    }
294
295    async fn prepare_commit_inner(
296        &self,
297        snapshot: Snapshot,
298        cache: Cache,
299        version: jmt::Version,
300        perform_migration: bool,
301    ) -> Result<StagedWriteBatch> {
302        tracing::debug!(new_jmt_version = ?version, "preparing to commit state delta");
303        // Save a copy of the changes to send to subscribers later.
304        let changes = Arc::new(cache.clone_changes());
305
306        let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config);
307        #[allow(clippy::disallowed_types)]
308        let mut substore_roots = HashMap::new();
309        let mut multistore_versions =
310            multistore::MultistoreCache::from_config(self.0.multistore_config.clone());
311
312        let db = self.0.db.clone();
313        let rocksdb_snapshot = snapshot.0.snapshot.clone();
314
315        let mut new_versions = vec![];
316
317        // We use a single write batch to commit all the substores at once. Each task will append
318        // its own changes to the batch, and we will commit it at the end.
319        let mut write_batch = rocksdb::WriteBatch::default();
320
321        //  Note(erwan): Here, we spawn a commit task for each substore.
322        //  The substore keyspaces are disjoint, so conceptually it is
323        //  fine to rewrite it using a [`tokio::task::JoinSet`].
324        //  The reason this isn't done is because `rocksdb::WriteBatch`
325        //  is _not_ thread-safe.
326        //
327        //  This means that to spin-up N tasks, we would need to use a
328        //  single batch wrapped in a mutex, or use N batches, and find
329        //  a way to commit to them atomically. This isn't supported by
330        //  RocksDB which leaves one option: to iterate over each entry
331        //  in each batch, and merge them together. At this point, this
332        //  is probably not worth it.
333        //
334        //  Another option is to trade atomicity for parallelism by producing
335        //  N batches, and committing them in distinct atomic writes. This is
336        //  potentially faster, but it is also more dangerous, because if one
337        //  of the writes fails, we are left with a partially committed state.
338        //
339        //  The current implementation leans on the fact that the number of
340        //  substores is small, and that the synchronization overhead of a joinset
341        //  would exceed its benefits. This works well for now.
342        for config in self.0.multistore_config.iter() {
343            tracing::debug!(substore_prefix = ?config.prefix, "processing substore");
344            // If the substore is empty, we need to fetch its initialized version from the cache.
345            let old_substore_version = config
346                .latest_version_from_snapshot(&db, &rocksdb_snapshot)?
347                .unwrap_or_else(|| {
348                    tracing::debug!("substore is empty, fetching initialized version from cache");
349                    snapshot
350                        .substore_version(config)
351                        .expect("prefix should be initialized")
352                });
353
354            let Some(changeset) = changes_by_substore.remove(config) else {
355                tracing::debug!(prefix = config.prefix, "no changes for substore, skipping");
356                multistore_versions.set_version(config.clone(), old_substore_version);
357                continue;
358            };
359
360            let new_version = if perform_migration {
361                old_substore_version
362            } else {
363                old_substore_version.wrapping_add(1)
364            };
365            new_versions.push(new_version);
366            let substore_snapshot = SubstoreSnapshot {
367                config: config.clone(),
368                rocksdb_snapshot: rocksdb_snapshot.clone(),
369                version: new_version,
370                db: db.clone(),
371            };
372
373            let substore_storage = SubstoreStorage { substore_snapshot };
374
375            // Commit the substore and collect its root hash
376            let (root_hash, substore_batch) = substore_storage
377                .commit(changeset, write_batch, new_version, perform_migration)
378                .await?;
379            write_batch = substore_batch;
380
381            tracing::debug!(
382                ?root_hash,
383                prefix = config.prefix,
384                ?version,
385                "added substore to write batch"
386            );
387            substore_roots.insert(config.clone(), (root_hash, new_version));
388
389            tracing::debug!(
390                ?root_hash,
391                prefix = ?config.prefix,
392                ?new_version,
393                "updating substore version"
394            );
395            multistore_versions.set_version(config.clone(), new_version);
396        }
397
398        // Add substore roots to the main store changeset
399        let main_store_config = self.0.multistore_config.main_store.clone();
400        let mut main_store_changes = changes_by_substore
401            .remove(&main_store_config)
402            .unwrap_or_else(|| {
403                tracing::debug!("no changes for main store, creating empty changeset");
404                Cache::default()
405            });
406
407        for (config, (root_hash, _)) in substore_roots.iter() {
408            main_store_changes
409                .unwritten_changes
410                .insert(config.prefix.to_string(), Some(root_hash.0.to_vec()));
411        }
412
413        // Commit the main store and collect the global root hash
414        let main_store_snapshot = SubstoreSnapshot {
415            config: main_store_config.clone(),
416            rocksdb_snapshot: snapshot.0.snapshot.clone(),
417            version,
418            db: self.0.db.clone(),
419        };
420
421        let main_store_storage = SubstoreStorage {
422            substore_snapshot: main_store_snapshot,
423        };
424
425        let (global_root_hash, write_batch) = main_store_storage
426            .commit(main_store_changes, write_batch, version, perform_migration)
427            .await?;
428        tracing::debug!(
429            ?global_root_hash,
430            ?version,
431            "added main store to write batch"
432        );
433
434        tracing::debug!(?global_root_hash, version = ?version, "updating main store version");
435        let main_store_config = self.0.multistore_config.main_store.clone();
436        multistore_versions.set_version(main_store_config, version);
437
438        Ok(StagedWriteBatch {
439            write_batch,
440            version,
441            multistore_versions,
442            root_hash: global_root_hash,
443            substore_roots,
444            perform_migration,
445            changes,
446        })
447    }
448
449    /// Commits the provided [`StateDelta`] to persistent storage as the latest
450    /// version of the chain state.
451    pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
452        let batch = self.prepare_commit(delta).await?;
453        self.commit_batch(batch)
454    }
455
456    /// Commits the supplied [`StagedWriteBatch`] to persistent storage.
457    ///
458    /// # Migrations
459    /// In the case of chain state migrations we need to commit the new state
460    /// without incrementing the version. If `perform_migration` is `true` the
461    /// snapshot will _not_ be written to the snapshot cache, and no subscribers
462    /// will be notified. Substore versions will not be updated.
463    pub fn commit_batch(&self, batch: StagedWriteBatch) -> Result<crate::RootHash> {
464        let StagedWriteBatch {
465            write_batch,
466            version,
467            multistore_versions,
468            root_hash: global_root_hash,
469            substore_roots,
470            perform_migration,
471            changes,
472        } = batch;
473
474        let db = self.0.db.clone();
475
476        // check that the version of the batch being committed is the correct next version
477        let old_version = self.latest_version();
478        let expected_new_version = if perform_migration {
479            old_version
480        } else {
481            old_version.wrapping_add(1)
482        };
483
484        ensure!(
485            expected_new_version == version,
486            "new version mismatch: expected {} but got {}",
487            expected_new_version,
488            version
489        );
490
491        // also check that each of the substore versions are the correct next version
492        let snapshot = self.latest_snapshot();
493
494        // Warning: we MUST check version coherence for **every** substore.
495        // These checks are a second line of defense. They must consider
496        // the case when two deltas effect distinct substores.
497        //
498        // version: (m,   ss_1, ss_2)
499        // D_0:     (_,      1,    0) <- initial state
500        // D_1:     (A,      1,    1) <- multiwrite to ss_1 AND ss_2
501        // D_1*:    (A,      1,    0) <- isolate write to ss_1
502        //
503        // A comprehensive check lets us catch the stale write D_1* even if
504        // locally it does not directly effect the second substore at all.
505        // And even if the main version check passes (spuriously, or because of
506        // a migration).
507        for (substore_config, new_version) in &multistore_versions.substores {
508            if substore_config.prefix.is_empty() {
509                // this is the main store, ignore
510                continue;
511            }
512
513            let old_substore_version = snapshot
514                .substore_version(substore_config)
515                .expect("substores must be initialized at startup");
516
517            // if the substore exists in `substore_roots`, there have been updates to the substore.
518            // if `perform_migration` is false and there are updates, the next version should be previous + 1.
519            // otherwise, the version should remain the same.
520            let expected_substore_version =
521                if substore_roots.get(substore_config).is_some() && !perform_migration {
522                    old_substore_version.wrapping_add(1)
523                } else {
524                    old_substore_version
525                };
526
527            ensure!(
528                expected_substore_version == *new_version,
529                "substore new version mismatch for substore with prefix {}: expected {} but got {}",
530                substore_config.prefix,
531                expected_substore_version,
532                new_version
533            );
534        }
535
536        tracing::debug!(new_jmt_version = ?batch.version, "committing batch to db");
537
538        db.write(write_batch).expect("can write to db");
539        tracing::debug!(
540            ?global_root_hash,
541            ?version,
542            "committed main store and substores to db"
543        );
544
545        // If we're not performing a migration, we should update the snapshot cache
546        if !perform_migration {
547            tracing::debug!("updating snapshot cache");
548
549            let latest_snapshot = Snapshot::new(db.clone(), version, multistore_versions);
550            // Obtain a write lock to the snapshot cache, and push the latest snapshot
551            // available. The lock guard is implicitly dropped immediately.
552            self.0
553                .snapshots
554                .write()
555                .try_push(latest_snapshot.clone())
556                .expect("should process snapshots with consecutive jmt versions");
557
558            tracing::debug!(?version, "dispatching snapshot");
559
560            // Send fails if the channel is closed (i.e., if there are no receivers);
561            // in this case, we should ignore the error, we have no one to notify.
562            let _ = self
563                .0
564                .dispatcher_tx
565                .send((latest_snapshot, (version, changes)));
566        } else {
567            tracing::debug!("skipping snapshot cache update");
568        }
569
570        Ok(global_root_hash)
571    }
572
573    #[cfg(feature = "migration")]
574    /// Commit the provided [`StateDelta`] to persistent storage without increasing the version
575    /// of the chain state, and skips the snapshot cache update.
576    pub async fn commit_in_place(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
577        let (snapshot, changes) = delta.flatten();
578        let old_version = self.latest_version();
579        let batch = self
580            .prepare_commit_inner(snapshot, changes, old_version, true)
581            .await?;
582        self.commit_batch(batch)
583    }
584
585    /// Returns the internal handle to RocksDB, this is useful to test adjacent storage crates.
586    #[cfg(test)]
587    pub(crate) fn db(&self) -> Arc<DB> {
588        self.0.db.clone()
589    }
590
591    /// Shuts down the database and the dispatcher task, and waits for all resources to be reclaimed.
592    /// Panics if there are still outstanding references to the `Inner` storage.
593    pub async fn release(mut self) {
594        if let Some(inner) = Arc::get_mut(&mut self.0) {
595            inner.shutdown().await;
596            inner.snapshots.write().clear();
597            // `Inner` is dropped once the call completes.
598        } else {
599            panic!("Unable to get mutable reference to Inner");
600        }
601    }
602}
603
604impl Inner {
605    pub(crate) async fn shutdown(&mut self) {
606        if let Some(jh) = self.jh_dispatcher.take() {
607            jh.abort();
608            let _ = jh.await;
609        }
610    }
611}