cnidarium/
delta.rs

1use std::{any::Any, sync::Arc};
2
3use futures::StreamExt;
4use parking_lot::RwLock;
5use tendermint::abci;
6
7use crate::{
8    future::{
9        CacheFuture, StateDeltaNonconsensusPrefixRawStream, StateDeltaNonconsensusRangeRawStream,
10        StateDeltaPrefixKeysStream, StateDeltaPrefixRawStream,
11    },
12    utils, Cache, EscapedByteSlice, StateRead, StateWrite,
13};
14
15/// An arbitrarily-deeply nested stack of delta updates to an underlying state.
16///
17/// This API allows exploring a tree of possible execution paths concurrently,
18/// before finally selecting one and applying it to the underlying state.
19///
20/// Using this API requires understanding its invariants.
21///
22/// On creation, `StateDelta::new` takes ownership of a `StateRead + StateWrite`
23/// instance, acquiring a "write lock" over the underlying state (since `&mut S`
24/// is `StateWrite` if `S: StateWrite`, it's possible to pass a unique
25/// reference).
26///
27/// The resulting `StateDelta` instance is a "leaf" state, and can be used for
28/// reads and writes, following the some execution path.
29///
30/// When two potential execution paths diverge, `delta.fork()` can be used to
31/// fork the state update.  The new forked `StateDelta` will include all
32/// previous state writes made to the original (and its ancestors).  Any writes
33/// made to the original `StateDelta` after `fork()` is called will not be seen
34/// by the forked state.
35///
36/// Finally, after some execution path has been selected, calling
37/// `delta.apply()` on one of the possible state updates will commit the changes
38/// to the underlying state instance, and invalidate all other delta updates in
39/// the same family.  It is a programming error to use the other delta updates
40/// after `apply()` has been called, but ideally this should not be a problem in
41/// practice: the API is intended to explore a tree of possible execution paths;
42/// once one has been selected, the others should be discarded.
43#[derive(Debug)]
44pub struct StateDelta<S: StateRead> {
45    /// The underlying state instance.
46    ///
47    /// The Arc<_> allows it to be shared between different stacks of delta updates,
48    /// and the RwLock<Option<_>> allows it to be taken out when it's time to commit
49    /// the changes from one of the stacks.
50    state: Arc<RwLock<Option<S>>>,
51    /// A stack of intermediate delta updates, with the "top" layers first.
52    ///
53    /// We store all the layers directly, rather than using a recursive structure,
54    /// so that the type doesn't depend on how many layers are involved. We're only
55    /// duplicating the Arc<_>, so this should be cheap.
56    layers: Vec<Arc<RwLock<Option<Cache>>>>,
57    /// The final delta update in the stack, the one we're currently working on.
58    /// Storing this separately allows us to avoid lock contention during writes.
59    /// In fact, this data shouldn't usually be shared at all; the only reason it's
60    /// wrapped this way is so that prefix streams can have 'static lifetimes.
61    /// We option-wrap it so it can be chained with the layers; it will never be None.
62    leaf_cache: Arc<RwLock<Option<Cache>>>,
63}
64
65impl<S: StateRead> StateDelta<S> {
66    /// Create a new tree of possible updates to an underlying `state`.
67    pub fn new(state: S) -> Self {
68        Self {
69            state: Arc::new(RwLock::new(Some(state))),
70            layers: Vec::default(),
71            leaf_cache: Arc::new(RwLock::new(Some(Cache::default()))),
72        }
73    }
74
75    /// Fork execution, returning a new child state that includes all previous changes.
76    pub fn fork(&mut self) -> Self {
77        // If we have writes in the leaf cache, we'll move them to a new layer,
78        // ensuring that the new child only sees writes made to this state
79        // *before* fork was called, and not after.
80        //
81        // Doing this only when the leaf cache is dirty means that we don't
82        // add empty layers in repeated fork() calls without intervening writes.
83        if self
84            .leaf_cache
85            .read()
86            .as_ref()
87            .expect("unable to get ref to leaf cache, storage not initialized?")
88            .is_dirty()
89        {
90            let new_layer = std::mem::replace(
91                &mut self.leaf_cache,
92                Arc::new(RwLock::new(Some(Cache::default()))),
93            );
94            self.layers.push(new_layer);
95        }
96
97        Self {
98            state: self.state.clone(),
99            layers: self.layers.clone(),
100            leaf_cache: Arc::new(RwLock::new(Some(Cache::default()))),
101        }
102    }
103
104    /// Flatten all changes in this branch of the tree into a single [`Cache`],
105    /// invalidating all other branches of the tree and releasing the underlying
106    /// state back to the caller.
107    ///
108    /// The [`apply`](Self::apply) method is a convenience wrapper around this
109    /// that applies the changes to the underlying state.
110    pub fn flatten(self) -> (S, Cache) {
111        tracing::trace!("flattening branch");
112        // Take ownership of the underlying state, immediately invalidating all
113        // other delta stacks in the same family.
114        let state = self
115            .state
116            .write()
117            .take()
118            .expect("apply must be called only once");
119
120        // Flatten the intermediate layers into a single cache, applying them from oldest
121        // (bottom) to newest (top), so that newer writes clobber old ones.
122        let mut changes = Cache::default();
123        for layer in self.layers {
124            let cache = layer
125                .write()
126                .take()
127                .expect("cache must not have already been applied");
128            changes.merge(cache);
129        }
130        // Last, apply the changes in the leaf cache.
131        changes.merge(
132            self.leaf_cache
133                .write()
134                .take()
135                .expect("unable to take leaf cache, was it already applied?"),
136        );
137
138        (state, changes)
139    }
140}
141
142impl<S: StateRead + StateWrite> StateDelta<S> {
143    /// Apply all changes in this branch of the tree to the underlying state,
144    /// releasing it back to the caller and invalidating all other branches of
145    /// the tree.
146    pub fn apply(self) -> (S, Vec<abci::Event>) {
147        let (mut state, mut changes) = self.flatten();
148        let events = changes.take_events();
149
150        // Apply the flattened changes to the underlying state.
151        changes.apply_to(&mut state);
152
153        // Finally, return ownership of the state back to the caller.
154        (state, events)
155    }
156}
157
158impl<S: StateRead + StateWrite> StateDelta<Arc<S>> {
159    pub fn try_apply(self) -> anyhow::Result<(S, Vec<abci::Event>)> {
160        let (arc_state, mut changes) = self.flatten();
161        let events = std::mem::take(&mut changes.events);
162
163        if let Ok(mut state) = Arc::try_unwrap(arc_state) {
164            // Apply the flattened changes to the underlying state.
165            changes.apply_to(&mut state);
166
167            // Finally, return ownership of the state back to the caller.
168            Ok((state, events))
169        } else {
170            Err(anyhow::anyhow!("did not have unique ownership of Arc<S>"))
171        }
172    }
173}
174
175impl<S: StateRead> StateRead for StateDelta<S> {
176    type GetRawFut = CacheFuture<S::GetRawFut>;
177    type PrefixRawStream = StateDeltaPrefixRawStream<S::PrefixRawStream>;
178    type PrefixKeysStream = StateDeltaPrefixKeysStream<S::PrefixKeysStream>;
179    type NonconsensusPrefixRawStream =
180        StateDeltaNonconsensusPrefixRawStream<S::NonconsensusPrefixRawStream>;
181    type NonconsensusRangeRawStream =
182        StateDeltaNonconsensusRangeRawStream<S::NonconsensusRangeRawStream>;
183
184    fn get_raw(&self, key: &str) -> Self::GetRawFut {
185        // Check if we have a cache hit in the leaf cache.
186        if let Some(entry) = self
187            .leaf_cache
188            .read()
189            .as_ref()
190            .expect("delta must not have been applied")
191            .unwritten_changes
192            .get(key)
193        {
194            return CacheFuture::hit(entry.clone());
195        }
196
197        // Iterate through the stack, top to bottom, to see if we have a cache hit.
198        for layer in self.layers.iter().rev() {
199            if let Some(entry) = layer
200                .read()
201                .as_ref()
202                .expect("delta must not have been applied")
203                .unwritten_changes
204                .get(key)
205            {
206                return CacheFuture::hit(entry.clone());
207            }
208        }
209
210        // If we got here, the key must be in the underlying state or not present at all.
211        CacheFuture::miss(
212            self.state
213                .read()
214                .as_ref()
215                .expect("delta must not have been applied")
216                .get_raw(key),
217        )
218    }
219
220    fn nonverifiable_get_raw(&self, key: &[u8]) -> Self::GetRawFut {
221        // Check if we have a cache hit in the leaf cache.
222        if let Some(entry) = self
223            .leaf_cache
224            .read()
225            .as_ref()
226            .expect("delta must not have been applied")
227            .nonverifiable_changes
228            .get(key)
229        {
230            return CacheFuture::hit(entry.clone());
231        }
232
233        // Iterate through the stack, top to bottom, to see if we have a cache hit.
234        for layer in self.layers.iter().rev() {
235            if let Some(entry) = layer
236                .read()
237                .as_ref()
238                .expect("delta must not have been applied")
239                .nonverifiable_changes
240                .get(key)
241            {
242                return CacheFuture::hit(entry.clone());
243            }
244        }
245
246        // If we got here, the key must be in the underlying state or not present at all.
247        CacheFuture::miss(
248            self.state
249                .read()
250                .as_ref()
251                .expect("delta must not have been applied")
252                .nonverifiable_get_raw(key),
253        )
254    }
255
256    fn object_type(&self, key: &'static str) -> Option<std::any::TypeId> {
257        // Check if we have a cache hit in the leaf cache.
258        if let Some(entry) = self
259            .leaf_cache
260            .read()
261            .as_ref()
262            .expect("delta must not have been applied")
263            .ephemeral_objects
264            .get(key)
265        {
266            // We have to explicitly call `Any::type_id(&**v)` here because this ensures that we are
267            // asking for the type of the `Any` *inside* the `Box<dyn Any>`, rather than the type of
268            // `Box<dyn Any>` itself.
269            return entry.as_ref().map(|v| std::any::Any::type_id(&**v));
270        }
271
272        // Iterate through the stack, top to bottom, to see if we have a cache hit.
273        for layer in self.layers.iter().rev() {
274            if let Some(entry) = layer
275                .read()
276                .as_ref()
277                .expect("delta must not have been applied")
278                .ephemeral_objects
279                .get(key)
280            {
281                // We have to explicitly call `Any::type_id(&**v)` here because this ensures that we are
282                // asking for the type of the `Any` *inside* the `Box<dyn Any>`, rather than the type of
283                // `Box<dyn Any>` itself.
284                return entry.as_ref().map(|v| std::any::Any::type_id(&**v));
285            }
286        }
287
288        // Fall through to the underlying store.
289        self.state
290            .read()
291            .as_ref()
292            .expect("delta must not have been applied")
293            .object_type(key)
294    }
295
296    fn object_get<T: std::any::Any + Send + Sync + Clone>(&self, key: &'static str) -> Option<T> {
297        // Check if we have a cache hit in the leaf cache.
298        if let Some(entry) = self
299            .leaf_cache
300            .read()
301            .as_ref()
302            .expect("delta must not have been applied")
303            .ephemeral_objects
304            .get(key)
305        {
306            return entry
307                .as_ref()
308                .map(|v| {
309                    v.downcast_ref().unwrap_or_else(|| panic!("unexpected type for key \"{key}\" in `StateDelta::object_get`: expected type {}", std::any::type_name::<T>()))
310                })
311                .cloned();
312        }
313
314        // Iterate through the stack, top to bottom, to see if we have a cache hit.
315        for layer in self.layers.iter().rev() {
316            if let Some(entry) = layer
317                .read()
318                .as_ref()
319                .expect("delta must not have been applied")
320                .ephemeral_objects
321                .get(key)
322            {
323                return entry
324                    .as_ref()
325                    .map(|v| {
326                    v.downcast_ref().unwrap_or_else(|| panic!("unexpected type for key \"{key}\" in `StateDelta::object_get`: expected type {}", std::any::type_name::<T>()))
327                }).cloned();
328            }
329        }
330
331        // Fall through to the underlying store.
332        self.state
333            .read()
334            .as_ref()
335            .expect("delta must not have been applied")
336            .object_get(key)
337    }
338
339    fn prefix_raw(&self, prefix: &str) -> Self::PrefixRawStream {
340        let underlying = self
341            .state
342            .read()
343            .as_ref()
344            .expect("delta must not have been applied")
345            .prefix_raw(prefix)
346            .peekable();
347        StateDeltaPrefixRawStream {
348            underlying,
349            layers: self.layers.clone(),
350            leaf_cache: self.leaf_cache.clone(),
351            last_key: None,
352            prefix: prefix.to_owned(),
353        }
354    }
355
356    fn prefix_keys(&self, prefix: &str) -> Self::PrefixKeysStream {
357        let underlying = self
358            .state
359            .read()
360            .as_ref()
361            .expect("delta must not have been applied")
362            .prefix_keys(prefix)
363            .peekable();
364        StateDeltaPrefixKeysStream {
365            underlying,
366            layers: self.layers.clone(),
367            leaf_cache: self.leaf_cache.clone(),
368            last_key: None,
369            prefix: prefix.to_owned(),
370        }
371    }
372
373    fn nonverifiable_prefix_raw(&self, prefix: &[u8]) -> Self::NonconsensusPrefixRawStream {
374        let underlying = self
375            .state
376            .read()
377            .as_ref()
378            .expect("delta must not have been applied")
379            .nonverifiable_prefix_raw(prefix)
380            .peekable();
381        StateDeltaNonconsensusPrefixRawStream {
382            underlying,
383            layers: self.layers.clone(),
384            leaf_cache: self.leaf_cache.clone(),
385            last_key: None,
386            prefix: prefix.to_vec(),
387        }
388    }
389
390    fn nonverifiable_range_raw(
391        &self,
392        prefix: Option<&[u8]>,
393        range: impl std::ops::RangeBounds<Vec<u8>>,
394    ) -> anyhow::Result<Self::NonconsensusRangeRawStream> {
395        let (range, (start, end)) = utils::convert_bounds(range)?;
396        let underlying = self
397            .state
398            .read()
399            .as_ref()
400            .expect("delta must not have been applied")
401            .nonverifiable_range_raw(prefix, range)?
402            .peekable();
403        Ok(StateDeltaNonconsensusRangeRawStream {
404            underlying,
405            layers: self.layers.clone(),
406            leaf_cache: self.leaf_cache.clone(),
407            last_key: None,
408            prefix: prefix.map(|p| p.to_vec()),
409            range: (start, end),
410        })
411    }
412}
413
414impl<S: StateRead> StateWrite for StateDelta<S> {
415    fn put_raw(&mut self, key: String, value: jmt::OwnedValue) {
416        self.leaf_cache
417            .write()
418            .as_mut()
419            .expect("delta must not have been applied")
420            .unwritten_changes
421            .insert(key, Some(value));
422    }
423
424    fn delete(&mut self, key: String) {
425        self.leaf_cache
426            .write()
427            .as_mut()
428            .expect("delta must not have been applied")
429            .unwritten_changes
430            .insert(key, None);
431    }
432
433    fn nonverifiable_delete(&mut self, key: Vec<u8>) {
434        tracing::trace!(key = ?EscapedByteSlice(&key), "deleting key");
435        self.leaf_cache
436            .write()
437            .as_mut()
438            .expect("delta must not have been applied")
439            .nonverifiable_changes
440            .insert(key, None);
441    }
442
443    fn nonverifiable_put_raw(&mut self, key: Vec<u8>, value: Vec<u8>) {
444        tracing::trace!(key = ?EscapedByteSlice(&key), value = ?EscapedByteSlice(&value), "insert nonverifiable change");
445        self.leaf_cache
446            .write()
447            .as_mut()
448            .expect("delta must not have been applied")
449            .nonverifiable_changes
450            .insert(key, Some(value));
451    }
452
453    fn object_put<T: Clone + Any + Send + Sync>(&mut self, key: &'static str, value: T) {
454        if let Some(previous_type) = self.object_type(key) {
455            if std::any::TypeId::of::<T>() != previous_type {
456                panic!(
457                    "unexpected type for key \"{key}\" in `StateDelta::object_put`: expected type {expected}",
458                    expected = std::any::type_name::<T>(),
459                );
460            }
461        }
462        self.leaf_cache
463            .write()
464            .as_mut()
465            .expect("delta must not have been applied")
466            .ephemeral_objects
467            .insert(key, Some(Box::new(value)));
468    }
469
470    fn object_delete(&mut self, key: &'static str) {
471        self.leaf_cache
472            .write()
473            .as_mut()
474            .expect("delta must not have been applied")
475            .ephemeral_objects
476            .insert(key, None);
477    }
478
479    fn object_merge(
480        &mut self,
481        objects: std::collections::BTreeMap<&'static str, Option<Box<dyn Any + Send + Sync>>>,
482    ) {
483        self.leaf_cache
484            .write()
485            .as_mut()
486            .expect("delta must not have been applied")
487            .ephemeral_objects
488            .extend(objects);
489    }
490
491    fn record(&mut self, event: abci::Event) {
492        self.leaf_cache
493            .write()
494            .as_mut()
495            .expect("delta must not have been applied")
496            .events
497            .push(event)
498    }
499}
500
501/// Extension trait providing `try_begin_transaction()` on `Arc<StateDelta<S>>`.
502pub trait ArcStateDeltaExt: Sized {
503    type S: StateRead;
504    /// Attempts to begin a transaction on this `Arc<State>`, returning `None` if the `Arc` is shared.
505    fn try_begin_transaction(&'_ mut self) -> Option<StateDelta<&'_ mut StateDelta<Self::S>>>;
506}
507
508impl<S: StateRead> ArcStateDeltaExt for Arc<StateDelta<S>> {
509    type S = S;
510    fn try_begin_transaction(&'_ mut self) -> Option<StateDelta<&'_ mut StateDelta<S>>> {
511        Arc::get_mut(self).map(StateDelta::new)
512    }
513}