penumbra_sdk_dex/component/router/
route_and_fill.rs1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use cnidarium::StateWrite;
6use penumbra_sdk_asset::{asset, Value};
7use penumbra_sdk_num::Amount;
8use penumbra_sdk_sct::component::clock::EpochRead;
9use tracing::instrument;
10
11use crate::{
12 component::{
13 chandelier::Chandelier,
14 flow::SwapFlow,
15 router::{FillRoute, PathSearch, RoutingParams},
16 ExecutionCircuitBreaker, InternalDexWrite, PositionManager,
17 },
18 lp::position::MAX_RESERVE_AMOUNT,
19 BatchSwapOutputData, SwapExecution, TradingPair,
20};
21
22use super::fill_route::FillError;
23
24#[async_trait]
27pub trait HandleBatchSwaps: StateWrite + Sized {
28 #[instrument(skip(self, trading_pair, batch_data, block_height, params))]
29 async fn handle_batch_swaps(
30 self: &mut Arc<Self>,
31 trading_pair: TradingPair,
32 batch_data: SwapFlow,
33 block_height: u64,
34 params: RoutingParams,
35 execution_budget: u32,
36 ) -> Result<BatchSwapOutputData>
37 where
38 Self: 'static,
39 {
40 let (delta_1, delta_2) = (batch_data.0, batch_data.1);
41 tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");
42
43 let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
46
47 let clamped_delta_1 = delta_1.min(MAX_RESERVE_AMOUNT.into());
49 let clamped_delta_2 = delta_2.min(MAX_RESERVE_AMOUNT.into());
50
51 tracing::debug!(
52 ?clamped_delta_1,
53 ?clamped_delta_2,
54 "clamped deltas to maximum amount"
55 );
56
57 let swap_execution_1_for_2 = self
58 .route_and_fill(
59 trading_pair.asset_1(),
60 trading_pair.asset_2(),
61 clamped_delta_1,
62 params.clone(),
63 execution_circuit_breaker.clone(),
64 )
65 .await?;
66
67 let swap_execution_2_for_1 = self
68 .route_and_fill(
69 trading_pair.asset_2(),
70 trading_pair.asset_1(),
71 clamped_delta_2,
72 params.clone(),
73 execution_circuit_breaker,
74 )
75 .await?;
76
77 let (lambda_2, unfilled_1) = match &swap_execution_1_for_2 {
78 Some(swap_execution) => (
79 swap_execution.output.amount,
80 delta_1 - swap_execution.input.amount,
82 ),
83 None => (0u64.into(), delta_1),
84 };
85 let (lambda_1, unfilled_2) = match &swap_execution_2_for_1 {
86 Some(swap_execution) => (
87 swap_execution.output.amount,
88 delta_2 - swap_execution.input.amount,
89 ),
90 None => (0u64.into(), delta_2),
91 };
92 let epoch = self.get_current_epoch().await.expect("epoch is set");
93 let output_data = BatchSwapOutputData {
94 height: block_height,
95 trading_pair,
96 delta_1,
97 delta_2,
98 lambda_1,
99 lambda_2,
100 unfilled_1,
101 unfilled_2,
102 sct_position_prefix: (
103 u16::try_from(epoch.index).expect("epoch index should be small enough"),
104 u16::try_from(block_height - epoch.start_height)
107 .expect("block index should be small enough"),
108 0,
109 )
110 .into(),
111 };
112
113 tracing::debug!(
114 ?output_data,
115 ?swap_execution_1_for_2,
116 ?swap_execution_2_for_1
117 );
118
119 if let Some(se) = swap_execution_1_for_2.clone() {
121 tracing::debug!("updating candlestick for 1=>2 swap");
122 Arc::get_mut(self)
123 .expect("expected state to have no other refs")
124 .record_swap_execution(&se)
125 .await;
126 }
127 if let Some(se) = &swap_execution_2_for_1 {
128 tracing::debug!("updating candlestick for 2=>1 swap");
129 Arc::get_mut(self)
130 .expect("expected state to have no other refs")
131 .record_swap_execution(se)
132 .await;
133 }
134
135 Arc::get_mut(self)
137 .expect("expected state to have no other refs")
138 .set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1)
139 .await?;
140
141 Ok(output_data)
142 }
143}
144
145impl<T: PositionManager> HandleBatchSwaps for T {}
146
147#[async_trait]
149pub trait RouteAndFill: StateWrite + Sized {
150 #[instrument(skip(self, asset_1, asset_2, input, params, execution_circuit_breaker))]
151 async fn route_and_fill(
152 self: &mut Arc<Self>,
153 asset_1: asset::Id,
154 asset_2: asset::Id,
155 input: Amount,
156 params: RoutingParams,
157 mut execution_circuit_breaker: ExecutionCircuitBreaker,
158 ) -> Result<Option<SwapExecution>>
159 where
160 Self: 'static,
161 {
162 tracing::debug!(?input, ?asset_1, ?asset_2, "prepare to route and fill");
163
164 if input == Amount::zero() {
165 tracing::debug!("no input, short-circuit exit");
166 return Ok(None);
167 }
168
169 let mut total_unfilled_1 = input;
171 let mut total_output_2 = 0u64.into();
173
174 let mut traces: Vec<Vec<Value>> = Vec::new();
176
177 loop {
184 if execution_circuit_breaker.exceeded_limits() {
186 tracing::debug!("execution circuit breaker triggered, exiting route_and_fill");
187 break;
188 } else {
189 execution_circuit_breaker.increment();
192 }
193
194 let (path, spill_price) = self
196 .path_search(asset_1, asset_2, params.clone())
197 .await
198 .context("error finding best path")?;
199
200 let Some(path) = path else {
201 tracing::debug!("no path found, exiting route_and_fill");
202 break;
203 };
204
205 if path.is_empty() {
206 tracing::debug!("empty path found, exiting route_and_fill");
207 break;
208 }
209
210 let delta_1 = Value {
212 asset_id: asset_1,
213 amount: total_unfilled_1,
214 };
215
216 tracing::debug!(?path, ?delta_1, "found path, filling up to spill price");
217
218 let execution_result = Arc::get_mut(self)
219 .expect("expected state to have no other refs")
220 .fill_route(delta_1, &path, spill_price)
221 .await;
222
223 let swap_execution = match execution_result {
224 Ok(execution) => execution,
225 Err(FillError::ExecutionOverflow(position_id)) => {
226 tracing::debug!(culprit = ?position_id, "overflow detected during routing execution");
229 Arc::get_mut(self)
230 .expect("expected state to have no other refs")
231 .close_position_by_id(&position_id)
232 .await
233 .expect("the position still exists");
234 continue;
235 }
236 Err(e) => {
237 anyhow::bail!("error filling route: {:?}", e);
241 }
242 };
243
244 (total_output_2, total_unfilled_1) = {
246 let consumed_input = swap_execution.input;
248 let produced_output = swap_execution.output;
250
251 tracing::debug!(consumed_input = ?consumed_input.amount, output = ?produced_output.amount, "filled along best path");
252
253 assert_eq!(produced_output.asset_id, asset_2);
255 assert_eq!(consumed_input.asset_id, asset_1);
256
257 traces.append(&mut swap_execution.traces.clone());
259
260 (
261 total_output_2 + produced_output.amount,
263 total_unfilled_1 - consumed_input.amount,
265 )
266 };
267
268 if total_unfilled_1.value() == 0 {
269 tracing::debug!("filled all input, exiting route_and_fill");
270 break;
271 }
272
273 let Some(accurate_max_price) = swap_execution.max_price() else {
275 tracing::debug!("no traces in execution, exiting route_and_fill");
276 break;
277 };
278
279 if let Some(price_limit) = params.price_limit {
281 if accurate_max_price >= price_limit {
282 tracing::debug!(
283 ?accurate_max_price,
284 ?price_limit,
285 "execution price above price limit, exiting route_and_fill"
286 );
287 break;
288 }
289 }
290 }
291
292 if traces.is_empty() {
294 return Ok(None);
295 } else {
296 Ok(Some(SwapExecution {
297 traces,
298 input: Value {
299 asset_id: asset_1,
300 amount: input - total_unfilled_1,
302 },
303 output: Value {
304 asset_id: asset_2,
305 amount: total_output_2,
306 },
307 }))
308 }
309 }
310}
311
312impl<T: HandleBatchSwaps> RouteAndFill for T {}