1//! Concrete futures types used by the storage crate.
3use anyhow::Result;
4use futures::{
5 future::{Either, Ready},
6 stream::Peekable,
7 Stream,
9use parking_lot::RwLock;
10use pin_project::pin_project;
11use smallvec::SmallVec;
12use std::{
13 future::Future,
14 ops::Bound,
15 pin::Pin,
16 sync::Arc,
17 task::{ready, Context, Poll},
20use crate::Cache;
22/// Future representing a read from a state snapshot.
24pub struct SnapshotFuture(#[pin] pub(crate) tokio::task::JoinHandle<Result<Option<Vec<u8>>>>);
26impl Future for SnapshotFuture {
27 type Output = Result<Option<Vec<u8>>>;
28 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
29 let this = self.project();
30 match this.0.poll(cx) {
31 Poll::Ready(result) => {
32 Poll::Ready(result.expect("unrecoverable join error from tokio task"))
33 }
34 Poll::Pending => Poll::Pending,
35 }
36 }
39/// Future representing a read from an in-memory cache over an underlying state.
41pub struct CacheFuture<F> {
42 #[pin]
43 inner: Either<Ready<Result<Option<Vec<u8>>>>, F>,
46impl<F> CacheFuture<F> {
47 pub(crate) fn hit(value: Option<Vec<u8>>) -> Self {
48 Self {
49 inner: Either::Left(futures::future::ready(Ok(value))),
50 }
51 }
53 pub(crate) fn miss(underlying: F) -> Self {
54 Self {
55 inner: Either::Right(underlying),
56 }
57 }
60impl<F> Future for CacheFuture<F>
62 F: Future<Output = Result<Option<Vec<u8>>>>,
64 type Output = Result<Option<Vec<u8>>>;
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let this = self.project();
67 this.inner.poll(cx)
68 }
72pub struct StateDeltaNonconsensusPrefixRawStream<St>
74 St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
76 #[pin]
77 pub(crate) underlying: Peekable<St>,
78 pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
79 pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
80 pub(crate) last_key: Option<Vec<u8>>,
81 pub(crate) prefix: Vec<u8>,
84impl<St> Stream for StateDeltaNonconsensusPrefixRawStream<St>
86 St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
88 type Item = Result<(Vec<u8>, Vec<u8>)>;
90 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
91 // This implementation interleaves items from the underlying stream with
92 // items in cache layers. To do this, it tracks the last key it
93 // returned, then, for each item in the underlying stream, searches for
94 // cached keys that lie between the last-returned key and the item's key,
95 // checking whether the cached key represents a deletion requiring further
96 // scanning. This process is illustrated as follows:
97 //
98 // ◇ skip ◇ skip ▲ yield ▲ yield ▲ yield
99 // │ │ │ │ │
100 // ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
101 // ▲ ▲ ▲ ▲ ▲
102 // ▲ │ ▲ │ ▲ │ ▲ │ ▲ │
103 // write│ │ │ │ │ │ │ │ │ │
104 // layer│ │ █ │ │ █ │ │█ │ █ │ █ │
105 // │ │ ░ │ ░ │ ░│ │ ░ │ ░ │
106 // │ ░ │ ░ │ ░ │ │ ░ │ ░ │
107 // │ █ │ █ │ █│ │ █ │ █ │
108 // │ █ █ │ █ █ │ █ │ █ │ █ █ │ █ █
109 // │ █ │ █ │ █ │ █ │ █
110 // ─┼(─────]────keys─▶ ─┼──(───]────▶ ─┼────(─]────▶ ─┼─────(]────▶ ─┼──────(──]─▶
111 // │ ▲ █ █ │ █ █ │ █ █ │ █ █ │ █ █
112 // │
113 // │search range of key-value pairs in cache layers that could
114 // │affect whether to yield the next item in the underlying stream
116 // Optimization: ensure we have a peekable item in the underlying stream before continuing.
117 let mut this = self.project();
118 ready!(this.underlying.as_mut().poll_peek(cx));
120 // Now that we're ready to interleave the next underlying item with any
121 // cache layers, lock them all for the duration of the method, using a
122 // SmallVec to (hopefully) store all the guards on the stack.
123 let mut layer_guards = SmallVec::<[_; 8]>::new();
124 for layer in this.layers.iter() {
125 layer_guards.push(;
126 }
127 // Tacking the leaf cache onto the list is important to not miss any values.
128 // It's stored separately so that the contents of the
129 layer_guards.push(;
131 loop {
132 // Obtain a reference to the next key-value pair from the underlying stream.
133 let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
134 // If we get an underlying error, bubble it up immediately.
135 Some(Err(_e)) => return this.underlying.poll_next(cx),
136 // Otherwise, pass through the peeked value.
137 Some(Ok(pair)) => Some(pair),
138 None => None,
139 };
141 // To determine whether or not we should return the peeked value, we
142 // need to search the cache layers for keys that are between the last
143 // key we returned (exclusive, so we make forward progress on the
144 // stream) and the peeked key (inclusive, because we need to find out
145 // whether or not there was a covering deletion).
146 let search_range = (
147 this.last_key
148 .as_ref()
149 .map(Bound::Excluded)
150 .unwrap_or(Bound::Included(this.prefix)),
151 peeked
152 .map(|(k, _)| Bound::Included(k))
153 .unwrap_or(Bound::Unbounded),
154 );
156 // It'd be slightly cleaner to initialize `leftmost_pair` with the
157 // peeked contents, but that would taint `leftmost_pair` with a
158 // `peeked` borrow, and we may need to mutate the underlying stream
159 // later. Instead, initialize it with `None` to only search the
160 // cache layers, and compare at the end.
161 let mut leftmost_pair = None;
162 for layer in layer_guards.iter() {
163 // Find this layer's leftmost key-value pair in the search range.
164 let found_pair = layer
165 .as_ref()
166 .expect("layer must not have been applied")
167 .nonverifiable_changes
168 .range::<Vec<u8>, _>(search_range)
169 .take_while(|(k, _v)| k.starts_with(this.prefix))
170 .next();
172 // Check whether the new pair, if any, is the new leftmost pair.
173 match (leftmost_pair, found_pair) {
174 // We want to replace the pair even when the key is equal,
175 // so that we always prefer a newer value over an older value.
176 (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
177 leftmost_pair = Some((k, v));
178 }
179 (None, Some((k, v))) => {
180 leftmost_pair = Some((k, v));
181 }
182 _ => {}
183 }
184 }
186 // Overwrite a Vec, attempting to reuse its existing allocation.
187 let overwrite_in_place = |dst: &mut Option<Vec<u8>>, src: &[u8]| {
188 if let Some(ref mut dst) = dst {
189 dst.clear();
190 dst.extend_from_slice(src);
191 } else {
192 *dst = Some(src.to_vec());
193 }
194 };
196 match (leftmost_pair, peeked) {
197 (Some((k, v)), peeked) => {
198 // Since we searched for cached keys less than or equal to
199 // the peeked key, we know that the cached pair takes
200 // priority over the peeked pair.
201 //
202 // If the keys are exactly equal, we advance the underlying stream.
203 if|(kp, _)| kp) == Some(k) {
204 let _ = this.underlying.as_mut().poll_next(cx);
205 }
206 overwrite_in_place(this.last_key, k);
207 if let Some(v) = v {
208 // If the value is Some, we have a key-value pair to yield.
209 return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
210 } else {
211 // If the value is None, this pair represents a deletion,
212 // so continue looping until we find a non-deleted pair.
213 continue;
214 }
215 }
216 (None, Some(_)) => {
217 // There's no cache hit before the peeked pair, so we want
218 // to extract and return it from the underlying stream.
219 let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
220 else {
221 unreachable!("peeked stream must yield peeked item");
222 };
223 overwrite_in_place(this.last_key, &k);
224 return Poll::Ready(Some(Ok((k, v))));
225 }
226 (None, None) => {
227 // Terminate the stream, no more items are available.
228 return Poll::Ready(None);
229 }
230 }
231 }
232 }
235// This implementation is almost exactly the same as the one above, but with
236// minor tweaks to work with string keys and to read different fields from the cache.
237// Update them together.
240pub struct StateDeltaPrefixRawStream<St>
242 St: Stream<Item = Result<(String, Vec<u8>)>>,
244 #[pin]
245 pub(crate) underlying: Peekable<St>,
246 pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
247 pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
248 pub(crate) last_key: Option<String>,
249 pub(crate) prefix: String,
252impl<St> Stream for StateDeltaPrefixRawStream<St>
254 St: Stream<Item = Result<(String, Vec<u8>)>>,
256 type Item = Result<(String, Vec<u8>)>;
258 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
259 // This implementation interleaves items from the underlying stream with
260 // items in cache layers. To do this, it tracks the last key it
261 // returned, then, for each item in the underlying stream, searches for
262 // cached keys that lie between the last-returned key and the item's key,
263 // checking whether the cached key represents a deletion requiring further
264 // scanning. This process is illustrated as follows:
265 //
266 // ◇ skip ◇ skip ▲ yield ▲ yield ▲ yield
267 // │ │ │ │ │
268 // ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
269 // ▲ ▲ ▲ ▲ ▲
270 // ▲ │ ▲ │ ▲ │ ▲ │ ▲ │
271 // write│ │ │ │ │ │ │ │ │ │
272 // layer│ │ █ │ │ █ │ │█ │ █ │ █ │
273 // │ │ ░ │ ░ │ ░│ │ ░ │ ░ │
274 // │ ░ │ ░ │ ░ │ │ ░ │ ░ │
275 // │ █ │ █ │ █│ │ █ │ █ │
276 // │ █ █ │ █ █ │ █ │ █ │ █ █ │ █ █
277 // │ █ │ █ │ █ │ █ │ █
278 // ─┼(─────]────keys─▶ ─┼──(───]────▶ ─┼────(─]────▶ ─┼─────(]────▶ ─┼──────(──]─▶
279 // │ ▲ █ █ │ █ █ │ █ █ │ █ █ │ █ █
280 // │
281 // │search range of key-value pairs in cache layers that could
282 // │affect whether to yield the next item in the underlying stream
284 // Optimization: ensure we have a peekable item in the underlying stream before continuing.
285 let mut this = self.project();
286 ready!(this.underlying.as_mut().poll_peek(cx));
288 // Now that we're ready to interleave the next underlying item with any
289 // cache layers, lock them all for the duration of the method, using a
290 // SmallVec to (hopefully) store all the guards on the stack.
291 let mut layer_guards = SmallVec::<[_; 8]>::new();
292 for layer in this.layers.iter() {
293 layer_guards.push(;
294 }
295 // Tacking the leaf cache onto the list is important to not miss any values.
296 // It's stored separately so that the contents of the
297 layer_guards.push(;
299 loop {
300 // Obtain a reference to the next key-value pair from the underlying stream.
301 let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
302 // If we get an underlying error, bubble it up immediately.
303 Some(Err(_e)) => return this.underlying.poll_next(cx),
304 // Otherwise, pass through the peeked value.
305 Some(Ok(pair)) => Some(pair),
306 None => None,
307 };
309 // To determine whether or not we should return the peeked value, we
310 // need to search the cache layers for keys that are between the last
311 // key we returned (exclusive, so we make forward progress on the
312 // stream) and the peeked key (inclusive, because we need to find out
313 // whether or not there was a covering deletion).
314 let search_range = (
315 this.last_key
316 .as_ref()
317 .map(Bound::Excluded)
318 .unwrap_or(Bound::Included(this.prefix)),
319 peeked
320 .map(|(k, _)| Bound::Included(k))
321 .unwrap_or(Bound::Unbounded),
322 );
324 // It'd be slightly cleaner to initialize `leftmost_pair` with the
325 // peeked contents, but that would taint `leftmost_pair` with a
326 // `peeked` borrow, and we may need to mutate the underlying stream
327 // later. Instead, initialize it with `None` to only search the
328 // cache layers, and compare at the end.
329 let mut leftmost_pair = None;
330 for layer in layer_guards.iter() {
331 // Find this layer's leftmost key-value pair in the search range.
332 let found_pair = layer
333 .as_ref()
334 .expect("layer must not have been applied")
335 .unwritten_changes
336 .range::<String, _>(search_range)
337 .take_while(|(k, _v)| k.starts_with(this.prefix.as_str()))
338 .next();
340 // Check whether the new pair, if any, is the new leftmost pair.
341 match (leftmost_pair, found_pair) {
342 // We want to replace the pair even when the key is equal,
343 // so that we always prefer a newer value over an older value.
344 (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
345 leftmost_pair = Some((k, v));
346 }
347 (None, Some((k, v))) => {
348 leftmost_pair = Some((k, v));
349 }
350 _ => {}
351 }
352 }
354 // Overwrite a Vec, attempting to reuse its existing allocation.
355 let overwrite_in_place = |dst: &mut Option<String>, src: &str| {
356 if let Some(ref mut dst) = dst {
357 dst.clear();
358 dst.push_str(src);
359 } else {
360 *dst = Some(src.to_owned());
361 }
362 };
364 match (leftmost_pair, peeked) {
365 (Some((k, v)), peeked) => {
366 // Since we searched for cached keys less than or equal to
367 // the peeked key, we know that the cached pair takes
368 // priority over the peeked pair.
369 //
370 // If the keys are exactly equal, we advance the underlying stream.
371 if|(kp, _)| kp) == Some(k) {
372 let _ = this.underlying.as_mut().poll_next(cx);
373 }
374 overwrite_in_place(this.last_key, k);
375 if let Some(v) = v {
376 // If the value is Some, we have a key-value pair to yield.
377 return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
378 } else {
379 // If the value is None, this pair represents a deletion,
380 // so continue looping until we find a non-deleted pair.
381 continue;
382 }
383 }
384 (None, Some(_)) => {
385 // There's no cache hit before the peeked pair, so we want
386 // to extract and return it from the underlying stream.
387 let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
388 else {
389 unreachable!("peeked stream must yield peeked item");
390 };
391 overwrite_in_place(this.last_key, &k);
392 return Poll::Ready(Some(Ok((k, v))));
393 }
394 (None, None) => {
395 // Terminate the stream, no more items are available.
396 return Poll::Ready(None);
397 }
398 }
399 }
400 }
403// This implementation is almost exactly the same as the one above, but with
404// minor tweaks to work with string keys and to read different fields from the cache.
405// Update them together.
408pub struct StateDeltaPrefixKeysStream<St>
410 St: Stream<Item = Result<String>>,
412 #[pin]
413 pub(crate) underlying: Peekable<St>,
414 pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
415 pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
416 pub(crate) last_key: Option<String>,
417 pub(crate) prefix: String,
420impl<St> Stream for StateDeltaPrefixKeysStream<St>
422 St: Stream<Item = Result<String>>,
424 type Item = Result<String>;
426 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
427 // This implementation interleaves items from the underlying stream with
428 // items in cache layers. To do this, it tracks the last key it
429 // returned, then, for each item in the underlying stream, searches for
430 // cached keys that lie between the last-returned key and the item's key,
431 // checking whether the cached key represents a deletion requiring further
432 // scanning. This process is illustrated as follows:
433 //
434 // ◇ skip ◇ skip ▲ yield ▲ yield ▲ yield
435 // │ │ │ │ │
436 // ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
437 // ▲ ▲ ▲ ▲ ▲
438 // ▲ │ ▲ │ ▲ │ ▲ │ ▲ │
439 // write│ │ │ │ │ │ │ │ │ │
440 // layer│ │ █ │ │ █ │ │█ │ █ │ █ │
441 // │ │ ░ │ ░ │ ░│ │ ░ │ ░ │
442 // │ ░ │ ░ │ ░ │ │ ░ │ ░ │
443 // │ █ │ █ │ █│ │ █ │ █ │
444 // │ █ █ │ █ █ │ █ │ █ │ █ █ │ █ █
445 // │ █ │ █ │ █ │ █ │ █
446 // ─┼(─────]────keys─▶ ─┼──(───]────▶ ─┼────(─]────▶ ─┼─────(]────▶ ─┼──────(──]─▶
447 // │ ▲ █ █ │ █ █ │ █ █ │ █ █ │ █ █
448 // │
449 // │search range of key-value pairs in cache layers that could
450 // │affect whether to yield the next item in the underlying stream
452 // Optimization: ensure we have a peekable item in the underlying stream before continuing.
453 let mut this = self.project();
454 ready!(this.underlying.as_mut().poll_peek(cx));
456 // Now that we're ready to interleave the next underlying item with any
457 // cache layers, lock them all for the duration of the method, using a
458 // SmallVec to (hopefully) store all the guards on the stack.
459 let mut layer_guards = SmallVec::<[_; 8]>::new();
460 for layer in this.layers.iter() {
461 layer_guards.push(;
462 }
463 // Tacking the leaf cache onto the list is important to not miss any values.
464 // It's stored separately so that the contents of the
465 layer_guards.push(;
467 loop {
468 // Obtain a reference to the next key-value pair from the underlying stream.
469 let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
470 // If we get an underlying error, bubble it up immediately.
471 Some(Err(_e)) => return this.underlying.poll_next(cx),
472 // Otherwise, pass through the peeked value.
473 Some(Ok(pair)) => Some(pair),
474 None => None,
475 };
477 // To determine whether or not we should return the peeked value, we
478 // need to search the cache layers for keys that are between the last
479 // key we returned (exclusive, so we make forward progress on the
480 // stream) and the peeked key (inclusive, because we need to find out
481 // whether or not there was a covering deletion).
482 let search_range = (
483 this.last_key
484 .as_ref()
485 .map(Bound::Excluded)
486 .unwrap_or(Bound::Included(this.prefix)),
488 );
490 // It'd be slightly cleaner to initialize `leftmost_pair` with the
491 // peeked contents, but that would taint `leftmost_pair` with a
492 // `peeked` borrow, and we may need to mutate the underlying stream
493 // later. Instead, initialize it with `None` to only search the
494 // cache layers, and compare at the end.
495 let mut leftmost_pair = None;
496 for layer in layer_guards.iter() {
497 // Find this layer's leftmost key-value pair in the search range.
498 let found_pair = layer
499 .as_ref()
500 .expect("layer must not have been applied")
501 .unwritten_changes
502 .range::<String, _>(search_range)
503 .take_while(|(k, _v)| k.starts_with(this.prefix.as_str()))
504 .next();
506 // Check whether the new pair, if any, is the new leftmost pair.
507 match (leftmost_pair, found_pair) {
508 // We want to replace the pair even when the key is equal,
509 // so that we always prefer a newer value over an older value.
510 (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
511 leftmost_pair = Some((k, v));
512 }
513 (None, Some((k, v))) => {
514 leftmost_pair = Some((k, v));
515 }
516 _ => {}
517 }
518 }
520 // Overwrite a Vec, attempting to reuse its existing allocation.
521 let overwrite_in_place = |dst: &mut Option<String>, src: &str| {
522 if let Some(ref mut dst) = dst {
523 dst.clear();
524 dst.push_str(src);
525 } else {
526 *dst = Some(src.to_owned());
527 }
528 };
530 match (leftmost_pair, peeked) {
531 (Some((k, v)), peeked) => {
532 // Since we searched for cached keys less than or equal to
533 // the peeked key, we know that the cached pair takes
534 // priority over the peeked pair.
535 //
536 // If the keys are exactly equal, we advance the underlying stream.
537 if peeked == Some(k) {
538 let _ = this.underlying.as_mut().poll_next(cx);
539 }
540 overwrite_in_place(this.last_key, k);
541 if v.is_some() {
542 // If the value is Some, we have a key-value pair to yield.
543 return Poll::Ready(Some(Ok(k.clone())));
544 } else {
545 // If the value is None, this pair represents a deletion,
546 // so continue looping until we find a non-deleted pair.
547 continue;
548 }
549 }
550 (None, Some(_)) => {
551 // There's no cache hit before the peeked pair, so we want
552 // to extract and return it from the underlying stream.
553 let Poll::Ready(Some(Ok(k))) = this.underlying.as_mut().poll_next(cx) else {
554 unreachable!("peeked stream must yield peeked item");
555 };
556 overwrite_in_place(this.last_key, &k);
557 return Poll::Ready(Some(Ok(k)));
558 }
559 (None, None) => {
560 // Terminate the stream, no more items are available.
561 return Poll::Ready(None);
562 }
563 }
564 }
565 }
569/// A stream of key-value pairs that interleaves a nonverifiable storage and caching layers.
570// This implementation differs from [`StateDeltaNonconsensusPrefixRawStream`] sin how
571// it specifies the search space for the cache.
572pub struct StateDeltaNonconsensusRangeRawStream<St>
574 St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
576 #[pin]
577 pub(crate) underlying: Peekable<St>,
578 pub(crate) layers: Vec<Arc<RwLock<Option<Cache>>>>,
579 pub(crate) leaf_cache: Arc<RwLock<Option<Cache>>>,
580 pub(crate) last_key: Option<Vec<u8>>,
581 pub(crate) prefix: Option<Vec<u8>>,
582 pub(crate) range: (Option<Vec<u8>>, Option<Vec<u8>>),
585impl<St> Stream for StateDeltaNonconsensusRangeRawStream<St>
587 St: Stream<Item = Result<(Vec<u8>, Vec<u8>)>>,
589 type Item = Result<(Vec<u8>, Vec<u8>)>;
591 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
592 // This implementation interleaves items from the underlying stream with
593 // items in cache layers. To do this, it tracks the last key it
594 // returned, then, for each item in the underlying stream, searches for
595 // cached keys that lie between the last-returned key and the item's key,
596 // checking whether the cached key represents a deletion requiring further
597 // scanning. This process is illustrated as follows:
598 //
599 // ◇ skip ◇ skip ▲ yield ▲ yield ▲ yield
600 // │ │ │ │ │
601 // ░ pick ──────────────▶ ░ pick ────────▶ █ pick ────────▶ █ pick ─────────▶ █ pick
602 // ▲ ▲ ▲ ▲ ▲
603 // ▲ │ ▲ │ ▲ │ ▲ │ ▲ │
604 // write│ │ │ │ │ │ │ │ │ │
605 // layer│ │ █ │ │ █ │ │█ │ █ │ █ │
606 // │ │ ░ │ ░ │ ░│ │ ░ │ ░ │
607 // │ ░ │ ░ │ ░ │ │ ░ │ ░ │
608 // │ █ │ █ │ █│ │ █ │ █ │
609 // │ █ █ │ █ █ │ █ │ █ │ █ █ │ █ █
610 // │ █ │ █ │ █ │ █ │ █
611 // ─┼(─────]────keys─▶ ─┼──(───]────▶ ─┼────(─]────▶ ─┼─────(]────▶ ─┼──────(──]─▶
612 // │ ▲ █ █ │ █ █ │ █ █ │ █ █ │ █ █
613 // │
614 // │search range of key-value pairs in cache layers that could
615 // │affect whether to yield the next item in the underlying stream
617 // Optimization: ensure we have a peekable item in the underlying stream before continuing.
618 let mut this = self.project();
619 ready!(this.underlying.as_mut().poll_peek(cx));
620 // Now that we're ready to interleave the next underlying item with any
621 // cache layers, lock them all for the duration of the method, using a
622 // SmallVec to (hopefully) store all the guards on the stack.
623 let mut layer_guards = SmallVec::<[_; 8]>::new();
624 for layer in this.layers.iter() {
625 layer_guards.push(;
626 }
627 // Tacking the leaf cache onto the list is important to not miss any values.
628 // It's stored separately so that the contents of the
629 layer_guards.push(;
631 let (binding_prefix, binding_start, binding_end) = (Vec::new(), Vec::new(), Vec::new());
632 let prefix = this.prefix.as_ref().unwrap_or(&binding_prefix);
633 let start = this.range.0.as_ref().unwrap_or(&binding_start);
634 let end = this.range.1.as_ref().unwrap_or(&binding_end);
636 let mut prefix_start = Vec::with_capacity(prefix.len() + start.len());
637 let mut prefix_end = Vec::with_capacity(prefix.len() + end.len());
639 prefix_start.extend(prefix);
640 prefix_start.extend(start);
641 prefix_end.extend(prefix);
642 prefix_end.extend(end);
644 loop {
645 // Obtain a reference to the next key-value pair from the underlying stream.
646 let peeked = match ready!(this.underlying.as_mut().poll_peek(cx)) {
647 // If we get an underlying error, bubble it up immediately.
648 Some(Err(_e)) => return this.underlying.poll_next(cx),
649 // Otherwise, pass through the peeked value.
650 Some(Ok(pair)) => Some(pair),
651 None => None,
652 };
654 // We want to decide which key to return next, so we have to inspect the cache layers.
655 // To do this, we have to define a search space so that we cover updates and new insertions
656 // that could affect the next key to return.
657 let lower_bound = match this.last_key.as_ref() {
658 Some(k) => Bound::Excluded(k),
659 None => Bound::Included(prefix_start.as_ref()),
660 };
662 let upper_bound = match peeked {
663 Some((k, _v)) => Bound::Included(k),
664 None => this
665 .range
666 .1
667 .as_ref()
668 .map_or(Bound::Unbounded, |_| Bound::Excluded(prefix_end.as_ref())),
669 };
671 let search_range = (lower_bound, upper_bound);
673 tracing::debug!(
674 "searching cache layers for key-value pairs in range {:?}",
675 search_range
676 );
678 // It'd be slightly cleaner to initialize `leftmost_pair` with the
679 // peeked contents, but that would taint `leftmost_pair` with a
680 // `peeked` borrow, and we may need to mutate the underlying stream
681 // later. Instead, initialize it with `None` to only search the
682 // cache layers, and compare at the end.
683 let mut leftmost_pair = None;
684 for layer in layer_guards.iter() {
685 // Find this layer's leftmost key-value pair in the search range.
686 let found_pair = layer
687 .as_ref()
688 .expect("layer must not have been applied")
689 .nonverifiable_changes
690 .range::<Vec<u8>, _>(search_range)
691 .take_while(|(k, v)| {
692 tracing::debug!(?v, ?k, "found key-value pair in cache layer");
693 match peeked {
694 Some((peeked_k, _)) => {
695 k.starts_with(prefix.as_slice()) && k <= &peeked_k
696 }
697 None => k.starts_with(prefix.as_slice()),
698 }
699 })
700 .next();
702 // Check whether the new pair, if any, is the new leftmost pair.
703 match (leftmost_pair, found_pair) {
704 // We want to replace the pair even when the key is equal,
705 // so that we always prefer a newer value over an older value.
706 (Some((leftmost_k, _)), Some((k, v))) if k <= leftmost_k => {
707 leftmost_pair = Some((k, v));
708 }
709 (None, Some((k, v))) => {
710 leftmost_pair = Some((k, v));
711 }
712 _ => {}
713 }
714 }
716 // Overwrite a Vec, attempting to reuse its existing allocation.
717 let overwrite_in_place = |dst: &mut Option<Vec<u8>>, src: &[u8]| {
718 if let Some(ref mut dst) = dst {
719 dst.clear();
720 dst.extend_from_slice(src);
721 } else {
722 *dst = Some(src.to_vec());
723 }
724 };
726 match (leftmost_pair, peeked) {
727 (Some((k, v)), peeked) => {
728 // Since we searched for cached keys less than or equal to
729 // the peeked key, we know that the cached pair takes
730 // priority over the peeked pair.
731 //
732 // If the keys are exactly equal, we advance the underlying stream.
733 if|(kp, _)| kp) == Some(k) {
734 let _ = this.underlying.as_mut().poll_next(cx);
735 }
736 overwrite_in_place(this.last_key, k);
737 if let Some(v) = v {
738 // If the value is Some, we have a key-value pair to yield.
739 return Poll::Ready(Some(Ok((k.clone(), v.clone()))));
740 } else {
741 // If the value is None, this pair represents a deletion,
742 // so continue looping until we find a non-deleted pair.
743 continue;
744 }
745 }
746 (None, Some(_)) => {
747 // There's no cache hit before the peeked pair, so we want
748 // to extract and return it from the underlying stream.
749 let Poll::Ready(Some(Ok((k, v)))) = this.underlying.as_mut().poll_next(cx)
750 else {
751 unreachable!("peeked stream must yield peeked item");
752 };
753 overwrite_in_place(this.last_key, &k);
754 return Poll::Ready(Some(Ok((k, v))));
755 }
756 (None, None) => {
757 // Terminate the stream, no more items are available.
758 return Poll::Ready(None);
759 }
760 }
761 }
762 }