cnidarium/
snapshot_cache.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
use crate::Snapshot;
use std::{cmp, collections::VecDeque};

/// A circular cache for storing [`Snapshot`]s.
///
/// # Usage
///
/// [`Snapshot`]s are inserted in the cache using the [`push`] or [`try_push`]
/// methods. If the cache is full, the oldest entry will be evicted to make space
/// for the newer entry.
///
/// # Constraints
///
/// [`Snapshot`]s must be inserted sequentially relative to their [`jmt::Version`]
/// numbers, and have consecutive version numbers.
pub struct SnapshotCache {
    /// A sequence of increasingly recent [`Snapshot`]s.
    cache: VecDeque<Snapshot>,
    /// The max length and capacity of the cache.
    max_size: usize,
}

impl SnapshotCache {
    /// Creates a [`SnapshotCache`] with `max_size` capacity, and inserts an initial `Snapshot` in
    /// it. If the specified capacity is zero, the cache will default to having size 1.
    pub fn new(initial: Snapshot, max_size: usize) -> Self {
        let max_size = cmp::max(max_size, 1);
        let mut cache = VecDeque::with_capacity(max_size);
        cache.push_front(initial);

        Self { cache, max_size }
    }

    /// Attempts to insert a [`Snapshot`] entry into the cache. If the cache is full, the oldest
    /// entry will be evicted to make space.
    ///
    /// [`Snapshot`]s must be inserted sequentially relative to their `jmt::Version`s and have
    /// consecutive version numbers.
    ///
    /// ## Errors
    ///
    /// The method will return an error if the supplied `snapshot` has a version number that is:
    ///
    /// - stale i.e. older than the latest snapshot
    ///
    /// - skipping a version i.e. the difference between their version numbers is greater than 1
    pub fn try_push(&mut self, snapshot: Snapshot) -> anyhow::Result<()> {
        let latest_version = self.latest().version();
        if latest_version.wrapping_add(1) != snapshot.version() {
            anyhow::bail!("snapshot_cache: trying to insert stale snapshots.");
        }

        if self.cache.len() >= self.max_size {
            self.cache.pop_back();
        }

        self.cache.push_front(snapshot);
        Ok(())
    }

    /// Returns the latest inserted `Snapshot`.
    pub fn latest(&self) -> Snapshot {
        self.cache
            .front()
            .map(Clone::clone)
            .expect("snapshot_cache cannot be empty")
    }

    /// Attempts to fetch a [`Snapshot`] with a matching `jmt::Version`, and returns `None` if none
    /// was found.
    pub fn get(&self, version: jmt::Version) -> Option<Snapshot> {
        let latest_version = self.latest().version();
        // We compute the offset assuming that snapshot entries are cached
        // such that the delta between entries is always 1.
        let offset = latest_version.wrapping_sub(version) as usize;
        self.cache
            .get(offset)
            .map(Clone::clone)
            .filter(|s| s.version() == version)
    }

    /// Empties the cache.
    pub fn clear(&mut self) {
        self.cache.clear();
    }
}

#[cfg(test)]
mod test {

    use crate::snapshot::Snapshot;
    use crate::snapshot_cache::SnapshotCache;
    use crate::storage::Storage;
    use crate::store::multistore::MultistoreCache;

    async fn create_storage_instance() -> Storage {
        use tempfile::tempdir;
        // create a storage backend for testing
        let dir = tempdir().expect("unable to create tempdir");
        let file_path = dir.path().join("snapshot-cache-testing.db");

        Storage::load(file_path, vec![])
            .await
            .expect("unable to load storage")
    }

    #[tokio::test]
    /// `SnapshotCache` constructed with zero capacity instead defaults to one.
    async fn fail_zero_capacity() {
        let storage = create_storage_instance().await;
        let db = storage.db();
        let snapshot = storage.latest_snapshot();
        let mut cache = SnapshotCache::new(snapshot, 0);

        // Check that the cache has a capacity at least 1
        assert!(cache.get(u64::MAX).is_some());
        let new_snapshot = Snapshot::new(db, 0, MultistoreCache::default());
        cache
            .try_push(new_snapshot)
            .expect("should not fail to insert a new entry");

        // Check that the cache has a capacity of exactly 1
        assert!(cache.get(u64::MAX).is_none());
        assert!(cache.get(0).is_some());
    }

