1use std::iter;
2use std::{any::Any, sync::Arc};
3
4use anyhow::Result;
5use async_trait::async_trait;
6use ibc_types::core::commitment::MerkleProof;
7use tokio::sync::mpsc;
8use tracing::Span;
9
10#[cfg(feature = "metrics")]
11use crate::metrics;
12use crate::store::multistore::{self, MultistoreCache};
13use crate::{store, StateRead};
14
15mod rocks_wrapper;
16
17pub(crate) use rocks_wrapper::RocksDbSnapshot;
18
19#[derive(Clone)]
26pub struct Snapshot(pub(crate) Arc<Inner>);
27
28#[derive(Debug)]
30pub(crate) struct Inner {
31 pub(crate) multistore_cache: MultistoreCache,
33 pub(crate) snapshot: Arc<RocksDbSnapshot>,
35 pub(crate) version: jmt::Version,
37 pub(crate) db: Arc<rocksdb::DB>,
39}
40
41impl Snapshot {
42 pub(crate) fn new(
44 db: Arc<rocksdb::DB>,
45 version: jmt::Version,
46 multistore_cache: multistore::MultistoreCache,
47 ) -> Self {
48 Self(Arc::new(Inner {
49 snapshot: Arc::new(RocksDbSnapshot::new(db.clone())),
50 version,
51 db,
52 multistore_cache,
53 }))
54 }
55
56 pub fn version(&self) -> jmt::Version {
57 self.0.version
58 }
59
60 pub async fn get_with_proof(&self, key: Vec<u8>) -> Result<(Option<Vec<u8>>, MerkleProof)> {
64 if key.is_empty() {
65 anyhow::bail!("empty keys are not allowed")
66 }
67
68 let span = tracing::Span::current();
69 let rocksdb_snapshot = self.0.snapshot.clone();
70 let db = self.0.db.clone();
71 let mut proofs = vec![];
72
73 let (substore_key, substore_config) = self.0.multistore_cache.config.route_key_bytes(&key);
74 let substore_key_bytes = substore_key.to_vec();
75 let substore_version = self.substore_version(&substore_config).unwrap_or(u64::MAX);
76 let key_to_substore_root = substore_config.prefix.clone();
77
78 let substore = store::substore::SubstoreSnapshot {
79 config: substore_config,
80 rocksdb_snapshot: rocksdb_snapshot.clone(),
81 version: substore_version,
82 db: db.clone(),
83 };
84
85 let (substore_value, substore_commitment_proof) = tokio::task::spawn_blocking({
86 let span = span.clone();
87 move || span.in_scope(|| substore.get_with_proof(substore_key_bytes))
88 })
89 .await??;
90
91 proofs.push(substore_commitment_proof);
92
93 if !key_to_substore_root.is_empty() {
95 let main_store_config = self.0.multistore_cache.config.main_store.clone();
96 let main_version = self
97 .substore_version(&main_store_config)
98 .unwrap_or(u64::MAX);
99 let mainstore = store::substore::SubstoreSnapshot {
100 config: main_store_config,
101 rocksdb_snapshot,
102 version: main_version,
103 db,
104 };
105
106 let (_, main_commitment_proof) = tokio::task::spawn_blocking({
107 let span = span.clone();
108 move || span.in_scope(|| mainstore.get_with_proof(key_to_substore_root.into()))
109 })
110 .await??;
111
112 proofs.push(main_commitment_proof);
113 }
114
115 Ok((
116 substore_value,
117 MerkleProof {
118 proofs: proofs.clone(),
119 },
120 ))
121 }
122
123 pub fn prefix_version(&self, prefix: &str) -> Result<Option<jmt::Version>> {
124 let Some(config) = self
125 .0
126 .multistore_cache
127 .config
128 .find_substore(prefix.as_bytes())
129 else {
130 anyhow::bail!("rquested a version for a prefix that does not exist (prefix={prefix})")
131 };
132
133 Ok(self.substore_version(&config))
134 }
135
136 pub async fn prefix_root_hash(&self, prefix: &str) -> Result<crate::RootHash> {
142 let span = tracing::Span::current();
143 let rocksdb_snapshot = self.0.snapshot.clone();
144 let db = self.0.db.clone();
145
146 let Some(config) = self
147 .0
148 .multistore_cache
149 .config
150 .find_substore(prefix.as_bytes())
151 else {
152 anyhow::bail!("requested a root for a substore that does not exist (prefix={prefix})")
153 };
154
155 let version = self
156 .substore_version(&config)
157 .expect("the substore exists and has been initialized");
158
159 let substore = store::substore::SubstoreSnapshot {
160 config,
161 rocksdb_snapshot,
162 version,
163 db,
164 };
165
166 tracing::debug!(
167 prefix = substore.config.prefix,
168 version = substore.version,
169 "fetching root hash for substore"
170 );
171
172 tokio::task::spawn_blocking(move || span.in_scope(|| substore.root_hash())).await?
173 }
174
175 pub async fn root_hash(&self) -> Result<crate::RootHash> {
176 self.prefix_root_hash("").await
177 }
178
179 pub(crate) fn substore_version(
180 &self,
181 prefix: &Arc<store::substore::SubstoreConfig>,
182 ) -> Option<jmt::Version> {
183 self.0.multistore_cache.get_version(prefix)
184 }
185}
186
187#[async_trait]
188impl StateRead for Snapshot {
189 type GetRawFut = crate::future::SnapshotFuture;
190 type PrefixRawStream =
191 tokio_stream::wrappers::ReceiverStream<anyhow::Result<(String, Vec<u8>)>>;
192 type PrefixKeysStream = tokio_stream::wrappers::ReceiverStream<anyhow::Result<String>>;
193 type NonconsensusPrefixRawStream =
194 tokio_stream::wrappers::ReceiverStream<anyhow::Result<(Vec<u8>, Vec<u8>)>>;
195 type NonconsensusRangeRawStream =
196 tokio_stream::wrappers::ReceiverStream<anyhow::Result<(Vec<u8>, Vec<u8>)>>;
197
198 fn get_raw(&self, key: &str) -> Self::GetRawFut {
200 let span = Span::current();
201 let (key, config) = self.0.multistore_cache.config.route_key_str(key);
202
203 let rocksdb_snapshot = self.0.snapshot.clone();
204 let db = self.0.db.clone();
205
206 let version = self
207 .substore_version(&config)
208 .expect("the substore exists and has been initialized");
209
210 let substore = store::substore::SubstoreSnapshot {
211 config,
212 rocksdb_snapshot,
213 version,
214 db,
215 };
216 let key_hash = jmt::KeyHash::with::<sha2::Sha256>(key);
217
218 crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
219 span.in_scope(|| {
220 let _start = std::time::Instant::now();
221 let rsp = substore.get_jmt(key_hash);
222 #[cfg(feature = "metrics")]
223 metrics::histogram!(metrics::STORAGE_GET_RAW_DURATION).record(_start.elapsed());
224 rsp
225 })
226 }))
227 }
228
229 fn nonverifiable_get_raw(&self, key: &[u8]) -> Self::GetRawFut {
231 let span = Span::current();
232 let (key, config) = self.0.multistore_cache.config.route_key_bytes(key);
233
234 let rocksdb_snapshot = self.0.snapshot.clone();
235 let db = self.0.db.clone();
236
237 let version = self
238 .substore_version(&config)
239 .expect("the substore exists and has been initialized");
240
241 let substore = store::substore::SubstoreSnapshot {
242 config,
243 rocksdb_snapshot,
244 version,
245 db,
246 };
247 let key: Vec<u8> = key.to_vec();
248
249 crate::future::SnapshotFuture(tokio::task::spawn_blocking(move || {
250 span.in_scope(|| {
251 let _start = std::time::Instant::now();
252
253 let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
254 let rsp = substore
255 .rocksdb_snapshot
256 .get_cf(cf_nonverifiable, key)
257 .map_err(Into::into);
258 #[cfg(feature = "metrics")]
259 metrics::histogram!(metrics::STORAGE_NONCONSENSUS_GET_RAW_DURATION)
260 .record(_start.elapsed());
261 rsp
262 })
263 }))
264 }
265
266 fn prefix_raw(&self, prefix: &str) -> Self::PrefixRawStream {
268 let span = Span::current();
269
270 let rocksdb_snapshot = self.0.snapshot.clone();
271 let db = self.0.db.clone();
272
273 let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
274 tracing::trace!(substore_key = prefix_truncated, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
275 let substore_prefix = config.prefix.clone();
276
277 let version = self
278 .substore_version(&config)
279 .expect("the substore exists and has been initialized");
280
281 let substore = store::substore::SubstoreSnapshot {
282 config,
283 rocksdb_snapshot,
284 version,
285 db,
286 };
287
288 let mut options = rocksdb::ReadOptions::default();
289 options.set_iterate_range(rocksdb::PrefixRange(prefix_truncated.as_bytes()));
290 let mode = rocksdb::IteratorMode::Start;
291 let (tx_prefix_item, rx_prefix_query) = mpsc::channel(10);
292
293 tokio::task::spawn_blocking(move || {
297 span.in_scope(|| {
298 let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
299 let jmt_keys_iterator =
300 substore
301 .rocksdb_snapshot
302 .iterator_cf_opt(cf_jmt_keys, options, mode);
303
304 for tuple in jmt_keys_iterator {
305 let (key_preimage, _) = tuple?;
307 let substore_key = std::str::from_utf8(key_preimage.as_ref())
308 .expect("saved jmt keys are utf-8 strings");
309 let key_hash = jmt::KeyHash::with::<sha2::Sha256>(substore_key.as_bytes());
310
311 let full_key = if substore_prefix.is_empty() {
312 substore_key.to_string()
313 } else {
314 format!("{substore_prefix}/{substore_key}").to_string()
315 };
316
317 let v = substore
318 .get_jmt(key_hash)?
319 .expect("keys in jmt_keys should have a corresponding value in jmt");
320
321 tx_prefix_item.blocking_send(Ok((full_key, v)))?;
322 }
323 anyhow::Ok(())
324 })
325 });
326
327 tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
328 }
329
330 fn prefix_keys(&self, prefix: &str) -> Self::PrefixKeysStream {
334 let span = Span::current();
335
336 let rocksdb_snapshot = self.0.snapshot.clone();
337 let db = self.0.db.clone();
338
339 let (prefix_truncated, config) = self.0.multistore_cache.config.match_prefix_str(prefix);
340
341 let version = self
342 .substore_version(&config)
343 .expect("the substore exists and has been initialized");
344
345 let substore = store::substore::SubstoreSnapshot {
346 config,
347 rocksdb_snapshot,
348 version,
349 db,
350 };
351
352 let mut options = rocksdb::ReadOptions::default();
353 options.set_iterate_range(rocksdb::PrefixRange(prefix_truncated.as_bytes()));
354 let mode = rocksdb::IteratorMode::Start;
355 let (tx_prefix_keys, rx_prefix_keys) = mpsc::channel(10);
356
357 tokio::task::spawn_blocking(move || {
358 span.in_scope(|| {
359 let cf_jmt_keys = substore.config.cf_jmt_keys(&substore.db);
360 let iter = substore
361 .rocksdb_snapshot
362 .iterator_cf_opt(cf_jmt_keys, options, mode);
363
364 let substore_prefix = &substore.config.prefix;
365
366 for key_and_keyhash in iter {
367 let (raw_preimage, _) = key_and_keyhash?;
368 let preimage = std::str::from_utf8(raw_preimage.as_ref())
369 .expect("saved jmt keys are utf-8 strings");
370
371 let full_key = if substore_prefix.is_empty() {
372 preimage.to_string()
373 } else {
374 format!("{substore_prefix}/{preimage}").to_string()
375 };
376
377 tx_prefix_keys.blocking_send(Ok(full_key))?;
378 }
379 anyhow::Ok(())
380 })
381 });
382
383 tokio_stream::wrappers::ReceiverStream::new(rx_prefix_keys)
384 }
385
386 fn nonverifiable_prefix_raw(&self, prefix: &[u8]) -> Self::NonconsensusPrefixRawStream {
388 let span = Span::current();
389 let rocksdb_snapshot = self.0.snapshot.clone();
390 let db = self.0.db.clone();
391
392 let (truncated_prefix, config) = self.0.multistore_cache.config.match_prefix_bytes(prefix);
393 tracing::trace!(substore_key = ?truncated_prefix, substore_prefix = config.prefix, prefix_supplied = ?prefix, "matched prefix, fetching substore");
394 let version = self
395 .substore_version(&config)
396 .expect("the substore exists and has been initialized");
397
398 let substore = store::substore::SubstoreSnapshot {
399 config,
400 rocksdb_snapshot,
401 version,
402 db,
403 };
404
405 let mut options = rocksdb::ReadOptions::default();
406 options.set_iterate_range(rocksdb::PrefixRange(truncated_prefix));
407 let mode = rocksdb::IteratorMode::Start;
408
409 let (tx_prefix_query, rx_prefix_query) = mpsc::channel(10);
410
411 tokio::task::spawn_blocking(move || {
412 span.in_scope(|| {
413 let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
414 let iter =
415 substore
416 .rocksdb_snapshot
417 .iterator_cf_opt(cf_nonverifiable, options, mode);
418 let substore_prefix = substore.config.prefix.as_bytes().to_vec();
419 for i in iter {
420 let (boxed_key, boxed_value) = i?;
421 let key: Vec<u8> = boxed_key.into();
422 let value: Vec<u8> = boxed_value.into();
423
424 let mut full_key: Vec<u8> = vec![];
427 if substore_prefix.is_empty() {
428 full_key.extend(key);
429 } else {
430 full_key.extend(substore_prefix.clone());
431 full_key.extend(iter::once(b'/'));
432 full_key.extend(key);
433 }
434
435 tx_prefix_query.blocking_send(Ok((full_key, value)))?;
436 }
437 anyhow::Ok(())
438 })
439 });
440
441 tokio_stream::wrappers::ReceiverStream::new(rx_prefix_query)
442 }
443
444 fn nonverifiable_range_raw(
448 &self,
449 prefix: Option<&[u8]>,
450 range: impl std::ops::RangeBounds<Vec<u8>>,
451 ) -> anyhow::Result<Self::NonconsensusRangeRawStream> {
452 let span = Span::current();
453 let rocksdb_snapshot = self.0.snapshot.clone();
454 let db = self.0.db.clone();
455
456 let (prefix, config) = self
457 .0
458 .multistore_cache
459 .config
460 .route_key_bytes(prefix.unwrap_or_default());
461
462 let version = self
463 .substore_version(&config)
464 .expect("the substore exists and has been initialized");
465
466 let substore = store::substore::SubstoreSnapshot {
467 config,
468 rocksdb_snapshot,
469 version,
470 db,
471 };
472
473 let (_range, (start, end)) = crate::utils::convert_bounds(range)?;
474 let mut options = rocksdb::ReadOptions::default();
475
476 let (start, end) = (start.unwrap_or_default(), end.unwrap_or_default());
477 let end_is_empty = end.is_empty();
478
479 let mut prefix_start = Vec::with_capacity(prefix.len() + start.len());
480 let mut prefix_end = Vec::with_capacity(prefix.len() + end.len());
481
482 prefix_start.extend(prefix);
483 prefix_start.extend(start);
484 prefix_end.extend(prefix);
485 prefix_end.extend(end);
486
487 tracing::debug!(
488 ?prefix_start,
489 ?prefix_end,
490 ?prefix,
491 "nonverifiable_range_raw"
492 );
493
494 options.set_iterate_lower_bound(prefix_start);
495
496 if !end_is_empty {
507 options.set_iterate_upper_bound(prefix_end);
508 }
509
510 let mode = rocksdb::IteratorMode::Start;
511 let prefix = prefix.to_vec();
512
513 let (tx, rx) = mpsc::channel::<Result<(Vec<u8>, Vec<u8>)>>(10);
514 tokio::task::spawn_blocking(move || {
515 span.in_scope(|| {
516 let cf_nonverifiable = substore.config.cf_nonverifiable(&substore.db);
517 let iter =
518 substore
519 .rocksdb_snapshot
520 .iterator_cf_opt(cf_nonverifiable, options, mode);
521
522 for i in iter {
523 let (key, value) = i?;
524
525 if !prefix.is_empty() && !key.starts_with(&prefix) {
530 break;
531 }
532 tx.blocking_send(Ok((key.into(), value.into())))?;
533 }
534 Ok::<(), anyhow::Error>(())
535 })
536 });
537
538 Ok(tokio_stream::wrappers::ReceiverStream::new(rx))
539 }
540
541 fn object_get<T: Any + Send + Sync + Clone>(&self, _key: &str) -> Option<T> {
542 None
544 }
545
546 fn object_type(&self, _key: &str) -> Option<std::any::TypeId> {
547 None
549 }
550}
551
552impl std::fmt::Debug for Snapshot {
553 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
554 f.debug_struct("Snapshot")
555 .field("version", &self.0.version)
556 .finish_non_exhaustive()
557 }
558}