1use std::{any::Any, sync::Arc};
2
3use futures::StreamExt;
4use parking_lot::RwLock;
5use tendermint::abci;
6
7use crate::{
8 future::{
9 CacheFuture, StateDeltaNonconsensusPrefixRawStream, StateDeltaNonconsensusRangeRawStream,
10 StateDeltaPrefixKeysStream, StateDeltaPrefixRawStream,
11 },
12 utils, Cache, EscapedByteSlice, StateRead, StateWrite,
13};
14
15#[derive(Debug)]
44pub struct StateDelta<S: StateRead> {
45 state: Arc<RwLock<Option<S>>>,
51 layers: Vec<Arc<RwLock<Option<Cache>>>>,
57 leaf_cache: Arc<RwLock<Option<Cache>>>,
63}
64
65impl<S: StateRead> StateDelta<S> {
66 pub fn new(state: S) -> Self {
68 Self {
69 state: Arc::new(RwLock::new(Some(state))),
70 layers: Vec::default(),
71 leaf_cache: Arc::new(RwLock::new(Some(Cache::default()))),
72 }
73 }
74
75 pub fn fork(&mut self) -> Self {
77 if self
84 .leaf_cache
85 .read()
86 .as_ref()
87 .expect("unable to get ref to leaf cache, storage not initialized?")
88 .is_dirty()
89 {
90 let new_layer = std::mem::replace(
91 &mut self.leaf_cache,
92 Arc::new(RwLock::new(Some(Cache::default()))),
93 );
94 self.layers.push(new_layer);
95 }
96
97 Self {
98 state: self.state.clone(),
99 layers: self.layers.clone(),
100 leaf_cache: Arc::new(RwLock::new(Some(Cache::default()))),
101 }
102 }
103
104 pub fn flatten(self) -> (S, Cache) {
111 tracing::trace!("flattening branch");
112 let state = self
115 .state
116 .write()
117 .take()
118 .expect("apply must be called only once");
119
120 let mut changes = Cache::default();
123 for layer in self.layers {
124 let cache = layer
125 .write()
126 .take()
127 .expect("cache must not have already been applied");
128 changes.merge(cache);
129 }
130 changes.merge(
132 self.leaf_cache
133 .write()
134 .take()
135 .expect("unable to take leaf cache, was it already applied?"),
136 );
137
138 (state, changes)
139 }
140}
141
142impl<S: StateRead + StateWrite> StateDelta<S> {
143 pub fn apply(self) -> (S, Vec<abci::Event>) {
147 let (mut state, mut changes) = self.flatten();
148 let events = changes.take_events();
149
150 changes.apply_to(&mut state);
152
153 (state, events)
155 }
156}
157
158impl<S: StateRead + StateWrite> StateDelta<Arc<S>> {
159 pub fn try_apply(self) -> anyhow::Result<(S, Vec<abci::Event>)> {
160 let (arc_state, mut changes) = self.flatten();
161 let events = std::mem::take(&mut changes.events);
162
163 if let Ok(mut state) = Arc::try_unwrap(arc_state) {
164 changes.apply_to(&mut state);
166
167 Ok((state, events))
169 } else {
170 Err(anyhow::anyhow!("did not have unique ownership of Arc<S>"))
171 }
172 }
173}
174
175impl<S: StateRead> StateRead for StateDelta<S> {
176 type GetRawFut = CacheFuture<S::GetRawFut>;
177 type PrefixRawStream = StateDeltaPrefixRawStream<S::PrefixRawStream>;
178 type PrefixKeysStream = StateDeltaPrefixKeysStream<S::PrefixKeysStream>;
179 type NonconsensusPrefixRawStream =
180 StateDeltaNonconsensusPrefixRawStream<S::NonconsensusPrefixRawStream>;
181 type NonconsensusRangeRawStream =
182 StateDeltaNonconsensusRangeRawStream<S::NonconsensusRangeRawStream>;
183
184 fn get_raw(&self, key: &str) -> Self::GetRawFut {
185 if let Some(entry) = self
187 .leaf_cache
188 .read()
189 .as_ref()
190 .expect("delta must not have been applied")
191 .unwritten_changes
192 .get(key)
193 {
194 return CacheFuture::hit(entry.clone());
195 }
196
197 for layer in self.layers.iter().rev() {
199 if let Some(entry) = layer
200 .read()
201 .as_ref()
202 .expect("delta must not have been applied")
203 .unwritten_changes
204 .get(key)
205 {
206 return CacheFuture::hit(entry.clone());
207 }
208 }
209
210 CacheFuture::miss(
212 self.state
213 .read()
214 .as_ref()
215 .expect("delta must not have been applied")
216 .get_raw(key),
217 )
218 }
219
220 fn nonverifiable_get_raw(&self, key: &[u8]) -> Self::GetRawFut {
221 if let Some(entry) = self
223 .leaf_cache
224 .read()
225 .as_ref()
226 .expect("delta must not have been applied")
227 .nonverifiable_changes
228 .get(key)
229 {
230 return CacheFuture::hit(entry.clone());
231 }
232
233 for layer in self.layers.iter().rev() {
235 if let Some(entry) = layer
236 .read()
237 .as_ref()
238 .expect("delta must not have been applied")
239 .nonverifiable_changes
240 .get(key)
241 {
242 return CacheFuture::hit(entry.clone());
243 }
244 }
245
246 CacheFuture::miss(
248 self.state
249 .read()
250 .as_ref()
251 .expect("delta must not have been applied")
252 .nonverifiable_get_raw(key),
253 )
254 }
255
256 fn object_type(&self, key: &'static str) -> Option<std::any::TypeId> {
257 if let Some(entry) = self
259 .leaf_cache
260 .read()
261 .as_ref()
262 .expect("delta must not have been applied")
263 .ephemeral_objects
264 .get(key)
265 {
266 return entry.as_ref().map(|v| std::any::Any::type_id(&**v));
270 }
271
272 for layer in self.layers.iter().rev() {
274 if let Some(entry) = layer
275 .read()
276 .as_ref()
277 .expect("delta must not have been applied")
278 .ephemeral_objects
279 .get(key)
280 {
281 return entry.as_ref().map(|v| std::any::Any::type_id(&**v));
285 }
286 }
287
288 self.state
290 .read()
291 .as_ref()
292 .expect("delta must not have been applied")
293 .object_type(key)
294 }
295
296 fn object_get<T: std::any::Any + Send + Sync + Clone>(&self, key: &'static str) -> Option<T> {
297 if let Some(entry) = self
299 .leaf_cache
300 .read()
301 .as_ref()
302 .expect("delta must not have been applied")
303 .ephemeral_objects
304 .get(key)
305 {
306 return entry
307 .as_ref()
308 .map(|v| {
309 v.downcast_ref().unwrap_or_else(|| panic!("unexpected type for key \"{key}\" in `StateDelta::object_get`: expected type {}", std::any::type_name::<T>()))
310 })
311 .cloned();
312 }
313
314 for layer in self.layers.iter().rev() {
316 if let Some(entry) = layer
317 .read()
318 .as_ref()
319 .expect("delta must not have been applied")
320 .ephemeral_objects
321 .get(key)
322 {
323 return entry
324 .as_ref()
325 .map(|v| {
326 v.downcast_ref().unwrap_or_else(|| panic!("unexpected type for key \"{key}\" in `StateDelta::object_get`: expected type {}", std::any::type_name::<T>()))
327 }).cloned();
328 }
329 }
330
331 self.state
333 .read()
334 .as_ref()
335 .expect("delta must not have been applied")
336 .object_get(key)
337 }
338
339 fn prefix_raw(&self, prefix: &str) -> Self::PrefixRawStream {
340 let underlying = self
341 .state
342 .read()
343 .as_ref()
344 .expect("delta must not have been applied")
345 .prefix_raw(prefix)
346 .peekable();
347 StateDeltaPrefixRawStream {
348 underlying,
349 layers: self.layers.clone(),
350 leaf_cache: self.leaf_cache.clone(),
351 last_key: None,
352 prefix: prefix.to_owned(),
353 }
354 }
355
356 fn prefix_keys(&self, prefix: &str) -> Self::PrefixKeysStream {
357 let underlying = self
358 .state
359 .read()
360 .as_ref()
361 .expect("delta must not have been applied")
362 .prefix_keys(prefix)
363 .peekable();
364 StateDeltaPrefixKeysStream {
365 underlying,
366 layers: self.layers.clone(),
367 leaf_cache: self.leaf_cache.clone(),
368 last_key: None,
369 prefix: prefix.to_owned(),
370 }
371 }
372
373 fn nonverifiable_prefix_raw(&self, prefix: &[u8]) -> Self::NonconsensusPrefixRawStream {
374 let underlying = self
375 .state
376 .read()
377 .as_ref()
378 .expect("delta must not have been applied")
379 .nonverifiable_prefix_raw(prefix)
380 .peekable();
381 StateDeltaNonconsensusPrefixRawStream {
382 underlying,
383 layers: self.layers.clone(),
384 leaf_cache: self.leaf_cache.clone(),
385 last_key: None,
386 prefix: prefix.to_vec(),
387 }
388 }
389
390 fn nonverifiable_range_raw(
391 &self,
392 prefix: Option<&[u8]>,
393 range: impl std::ops::RangeBounds<Vec<u8>>,
394 ) -> anyhow::Result<Self::NonconsensusRangeRawStream> {
395 let (range, (start, end)) = utils::convert_bounds(range)?;
396 let underlying = self
397 .state
398 .read()
399 .as_ref()
400 .expect("delta must not have been applied")
401 .nonverifiable_range_raw(prefix, range)?
402 .peekable();
403 Ok(StateDeltaNonconsensusRangeRawStream {
404 underlying,
405 layers: self.layers.clone(),
406 leaf_cache: self.leaf_cache.clone(),
407 last_key: None,
408 prefix: prefix.map(|p| p.to_vec()),
409 range: (start, end),
410 })
411 }
412}
413
414impl<S: StateRead> StateWrite for StateDelta<S> {
415 fn put_raw(&mut self, key: String, value: jmt::OwnedValue) {
416 self.leaf_cache
417 .write()
418 .as_mut()
419 .expect("delta must not have been applied")
420 .unwritten_changes
421 .insert(key, Some(value));
422 }
423
424 fn delete(&mut self, key: String) {
425 self.leaf_cache
426 .write()
427 .as_mut()
428 .expect("delta must not have been applied")
429 .unwritten_changes
430 .insert(key, None);
431 }
432
433 fn nonverifiable_delete(&mut self, key: Vec<u8>) {
434 tracing::trace!(key = ?EscapedByteSlice(&key), "deleting key");
435 self.leaf_cache
436 .write()
437 .as_mut()
438 .expect("delta must not have been applied")
439 .nonverifiable_changes
440 .insert(key, None);
441 }
442
443 fn nonverifiable_put_raw(&mut self, key: Vec<u8>, value: Vec<u8>) {
444 tracing::trace!(key = ?EscapedByteSlice(&key), value = ?EscapedByteSlice(&value), "insert nonverifiable change");
445 self.leaf_cache
446 .write()
447 .as_mut()
448 .expect("delta must not have been applied")
449 .nonverifiable_changes
450 .insert(key, Some(value));
451 }
452
453 fn object_put<T: Clone + Any + Send + Sync>(&mut self, key: &'static str, value: T) {
454 if let Some(previous_type) = self.object_type(key) {
455 if std::any::TypeId::of::<T>() != previous_type {
456 panic!(
457 "unexpected type for key \"{key}\" in `StateDelta::object_put`: expected type {expected}",
458 expected = std::any::type_name::<T>(),
459 );
460 }
461 }
462 self.leaf_cache
463 .write()
464 .as_mut()
465 .expect("delta must not have been applied")
466 .ephemeral_objects
467 .insert(key, Some(Box::new(value)));
468 }
469
470 fn object_delete(&mut self, key: &'static str) {
471 self.leaf_cache
472 .write()
473 .as_mut()
474 .expect("delta must not have been applied")
475 .ephemeral_objects
476 .insert(key, None);
477 }
478
479 fn object_merge(
480 &mut self,
481 objects: std::collections::BTreeMap<&'static str, Option<Box<dyn Any + Send + Sync>>>,
482 ) {
483 self.leaf_cache
484 .write()
485 .as_mut()
486 .expect("delta must not have been applied")
487 .ephemeral_objects
488 .extend(objects);
489 }
490
491 fn record(&mut self, event: abci::Event) {
492 self.leaf_cache
493 .write()
494 .as_mut()
495 .expect("delta must not have been applied")
496 .events
497 .push(event)
498 }
499}
500
501pub trait ArcStateDeltaExt: Sized {
503 type S: StateRead;
504 fn try_begin_transaction(&'_ mut self) -> Option<StateDelta<&'_ mut StateDelta<Self::S>>>;
506}
507
508impl<S: StateRead> ArcStateDeltaExt for Arc<StateDelta<S>> {
509 type S = S;
510 fn try_begin_transaction(&'_ mut self) -> Option<StateDelta<&'_ mut StateDelta<S>>> {
511 Arc::get_mut(self).map(StateDelta::new)
512 }
513}