cnidarium/
future.rs

1//! Concrete futures types used by the storage crate.
2
3use anyhow::Result;
4use futures::{
5    future::{Either, Ready},
6    stream::Peekable,
7    Stream,
8};
9use parking_lot::RwLock;
10use pin_project::pin_project;
11use smallvec::SmallVec;
12use std::{
13    future::Future,
14    ops::Bound,
15    pin::Pin,
16    sync::Arc,
17    task::{ready, Context, Poll},
18};
19
20use crate::Cache;
21
22/// Future representing a read from a state snapshot.
23#[pin_project]
24pub struct SnapshotFuture(#[pin] pub(crate) tokio::task::JoinHandle<Result<Option<Vec<u8>>>>);
25
26impl Future for SnapshotFuture {
27    type Output = Result<Option<Vec<u8>>>;
28    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
29        let this = self.project();
30        match this.0.poll(cx) {
31            Poll::Ready(result) => {
32                Poll::Ready(result.expect("unrecoverable join error from tokio task"))
33            }
34            Poll::Pending => Poll::Pending,
35        }
36    }
37}
38
39/// Future representing a read from an in-memory cache over an underlying state.
40#[pin_project]
41pub struct CacheFuture<F> {
42    #[pin]
43    inner: Either<Ready<Result<Option<Vec<u8>>>>, F>,
44}
45
46impl<F> CacheFuture<F> {
47    pub(crate) fn hit(value: Option<Vec<u8>>) -> Self {
48        Self {
49            inner: Either::Left(futures::future::ready(Ok(value))),
50        }
51    }
52
53    pub(crate) fn miss(underlying: F) -> Self {
54        Self {
55            inner: Either::Right(underlying),
56        }
57    }
58}
59
60impl<F> Future for CacheFuture<F>
61where
62    F: Future<Output = Result<Option<Vec<u8>>>>,
63{
64    type Output = Result<Option<Vec<u8>>>;
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let this = self.project();
67        this.inner.poll(cx)
68    }
69}
70
71#[pin_project]
72pub struct StateDeltaNonconsensusPrefixRawStream<St>
73where
74    St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
75{
76    #[pin]
77    pub(crate) underlying: Peekable<St>,
78    pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
79    pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
80    pub(crate) last_key: Option<Vec<u8>>,
81    pub(crate) prefix: Vec<u8>,
82}
83
84impl<St> Stream for StateDeltaNonconsensusPrefixRawStream<St>
85where
86    St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
87{
88    type Item = Result<(Vec<u8>, Vec<u8>)>;
89
90    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91        // This implementation interleaves items from the underlying stream with
92        // items in cache layers.  To do this, it tracks the last key it
93        // returned, then, for each item in the underlying stream, searches for
94        // cached keys that lie between the last-returned key and the item's key,
95        // checking whether the cached key represents a deletion requiring further
96        // scanning.  This process is illustrated as follows:
97        //
98        //         ◇ skip                 ◇ skip           ▲ yield          ▲ yield           ▲ yield
99        //         │                      │                │                │                 │
100        //         ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
101        //         ▲                      ▲                ▲                ▲                 ▲
102        //      ▲  │                 ▲    │          ▲     │         ▲      │        ▲        │
103        // write│  │                 │    │          │     │         │      │        │        │
104        // layer│  │   █             │    │ █        │     │█        │      █        │      █ │
105        //      │  │ ░               │    ░          │    ░│         │    ░          │    ░   │
106        //      │  ░                 │  ░            │  ░  │         │  ░            │  ░     │
107        //      │    █               │    █          │    █│         │    █          │    █   │
108        //      │  █     █           │  █     █      │  █  │  █      │  █     █      │  █     █
109        //      │     █              │     █         │     █         │     █         │     █
110        //     ─┼(─────]────keys─▶  ─┼──(───]────▶  ─┼────(─]────▶  ─┼─────(]────▶  ─┼──────(──]─▶
111        //      │   ▲  █  █          │      █  █     │      █  █     │      █  █     │      █  █
112        //          │
113        //          │search range of key-value pairs in cache layers that could
114        //          │affect whether to yield the next item in the underlying stream
115
116        // Optimization: ensure we have a peekable item in the underlying stream before continuing.
117        let mut this = self.project();
118        ready!(this.underlying.as_mut().poll_peek(cx));
119
120        // Now that we're ready to interleave the next underlying item with any
121        // cache layers, lock them all for the duration of the method, using a
122        // SmallVec to (hopefully) store all the guards on the stack.
123        let mut layer_guards = SmallVec::<[_; 8]>::new();
124        for layer in this.layers.iter() {
125            layer_guards.push(layer.read());
126        }
127        // Tacking the leaf cache onto the list is important to not miss any values.
128        // It's stored separately so that the contents of the
129        layer_guards.push(this.leaf_cache.read());
130
131        loop {
132            // Obtain a reference to the next key-value pair from the underlying stream.
133            let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
134                // If we get an underlying error, bubble it up immediately.
135                Some(Err(_e)) => return this.underlying.poll_next(cx),
136                // Otherwise, pass through the peeked value.
137                Some(Ok(pair)) => Some(pair),
138                None => None,
139            };
140
141            // To determine whether or not we should return the peeked value, we
142            // need to search the cache layers for keys that are between the last
143            // key we returned (exclusive, so we make forward progress on the
144            // stream) and the peeked key (inclusive, because we need to find out
145            // whether or not there was a covering deletion).
146            let search_range = (
147                this.last_key
148                    .as_ref()
149                    .map(Bound::Excluded)
150                    .unwrap_or(Bound::Included(this.prefix)),
151                peeked
152                    .map(|(k, _)| Bound::Included(k))
153                    .unwrap_or(Bound::Unbounded),
154            );
155
156            // It'd be slightly cleaner to initialize `leftmost_pair` with the
157            // peeked contents, but that would taint `leftmost_pair` with a
158            // `peeked` borrow, and we may need to mutate the underlying stream
159            // later.  Instead, initialize it with `None` to only search the
160            // cache layers, and compare at the end.
161            let mut leftmost_pair = None;
162            for layer in layer_guards.iter() {
163                // Find this layer's leftmost key-value pair in the search range.
164                let found_pair = layer
165                    .as_ref()
166                    .expect("layer must not have been applied")
167                    .nonverifiable_changes
168                    .range::<Vec<u8>, _>(search_range)
169                    .take_while(|(k, _v)| k.starts_with(this.prefix))
170                    .next();
171
172                // Check whether the new pair, if any, is the new leftmost pair.
173                match (leftmost_pair, found_pair) {
174                    // We want to replace the pair even when the key is equal,
175                    // so that we always prefer a newer value over an older value.
176                    (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
177                        leftmost_pair = Some((k, v));
178                    }
179                    (None, Some((k, v))) => {
180                        leftmost_pair = Some((k, v));
181                    }
182                    _ => {}
183                }
184            }
185
186            // Overwrite a Vec, attempting to reuse its existing allocation.
187            let overwrite_in_place = |dst: &mut Option<Vec<u8>>, src: &[u8]| {
188                if let Some(ref mut dst) = dst {
189                    dst.clear();
190                    dst.extend_from_slice(src);
191                } else {
192                    *dst = Some(src.to_vec());
193                }
194            };
195
196            match (leftmost_pair, peeked) {
197                (Some((k, v)), peeked) => {
198                    // Since we searched for cached keys less than or equal to
199                    // the peeked key, we know that the cached pair takes
200                    // priority over the peeked pair.
201                    //
202                    // If the keys are exactly equal, we advance the underlying stream.
203                    if peeked.map(|(kp, _)| kp) == Some(k) {
204                        let _ = this.underlying.as_mut().poll_next(cx);
205                    }
206                    overwrite_in_place(this.last_key, k);
207                    if let Some(v) = v {
208                        // If the value is Some, we have a key-value pair to yield.
209                        return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
210                    } else {
211                        // If the value is None, this pair represents a deletion,
212                        // so continue looping until we find a non-deleted pair.
213                        continue;
214                    }
215                }
216                (None, Some(_)) => {
217                    // There's no cache hit before the peeked pair, so we want
218                    // to extract and return it from the underlying stream.
219                    let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
220                    else {
221                        unreachable!("peeked stream must yield peeked item");
222                    };
223                    overwrite_in_place(this.last_key, &k);
224                    return Poll::Ready(Some(Ok((k, v))));
225                }
226                (None, None) => {
227                    // Terminate the stream, no more items are available.
228                    return Poll::Ready(None);
229                }
230            }
231        }
232    }
233}
234
235// This implementation is almost exactly the same as the one above, but with
236// minor tweaks to work with string keys and to read different fields from the cache.
237// Update them together.
238
239#[pin_project]
240pub struct StateDeltaPrefixRawStream<St>
241where
242    St: Stream<Item = Result<(String, Vec<u8>)>>,
243{
244    #[pin]
245    pub(crate) underlying: Peekable<St>,
246    pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
247    pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
248    pub(crate) last_key: Option<String>,
249    pub(crate) prefix: String,
250}
251
252impl<St> Stream for StateDeltaPrefixRawStream<St>
253where
254    St: Stream<Item = Result<(String, Vec<u8>)>>,
255{
256    type Item = Result<(String, Vec<u8>)>;
257
258    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
259        // This implementation interleaves items from the underlying stream with
260        // items in cache layers.  To do this, it tracks the last key it
261        // returned, then, for each item in the underlying stream, searches for
262        // cached keys that lie between the last-returned key and the item's key,
263        // checking whether the cached key represents a deletion requiring further
264        // scanning.  This process is illustrated as follows:
265        //
266        //         ◇ skip                 ◇ skip           ▲ yield          ▲ yield           ▲ yield
267        //         │                      │                │                │                 │
268        //         ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
269        //         ▲                      ▲                ▲                ▲                 ▲
270        //      ▲  │                 ▲    │          ▲     │         ▲      │        ▲        │
271        // write│  │                 │    │          │     │         │      │        │        │
272        // layer│  │   █             │    │ █        │     │█        │      █        │      █ │
273        //      │  │ ░               │    ░          │    ░│         │    ░          │    ░   │
274        //      │  ░                 │  ░            │  ░  │         │  ░            │  ░     │
275        //      │    █               │    █          │    █│         │    █          │    █   │
276        //      │  █     █           │  █     █      │  █  │  █      │  █     █      │  █     █
277        //      │     █              │     █         │     █         │     █         │     █
278        //     ─┼(─────]────keys─▶  ─┼──(───]────▶  ─┼────(─]────▶  ─┼─────(]────▶  ─┼──────(──]─▶
279        //      │   ▲  █  █          │      █  █     │      █  █     │      █  █     │      █  █
280        //          │
281        //          │search range of key-value pairs in cache layers that could
282        //          │affect whether to yield the next item in the underlying stream
283
284        // Optimization: ensure we have a peekable item in the underlying stream before continuing.
285        let mut this = self.project();
286        ready!(this.underlying.as_mut().poll_peek(cx));
287
288        // Now that we're ready to interleave the next underlying item with any
289        // cache layers, lock them all for the duration of the method, using a
290        // SmallVec to (hopefully) store all the guards on the stack.
291        let mut layer_guards = SmallVec::<[_; 8]>::new();
292        for layer in this.layers.iter() {
293            layer_guards.push(layer.read());
294        }
295        // Tacking the leaf cache onto the list is important to not miss any values.
296        // It's stored separately so that the contents of the
297        layer_guards.push(this.leaf_cache.read());
298
299        loop {
300            // Obtain a reference to the next key-value pair from the underlying stream.
301            let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
302                // If we get an underlying error, bubble it up immediately.
303                Some(Err(_e)) => return this.underlying.poll_next(cx),
304                // Otherwise, pass through the peeked value.
305                Some(Ok(pair)) => Some(pair),
306                None => None,
307            };
308
309            // To determine whether or not we should return the peeked value, we
310            // need to search the cache layers for keys that are between the last
311            // key we returned (exclusive, so we make forward progress on the
312            // stream) and the peeked key (inclusive, because we need to find out
313            // whether or not there was a covering deletion).
314            let search_range = (
315                this.last_key
316                    .as_ref()
317                    .map(Bound::Excluded)
318                    .unwrap_or(Bound::Included(this.prefix)),
319                peeked
320                    .map(|(k, _)| Bound::Included(k))
321                    .unwrap_or(Bound::Unbounded),
322            );
323
324            // It'd be slightly cleaner to initialize `leftmost_pair` with the
325            // peeked contents, but that would taint `leftmost_pair` with a
326            // `peeked` borrow, and we may need to mutate the underlying stream
327            // later.  Instead, initialize it with `None` to only search the
328            // cache layers, and compare at the end.
329            let mut leftmost_pair = None;
330            for layer in layer_guards.iter() {
331                // Find this layer's leftmost key-value pair in the search range.
332                let found_pair = layer
333                    .as_ref()
334                    .expect("layer must not have been applied")
335                    .unwritten_changes
336                    .range::<String, _>(search_range)
337                    .take_while(|(k, _v)| k.starts_with(this.prefix.as_str()))
338                    .next();
339
340                // Check whether the new pair, if any, is the new leftmost pair.
341                match (leftmost_pair, found_pair) {
342                    // We want to replace the pair even when the key is equal,
343                    // so that we always prefer a newer value over an older value.
344                    (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
345                        leftmost_pair = Some((k, v));
346                    }
347                    (None, Some((k, v))) => {
348                        leftmost_pair = Some((k, v));
349                    }
350                    _ => {}
351                }
352            }
353
354            // Overwrite a Vec, attempting to reuse its existing allocation.
355            let overwrite_in_place = |dst: &mut Option<String>, src: &str| {
356                if let Some(ref mut dst) = dst {
357                    dst.clear();
358                    dst.push_str(src);
359                } else {
360                    *dst = Some(src.to_owned());
361                }
362            };
363
364            match (leftmost_pair, peeked) {
365                (Some((k, v)), peeked) => {
366                    // Since we searched for cached keys less than or equal to
367                    // the peeked key, we know that the cached pair takes
368                    // priority over the peeked pair.
369                    //
370                    // If the keys are exactly equal, we advance the underlying stream.
371                    if peeked.map(|(kp, _)| kp) == Some(k) {
372                        let _ = this.underlying.as_mut().poll_next(cx);
373                    }
374                    overwrite_in_place(this.last_key, k);
375                    if let Some(v) = v {
376                        // If the value is Some, we have a key-value pair to yield.
377                        return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
378                    } else {
379                        // If the value is None, this pair represents a deletion,
380                        // so continue looping until we find a non-deleted pair.
381                        continue;
382                    }
383                }
384                (None, Some(_)) => {
385                    // There's no cache hit before the peeked pair, so we want
386                    // to extract and return it from the underlying stream.
387                    let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
388                    else {
389                        unreachable!("peeked stream must yield peeked item");
390                    };
391                    overwrite_in_place(this.last_key, &k);
392                    return Poll::Ready(Some(Ok((k, v))));
393                }
394                (None, None) => {
395                    // Terminate the stream, no more items are available.
396                    return Poll::Ready(None);
397                }
398            }
399        }
400    }
401}
402
403// This implementation is almost exactly the same as the one above, but with
404// minor tweaks to work with string keys and to read different fields from the cache.
405// Update them together.
406
407#[pin_project]
408pub struct StateDeltaPrefixKeysStream<St>
409where
410    St: Stream<Item = Result<String>>,
411{
412    #[pin]
413    pub(crate) underlying: Peekable<St>,
414    pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
415    pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
416    pub(crate) last_key: Option<String>,
417    pub(crate) prefix: String,
418}
419
420impl<St> Stream for StateDeltaPrefixKeysStream<St>
421where
422    St: Stream<Item = Result<String>>,
423{
424    type Item = Result<String>;
425
426    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427        // This implementation interleaves items from the underlying stream with
428        // items in cache layers.  To do this, it tracks the last key it
429        // returned, then, for each item in the underlying stream, searches for
430        // cached keys that lie between the last-returned key and the item's key,
431        // checking whether the cached key represents a deletion requiring further
432        // scanning.  This process is illustrated as follows:
433        //
434        //         ◇ skip                 ◇ skip           ▲ yield          ▲ yield           ▲ yield
435        //         │                      │                │                │                 │
436        //         ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
437        //         ▲                      ▲                ▲                ▲                 ▲
438        //      ▲  │                 ▲    │          ▲     │         ▲      │        ▲        │
439        // write│  │                 │    │          │     │         │      │        │        │
440        // layer│  │   █             │    │ █        │     │█        │      █        │      █ │
441        //      │  │ ░               │    ░          │    ░│         │    ░          │    ░   │
442        //      │  ░                 │  ░            │  ░  │         │  ░            │  ░     │
443        //      │    █               │    █          │    █│         │    █          │    █   │
444        //      │  █     █           │  █     █      │  █  │  █      │  █     █      │  █     █
445        //      │     █              │     █         │     █         │     █         │     █
446        //     ─┼(─────]────keys─▶  ─┼──(───]────▶  ─┼────(─]────▶  ─┼─────(]────▶  ─┼──────(──]─▶
447        //      │   ▲  █  █          │      █  █     │      █  █     │      █  █     │      █  █
448        //          │
449        //          │search range of key-value pairs in cache layers that could
450        //          │affect whether to yield the next item in the underlying stream
451
452        // Optimization: ensure we have a peekable item in the underlying stream before continuing.
453        let mut this = self.project();
454        ready!(this.underlying.as_mut().poll_peek(cx));
455
456        // Now that we're ready to interleave the next underlying item with any
457        // cache layers, lock them all for the duration of the method, using a
458        // SmallVec to (hopefully) store all the guards on the stack.
459        let mut layer_guards = SmallVec::<[_; 8]>::new();
460        for layer in this.layers.iter() {
461            layer_guards.push(layer.read());
462        }
463        // Tacking the leaf cache onto the list is important to not miss any values.
464        // It's stored separately so that the contents of the
465        layer_guards.push(this.leaf_cache.read());
466
467        loop {
468            // Obtain a reference to the next key-value pair from the underlying stream.
469            let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
470                // If we get an underlying error, bubble it up immediately.
471                Some(Err(_e)) => return this.underlying.poll_next(cx),
472                // Otherwise, pass through the peeked value.
473                Some(Ok(pair)) => Some(pair),
474                None => None,
475            };
476
477            // To determine whether or not we should return the peeked value, we
478            // need to search the cache layers for keys that are between the last
479            // key we returned (exclusive, so we make forward progress on the
480            // stream) and the peeked key (inclusive, because we need to find out
481            // whether or not there was a covering deletion).
482            let search_range = (
483                this.last_key
484                    .as_ref()
485                    .map(Bound::Excluded)
486                    .unwrap_or(Bound::Included(this.prefix)),
487                peeked.map(Bound::Included).unwrap_or(Bound::Unbounded),
488            );
489
490            // It'd be slightly cleaner to initialize `leftmost_pair` with the
491            // peeked contents, but that would taint `leftmost_pair` with a
492            // `peeked` borrow, and we may need to mutate the underlying stream
493            // later.  Instead, initialize it with `None` to only search the
494            // cache layers, and compare at the end.
495            let mut leftmost_pair = None;
496            for layer in layer_guards.iter() {
497                // Find this layer's leftmost key-value pair in the search range.
498                let found_pair = layer
499                    .as_ref()
500                    .expect("layer must not have been applied")
501                    .unwritten_changes
502                    .range::<String, _>(search_range)
503                    .take_while(|(k, _v)| k.starts_with(this.prefix.as_str()))
504                    .next();
505
506                // Check whether the new pair, if any, is the new leftmost pair.
507                match (leftmost_pair, found_pair) {
508                    // We want to replace the pair even when the key is equal,
509                    // so that we always prefer a newer value over an older value.
510                    (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
511                        leftmost_pair = Some((k, v));
512                    }
513                    (None, Some((k, v))) => {
514                        leftmost_pair = Some((k, v));
515                    }
516                    _ => {}
517                }
518            }
519
520            // Overwrite a Vec, attempting to reuse its existing allocation.
521            let overwrite_in_place = |dst: &mut Option<String>, src: &str| {
522                if let Some(ref mut dst) = dst {
523                    dst.clear();
524                    dst.push_str(src);
525                } else {
526                    *dst = Some(src.to_owned());
527                }
528            };
529
530            match (leftmost_pair, peeked) {
531                (Some((k, v)), peeked) => {
532                    // Since we searched for cached keys less than or equal to
533                    // the peeked key, we know that the cached pair takes
534                    // priority over the peeked pair.
535                    //
536                    // If the keys are exactly equal, we advance the underlying stream.
537                    if peeked == Some(k) {
538                        let _ = this.underlying.as_mut().poll_next(cx);
539                    }
540                    overwrite_in_place(this.last_key, k);
541                    if v.is_some() {
542                        // If the value is Some, we have a key-value pair to yield.
543                        return Poll::Ready(Some(Ok(k.clone())));
544                    } else {
545                        // If the value is None, this pair represents a deletion,
546                        // so continue looping until we find a non-deleted pair.
547                        continue;
548                    }
549                }
550                (None, Some(_)) => {
551                    // There's no cache hit before the peeked pair, so we want
552                    // to extract and return it from the underlying stream.
553                    let Poll::Ready(Some(Ok(k))) = this.underlying.as_mut().poll_next(cx) else {
554                        unreachable!("peeked stream must yield peeked item");
555                    };
556                    overwrite_in_place(this.last_key, &k);
557                    return Poll::Ready(Some(Ok(k)));
558                }
559                (None, None) => {
560                    // Terminate the stream, no more items are available.
561                    return Poll::Ready(None);
562                }
563            }
564        }
565    }
566}
567
568#[pin_project]
569/// A stream of key-value pairs that interleaves a nonverifiable storage and caching layers.
570// This implementation differs from [`StateDeltaNonconsensusPrefixRawStream`] sin how
571// it specifies the search space for the cache.
572pub struct StateDeltaNonconsensusRangeRawStream<St>
573where
574    St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
575{
576    #[pin]
577    pub(crate) underlying: Peekable<St>,
578    pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
579    pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
580    pub(crate) last_key: Option<Vec<u8>>,
581    pub(crate) prefix: Option<Vec<u8>>,
582    pub(crate) range: (Option<Vec<u8>>, Option<Vec<u8>>),
583}
584
585impl<St> Stream for StateDeltaNonconsensusRangeRawStream<St>
586where
587    St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
588{
589    type Item = Result<(Vec<u8>, Vec<u8>)>;
590
591    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
592        // This implementation interleaves items from the underlying stream with
593        // items in cache layers.  To do this, it tracks the last key it
594        // returned, then, for each item in the underlying stream, searches for
595        // cached keys that lie between the last-returned key and the item's key,
596        // checking whether the cached key represents a deletion requiring further
597        // scanning.  This process is illustrated as follows:
598        //
599        //         ◇ skip                 ◇ skip           ▲ yield          ▲ yield           ▲ yield
600        //         │                      │                │                │                 │
601        //         ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
602        //         ▲                      ▲                ▲                ▲                 ▲
603        //      ▲  │                 ▲    │          ▲     │         ▲      │        ▲        │
604        // write│  │                 │    │          │     │         │      │        │        │
605        // layer│  │   █             │    │ █        │     │█        │      █        │      █ │
606        //      │  │ ░               │    ░          │    ░│         │    ░          │    ░   │
607        //      │  ░                 │  ░            │  ░  │         │  ░            │  ░     │
608        //      │    █               │    █          │    █│         │    █          │    █   │
609        //      │  █     █           │  █     █      │  █  │  █      │  █     █      │  █     █
610        //      │     █              │     █         │     █         │     █         │     █
611        //     ─┼(─────]────keys─▶  ─┼──(───]────▶  ─┼────(─]────▶  ─┼─────(]────▶  ─┼──────(──]─▶
612        //      │   ▲  █  █          │      █  █     │      █  █     │      █  █     │      █  █
613        //          │
614        //          │search range of key-value pairs in cache layers that could
615        //          │affect whether to yield the next item in the underlying stream
616
617        // Optimization: ensure we have a peekable item in the underlying stream before continuing.
618        let mut this = self.project();
619        ready!(this.underlying.as_mut().poll_peek(cx));
620        // Now that we're ready to interleave the next underlying item with any
621        // cache layers, lock them all for the duration of the method, using a
622        // SmallVec to (hopefully) store all the guards on the stack.
623        let mut layer_guards = SmallVec::<[_; 8]>::new();
624        for layer in this.layers.iter() {
625            layer_guards.push(layer.read());
626        }
627        // Tacking the leaf cache onto the list is important to not miss any values.
628        // It's stored separately so that the contents of the
629        layer_guards.push(this.leaf_cache.read());
630
631        let (binding_prefix, binding_start, binding_end) = (Vec::new(), Vec::new(), Vec::new());
632        let prefix = this.prefix.as_ref().unwrap_or(&binding_prefix);
633        let start = this.range.0.as_ref().unwrap_or(&binding_start);
634        let end = this.range.1.as_ref().unwrap_or(&binding_end);
635
636        let mut prefix_start = Vec::with_capacity(prefix.len() + start.len());
637        let mut prefix_end = Vec::with_capacity(prefix.len() + end.len());
638
639        prefix_start.extend(prefix);
640        prefix_start.extend(start);
641        prefix_end.extend(prefix);
642        prefix_end.extend(end);
643
644        loop {
645            // Obtain a reference to the next key-value pair from the underlying stream.
646            let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
647                // If we get an underlying error, bubble it up immediately.
648                Some(Err(_e)) => return this.underlying.poll_next(cx),
649                // Otherwise, pass through the peeked value.
650                Some(Ok(pair)) => Some(pair),
651                None => None,
652            };
653
654            // We want to decide which key to return next, so we have to inspect the cache layers.
655            // To do this, we have to define a search space so that we cover updates and new insertions
656            // that could affect the next key to return.
657            let lower_bound = match this.last_key.as_ref() {
658                Some(k) => Bound::Excluded(k),
659                None => Bound::Included(prefix_start.as_ref()),
660            };
661
662            let upper_bound = match peeked {
663                Some((k, _v)) => Bound::Included(k),
664                None => this
665                    .range
666                    .1
667                    .as_ref()
668                    .map_or(Bound::Unbounded, |_| Bound::Excluded(prefix_end.as_ref())),
669            };
670
671            let search_range = (lower_bound, upper_bound);
672
673            tracing::debug!(
674                "searching cache layers for key-value pairs in range {:?}",
675                search_range
676            );
677
678            // It'd be slightly cleaner to initialize `leftmost_pair` with the
679            // peeked contents, but that would taint `leftmost_pair` with a
680            // `peeked` borrow, and we may need to mutate the underlying stream
681            // later.  Instead, initialize it with `None` to only search the
682            // cache layers, and compare at the end.
683            let mut leftmost_pair = None;
684            for layer in layer_guards.iter() {
685                // Find this layer's leftmost key-value pair in the search range.
686                let found_pair = layer
687                    .as_ref()
688                    .expect("layer must not have been applied")
689                    .nonverifiable_changes
690                    .range::<Vec<u8>, _>(search_range)
691                    .take_while(|(k, v)| {
692                        tracing::debug!(?v, ?k, "found key-value pair in cache layer");
693                        match peeked {
694                            Some((peeked_k, _)) => {
695                                k.starts_with(prefix.as_slice()) && k <= &peeked_k
696                            }
697                            None => k.starts_with(prefix.as_slice()),
698                        }
699                    })
700                    .next();
701
702                // Check whether the new pair, if any, is the new leftmost pair.
703                match (leftmost_pair, found_pair) {
704                    // We want to replace the pair even when the key is equal,
705                    // so that we always prefer a newer value over an older value.
706                    (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
707                        leftmost_pair = Some((k, v));
708                    }
709                    (None, Some((k, v))) => {
710                        leftmost_pair = Some((k, v));
711                    }
712                    _ => {}
713                }
714            }
715
716            // Overwrite a Vec, attempting to reuse its existing allocation.
717            let overwrite_in_place = |dst: &mut Option<Vec<u8>>, src: &[u8]| {
718                if let Some(ref mut dst) = dst {
719                    dst.clear();
720                    dst.extend_from_slice(src);
721                } else {
722                    *dst = Some(src.to_vec());
723                }
724            };
725
726            match (leftmost_pair, peeked) {
727                (Some((k, v)), peeked) => {
728                    // Since we searched for cached keys less than or equal to
729                    // the peeked key, we know that the cached pair takes
730                    // priority over the peeked pair.
731                    //
732                    // If the keys are exactly equal, we advance the underlying stream.
733                    if peeked.map(|(kp, _)| kp) == Some(k) {
734                        let _ = this.underlying.as_mut().poll_next(cx);
735                    }
736                    overwrite_in_place(this.last_key, k);
737                    if let Some(v) = v {
738                        // If the value is Some, we have a key-value pair to yield.
739                        return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
740                    } else {
741                        // If the value is None, this pair represents a deletion,
742                        // so continue looping until we find a non-deleted pair.
743                        continue;
744                    }
745                }
746                (None, Some(_)) => {
747                    // There's no cache hit before the peeked pair, so we want
748                    // to extract and return it from the underlying stream.
749                    let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
750                    else {
751                        unreachable!("peeked stream must yield peeked item");
752                    };
753                    overwrite_in_place(this.last_key, &k);
754                    return Poll::Ready(Some(Ok((k, v))));
755                }
756                (None, None) => {
757                    // Terminate the stream, no more items are available.
758                    return Poll::Ready(None);
759                }
760            }
761        }
762    }
763}