cnidarium/store/
substore.rs

1use std::{
2    fmt::{Display, Formatter},
3    sync::Arc,
4};
5
6use anyhow::Result;
7use borsh::BorshDeserialize;
8use jmt::{
9    storage::{HasPreimage, LeafNode, Node, NodeKey, TreeReader},
10    KeyHash, RootHash,
11};
12use rocksdb::{ColumnFamily, IteratorMode, ReadOptions};
13use tracing::Span;
14
15use crate::{snapshot::RocksDbSnapshot, Cache};
16
17use jmt::storage::TreeWriter;
18
19/// Specifies the configuration of a substore, which is a prefixed subset of
20/// the main store with its own merkle tree, nonverifiable data, preimage index, etc.
21#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
22pub struct SubstoreConfig {
23    /// The prefix of the substore. If empty, it is the root-level store config.
24    pub prefix: String,
25    /// The prefix of the substore including the trailing slash.
26    pub prefix_with_delimiter: String,
27    /// name: "substore-{prefix}-jmt"
28    /// role: persists the logical structure of the JMT
29    /// maps: `storage::DbNodeKey` to `jmt::Node`
30    // note: `DbNodeKey` is a newtype around `NodeKey` that serialize the key
31    // so that it maps to a lexicographical ordering with ascending jmt::Version.
32    cf_jmt: String,
33    /// name: "susbstore-{prefix}-jmt-keys"
34    /// role: JMT key index.
35    /// maps: key preimages to their keyhash.
36    cf_jmt_keys: String,
37    /// name: "substore-{prefix}-jmt-values"
38    /// role: stores the actual values that JMT leaves point to.
39    /// maps: KeyHash || BE(version) to an `Option<Vec<u8>>`
40    cf_jmt_values: String,
41    /// name: "substore-{prefix}-jmt-keys-by-keyhash"
42    /// role: index JMT keys by their keyhash.
43    /// maps: keyhashes to their preimage.
44    cf_jmt_keys_by_keyhash: String,
45    /// name: "substore-{prefix}-nonverifiable"
46    /// role: auxiliary data that is not part of our merkle tree, and thus not strictly
47    /// part of consensus.
48    /// maps: arbitrary keys to arbitrary values.
49    cf_nonverifiable: String,
50}
51
52impl SubstoreConfig {
53    pub fn new(prefix: impl ToString) -> Self {
54        let prefix = prefix.to_string();
55        Self {
56            cf_jmt: format!("substore-{}-jmt", prefix),
57            cf_jmt_keys: format!("substore-{}-jmt-keys", prefix),
58            cf_jmt_values: format!("substore-{}-jmt-values", prefix),
59            cf_jmt_keys_by_keyhash: format!("substore-{}-jmt-keys-by-keyhash", prefix),
60            cf_nonverifiable: format!("substore-{}-nonverifiable", prefix),
61            prefix_with_delimiter: format!("{}/", prefix),
62            prefix,
63        }
64    }
65
66    /// Returns an iterator over all column families in this substore.
67    /// Note(erwan): This is verbose, but very lightweight.
68    pub fn columns(&self) -> impl Iterator<Item = &String> {
69        std::iter::once(&self.cf_jmt)
70            .chain(std::iter::once(&self.cf_jmt_keys))
71            .chain(std::iter::once(&self.cf_jmt_values))
72            .chain(std::iter::once(&self.cf_jmt_keys_by_keyhash))
73            .chain(std::iter::once(&self.cf_nonverifiable))
74    }
75
76    pub fn cf_jmt<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
77        let column = self.cf_jmt.as_str();
78        db_handle.cf_handle(column).unwrap_or_else(|| {
79            panic!(
80                "jmt column family not found for prefix: {}, substore: {}",
81                column, self.prefix
82            )
83        })
84    }
85
86    pub fn cf_jmt_values<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
87        let column = self.cf_jmt_values.as_str();
88        db_handle.cf_handle(column).unwrap_or_else(|| {
89            panic!(
90                "jmt-values column family not found for prefix: {}, substore: {}",
91                column, self.prefix
92            )
93        })
94    }
95
96    pub fn cf_jmt_keys_by_keyhash<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
97        let column = self.cf_jmt_keys_by_keyhash.as_str();
98        db_handle.cf_handle(column).unwrap_or_else(|| {
99            panic!(
100                "jmt-keys-by-keyhash column family not found for prefix: {}, substore: {}",
101                column, self.prefix
102            )
103        })
104    }
105
106    pub fn cf_jmt_keys<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
107        let column = self.cf_jmt_keys.as_str();
108        db_handle.cf_handle(column).unwrap_or_else(|| {
109            panic!(
110                "jmt-keys column family not found for prefix: {}, substore: {}",
111                column, self.prefix
112            )
113        })
114    }
115
116    pub fn cf_nonverifiable<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
117        let column = self.cf_nonverifiable.as_str();
118        db_handle.cf_handle(column).unwrap_or_else(|| {
119            panic!(
120                "nonverifiable column family not found for prefix: {}, substore: {}",
121                column, self.prefix
122            )
123        })
124    }
125
126    pub fn latest_version_from_db(
127        &self,
128        db_handle: &Arc<rocksdb::DB>,
129    ) -> Result<Option<jmt::Version>> {
130        Ok(self
131            .get_rightmost_leaf_from_db(db_handle)?
132            .map(|(node_key, _)| node_key.version()))
133    }
134
135    pub fn latest_version_from_snapshot(
136        &self,
137        db_handle: &Arc<rocksdb::DB>,
138        snapshot: &RocksDbSnapshot,
139    ) -> Result<Option<jmt::Version>> {
140        Ok(self
141            .get_rightmost_leaf_from_snapshot(db_handle, snapshot)?
142            .map(|(node_key, _)| node_key.version()))
143    }
144
145    // TODO(erwan): having two different implementations of this is a bit weird and should
146    // be refactored, or remodeled. The DB version is only used during initialization, before
147    // a `Snapshot` is available.
148    fn get_rightmost_leaf_from_db(
149        &self,
150        db_handle: &Arc<rocksdb::DB>,
151    ) -> Result<Option<(NodeKey, LeafNode)>> {
152        let cf_jmt = self.cf_jmt(db_handle);
153        let mut iter = db_handle.raw_iterator_cf(cf_jmt);
154        iter.seek_to_last();
155
156        if iter.valid() {
157            let node_key =
158                DbNodeKey::decode(iter.key().expect("all DB entries should have a key"))?
159                    .into_inner();
160            let node =
161                Node::try_from_slice(iter.value().expect("all DB entries should have a value"))?;
162
163            if let Node::Leaf(leaf_node) = node {
164                return Ok(Some((node_key, leaf_node)));
165            }
166        } else {
167            // There are no keys in the database
168        }
169
170        Ok(None)
171    }
172
173    fn get_rightmost_leaf_from_snapshot(
174        &self,
175        db_handle: &Arc<rocksdb::DB>,
176        snapshot: &RocksDbSnapshot,
177    ) -> Result<Option<(NodeKey, LeafNode)>> {
178        let cf_jmt = self.cf_jmt(db_handle);
179        let mut iter = snapshot.iterator_cf(cf_jmt, IteratorMode::End);
180        let Some((raw_key, raw_value)) = iter.next().transpose()? else {
181            return Ok(None);
182        };
183
184        let node_key = DbNodeKey::decode(&raw_key)?.into_inner();
185        let Node::Leaf(leaf) = Node::try_from_slice(&raw_value)? else {
186            return Ok(None);
187        };
188        Ok(Some((node_key, leaf)))
189    }
190}
191
192impl Display for SubstoreConfig {
193    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
194        write!(f, "SubstoreConfig(prefix={})", self.prefix)
195    }
196}
197
198/// A read-only view into a substore at a specific state version.
199///
200/// A [`SubstoreSnapshot`] is lightweight and cheap to create, it can be
201/// instantiated on-demand when a read-only view of a substore's state is
202/// needed.
203pub struct SubstoreSnapshot {
204    pub(crate) config: Arc<SubstoreConfig>,
205    pub(crate) rocksdb_snapshot: Arc<RocksDbSnapshot>,
206    pub(crate) version: jmt::Version,
207    pub(crate) db: Arc<rocksdb::DB>,
208}
209
210impl SubstoreSnapshot {
211    pub fn root_hash(&self) -> Result<crate::RootHash> {
212        let version = self.version();
213        let tree = jmt::Sha256Jmt::new(self);
214        Ok(tree
215            .get_root_hash_option(version)?
216            .unwrap_or(jmt::RootHash([0; 32])))
217    }
218
219    pub fn version(&self) -> jmt::Version {
220        self.version
221    }
222
223    /// Returns some value corresponding to the key, along with an ICS23 existence proof
224    /// up to the current JMT root hash. If the key is not present, returns `None` and a
225    /// non-existence proof.
226    pub(crate) fn get_with_proof(
227        &self,
228        key: Vec<u8>,
229    ) -> Result<(Option<Vec<u8>>, ics23::CommitmentProof)> {
230        let version = self.version();
231        let tree = jmt::Sha256Jmt::new(self);
232        tree.get_with_ics23_proof(key, version)
233    }
234
235    /// Helper function used by `get_raw` and `prefix_raw`.
236    ///
237    /// Reads from the JMT will fail if the root is missing; this method
238    /// special-cases the empty tree case so that reads on an empty tree just
239    /// return None.
240    pub fn get_jmt(&self, key: jmt::KeyHash) -> Result<Option<Vec<u8>>> {
241        let tree = jmt::Sha256Jmt::new(self);
242        match tree.get(key, self.version()) {
243            Ok(Some(value)) => {
244                tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), ?key, value = ?hex::encode(&value), "read from tree");
245                Ok(Some(value))
246            }
247            Ok(None) => {
248                tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), ?key, "key not found in tree");
249                Ok(None)
250            }
251            // This allows for using the Overlay on an empty database without
252            // errors We only skip the `MissingRootError` if the `version` is
253            // `u64::MAX`, the pre-genesis version. Otherwise, a missing root
254            // actually does indicate a problem.
255            Err(e)
256                if e.downcast_ref::<jmt::MissingRootError>().is_some()
257                    && self.version() == u64::MAX =>
258            {
259                tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), "no data available at this version");
260                Ok(None)
261            }
262            Err(e) => Err(e),
263        }
264    }
265}
266
267impl TreeReader for SubstoreSnapshot {
268    /// Gets a value by identifier, returning the newest value whose version is *less than or
269    /// equal to* the specified version.  Returns `None` if the value does not exist.
270    fn get_value_option(
271        &self,
272        max_version: jmt::Version,
273        key_hash: KeyHash,
274    ) -> Result<Option<jmt::OwnedValue>> {
275        let cf_jmt_values = self.config.cf_jmt_values(&self.db);
276
277        // Prefix ranges exclude the upper bound in the iterator result.
278        // This means that when requesting the largest possible version, there
279        // is no way to specify a range that is inclusive of `u64::MAX`.
280        if max_version == u64::MAX {
281            let k = VersionedKeyHash {
282                version: u64::MAX,
283                key_hash,
284            };
285
286            if let Some(v) = self.rocksdb_snapshot.get_cf(cf_jmt_values, k.encode())? {
287                let maybe_value: Option<Vec<u8>> = BorshDeserialize::try_from_slice(v.as_ref())?;
288                return Ok(maybe_value);
289            }
290        }
291
292        let mut lower_bound = key_hash.0.to_vec();
293        lower_bound.extend_from_slice(&0u64.to_be_bytes());
294
295        let mut upper_bound = key_hash.0.to_vec();
296        // The upper bound is excluded from the iteration results.
297        upper_bound.extend_from_slice(&(max_version.saturating_add(1)).to_be_bytes());
298
299        let mut readopts = ReadOptions::default();
300        readopts.set_iterate_lower_bound(lower_bound);
301        readopts.set_iterate_upper_bound(upper_bound);
302        let mut iterator =
303            self.rocksdb_snapshot
304                .iterator_cf_opt(cf_jmt_values, readopts, IteratorMode::End);
305
306        let Some(tuple) = iterator.next() else {
307            return Ok(None);
308        };
309
310        let (_key, v) = tuple?;
311        let maybe_value = BorshDeserialize::try_from_slice(v.as_ref())?;
312        Ok(maybe_value)
313    }
314
315    /// Gets node given a node key. Returns `None` if the node does not exist.
316    fn get_node_option(&self, node_key: &NodeKey) -> Result<Option<Node>> {
317        let db_node_key = DbNodeKey::from(node_key.clone());
318        tracing::trace!(?node_key);
319
320        let cf_jmt = self.config.cf_jmt(&self.db);
321        let value = self
322            .rocksdb_snapshot
323            .get_cf(cf_jmt, db_node_key.encode()?)?
324            .map(|db_slice| Node::try_from_slice(&db_slice))
325            .transpose()?;
326
327        tracing::trace!(?node_key, ?value);
328        Ok(value)
329    }
330
331    fn get_rightmost_leaf(&self) -> Result<Option<(NodeKey, LeafNode)>> {
332        let cf_jmt = self.config.cf_jmt(&self.db);
333        let mut iter = self.rocksdb_snapshot.raw_iterator_cf(cf_jmt);
334        iter.seek_to_last();
335
336        if iter.valid() {
337            let node_key =
338                DbNodeKey::decode(iter.key().expect("all DB entries should have a key"))?
339                    .into_inner();
340            let node =
341                Node::try_from_slice(iter.value().expect("all DB entries should have a value"))?;
342
343            if let Node::Leaf(leaf_node) = node {
344                return Ok(Some((node_key, leaf_node)));
345            }
346        } else {
347            // There are no keys in the database
348        }
349
350        Ok(None)
351    }
352}
353
354impl HasPreimage for SubstoreSnapshot {
355    fn preimage(&self, key_hash: KeyHash) -> Result<Option<Vec<u8>>> {
356        let cf_jmt_keys_by_keyhash = self.config.cf_jmt_keys_by_keyhash(&self.db);
357
358        Ok(self
359            .rocksdb_snapshot
360            .get_cf(cf_jmt_keys_by_keyhash, key_hash.0)?)
361    }
362}
363
364pub struct SubstoreStorage {
365    pub(crate) substore_snapshot: SubstoreSnapshot,
366}
367
368impl SubstoreStorage {
369    pub async fn commit(
370        self,
371        cache: Cache,
372        mut write_batch: rocksdb::WriteBatch,
373        write_version: jmt::Version,
374        perform_migration: bool,
375    ) -> Result<(RootHash, rocksdb::WriteBatch)> {
376        let span = Span::current();
377
378        tokio::task
379                ::spawn_blocking(move || {
380                    span.in_scope(|| {
381                        let jmt = jmt::Sha256Jmt::new(&self.substore_snapshot);
382                        let unwritten_changes: Vec<_> = cache
383                            .unwritten_changes
384                            .into_iter()
385                            .map(|(key, some_value)| (KeyHash::with::<sha2::Sha256>(&key), key, some_value))
386                            .collect();
387
388                        let cf_jmt_keys = self.substore_snapshot.config.cf_jmt_keys(&self.substore_snapshot.db);
389                        let cf_jmt_keys_by_keyhash = self.substore_snapshot.config.cf_jmt_keys_by_keyhash(&self.substore_snapshot.db);
390                        let cf_jmt = self.substore_snapshot.config.cf_jmt(&self.substore_snapshot.db);
391                        let cf_jmt_values = self.substore_snapshot.config.cf_jmt_values(&self.substore_snapshot.db);
392
393                        /* Keyhash and pre-image indices */
394                        for (keyhash, key_preimage, value) in unwritten_changes.iter() {
395                            match value {
396                                Some(_) => { /* Key inserted, or updated, so we add it to the keyhash index */
397                                    write_batch.put_cf(cf_jmt_keys, key_preimage, keyhash.0);
398                                        write_batch
399                                        .put_cf(cf_jmt_keys_by_keyhash, keyhash.0, key_preimage)
400                                }
401                                None => { /* Key deleted, so we delete it from the preimage and keyhash index entries */
402                                    write_batch.delete_cf(cf_jmt_keys, key_preimage);
403                                    write_batch.delete_cf(cf_jmt_keys_by_keyhash, keyhash.0);
404                                }
405                            };
406                        }
407
408                        // We only track the keyhash and possible values; at the time of writing,
409                        // `rustfmt` panics on inlining the closure, so we use a helper function to skip the key.
410                        let skip_key = |(keyhash, _key, some_value)| (keyhash, some_value);
411
412                        let (root_hash, batch) = if perform_migration {
413                            // TODO(erwan): this should be feature-gated behind `migration`
414                            // activating `jmt/migration` more judiciously.
415                            jmt.append_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
416                        } else {
417                            jmt.put_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
418                        };
419
420                        /* JMT nodes and values */
421                        for (node_key, node) in batch.node_batch.nodes() {
422                            let db_node_key_bytes= DbNodeKey::encode_from_node_key(node_key)?;
423                            let value_bytes = borsh::to_vec(node)?;
424                            tracing::trace!(?db_node_key_bytes, value_bytes = ?hex::encode(&value_bytes));
425                            write_batch.put_cf(cf_jmt, db_node_key_bytes, value_bytes);
426                        }
427
428
429                        for ((version, key_hash), some_value) in batch.node_batch.values() {
430                            let key_bytes = VersionedKeyHash::encode_from_keyhash(key_hash, version);
431                            let value_bytes = borsh::to_vec(some_value)?;
432                            tracing::trace!(?key_bytes, value_bytes = ?hex::encode(&value_bytes));
433                            write_batch.put_cf(cf_jmt_values, key_bytes, value_bytes);
434                        }
435
436                        tracing::trace!(?root_hash, "accumulated node changes in the write batch");
437
438
439                        for (k, v) in cache.nonverifiable_changes.into_iter() {
440                            let cf_nonverifiable = self.substore_snapshot.config.cf_nonverifiable(&self.substore_snapshot.db);
441                            match v {
442                                Some(v) => {
443                                    tracing::trace!(key = ?crate::EscapedByteSlice(&k), value = ?crate::EscapedByteSlice(&v), "put nonverifiable key");
444                                    write_batch.put_cf(cf_nonverifiable, k, &v);
445                                }
446                                None => {
447                                    write_batch.delete_cf(cf_nonverifiable, k);
448                                }
449                            };
450                        }
451
452                        Ok((root_hash, write_batch))
453                    })
454                })
455                .await?
456    }
457}
458
459impl TreeWriter for SubstoreStorage {
460    fn write_node_batch(&self, _node_batch: &jmt::storage::NodeBatch) -> Result<()> {
461        // The "write"-part of the `TreeReader + TreeWriter` jmt architecture does not work
462        // well with a deferred write strategy.
463        // What we would like to do is to accumulate the changes in a write batch, and then commit
464        // them all at once. This isn't possible to do easily because the `TreeWriter` trait
465        // rightfully does not expose RocksDB-specific types in its API.
466        //
467        // The alternative is to use interior mutability but the semantics become
468        // so implementation specific that we lose the benefits of the trait abstraction.
469        unimplemented!("We inline the tree writing logic in the `commit` method")
470    }
471}
472
473/// An ordered node key is a node key that is encoded in a way that
474/// preserves the order of the node keys in the database.
475pub struct DbNodeKey(pub NodeKey);
476
477impl DbNodeKey {
478    pub fn from(node_key: NodeKey) -> Self {
479        DbNodeKey(node_key)
480    }
481
482    pub fn into_inner(self) -> NodeKey {
483        self.0
484    }
485
486    pub fn encode(&self) -> Result<Vec<u8>> {
487        Self::encode_from_node_key(&self.0)
488    }
489
490    pub fn encode_from_node_key(node_key: &NodeKey) -> Result<Vec<u8>> {
491        let mut bytes = Vec::new();
492        bytes.extend_from_slice(&node_key.version().to_be_bytes()); // encode version as big-endian
493        let rest = borsh::to_vec(node_key)?;
494        bytes.extend_from_slice(&rest);
495        Ok(bytes)
496    }
497
498    pub fn decode(bytes: impl AsRef<[u8]>) -> Result<Self> {
499        if bytes.as_ref().len() < 8 {
500            anyhow::bail!("byte slice is too short")
501        }
502        // Ignore the bytes that encode the version
503        let node_key_slice = bytes.as_ref()[8..].to_vec();
504        let node_key = borsh::BorshDeserialize::try_from_slice(&node_key_slice)?;
505        Ok(DbNodeKey(node_key))
506    }
507}
508
509/// Represent a JMT key hash at a specific `jmt::Version`
510/// This is used to index the JMT values in RocksDB.
511#[derive(Clone, Debug)]
512pub struct VersionedKeyHash {
513    pub key_hash: KeyHash,
514    pub version: jmt::Version,
515}
516
517impl VersionedKeyHash {
518    pub fn encode(&self) -> Vec<u8> {
519        VersionedKeyHash::encode_from_keyhash(&self.key_hash, &self.version)
520    }
521
522    pub fn encode_from_keyhash(key_hash: &KeyHash, version: &jmt::Version) -> Vec<u8> {
523        let mut buf: Vec<u8> = key_hash.0.to_vec();
524        buf.extend_from_slice(&version.to_be_bytes());
525        buf
526    }
527
528    #[allow(dead_code)]
529    pub fn decode(buf: Vec<u8>) -> Result<Self> {
530        if buf.len() != 40 {
531            Err(anyhow::anyhow!(
532                "could not decode buffer into VersionedKey (invalid size)"
533            ))
534        } else {
535            let raw_key_hash: [u8; 32] = buf[0..32]
536                .try_into()
537                .expect("buffer is at least 40 bytes wide");
538            let key_hash = KeyHash(raw_key_hash);
539
540            let raw_version: [u8; 8] = buf[32..40]
541                .try_into()
542                .expect("buffer is at least 40 bytes wide");
543            let version: u64 = u64::from_be_bytes(raw_version);
544
545            Ok(VersionedKeyHash { version, key_hash })
546        }
547    }
548}