penumbra_sdk_dex/component/router/
fill_route.rs

1use std::{
2    collections::{BTreeMap, BTreeSet},
3    pin::Pin,
4};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use cnidarium::{StateDelta, StateRead, StateWrite};
9use futures::{Stream, StreamExt};
10use penumbra_sdk_asset::{asset, Value};
11use penumbra_sdk_num::{
12    fixpoint::{Error, U128x128},
13    Amount,
14};
15use tracing::instrument;
16
17use crate::{
18    component::{metrics, PositionManager, PositionRead},
19    lp::{
20        position::{self, Position},
21        Reserves,
22    },
23    DirectedTradingPair, SwapExecution, TradingPair,
24};
25
26/// An error that occurs during routing execution.
27#[derive(Debug, thiserror::Error)]
28pub enum FillError {
29    /// Mismatch between the input asset id and the assets on either leg
30    /// of the trading pair.
31    #[error("input id {0:?} does not belong on pair: {1:?}")]
32    AssetIdMismatch(asset::Id, TradingPair),
33    /// Overflow occurred when executing against the position corresponding
34    /// to the wrapped asset id.
35    #[error("overflow when executing against position {0:?}")]
36    ExecutionOverflow(position::Id),
37    /// Route is empty or has only one hop.
38    #[error("invalid route length {0} (must be at least 2)")]
39    InvalidRoute(usize),
40    /// Frontier position not found.
41    #[error("frontier position with id {0:?}, not found")]
42    MissingFrontierPosition(position::Id),
43    /// Insufficient liquidity in a pair.
44    #[error("insufficient liquidity in pair {0:?}")]
45    InsufficientLiquidity(DirectedTradingPair),
46}
47
48#[async_trait]
49pub trait FillRoute: StateWrite + Sized {
50    /// Fills a trade of a given `input` amount along a given route of `hops`,
51    /// optionally using `spill_price` to put limits on execution.
52    ///
53    /// Note: this method will always execute at least one sub-trade along the
54    /// route, even if it would exceed the spill price (i.e., the spill price is
55    /// only used after consuming at least one position along the route). This
56    /// covers an edge case in routing, which computes approximate spill prices:
57    /// if there were two routes with very similar prices, and both of their
58    /// estimated prices were underestimates, the routing could potentially
59    /// switch back and forth between them without making progress. Ensuring we
60    /// always consume at least one position prevents this possibility.
61    ///
62    /// # Invariants
63    ///
64    /// It is an error to call `fill_route` on a route that does not have at least one position for each hop.
65    ///
66    /// # Errors
67    /// `fill_route` can fail for a number of reasons captured by the `FillError` enum.
68    ///
69    /// # Panics
70    /// At the moment, `fill_route` will panic on I/O failures (e.g., if the state is corrupted, or storage fails).
71    #[instrument(skip(self, input, hops, spill_price))]
72    async fn fill_route(
73        &mut self,
74        input: Value,
75        hops: &[asset::Id],
76        spill_price: Option<U128x128>,
77    ) -> Result<SwapExecution, FillError> {
78        fill_route_inner(self, input, hops, spill_price, true).await
79    }
80}
81
82impl<S: StateWrite> FillRoute for S {}
83
84async fn fill_route_inner<S: StateWrite + Sized>(
85    state: S,
86    mut input: Value,
87    hops: &[asset::Id],
88    spill_price: Option<U128x128>,
89    ensure_progress: bool,
90) -> Result<SwapExecution, FillError> {
91    let fill_start = std::time::Instant::now();
92
93    // Build a transaction for this execution, so if we error out at any
94    // point we don't leave the state in an inconsistent state.  This is
95    // particularly important for this method, because we lift position data
96    // out of the state and modify it in-memory, writing it only as we fully
97    // consume positions.
98    let mut this = StateDelta::new(state);
99
100    // Switch from representing hops implicitly as a sequence of asset IDs to
101    // representing them explicitly as a sequence of directed trading pairs.
102    let route = std::iter::once(input.asset_id)
103        .chain(hops.iter().cloned())
104        .collect::<Vec<_>>();
105
106    // Break down the route into a sequence of pairs to visit.
107    let pairs = breakdown_route(&route)?;
108
109    tracing::debug!(
110        input = ?input.amount,
111        ?route,
112        ?spill_price,
113    );
114
115    let mut output = Value {
116        amount: 0u64.into(),
117        asset_id: route
118            .last()
119            .cloned()
120            .ok_or(FillError::InvalidRoute(route.len()))?,
121    };
122
123    let mut frontier = Frontier::load(&mut this, pairs).await?;
124    tracing::debug!(?frontier, "assembled initial frontier");
125
126    // Tracks whether we've already filled at least once, so we can skip the spill price check
127    // until we've consumed at least one position.
128    let mut filled_once = if ensure_progress {
129        false
130    } else {
131        // If we don't need to ensure progress, we can act as if we've already filled once.
132        true
133    };
134
135    'filling: loop {
136        // INVARIANT: we must ensure that in each iteration of the loop, either:
137        //
138        // * we completely exhaust the input amount, or
139        // * we completely exhaust the reserves of one of the active positions.
140
141        // Phase 1 (Sensing): determine the index of the constraining position by
142        // executing along the frontier, tracking which hops are
143        // constraining.
144        let constraining_index = frontier.sense_capacity_constraint(input)?;
145
146        tracing::debug!(
147            ?constraining_index,
148            "sensed capacity constraint, begin filling"
149        );
150
151        // Phase 2 (Filling): transactionally execute along the path, using
152        // the constraint information we sensed above.
153        let tx = match constraining_index {
154            Some(constraining_index) => frontier.fill_constrained(constraining_index),
155            None => frontier.fill_unconstrained(input),
156        };
157
158        // Phase 3 (Committing): commit the transaction if the actual price was less than the spill price.
159
160        // WATCH OUT:
161        // - `None` on the spill price means no limit.
162        // - `None` on the actual price means infinite price.
163        let should_apply = if let Some(spill_price) = spill_price {
164            let below_limit = tx.actual_price().map(|p| p <= spill_price).unwrap_or(false);
165
166            // We should apply if we're below the limit, or we haven't yet made progress.
167            below_limit || !filled_once
168        } else {
169            true
170        };
171
172        if !should_apply {
173            tracing::debug!(
174                // Hack to get an f64-formatted version of the prices; want %p but Option<_> isn't Display
175                spill_price = ?spill_price.map(|x| x.to_string()),
176                actual_price = ?tx.actual_price().map(|x| x.to_string()),
177                "exceeded spill price, breaking loop"
178            );
179            // Discard the unapplied transaction, and break out of the filling loop.
180            break 'filling;
181        }
182
183        let (current_input, current_output) = frontier.apply(tx);
184        filled_once = true;
185
186        // Update the input and output amounts tracked outside of the loop:
187        input.amount = input.amount - current_input;
188        output.amount += current_output;
189
190        tracing::debug!(
191            ?current_input,
192            ?current_output,
193            input = ?input.amount,
194            output = ?output.amount,
195            "completed fill iteration, updating frontier"
196        );
197
198        // It's important to replace _any_ empty positions, not just the one we
199        // consumed, because we might have exhausted multiple positions at once,
200        // and we don't want to write empty positions into the state or process
201        // them in later iterations.
202        if !frontier.replace_empty_positions().await? {
203            tracing::debug!("ran out of positions, breaking loop");
204            break 'filling;
205        }
206
207        if constraining_index.is_none() {
208            // In this case, we should have fully consumed the input amount.
209            assert_eq!(input.amount, 0u64.into());
210            tracing::debug!("filled input amount completely, breaking loop");
211            break 'filling;
212        } else {
213            continue 'filling;
214        }
215    }
216
217    // We need to save these positions, because we mutated their state, even
218    // if we didn't fully consume their reserves.
219    frontier
220        .save()
221        .await
222        .expect("writing frontier should not fail");
223
224    // Input consists of the sum of the first value of each trace.
225    let input = frontier
226        .trace
227        .iter()
228        .map(|trace| trace.first().expect("empty trace").amount)
229        .sum::<Amount>();
230    // Output consists of the sum of the last value of each trace.
231    let output = frontier
232        .trace
233        .iter()
234        .map(|trace| trace.last().expect("empty trace").amount)
235        .sum::<Amount>();
236
237    let in_asset_id = frontier.pairs.first().expect("empty pairs").start;
238    let out_asset_id = frontier.pairs.last().expect("empty pairs").end;
239
240    let swap_execution = SwapExecution {
241        traces: std::mem::take(&mut frontier.trace),
242        input: Value {
243            amount: input,
244            asset_id: in_asset_id,
245        },
246        output: Value {
247            amount: output,
248            asset_id: out_asset_id,
249        },
250    };
251    std::mem::drop(frontier);
252
253    tracing::debug!(?swap_execution, "returning swap execution of filled route");
254
255    // Apply the state transaction now that we've reached the end without errors.
256    //
257    // We have to manually extract events and push them down to the state to avoid losing them.
258    // TODO: in a commit not intended to be cherry-picked, we should fix this hazardous API:
259    // - rename `StateDelta::apply` to `StateDelta::apply_extracting_events`
260    // - add `StateDelta::apply_with_events` that pushes the events down.
261    // - go through all uses of `apply_extracting_events` and determine what behavior is correct
262    let (mut state, events) = this.apply();
263    for event in events {
264        state.record(event);
265    }
266
267    let fill_elapsed = fill_start.elapsed();
268    metrics::histogram!(metrics::DEX_ROUTE_FILL_DURATION).record(fill_elapsed);
269    // cleanup / finalization
270    Ok(swap_execution)
271}
272
273/// Breaksdown a route into a collection of `DirectedTradingPair`, this is mostly useful
274/// for debugging right now.
275fn breakdown_route(route: &[asset::Id]) -> Result<Vec<DirectedTradingPair>, FillError> {
276    if route.len() < 2 {
277        Err(FillError::InvalidRoute(route.len()))
278    } else {
279        let mut pairs = vec![];
280        for pair in route.windows(2) {
281            let start = pair[0];
282            let end = pair[1];
283            pairs.push(DirectedTradingPair::new(start, end));
284        }
285        Ok(pairs)
286    }
287}
288
289type PositionsByPrice = BTreeMap<
290    DirectedTradingPair,
291    Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send>>,
292>;
293
294/// A frontier of least-priced positions along a route.
295struct Frontier<S> {
296    /// The list of trading pairs this frontier is for.
297    pub pairs: Vec<DirectedTradingPair>,
298    /// A list of the positions on the route.
299    pub positions: Vec<Position>,
300    /// A set of position IDs of positions contained in the frontier.
301    ///
302    /// This lets us correctly handle the case where we traverse the same macro-edge
303    /// in opposite directions, and a position has nonzero reserves of both assets
304    /// and shows up in both position streams (even though we must only use it once).
305    pub position_ids: BTreeSet<position::Id>,
306    /// The underlying state.
307    pub state: S,
308    /// A stream of positions for each pair on the route, ordered by price.
309    pub positions_by_price: PositionsByPrice,
310    /// A trace of the execution along the route.
311    pub trace: Vec<Vec<Value>>,
312}
313
314struct FrontierTx {
315    new_reserves: Vec<Option<Reserves>>,
316    trace: Vec<Option<Amount>>,
317}
318
319impl FrontierTx {
320    fn new<S>(frontier: &Frontier<S>) -> FrontierTx {
321        FrontierTx {
322            new_reserves: vec![None; frontier.positions.len()],
323            trace: vec![None; frontier.pairs.len() + 1],
324        }
325    }
326
327    fn actual_price(&self) -> Result<U128x128, Error> {
328        let input: U128x128 = self
329            .trace
330            .first()
331            .expect("input amount is set in a complete trace")
332            .expect("input amount is set in a complete trace")
333            .into();
334        let output: U128x128 = self
335            .trace
336            .last()
337            .expect("output amount is set in a complete trace")
338            .expect("output amount is set in a complete trace")
339            .into();
340
341        input / output
342    }
343}
344
345impl<S> std::fmt::Debug for Frontier<S> {
346    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347        f.debug_struct("Frontier")
348            .field("pairs", &self.pairs)
349            .field("positions", &self.positions)
350            .field("position_ids", &self.position_ids)
351            .field("trace", &self.trace)
352            .finish_non_exhaustive()
353    }
354}
355
356impl<S: StateRead + StateWrite> Frontier<S> {
357    async fn load(state: S, pairs: Vec<DirectedTradingPair>) -> Result<Frontier<S>, FillError> {
358        let mut positions = Vec::new();
359        let mut position_ids = BTreeSet::new();
360
361        // We want to ensure that any particular position is used at most once over the route,
362        // even if the route has cycles at the macro-scale. To do this, we store the streams
363        // of positions for each pair, taking care to only construct one stream per distinct pair.
364        let mut positions_by_price = BTreeMap::new();
365        for pair in &pairs {
366            positions_by_price
367                .entry(*pair)
368                .or_insert_with(|| state.positions_by_price(pair));
369        }
370
371        for pair in &pairs {
372            'next_position: loop {
373                let (id, position) = positions_by_price
374                    .get_mut(pair)
375                    .expect("positions_by_price should have an entry for each pair")
376                    .as_mut()
377                    .next()
378                    .await
379                    .ok_or(FillError::InsufficientLiquidity(*pair))?
380                    .expect("stream should not error");
381
382                // Check that the position is not already part of the frontier.
383                if !position_ids.contains(&id) {
384                    position_ids.insert(id);
385                    positions.push(position);
386
387                    break 'next_position;
388                }
389            }
390        }
391
392        // The current trace list along the route should be initialized as empty.
393        let trace: Vec<Vec<Value>> = Vec::new();
394
395        Ok(Frontier {
396            positions,
397            position_ids,
398            pairs,
399            state,
400            positions_by_price,
401            trace,
402        })
403    }
404
405    async fn save(&mut self) -> Result<()> {
406        let context = DirectedTradingPair {
407            start: self.pairs.first().expect("pairs is nonempty").start,
408            end: self.pairs.last().expect("pairs is nonempty").end,
409        };
410        for position in &self.positions {
411            self.state
412                .position_execution(position.clone(), context.clone())
413                .await?;
414        }
415        Ok(())
416    }
417
418    /// Apply the [`FrontierTx`] to the frontier, returning the input and output
419    /// amounts it added to the trace.
420    fn apply(&mut self, changes: FrontierTx) -> (Amount, Amount) {
421        let mut trace: Vec<Value> = vec![];
422
423        trace.push(Value {
424            amount: changes.trace[0].expect("all trace amounts must be set when applying changes"),
425            asset_id: self.pairs[0].start,
426        });
427        for (i, new_reserves) in changes.new_reserves.into_iter().enumerate() {
428            let new_reserves =
429                new_reserves.expect("all new reserves must be set when applying changes");
430            let amount =
431                changes.trace[i + 1].expect("all trace amounts must be set when applying changes");
432            self.positions[i].reserves = new_reserves;
433            // Pull the asset ID from the pairs.
434            trace.push(Value {
435                amount,
436                asset_id: self.pairs[i].end,
437            });
438        }
439
440        // Add the new trace
441        self.trace.push(trace);
442
443        (
444            changes
445                .trace
446                .first()
447                .expect("first should be set for a trace")
448                .expect("input amount should be set for a trace"),
449            changes
450                .trace
451                .last()
452                .expect("last should be set for a trace")
453                .expect("output amount should be set for a trace"),
454        )
455    }
456
457    async fn replace_empty_positions(&mut self) -> Result<bool, FillError> {
458        for i in 0..self.pairs.len() {
459            let desired_reserves = self.positions[i]
460                .reserves_for(self.pairs[i].end)
461                .ok_or_else(|| {
462                    FillError::AssetIdMismatch(self.pairs[i].end, self.positions[i].phi.pair)
463                })?;
464
465            // Replace any position that has been fully consumed.
466            if desired_reserves == 0u64.into() {
467                // If we can't find a replacement, report that failure upwards.
468                if !self.replace_position(i).await {
469                    return Ok(false);
470                }
471            }
472        }
473
474        Ok(true)
475    }
476
477    /// Returns `true` if a new position was found to replace the given one,
478    /// or `false`, if there are no more positions available for the given pair.
479    #[instrument(skip(self))]
480    async fn replace_position(&mut self, index: usize) -> bool {
481        let replaced_position_id = self.positions[index].id();
482        tracing::debug!(?replaced_position_id, "replacing position");
483
484        // First, save the position we're about to replace.  We're going to
485        // discard it, so write its updated reserves before we replace it on the
486        // frontier.  The other positions will be written out either when
487        // they're fully consumed, or when we finish filling.
488        let context = DirectedTradingPair {
489            start: self.pairs.first().expect("pairs is nonempty").start,
490            end: self.pairs.last().expect("pairs is nonempty").end,
491        };
492        let updated_position = self
493            .state
494            .position_execution(self.positions[index].clone(), context)
495            .await
496            .expect("writing to storage should not fail");
497
498        // We update the frontier cache with the updated state of the position we
499        // want to discard. This protects us from cache incoherency in case we do not
500        // find a suitable replacement for that position.
501        self.positions[index] = updated_position;
502
503        loop {
504            let pair = &self.pairs[index];
505            let (next_position_id, next_position) = match self
506                .positions_by_price
507                .get_mut(pair)
508                .expect("positions_by_price should have an entry for each pair")
509                .as_mut()
510                .next()
511                .await
512                .transpose()
513                .expect("stream doesn't error")
514            {
515                // If none is available, we can't keep filling, and need to signal as such.
516                None => {
517                    tracing::debug!(?pair, "no more positions available for pair");
518                    return false;
519                }
520                // Otherwise, we need to check that the position is not already
521                // part of the current frontier.
522                Some((position_id, lp)) if !self.position_ids.contains(&position_id) => {
523                    (position_id, lp)
524                }
525                // Otherwise, continue to the next position in the stream.
526                Some(position_id) => {
527                    tracing::debug!(?position_id, "skipping position already in frontier");
528                    continue;
529                }
530            };
531
532            tracing::debug!(
533                ?next_position_id,
534                ?next_position,
535                "replacing constraining position in frontier",
536            );
537
538            self.position_ids.insert(next_position_id);
539            self.positions[index] = next_position;
540
541            return true;
542        }
543    }
544
545    /// Senses which position along the frontier is a capacity constraint for
546    /// the given input amount. If an overflow occurs during fill, report the
547    /// position in an error.
548    #[instrument(skip(self, input), fields(input = ?input.amount))]
549    fn sense_capacity_constraint(&self, input: Value) -> Result<Option<usize>, FillError> {
550        tracing::debug!(
551            ?input,
552            "sensing frontier capacity with trial swap input amount"
553        );
554        let mut constraining_index = None;
555        let mut current_input = input;
556
557        for (i, position) in self.positions.iter().enumerate() {
558            if !position.phi.matches_input(current_input.asset_id) {
559                tracing::error!(
560                    ?current_input,
561                    ?position,
562                    "asset ids of input and position do not match, interrupt capacity sensing."
563                );
564                return Err(FillError::AssetIdMismatch(
565                    current_input.asset_id,
566                    position.phi.pair,
567                ));
568            }
569
570            let (unfilled, new_reserves, output) = position
571                .phi
572                .fill(current_input, &position.reserves)
573                .map_err(|_| FillError::ExecutionOverflow(position.id()))?;
574
575            if unfilled.amount > Amount::zero() {
576                tracing::debug!(
577                    i,
578                    current_input = ?current_input.amount,
579                    unfilled = ?unfilled.amount,
580                    output = ?output.amount,
581                    old_reserves = ?position.reserves,
582                    new_reserves = ?new_reserves,
583                    "could not completely fill input amount, marking as constraining"
584                );
585                // We found a pair that constrains how much we can fill along this frontier.
586                constraining_index = Some(i);
587            } else {
588                tracing::debug!(
589                    i,
590                    current_input = ?current_input.amount,
591                    unfilled = ?unfilled.amount,
592                    output = ?output.amount,
593                    old_reserves = ?position.reserves,
594                    new_reserves = ?new_reserves,
595                    "completely filled "
596                );
597            }
598
599            current_input = output;
600        }
601
602        Ok(constraining_index)
603    }
604
605    #[instrument(skip(self, input), fields(input = ?input.amount))]
606    fn fill_unconstrained(&self, input: Value) -> FrontierTx {
607        assert_eq!(
608            input.asset_id,
609            self.pairs
610                .first()
611                .expect("first should be set for a trace")
612                .start
613        );
614
615        let mut tx = FrontierTx::new(self);
616        // We have to manually update the trace here, because fill_forward
617        // doesn't handle the input amount, only things that come after it.
618        tx.trace[0] = Some(input.amount);
619        // Now fill forward along the frontier, accumulating changes into the new tx.
620        self.fill_forward(&mut tx, 0, input);
621
622        tx
623    }
624
625    fn fill_constrained(&self, constraining_index: usize) -> FrontierTx {
626        let mut tx = FrontierTx::new(self);
627
628        // If there was a constraining position along the path, we want to
629        // completely consume its reserves, then work "outwards" along the
630        // path, propagating rounding errors forwards to the end of the path
631        // and backwards to the input.
632
633        // Example:
634        // 0     1     2     3      4         [trace index]
635        // UM => GM => GN => USD => ETH       [asset id]
636        //     0     1     2      3           [pair index]
637        //
638        // Suppose that pair 2 is the constraining pair, with 0.1 USD
639        // reserves.  To completely consume the 0.1 USD reserves, we need
640        // work backwards along the path to compute a sequence of input
641        // amounts that are valid trades to get to 0.1 USD output at pair 2,
642        // and work forwards to compute the corresponding output amounts at
643        // the end of the path.
644
645        let exactly_consumed_reserves = Value {
646            amount: self.positions[constraining_index]
647                .reserves_for(self.pairs[constraining_index].end)
648                .expect("asset ids should match"),
649            asset_id: self.pairs[constraining_index].end,
650        };
651
652        tracing::debug!(
653            constraining_index,
654            exactly_consumed_reserves = ?exactly_consumed_reserves.amount,
655            "attempting to completely consume reserves of constraining position"
656        );
657
658        // Work backwards along the path from the constraining position.
659        self.fill_backward(&mut tx, constraining_index, exactly_consumed_reserves);
660        // Work forwards along the path from the constraining position.
661        self.fill_forward(&mut tx, constraining_index + 1, exactly_consumed_reserves);
662
663        tx
664    }
665
666    #[instrument(skip(self, input, tx), fields(input = ?input.amount))]
667    fn fill_forward(&self, tx: &mut FrontierTx, start_index: usize, input: Value) {
668        tracing::debug!("filling forward along frontier");
669        let mut current_value = input;
670
671        for i in start_index..self.positions.len() {
672            let (unfilled, new_reserves, output) = self.positions[i]
673                .phi
674                .fill(current_value, &self.positions[i].reserves)
675                .expect("forward fill should not fail");
676
677            assert_eq!(
678                unfilled.amount,
679                Amount::zero(),
680                "unfilled amount for unconstrained frontier should be zero"
681            );
682
683            tx.new_reserves[i] = Some(new_reserves);
684            tx.trace[i + 1] = Some(output.amount);
685
686            current_value = output;
687        }
688    }
689
690    #[instrument(skip(self, output, tx), fields(output = ?output.amount))]
691    fn fill_backward(&self, tx: &mut FrontierTx, start_index: usize, output: Value) {
692        tracing::debug!("filling backward along frontier");
693        let mut current_value = output;
694        for i in (0..=start_index).rev() {
695            tx.trace[i + 1] = Some(current_value.amount);
696
697            let (new_reserves, prev_input) = self.positions[i]
698                .phi
699                .fill_output(&self.positions[i].reserves, current_value)
700                .expect("backward fill should not fail")
701                .expect(
702                    "working backwards from most-constraining position should not exceed reserves",
703                );
704
705            tracing::debug!(
706                i,
707                current_value = ?current_value.amount,
708                prev_input = ?prev_input.amount,
709                old_reserves = ?self.positions[i].reserves,
710                new_reserves = ?new_reserves,
711                "found previous input for current value"
712            );
713
714            tx.new_reserves[i] = Some(new_reserves);
715            current_value = prev_input;
716        }
717
718        tx.trace[0] = Some(current_value.amount);
719    }
720}