cnidarium/storage.rs
1use std::{path::PathBuf, sync::Arc};
2
3use anyhow::{bail, ensure, Result};
4use parking_lot::RwLock;
5use rocksdb::{Options, DB};
6use std::collections::HashMap;
7use tokio::sync::watch;
8use tracing::Span;
9
10use crate::{
11 cache::Cache,
12 snapshot::Snapshot,
13 store::{
14 multistore::{self, MultistoreConfig},
15 substore::{SubstoreConfig, SubstoreSnapshot, SubstoreStorage},
16 },
17};
18use crate::{snapshot_cache::SnapshotCache, StagedWriteBatch, StateDelta};
19
20mod temp;
21pub use temp::TempStorage;
22
23/// A handle for a storage instance, backed by RocksDB.
24///
25/// The handle is cheaply clonable; all clones share the same backing data store.
26#[derive(Clone)]
27pub struct Storage(Arc<Inner>);
28
29impl std::fmt::Debug for Storage {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 f.debug_struct("Storage").finish_non_exhaustive()
32 }
33}
34
35// A private inner element to prevent the `TreeWriter` implementation
36// from leaking outside of this crate.
37struct Inner {
38 dispatcher_tx: watch::Sender<(Snapshot, (jmt::Version, Arc<Cache>))>,
39 snapshot_rx: watch::Receiver<Snapshot>,
40 changes_rx: watch::Receiver<(jmt::Version, Arc<Cache>)>,
41 snapshots: RwLock<SnapshotCache>,
42 multistore_config: MultistoreConfig,
43 /// A handle to the dispatcher task.
44 /// This is used by `Storage::release` to wait for the task to terminate.
45 jh_dispatcher: Option<tokio::task::JoinHandle<()>>,
46 db: Arc<DB>,
47}
48
49impl Storage {
50 /// Loads a storage instance from the given path, initializing it if necessary.
51 pub async fn load(path: PathBuf, default_prefixes: Vec<String>) -> Result<Self> {
52 let span = Span::current();
53 let db_path = path.clone();
54 // initializing main storage instance.
55 let prefixes = tokio::task::spawn_blocking(move || {
56 span.in_scope(|| {
57 let mut opts = Options::default();
58 opts.create_if_missing(true);
59 opts.create_missing_column_families(true);
60 tracing::info!(?path, "opening rocksdb config column");
61
62 // Hack(erwan): RocksDB requires us to specify all the column families
63 // that we want to use upfront. This is problematic when we are initializing
64 // a new database, because the call to `DBCommon<T>::list_cf` will fail
65 // if the database manifest is not found. To work around this, we ignore
66 // the error and assume that the database is empty.
67 // Tracked in: https://github.com/rust-rocksdb/rust-rocksdb/issues/608
68 let mut columns = DB::list_cf(&opts, path.clone()).unwrap_or_default();
69 if columns.is_empty() {
70 columns.push("config".to_string());
71 }
72
73 let db = DB::open_cf(&opts, path, columns).expect("can open database");
74 let cf_config = db
75 .cf_handle("config")
76 .expect("config column family is created if missing");
77 let config_iter = db.iterator_cf(cf_config, rocksdb::IteratorMode::Start);
78 let mut prefixes = Vec::new();
79 tracing::info!("reading prefixes from config column family");
80 for i in config_iter {
81 let (key, _) = i.expect("can read from iterator");
82 prefixes.push(String::from_utf8(key.to_vec()).expect("prefix is utf8"));
83 }
84
85 for prefix in default_prefixes {
86 if !prefixes.contains(&prefix) {
87 db.put_cf(cf_config, prefix.as_bytes(), b"")
88 .expect("can write to db");
89 prefixes.push(prefix);
90 }
91 }
92
93 std::mem::drop(db);
94 prefixes
95 })
96 })
97 .await?;
98
99 Storage::init(db_path, prefixes).await
100 }
101
102 /// Initializes a new storage instance at the given path. Takes a list of default prefixes
103 /// to initialize the storage configuration with.
104 /// Here is a high-level overview of the initialization process:
105 /// 1. Create a new RocksDB instance at the given path.
106 /// 2. Read the prefix list and create a [`SubstoreConfig`] for each prefix.
107 /// 3. Create a new [`MultistoreConfig`] from supplied prefixes.
108 /// 4. Initialize the substore cache with the latest version of each substore.
109 /// 5. Spawn a dispatcher task that forwards new snapshots to subscribers.
110 pub async fn init(path: PathBuf, prefixes: Vec<String>) -> Result<Self> {
111 let span = Span::current();
112
113 tokio::task
114 ::spawn_blocking(move || {
115 span.in_scope(|| {
116 let mut substore_configs = Vec::new();
117 tracing::info!("initializing global store config");
118 let main_store = Arc::new(SubstoreConfig::new(""));
119 for substore_prefix in prefixes {
120 tracing::info!(prefix = ?substore_prefix, "creating substore config for prefix");
121 if substore_prefix.is_empty() {
122 bail!("the empty prefix is reserved")
123 }
124 substore_configs.push(Arc::new(SubstoreConfig::new(substore_prefix)));
125 }
126
127 let multistore_config = MultistoreConfig {
128 main_store: main_store.clone(),
129 substores: substore_configs.clone(),
130 };
131
132 let mut substore_columns: Vec<&String> = substore_configs
133 .iter()
134 .flat_map(|config| config.columns())
135 .collect();
136 let mut columns: Vec<&String> = main_store.columns().collect();
137 columns.append(&mut substore_columns);
138
139 tracing::info!(?path, "opening rocksdb");
140 let cf_config_string = "config".to_string();
141 // RocksDB setup: define options, collect all the columns, and open the database.
142 // Each substore defines a prefix and its own set of columns.
143 // See [`crate::store::SubstoreConfig`] for more details.
144 let mut opts = Options::default();
145 opts.create_if_missing(true);
146 opts.create_missing_column_families(true);
147 columns.push(&cf_config_string);
148
149 let db = DB::open_cf(&opts, path, columns)?;
150 let shared_db = Arc::new(db);
151
152 // Initialize the substore cache with the latest version of each substore.
153 // Note: for compatibility reasons with Tendermint/CometBFT, we set the "pre-genesis"
154 // jmt version to be u64::MAX, corresponding to -1 mod 2^64.
155 let jmt_version = main_store
156 .latest_version_from_db(&shared_db)?
157 .unwrap_or(u64::MAX);
158
159 let mut multistore_cache =
160 multistore::MultistoreCache::from_config(multistore_config.clone());
161
162 for substore_config in substore_configs {
163 let substore_version = substore_config
164 .latest_version_from_db(&shared_db)?
165 .unwrap_or(u64::MAX);
166
167 multistore_cache.set_version(substore_config.clone(), substore_version);
168 tracing::debug!(
169 substore_prefix = ?substore_config.prefix,
170 ?substore_version,
171 "initializing substore"
172 );
173 }
174
175 multistore_cache.set_version(main_store, jmt_version);
176 tracing::debug!(?jmt_version, "initializing main store");
177
178 let latest_snapshot =
179 Snapshot::new(shared_db.clone(), jmt_version, multistore_cache);
180
181 // A concurrent-safe ring buffer of the latest 10 snapshots.
182 let snapshots = RwLock::new(SnapshotCache::new(latest_snapshot.clone(), 10));
183
184 // Setup a dispatcher task that acts as an intermediary between the storage
185 // and the rest of the system. Its purpose is to forward new snapshots to
186 // subscribers.
187 //
188 // If we were to send snapshots directly to subscribers, a slow subscriber could
189 // hold a lock on the watch channel for too long, and block the consensus-critical
190 // commit logic, which needs to acquire a write lock on the watch channel.
191 //
192 // Instead, we "proxy" through a dispatcher task that copies values from one
193 // channel to the other, ensuring that if an API consumer misuses the watch
194 // channels, it will only affect other subscribers, not the commit logic.
195
196 let (snapshot_tx, snapshot_rx) = watch::channel(latest_snapshot.clone());
197 // Note: this will never be seen by consumers, since we mark the current value as seen
198 // before returning the receiver.
199 let dummy_cache = (u64::MAX, Arc::new(Cache::default()));
200 let (changes_tx, changes_rx) = watch::channel(dummy_cache.clone());
201 let (tx_dispatcher, mut rx_dispatcher) = watch::channel((latest_snapshot, dummy_cache));
202
203 let jh_dispatcher = tokio::spawn(async move {
204 tracing::info!("snapshot dispatcher task has started");
205 // If the sender is dropped, the task will terminate.
206 while rx_dispatcher.changed().await.is_ok() {
207 tracing::debug!("dispatcher has received a new snapshot");
208 let (snapshot, changes) = rx_dispatcher.borrow_and_update().clone();
209 // [`watch::Sender<T>::send`] only returns an error if there are no
210 // receivers, so we can safely ignore the result here.
211 let _ = snapshot_tx.send(snapshot);
212 let _ = changes_tx.send(changes);
213 }
214 tracing::info!("dispatcher task has terminated")
215 });
216
217 Ok(Self(Arc::new(Inner {
218 // We don't need to wrap the task in a `CancelOnDrop<T>` because
219 // the task will stop when the sender is dropped. However, certain
220 // test scenarios require us to wait that all resources are released.
221 jh_dispatcher: Some(jh_dispatcher),
222 dispatcher_tx: tx_dispatcher,
223 snapshot_rx,
224 changes_rx,
225 multistore_config,
226 snapshots,
227 db: shared_db,
228 })))
229 })
230 })
231 .await?
232 }
233
234 /// Returns the latest version (block height) of the tree recorded by the
235 /// `Storage`.
236 ///
237 /// If the tree is empty and has not been initialized, returns `u64::MAX`.
238 pub fn latest_version(&self) -> jmt::Version {
239 self.latest_snapshot().version()
240 }
241
242 /// Returns a [`watch::Receiver`] that can be used to subscribe to new state versions.
243 pub fn subscribe(&self) -> watch::Receiver<Snapshot> {
244 let mut rx = self.0.snapshot_rx.clone();
245 // Mark the current value as seen, so that the user of the receiver
246 // will only be notified of *subsequent* values.
247 rx.borrow_and_update();
248 rx
249 }
250
251 /// Returns a [`watch::Receiver`] that can be used to subscribe to state changes.
252 pub fn subscribe_changes(&self) -> watch::Receiver<(jmt::Version, Arc<Cache>)> {
253 let mut rx = self.0.changes_rx.clone();
254 // Mark the current value as seen, so that the user of the receiver
255 // will only be notified of *subsequent* values.
256 rx.borrow_and_update();
257 rx
258 }
259
260 /// Returns a new [`Snapshot`] on top of the latest version of the tree.
261 pub fn latest_snapshot(&self) -> Snapshot {
262 self.0.snapshots.read().latest()
263 }
264
265 /// Fetches the [`Snapshot`] corresponding to the supplied `jmt::Version` from
266 /// the [`SnapshotCache`]. Returns `None` if no match was found.
267 pub fn snapshot(&self, version: jmt::Version) -> Option<Snapshot> {
268 self.0.snapshots.read().get(version)
269 }
270
271 /// Prepares a commit for the provided [`StateDelta`], returning a [`StagedWriteBatch`].
272 /// The batch can be committed to the database using the [`Storage::commit_batch`] method.
273 pub async fn prepare_commit(&self, delta: StateDelta<Snapshot>) -> Result<StagedWriteBatch> {
274 // Extract the snapshot and the changes from the state delta
275 let (snapshot, changes) = delta.flatten();
276 let prev_snapshot_version = snapshot.version();
277
278 // We use wrapping_add here so that we can write `new_version = 0` by
279 // overflowing `PRE_GENESIS_VERSION`.
280 let prev_storage_version = self.latest_version();
281 let next_storage_version = prev_storage_version.wrapping_add(1);
282 tracing::debug!(prev_storage_version, next_storage_version);
283
284 ensure!(
285 prev_storage_version == prev_snapshot_version,
286 "trying to prepare a commit for a delta forked from version {}, but the latest version is {}",
287 prev_snapshot_version,
288 prev_storage_version
289 );
290
291 self.prepare_commit_inner(snapshot, changes, next_storage_version, false)
292 .await
293 }
294
295 async fn prepare_commit_inner(
296 &self,
297 snapshot: Snapshot,
298 cache: Cache,
299 version: jmt::Version,
300 perform_migration: bool,
301 ) -> Result<StagedWriteBatch> {
302 tracing::debug!(new_jmt_version = ?version, "preparing to commit state delta");
303 // Save a copy of the changes to send to subscribers later.
304 let changes = Arc::new(cache.clone_changes());
305
306 let mut changes_by_substore = cache.shard_by_prefix(&self.0.multistore_config);
307 #[allow(clippy::disallowed_types)]
308 let mut substore_roots = HashMap::new();
309 let mut multistore_versions =
310 multistore::MultistoreCache::from_config(self.0.multistore_config.clone());
311
312 let db = self.0.db.clone();
313 let rocksdb_snapshot = snapshot.0.snapshot.clone();
314
315 let mut new_versions = vec![];
316
317 // We use a single write batch to commit all the substores at once. Each task will append
318 // its own changes to the batch, and we will commit it at the end.
319 let mut write_batch = rocksdb::WriteBatch::default();
320
321 // Note(erwan): Here, we spawn a commit task for each substore.
322 // The substore keyspaces are disjoint, so conceptually it is
323 // fine to rewrite it using a [`tokio::task::JoinSet`].
324 // The reason this isn't done is because `rocksdb::WriteBatch`
325 // is _not_ thread-safe.
326 //
327 // This means that to spin-up N tasks, we would need to use a
328 // single batch wrapped in a mutex, or use N batches, and find
329 // a way to commit to them atomically. This isn't supported by
330 // RocksDB which leaves one option: to iterate over each entry
331 // in each batch, and merge them together. At this point, this
332 // is probably not worth it.
333 //
334 // Another option is to trade atomicity for parallelism by producing
335 // N batches, and committing them in distinct atomic writes. This is
336 // potentially faster, but it is also more dangerous, because if one
337 // of the writes fails, we are left with a partially committed state.
338 //
339 // The current implementation leans on the fact that the number of
340 // substores is small, and that the synchronization overhead of a joinset
341 // would exceed its benefits. This works well for now.
342 for config in self.0.multistore_config.iter() {
343 tracing::debug!(substore_prefix = ?config.prefix, "processing substore");
344 // If the substore is empty, we need to fetch its initialized version from the cache.
345 let old_substore_version = config
346 .latest_version_from_snapshot(&db, &rocksdb_snapshot)?
347 .unwrap_or_else(|| {
348 tracing::debug!("substore is empty, fetching initialized version from cache");
349 snapshot
350 .substore_version(config)
351 .expect("prefix should be initialized")
352 });
353
354 let Some(changeset) = changes_by_substore.remove(config) else {
355 tracing::debug!(prefix = config.prefix, "no changes for substore, skipping");
356 multistore_versions.set_version(config.clone(), old_substore_version);
357 continue;
358 };
359
360 let new_version = if perform_migration {
361 old_substore_version
362 } else {
363 old_substore_version.wrapping_add(1)
364 };
365 new_versions.push(new_version);
366 let substore_snapshot = SubstoreSnapshot {
367 config: config.clone(),
368 rocksdb_snapshot: rocksdb_snapshot.clone(),
369 version: new_version,
370 db: db.clone(),
371 };
372
373 let substore_storage = SubstoreStorage { substore_snapshot };
374
375 // Commit the substore and collect its root hash
376 let (root_hash, substore_batch) = substore_storage
377 .commit(changeset, write_batch, new_version, perform_migration)
378 .await?;
379 write_batch = substore_batch;
380
381 tracing::debug!(
382 ?root_hash,
383 prefix = config.prefix,
384 ?version,
385 "added substore to write batch"
386 );
387 substore_roots.insert(config.clone(), (root_hash, new_version));
388
389 tracing::debug!(
390 ?root_hash,
391 prefix = ?config.prefix,
392 ?new_version,
393 "updating substore version"
394 );
395 multistore_versions.set_version(config.clone(), new_version);
396 }
397
398 // Add substore roots to the main store changeset
399 let main_store_config = self.0.multistore_config.main_store.clone();
400 let mut main_store_changes = changes_by_substore
401 .remove(&main_store_config)
402 .unwrap_or_else(|| {
403 tracing::debug!("no changes for main store, creating empty changeset");
404 Cache::default()
405 });
406
407 for (config, (root_hash, _)) in substore_roots.iter() {
408 main_store_changes
409 .unwritten_changes
410 .insert(config.prefix.to_string(), Some(root_hash.0.to_vec()));
411 }
412
413 // Commit the main store and collect the global root hash
414 let main_store_snapshot = SubstoreSnapshot {
415 config: main_store_config.clone(),
416 rocksdb_snapshot: snapshot.0.snapshot.clone(),
417 version,
418 db: self.0.db.clone(),
419 };
420
421 let main_store_storage = SubstoreStorage {
422 substore_snapshot: main_store_snapshot,
423 };
424
425 let (global_root_hash, write_batch) = main_store_storage
426 .commit(main_store_changes, write_batch, version, perform_migration)
427 .await?;
428 tracing::debug!(
429 ?global_root_hash,
430 ?version,
431 "added main store to write batch"
432 );
433
434 tracing::debug!(?global_root_hash, version = ?version, "updating main store version");
435 let main_store_config = self.0.multistore_config.main_store.clone();
436 multistore_versions.set_version(main_store_config, version);
437
438 Ok(StagedWriteBatch {
439 write_batch,
440 version,
441 multistore_versions,
442 root_hash: global_root_hash,
443 substore_roots,
444 perform_migration,
445 changes,
446 })
447 }
448
449 /// Commits the provided [`StateDelta`] to persistent storage as the latest
450 /// version of the chain state.
451 pub async fn commit(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
452 let batch = self.prepare_commit(delta).await?;
453 self.commit_batch(batch)
454 }
455
456 /// Commits the supplied [`StagedWriteBatch`] to persistent storage.
457 ///
458 /// # Migrations
459 /// In the case of chain state migrations we need to commit the new state
460 /// without incrementing the version. If `perform_migration` is `true` the
461 /// snapshot will _not_ be written to the snapshot cache, and no subscribers
462 /// will be notified. Substore versions will not be updated.
463 pub fn commit_batch(&self, batch: StagedWriteBatch) -> Result<crate::RootHash> {
464 let StagedWriteBatch {
465 write_batch,
466 version,
467 multistore_versions,
468 root_hash: global_root_hash,
469 substore_roots,
470 perform_migration,
471 changes,
472 } = batch;
473
474 let db = self.0.db.clone();
475
476 // check that the version of the batch being committed is the correct next version
477 let old_version = self.latest_version();
478 let expected_new_version = if perform_migration {
479 old_version
480 } else {
481 old_version.wrapping_add(1)
482 };
483
484 ensure!(
485 expected_new_version == version,
486 "new version mismatch: expected {} but got {}",
487 expected_new_version,
488 version
489 );
490
491 // also check that each of the substore versions are the correct next version
492 let snapshot = self.latest_snapshot();
493
494 // Warning: we MUST check version coherence for **every** substore.
495 // These checks are a second line of defense. They must consider
496 // the case when two deltas effect distinct substores.
497 //
498 // version: (m, ss_1, ss_2)
499 // D_0: (_, 1, 0) <- initial state
500 // D_1: (A, 1, 1) <- multiwrite to ss_1 AND ss_2
501 // D_1*: (A, 1, 0) <- isolate write to ss_1
502 //
503 // A comprehensive check lets us catch the stale write D_1* even if
504 // locally it does not directly effect the second substore at all.
505 // And even if the main version check passes (spuriously, or because of
506 // a migration).
507 for (substore_config, new_version) in &multistore_versions.substores {
508 if substore_config.prefix.is_empty() {
509 // this is the main store, ignore
510 continue;
511 }
512
513 let old_substore_version = snapshot
514 .substore_version(substore_config)
515 .expect("substores must be initialized at startup");
516
517 // if the substore exists in `substore_roots`, there have been updates to the substore.
518 // if `perform_migration` is false and there are updates, the next version should be previous + 1.
519 // otherwise, the version should remain the same.
520 let expected_substore_version =
521 if substore_roots.get(substore_config).is_some() && !perform_migration {
522 old_substore_version.wrapping_add(1)
523 } else {
524 old_substore_version
525 };
526
527 ensure!(
528 expected_substore_version == *new_version,
529 "substore new version mismatch for substore with prefix {}: expected {} but got {}",
530 substore_config.prefix,
531 expected_substore_version,
532 new_version
533 );
534 }
535
536 tracing::debug!(new_jmt_version = ?batch.version, "committing batch to db");
537
538 db.write(write_batch).expect("can write to db");
539 tracing::debug!(
540 ?global_root_hash,
541 ?version,
542 "committed main store and substores to db"
543 );
544
545 // If we're not performing a migration, we should update the snapshot cache
546 if !perform_migration {
547 tracing::debug!("updating snapshot cache");
548
549 let latest_snapshot = Snapshot::new(db.clone(), version, multistore_versions);
550 // Obtain a write lock to the snapshot cache, and push the latest snapshot
551 // available. The lock guard is implicitly dropped immediately.
552 self.0
553 .snapshots
554 .write()
555 .try_push(latest_snapshot.clone())
556 .expect("should process snapshots with consecutive jmt versions");
557
558 tracing::debug!(?version, "dispatching snapshot");
559
560 // Send fails if the channel is closed (i.e., if there are no receivers);
561 // in this case, we should ignore the error, we have no one to notify.
562 let _ = self
563 .0
564 .dispatcher_tx
565 .send((latest_snapshot, (version, changes)));
566 } else {
567 tracing::debug!("skipping snapshot cache update");
568 }
569
570 Ok(global_root_hash)
571 }
572
573 #[cfg(feature = "migration")]
574 /// Commit the provided [`StateDelta`] to persistent storage without increasing the version
575 /// of the chain state, and skips the snapshot cache update.
576 pub async fn commit_in_place(&self, delta: StateDelta<Snapshot>) -> Result<crate::RootHash> {
577 let (snapshot, changes) = delta.flatten();
578 let old_version = self.latest_version();
579 let batch = self
580 .prepare_commit_inner(snapshot, changes, old_version, true)
581 .await?;
582 self.commit_batch(batch)
583 }
584
585 /// Returns the internal handle to RocksDB, this is useful to test adjacent storage crates.
586 #[cfg(test)]
587 pub(crate) fn db(&self) -> Arc<DB> {
588 self.0.db.clone()
589 }
590
591 /// Shuts down the database and the dispatcher task, and waits for all resources to be reclaimed.
592 /// Panics if there are still outstanding references to the `Inner` storage.
593 pub async fn release(mut self) {
594 if let Some(inner) = Arc::get_mut(&mut self.0) {
595 inner.shutdown().await;
596 inner.snapshots.write().clear();
597 // `Inner` is dropped once the call completes.
598 } else {
599 panic!("Unable to get mutable reference to Inner");
600 }
601 }
602}
603
604impl Inner {
605 pub(crate) async fn shutdown(&mut self) {
606 if let Some(jh) = self.jh_dispatcher.take() {
607 jh.abort();
608 let _ = jh.await;
609 }
610 }
611}