penumbra_sdk_dex/component/router/
path_search.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use async_trait::async_trait;
5use cnidarium::{StateDelta, StateRead};
6use futures::StreamExt;
7use penumbra_sdk_asset::asset;
8use penumbra_sdk_num::fixpoint::U128x128;
9use tap::Tap;
10use tokio::task::JoinSet;
11use tracing::{instrument, Instrument};
12
13use crate::component::PositionRead as _;
14
15use super::{Path, PathCache, PathEntry, RoutingParams, SharedPathCache};
16
17#[async_trait]
18pub trait PathSearch: StateRead + Clone + 'static {
19 #[instrument(skip(self, params), fields(max_hops = params.max_hops), level = "debug", ret)]
23 async fn path_search(
24 &self,
25 src: asset::Id,
26 dst: asset::Id,
27 params: RoutingParams,
28 ) -> Result<(Option<Vec<asset::Id>>, Option<U128x128>)> {
29 let RoutingParams {
30 max_hops,
31 fixed_candidates,
32 price_limit,
33 } = params;
34
35 tracing::debug!(?src, ?dst, ?max_hops, "searching for path");
38 let path_start = std::time::Instant::now();
39 let record_duration = || {
40 use crate::component::metrics::DEX_PATH_SEARCH_DURATION;
41 let elapsed = path_start.elapsed();
42 metrics::histogram!(DEX_PATH_SEARCH_DURATION).record(elapsed);
43 };
44
45 let state = StateDelta::new(self.clone());
48
49 let cache = PathCache::begin(src, state);
50 for i in 0..max_hops {
51 relax_active_paths(cache.clone(), fixed_candidates.clone()).await?;
52 tracing::trace!(i, "finished relaxing all active paths");
53 }
54
55 let entry = cache.lock().0.remove(&dst);
56 let Some(PathEntry { path, spill, .. }) = entry else {
57 record_duration();
58 return Ok((None, None));
59 };
60
61 let nodes = path.nodes;
62 let spill_price = spill.map(|p| p.price);
63 tracing::debug!(price = %path.price, spill_price = %spill_price.unwrap_or_else(|| 0u64.into()), ?src, ?nodes, "found path");
64 record_duration();
65
66 match price_limit {
67 Some(price_limit) if path.price >= price_limit => {
75 tracing::debug!(price = %path.price, price_limit = %price_limit, "path too expensive");
76 Ok((None, None))
77 }
78 _ => Ok((Some(nodes), spill_price)),
79 }
80 }
81}
82
83impl<S> PathSearch for S where S: StateRead + Clone + 'static {}
84
85#[instrument(skip_all)]
86async fn relax_active_paths<S: StateRead + 'static>(
87 cache: SharedPathCache<S>,
88 fixed_candidates: Arc<Vec<asset::Id>>,
89) -> Result<()> {
90 let active_paths = cache.lock().extract_active();
91 let mut js = JoinSet::new();
92 tracing::trace!(
93 active_paths_len = active_paths.len(),
94 "relaxing active paths"
95 );
96 for path in active_paths {
97 let candidates = Arc::clone(&fixed_candidates);
98 let cache = Arc::clone(&cache);
99 js.spawn(async move {
100 use crate::component::metrics::DEX_PATH_SEARCH_RELAX_PATH_DURATION;
101 let metric = metrics::histogram!(DEX_PATH_SEARCH_RELAX_PATH_DURATION);
102 let start = std::time::Instant::now();
103 relax_path(cache, path, candidates)
104 .await
105 .tap(|_| metric.record(start.elapsed()))
106 });
107 }
108 while let Some(task) = js.join_next().await {
110 task??;
111 }
112 Ok(())
113}
114
115async fn relax_path<S: StateRead + 'static>(
116 cache: SharedPathCache<S>,
117 mut path: Path<S>,
118 fixed_candidates: Arc<Vec<asset::Id>>,
119) -> Result<()> {
120 let mut candidates = path
121 .state
122 .candidate_set(*path.end(), fixed_candidates)
123 .instrument(path.span.clone());
124
125 path.span.in_scope(|| {
126 tracing::trace!("relaxing path");
127 });
128
129 let mut js = JoinSet::new();
130
131 while let Some(new_end) = candidates.inner_mut().next().await {
132 let new_path = path.fork();
133 let cache2 = cache.clone();
134 js.spawn(async move {
135 if let Some(new_path) = new_path.extend_to(new_end?).await? {
136 cache2.lock().consider(new_path)
137 }
138 anyhow::Ok(())
139 });
140 }
141 while let Some(task) = js.join_next().await {
143 task??;
144 }
145 Ok(())
146}