    #[tokio::test]
    /// Fails to insert snapshot entries that are older than the latest'
    async fn fail_insert_stale_snapshot() {
        let storage = create_storage_instance().await;
        let db_handle = storage.db();
        let snapshot = storage.latest_snapshot();
        let mut cache = SnapshotCache::new(snapshot, 1);
        let stale_snapshot = Snapshot::new(db_handle, 1, MultistoreCache::default());
        cache
            .try_push(stale_snapshot)
            .expect_err("should fail to insert a stale entry in the snapshot cache");
    }

    #[tokio::test]
    /// Fails to insert snapshot entries that have a version gap.
    async fn fail_insert_gapped_snapshot() {
        let storage = create_storage_instance().await;
        let db_handle = storage.db();
        let snapshot = Snapshot::new(db_handle.clone(), 0, MultistoreCache::default());
        let mut cache = SnapshotCache::new(snapshot, 2);
        let snapshot = Snapshot::new(db_handle, 2, MultistoreCache::default());
        cache
            .try_push(snapshot)
            .expect_err("should fail to insert snapshot with skipped version number");
    }

    #[tokio::test]
    /// Checks that we handle pre-genesis `jmt::Version` correctly.
    async fn cache_manage_pre_genesis() {
        let storage = create_storage_instance().await;
        let db_handle = storage.db();
        let snapshot = storage.latest_snapshot();

        // Create a cache of size 10, populated with one entry with version: u64::MAX
        let mut cache = SnapshotCache::new(snapshot, 10);

        // Fill the entire cache by inserting 9 more entries.
        for i in 0..9 {
            let snapshot = Snapshot::new(db_handle.clone(), i, MultistoreCache::default());
            cache
                .try_push(snapshot)
                .expect("should not fail to insert a new entry");
        }

        // The cache is full, check that the oldest entry is still in the cache.
        assert!(cache.get(u64::MAX).is_some());

        // Push another snapshot in the cache, this should cause eviction of the oldest entry
        // alone.
        let new_snapshot = Snapshot::new(db_handle, 9, MultistoreCache::default());
        cache
            .try_push(new_snapshot)
            .expect("should not fail to insert a new entry");

        // Check that the pre-genesis entry has been evicted!
        assert!(cache.get(u64::MAX).is_none());

        // Check that all the other entries are still in the cache.
        for i in 0..10 {
            assert!(cache.get(i).is_some());
        }
    }

    #[tokio::test]
    /// Checks that inserting on a full cache exclusively evicts the oldest snapshots.
    async fn drop_oldest_snapshot() {
        let storage = create_storage_instance().await;
        let db_handle = storage.db();
        let snapshot = Snapshot::new(db_handle.clone(), 0, MultistoreCache::default());

        // Create a cache of size 10, populated with a snapshot at version 0.
        let mut cache = SnapshotCache::new(snapshot, 10);

        // Saturate the cache by inserting 9 more entries.
        for i in 1..10 {
            let snapshot = Snapshot::new(db_handle.clone(), i, MultistoreCache::default());
            cache
                .try_push(snapshot)
                .expect("should be able to insert new entries")
        }

        // Check that the oldest value is still present:
        assert!(cache.get(0).is_some());

        // Insert a new value that should overflow the cache.
        let snapshot = Snapshot::new(db_handle, 10, MultistoreCache::default());
        cache
            .try_push(snapshot)
            .expect("should be able to insert a new entry");

        // Check that the oldest value has been dropped.
        assert!(cache.get(0).is_none());

        // Check that the front of the cache is the latest inserted snapshot.
        assert_eq!(cache.latest().version(), 10);

        // Check that all the other snapshots are still present in the cache.
        for i in 1..11 {
            assert!(cache.get(i).is_some());
        }
    }
}