1use std::{
2 fmt::{Display, Formatter},
3 sync::Arc,
4};
5
6use anyhow::Result;
7use borsh::BorshDeserialize;
8use jmt::{
9 storage::{HasPreimage, LeafNode, Node, NodeKey, TreeReader},
10 KeyHash, RootHash,
11};
12use rocksdb::{ColumnFamily, IteratorMode, ReadOptions};
13use tracing::Span;
14
15use crate::{snapshot::RocksDbSnapshot, Cache};
16
17use jmt::storage::TreeWriter;
18
19#[derive(Debug, Eq, PartialEq, PartialOrd, Ord, Hash)]
22pub struct SubstoreConfig {
23 pub prefix: String,
25 pub prefix_with_delimiter: String,
27 cf_jmt: String,
33 cf_jmt_keys: String,
37 cf_jmt_values: String,
41 cf_jmt_keys_by_keyhash: String,
45 cf_nonverifiable: String,
50}
51
52impl SubstoreConfig {
53 pub fn new(prefix: impl ToString) -> Self {
54 let prefix = prefix.to_string();
55 Self {
56 cf_jmt: format!("substore-{}-jmt", prefix),
57 cf_jmt_keys: format!("substore-{}-jmt-keys", prefix),
58 cf_jmt_values: format!("substore-{}-jmt-values", prefix),
59 cf_jmt_keys_by_keyhash: format!("substore-{}-jmt-keys-by-keyhash", prefix),
60 cf_nonverifiable: format!("substore-{}-nonverifiable", prefix),
61 prefix_with_delimiter: format!("{}/", prefix),
62 prefix,
63 }
64 }
65
66 pub fn columns(&self) -> impl Iterator<Item = &String> {
69 std::iter::once(&self.cf_jmt)
70 .chain(std::iter::once(&self.cf_jmt_keys))
71 .chain(std::iter::once(&self.cf_jmt_values))
72 .chain(std::iter::once(&self.cf_jmt_keys_by_keyhash))
73 .chain(std::iter::once(&self.cf_nonverifiable))
74 }
75
76 pub fn cf_jmt<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
77 let column = self.cf_jmt.as_str();
78 db_handle.cf_handle(column).unwrap_or_else(|| {
79 panic!(
80 "jmt column family not found for prefix: {}, substore: {}",
81 column, self.prefix
82 )
83 })
84 }
85
86 pub fn cf_jmt_values<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
87 let column = self.cf_jmt_values.as_str();
88 db_handle.cf_handle(column).unwrap_or_else(|| {
89 panic!(
90 "jmt-values column family not found for prefix: {}, substore: {}",
91 column, self.prefix
92 )
93 })
94 }
95
96 pub fn cf_jmt_keys_by_keyhash<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
97 let column = self.cf_jmt_keys_by_keyhash.as_str();
98 db_handle.cf_handle(column).unwrap_or_else(|| {
99 panic!(
100 "jmt-keys-by-keyhash column family not found for prefix: {}, substore: {}",
101 column, self.prefix
102 )
103 })
104 }
105
106 pub fn cf_jmt_keys<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
107 let column = self.cf_jmt_keys.as_str();
108 db_handle.cf_handle(column).unwrap_or_else(|| {
109 panic!(
110 "jmt-keys column family not found for prefix: {}, substore: {}",
111 column, self.prefix
112 )
113 })
114 }
115
116 pub fn cf_nonverifiable<'s>(&self, db_handle: &'s Arc<rocksdb::DB>) -> &'s ColumnFamily {
117 let column = self.cf_nonverifiable.as_str();
118 db_handle.cf_handle(column).unwrap_or_else(|| {
119 panic!(
120 "nonverifiable column family not found for prefix: {}, substore: {}",
121 column, self.prefix
122 )
123 })
124 }
125
126 pub fn latest_version_from_db(
127 &self,
128 db_handle: &Arc<rocksdb::DB>,
129 ) -> Result<Option<jmt::Version>> {
130 Ok(self
131 .get_rightmost_leaf_from_db(db_handle)?
132 .map(|(node_key, _)| node_key.version()))
133 }
134
135 pub fn latest_version_from_snapshot(
136 &self,
137 db_handle: &Arc<rocksdb::DB>,
138 snapshot: &RocksDbSnapshot,
139 ) -> Result<Option<jmt::Version>> {
140 Ok(self
141 .get_rightmost_leaf_from_snapshot(db_handle, snapshot)?
142 .map(|(node_key, _)| node_key.version()))
143 }
144
145 fn get_rightmost_leaf_from_db(
149 &self,
150 db_handle: &Arc<rocksdb::DB>,
151 ) -> Result<Option<(NodeKey, LeafNode)>> {
152 let cf_jmt = self.cf_jmt(db_handle);
153 let mut iter = db_handle.raw_iterator_cf(cf_jmt);
154 iter.seek_to_last();
155
156 if iter.valid() {
157 let node_key =
158 DbNodeKey::decode(iter.key().expect("all DB entries should have a key"))?
159 .into_inner();
160 let node =
161 Node::try_from_slice(iter.value().expect("all DB entries should have a value"))?;
162
163 if let Node::Leaf(leaf_node) = node {
164 return Ok(Some((node_key, leaf_node)));
165 }
166 } else {
167 }
169
170 Ok(None)
171 }
172
173 fn get_rightmost_leaf_from_snapshot(
174 &self,
175 db_handle: &Arc<rocksdb::DB>,
176 snapshot: &RocksDbSnapshot,
177 ) -> Result<Option<(NodeKey, LeafNode)>> {
178 let cf_jmt = self.cf_jmt(db_handle);
179 let mut iter = snapshot.iterator_cf(cf_jmt, IteratorMode::End);
180 let Some((raw_key, raw_value)) = iter.next().transpose()? else {
181 return Ok(None);
182 };
183
184 let node_key = DbNodeKey::decode(&raw_key)?.into_inner();
185 let Node::Leaf(leaf) = Node::try_from_slice(&raw_value)? else {
186 return Ok(None);
187 };
188 Ok(Some((node_key, leaf)))
189 }
190}
191
192impl Display for SubstoreConfig {
193 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
194 write!(f, "SubstoreConfig(prefix={})", self.prefix)
195 }
196}
197
198pub struct SubstoreSnapshot {
204 pub(crate) config: Arc<SubstoreConfig>,
205 pub(crate) rocksdb_snapshot: Arc<RocksDbSnapshot>,
206 pub(crate) version: jmt::Version,
207 pub(crate) db: Arc<rocksdb::DB>,
208}
209
210impl SubstoreSnapshot {
211 pub fn root_hash(&self) -> Result<crate::RootHash> {
212 let version = self.version();
213 let tree = jmt::Sha256Jmt::new(self);
214 Ok(tree
215 .get_root_hash_option(version)?
216 .unwrap_or(jmt::RootHash([0; 32])))
217 }
218
219 pub fn version(&self) -> jmt::Version {
220 self.version
221 }
222
223 pub(crate) fn get_with_proof(
227 &self,
228 key: Vec<u8>,
229 ) -> Result<(Option<Vec<u8>>, ics23::CommitmentProof)> {
230 let version = self.version();
231 let tree = jmt::Sha256Jmt::new(self);
232 tree.get_with_ics23_proof(key, version)
233 }
234
235 pub fn get_jmt(&self, key: jmt::KeyHash) -> Result<Option<Vec<u8>>> {
241 let tree = jmt::Sha256Jmt::new(self);
242 match tree.get(key, self.version()) {
243 Ok(Some(value)) => {
244 tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), ?key, value = ?hex::encode(&value), "read from tree");
245 Ok(Some(value))
246 }
247 Ok(None) => {
248 tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), ?key, "key not found in tree");
249 Ok(None)
250 }
251 Err(e)
256 if e.downcast_ref::<jmt::MissingRootError>().is_some()
257 && self.version() == u64::MAX =>
258 {
259 tracing::trace!(substore = ?self.config.prefix, version = ?self.version(), "no data available at this version");
260 Ok(None)
261 }
262 Err(e) => Err(e),
263 }
264 }
265}
266
267impl TreeReader for SubstoreSnapshot {
268 fn get_value_option(
271 &self,
272 max_version: jmt::Version,
273 key_hash: KeyHash,
274 ) -> Result<Option<jmt::OwnedValue>> {
275 let cf_jmt_values = self.config.cf_jmt_values(&self.db);
276
277 if max_version == u64::MAX {
281 let k = VersionedKeyHash {
282 version: u64::MAX,
283 key_hash,
284 };
285
286 if let Some(v) = self.rocksdb_snapshot.get_cf(cf_jmt_values, k.encode())? {
287 let maybe_value: Option<Vec<u8>> = BorshDeserialize::try_from_slice(v.as_ref())?;
288 return Ok(maybe_value);
289 }
290 }
291
292 let mut lower_bound = key_hash.0.to_vec();
293 lower_bound.extend_from_slice(&0u64.to_be_bytes());
294
295 let mut upper_bound = key_hash.0.to_vec();
296 upper_bound.extend_from_slice(&(max_version.saturating_add(1)).to_be_bytes());
298
299 let mut readopts = ReadOptions::default();
300 readopts.set_iterate_lower_bound(lower_bound);
301 readopts.set_iterate_upper_bound(upper_bound);
302 let mut iterator =
303 self.rocksdb_snapshot
304 .iterator_cf_opt(cf_jmt_values, readopts, IteratorMode::End);
305
306 let Some(tuple) = iterator.next() else {
307 return Ok(None);
308 };
309
310 let (_key, v) = tuple?;
311 let maybe_value = BorshDeserialize::try_from_slice(v.as_ref())?;
312 Ok(maybe_value)
313 }
314
315 fn get_node_option(&self, node_key: &NodeKey) -> Result<Option<Node>> {
317 let db_node_key = DbNodeKey::from(node_key.clone());
318 tracing::trace!(?node_key);
319
320 let cf_jmt = self.config.cf_jmt(&self.db);
321 let value = self
322 .rocksdb_snapshot
323 .get_cf(cf_jmt, db_node_key.encode()?)?
324 .map(|db_slice| Node::try_from_slice(&db_slice))
325 .transpose()?;
326
327 tracing::trace!(?node_key, ?value);
328 Ok(value)
329 }
330
331 fn get_rightmost_leaf(&self) -> Result<Option<(NodeKey, LeafNode)>> {
332 let cf_jmt = self.config.cf_jmt(&self.db);
333 let mut iter = self.rocksdb_snapshot.raw_iterator_cf(cf_jmt);
334 iter.seek_to_last();
335
336 if iter.valid() {
337 let node_key =
338 DbNodeKey::decode(iter.key().expect("all DB entries should have a key"))?
339 .into_inner();
340 let node =
341 Node::try_from_slice(iter.value().expect("all DB entries should have a value"))?;
342
343 if let Node::Leaf(leaf_node) = node {
344 return Ok(Some((node_key, leaf_node)));
345 }
346 } else {
347 }
349
350 Ok(None)
351 }
352}
353
354impl HasPreimage for SubstoreSnapshot {
355 fn preimage(&self, key_hash: KeyHash) -> Result<Option<Vec<u8>>> {
356 let cf_jmt_keys_by_keyhash = self.config.cf_jmt_keys_by_keyhash(&self.db);
357
358 Ok(self
359 .rocksdb_snapshot
360 .get_cf(cf_jmt_keys_by_keyhash, key_hash.0)?)
361 }
362}
363
364pub struct SubstoreStorage {
365 pub(crate) substore_snapshot: SubstoreSnapshot,
366}
367
368impl SubstoreStorage {
369 pub async fn commit(
370 self,
371 cache: Cache,
372 mut write_batch: rocksdb::WriteBatch,
373 write_version: jmt::Version,
374 perform_migration: bool,
375 ) -> Result<(RootHash, rocksdb::WriteBatch)> {
376 let span = Span::current();
377
378 tokio::task
379 ::spawn_blocking(move || {
380 span.in_scope(|| {
381 let jmt = jmt::Sha256Jmt::new(&self.substore_snapshot);
382 let unwritten_changes: Vec<_> = cache
383 .unwritten_changes
384 .into_iter()
385 .map(|(key, some_value)| (KeyHash::with::<sha2::Sha256>(&key), key, some_value))
386 .collect();
387
388 let cf_jmt_keys = self.substore_snapshot.config.cf_jmt_keys(&self.substore_snapshot.db);
389 let cf_jmt_keys_by_keyhash = self.substore_snapshot.config.cf_jmt_keys_by_keyhash(&self.substore_snapshot.db);
390 let cf_jmt = self.substore_snapshot.config.cf_jmt(&self.substore_snapshot.db);
391 let cf_jmt_values = self.substore_snapshot.config.cf_jmt_values(&self.substore_snapshot.db);
392
393 for (keyhash, key_preimage, value) in unwritten_changes.iter() {
395 match value {
396 Some(_) => { write_batch.put_cf(cf_jmt_keys, key_preimage, keyhash.0);
398 write_batch
399 .put_cf(cf_jmt_keys_by_keyhash, keyhash.0, key_preimage)
400 }
401 None => { write_batch.delete_cf(cf_jmt_keys, key_preimage);
403 write_batch.delete_cf(cf_jmt_keys_by_keyhash, keyhash.0);
404 }
405 };
406 }
407
408 let skip_key = |(keyhash, _key, some_value)| (keyhash, some_value);
411
412 let (root_hash, batch) = if perform_migration {
413 jmt.append_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
416 } else {
417 jmt.put_value_set(unwritten_changes.into_iter().map(skip_key), write_version)?
418 };
419
420 for (node_key, node) in batch.node_batch.nodes() {
422 let db_node_key_bytes= DbNodeKey::encode_from_node_key(node_key)?;
423 let value_bytes = borsh::to_vec(node)?;
424 tracing::trace!(?db_node_key_bytes, value_bytes = ?hex::encode(&value_bytes));
425 write_batch.put_cf(cf_jmt, db_node_key_bytes, value_bytes);
426 }
427
428
429 for ((version, key_hash), some_value) in batch.node_batch.values() {
430 let key_bytes = VersionedKeyHash::encode_from_keyhash(key_hash, version);
431 let value_bytes = borsh::to_vec(some_value)?;
432 tracing::trace!(?key_bytes, value_bytes = ?hex::encode(&value_bytes));
433 write_batch.put_cf(cf_jmt_values, key_bytes, value_bytes);
434 }
435
436 tracing::trace!(?root_hash, "accumulated node changes in the write batch");
437
438
439 for (k, v) in cache.nonverifiable_changes.into_iter() {
440 let cf_nonverifiable = self.substore_snapshot.config.cf_nonverifiable(&self.substore_snapshot.db);
441 match v {
442 Some(v) => {
443 tracing::trace!(key = ?crate::EscapedByteSlice(&k), value = ?crate::EscapedByteSlice(&v), "put nonverifiable key");
444 write_batch.put_cf(cf_nonverifiable, k, &v);
445 }
446 None => {
447 write_batch.delete_cf(cf_nonverifiable, k);
448 }
449 };
450 }
451
452 Ok((root_hash, write_batch))
453 })
454 })
455 .await?
456 }
457}
458
459impl TreeWriter for SubstoreStorage {
460 fn write_node_batch(&self, _node_batch: &jmt::storage::NodeBatch) -> Result<()> {
461 unimplemented!("We inline the tree writing logic in the `commit` method")
470 }
471}
472
473pub struct DbNodeKey(pub NodeKey);
476
477impl DbNodeKey {
478 pub fn from(node_key: NodeKey) -> Self {
479 DbNodeKey(node_key)
480 }
481
482 pub fn into_inner(self) -> NodeKey {
483 self.0
484 }
485
486 pub fn encode(&self) -> Result<Vec<u8>> {
487 Self::encode_from_node_key(&self.0)
488 }
489
490 pub fn encode_from_node_key(node_key: &NodeKey) -> Result<Vec<u8>> {
491 let mut bytes = Vec::new();
492 bytes.extend_from_slice(&node_key.version().to_be_bytes()); let rest = borsh::to_vec(node_key)?;
494 bytes.extend_from_slice(&rest);
495 Ok(bytes)
496 }
497
498 pub fn decode(bytes: impl AsRef<[u8]>) -> Result<Self> {
499 if bytes.as_ref().len() < 8 {
500 anyhow::bail!("byte slice is too short")
501 }
502 let node_key_slice = bytes.as_ref()[8..].to_vec();
504 let node_key = borsh::BorshDeserialize::try_from_slice(&node_key_slice)?;
505 Ok(DbNodeKey(node_key))
506 }
507}
508
509#[derive(Clone, Debug)]
512pub struct VersionedKeyHash {
513 pub key_hash: KeyHash,
514 pub version: jmt::Version,
515}
516
517impl VersionedKeyHash {
518 pub fn encode(&self) -> Vec<u8> {
519 VersionedKeyHash::encode_from_keyhash(&self.key_hash, &self.version)
520 }
521
522 pub fn encode_from_keyhash(key_hash: &KeyHash, version: &jmt::Version) -> Vec<u8> {
523 let mut buf: Vec<u8> = key_hash.0.to_vec();
524 buf.extend_from_slice(&version.to_be_bytes());
525 buf
526 }
527
528 #[allow(dead_code)]
529 pub fn decode(buf: Vec<u8>) -> Result<Self> {
530 if buf.len() != 40 {
531 Err(anyhow::anyhow!(
532 "could not decode buffer into VersionedKey (invalid size)"
533 ))
534 } else {
535 let raw_key_hash: [u8; 32] = buf[0..32]
536 .try_into()
537 .expect("buffer is at least 40 bytes wide");
538 let key_hash = KeyHash(raw_key_hash);
539
540 let raw_version: [u8; 8] = buf[32..40]
541 .try_into()
542 .expect("buffer is at least 40 bytes wide");
543 let version: u64 = u64::from_be_bytes(raw_version);
544
545 Ok(VersionedKeyHash { version, key_hash })
546 }
547 }
548}