penumbra_sdk_dex/component/router/
path_search.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use cnidarium::{StateDelta, StateRead};
6use futures::StreamExt;
7use penumbra_sdk_asset::asset;
8use penumbra_sdk_num::fixpoint::U128x128;
9use tap::Tap;
10use tokio::task::JoinSet;
11use tracing::{instrument, Instrument};
12
13use crate::component::PositionRead as _;
14
15use super::{Path, PathCache, PathEntry, RoutingParams, SharedPathCache};
16
17#[async_trait]
18pub trait PathSearch: StateRead + Clone + 'static {
19    /// Find the best route from `src` to `dst` with estimated price strictly less
20    /// than `params.price_limit`, also returning the spill price for the next-best
21    /// route, if one exists.
22    #[instrument(skip(self, params), fields(max_hops = params.max_hops), level = "debug", ret)]
23    async fn path_search(
24        &self,
25        src: asset::Id,
26        dst: asset::Id,
27        params: RoutingParams,
28    ) -> Result<(Option<Vec<asset::Id>>, Option<U128x128>)> {
29        let RoutingParams {
30            max_hops,
31            fixed_candidates,
32            price_limit,
33        } = params;
34
35        // Initialize some metrics for calculating time spent on path searching
36        // vs route filling. We use vecs so we can count across iterations of the loop.
37        tracing::debug!(?src, ?dst, ?max_hops, "searching for path");
38        let path_start = std::time::Instant::now();
39        let record_duration = || {
40            use crate::component::metrics::DEX_PATH_SEARCH_DURATION;
41            let elapsed = path_start.elapsed();
42            metrics::histogram!(DEX_PATH_SEARCH_DURATION).record(elapsed);
43        };
44
45        // Work in a new stack of state changes, which we can completely discard
46        // at the end of routing
47        let state = StateDelta::new(self.clone());
48
49        let cache = PathCache::begin(src, state);
50        for i in 0..max_hops {
51            relax_active_paths(cache.clone(), fixed_candidates.clone()).await?;
52            tracing::trace!(i, "finished relaxing all active paths");
53        }
54
55        let entry = cache.lock().0.remove(&dst);
56        let Some(PathEntry { path, spill, .. }) = entry else {
57            record_duration();
58            return Ok((None, None));
59        };
60
61        let nodes = path.nodes;
62        let spill_price = spill.map(|p| p.price);
63        tracing::debug!(price = %path.price, spill_price = %spill_price.unwrap_or_else(|| 0u64.into()), ?src, ?nodes, "found path");
64        record_duration();
65
66        match price_limit {
67            // Note: previously, this branch was a load-bearing termination condition, primarily
68            // exercised by the arbitrage logic. However, during the course of testnet 53,  we
69            // encountered two bugs that caused this predicate to not be exercised:
70            // 1. We treated the price limit as an inclusive bound, rather than an exclusive bound.
71            // 2. We relied on an estimate of the end-to-end path price which was lossy (`path.price`).
72            // The latter is an inherent information limitation, so we now have a redundant check in
73            // `route_and_fill` which uses the exact price of the route.
74            Some(price_limit) if path.price >= price_limit => {
75                tracing::debug!(price = %path.price, price_limit = %price_limit, "path too expensive");
76                Ok((None, None))
77            }
78            _ => Ok((Some(nodes), spill_price)),
79        }
80    }
81}
82
83impl<S> PathSearch for S where S: StateRead + Clone + 'static {}
84
85#[instrument(skip_all)]
86async fn relax_active_paths<S: StateRead + 'static>(
87    cache: SharedPathCache<S>,
88    fixed_candidates: Arc<Vec<asset::Id>>,
89) -> Result<()> {
90    let active_paths = cache.lock().extract_active();
91    let mut js = JoinSet::new();
92    tracing::trace!(
93        active_paths_len = active_paths.len(),
94        "relaxing active paths"
95    );
96    for path in active_paths {
97        let candidates = Arc::clone(&fixed_candidates);
98        let cache = Arc::clone(&cache);
99        js.spawn(async move {
100            use crate::component::metrics::DEX_PATH_SEARCH_RELAX_PATH_DURATION;
101            let metric = metrics::histogram!(DEX_PATH_SEARCH_RELAX_PATH_DURATION);
102            let start = std::time::Instant::now();
103            relax_path(cache, path, candidates)
104                .await
105                .tap(|_| metric.record(start.elapsed()))
106        });
107    }
108    // Wait for all relaxations to complete.
109    while let Some(task) = js.join_next().await {
110        task??;
111    }
112    Ok(())
113}
114
115async fn relax_path<S: StateRead + 'static>(
116    cache: SharedPathCache<S>,
117    mut path: Path<S>,
118    fixed_candidates: Arc<Vec<asset::Id>>,
119) -> Result<()> {
120    let mut candidates = path
121        .state
122        .candidate_set(*path.end(), fixed_candidates)
123        .instrument(path.span.clone());
124
125    path.span.in_scope(|| {
126        tracing::trace!("relaxing path");
127    });
128
129    let mut js = JoinSet::new();
130
131    while let Some(new_end) = candidates.inner_mut().next().await {
132        let new_path = path.fork();
133        let cache2 = cache.clone();
134        js.spawn(async move {
135            if let Some(new_path) = new_path.extend_to(new_end?).await? {
136                cache2.lock().consider(new_path)
137            }
138            anyhow::Ok(())
139        });
140    }
141    // Wait for all candidates to be considered.
142    while let Some(task) = js.join_next().await {
143        task??;
144    }
145    Ok(())
146}