penumbra_sdk_dex/component/router/
route_and_fill.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use async_trait::async_trait;
5use cnidarium::StateWrite;
6use penumbra_sdk_asset::{asset, Value};
7use penumbra_sdk_num::Amount;
8use penumbra_sdk_sct::component::clock::EpochRead;
9use tracing::instrument;
10
11use crate::{
12    component::{
13        chandelier::Chandelier,
14        flow::SwapFlow,
15        router::{FillRoute, PathSearch, RoutingParams},
16        ExecutionCircuitBreaker, InternalDexWrite, PositionManager,
17    },
18    lp::position::MAX_RESERVE_AMOUNT,
19    BatchSwapOutputData, SwapExecution, TradingPair,
20};
21
22use super::fill_route::FillError;
23
24/// Ties together the routing and filling logic, to process
25/// a block's batch swap flows.
26#[async_trait]
27pub trait HandleBatchSwaps: StateWrite + Sized {
28    #[instrument(skip(self, trading_pair, batch_data, block_height, params))]
29    async fn handle_batch_swaps(
30        self: &mut Arc<Self>,
31        trading_pair: TradingPair,
32        batch_data: SwapFlow,
33        block_height: u64,
34        params: RoutingParams,
35        execution_budget: u32,
36    ) -> Result<BatchSwapOutputData>
37    where
38        Self: 'static,
39    {
40        let (delta_1, delta_2) = (batch_data.0, batch_data.1);
41        tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");
42
43        // We initialize a circuit breaker for this batch swap. This will limit the number of frontier
44        // executions up to the specified `execution_budget` parameter.
45        let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
46
47        // We clamp the deltas to the maximum input for batch swaps.
48        let clamped_delta_1 = delta_1.min(MAX_RESERVE_AMOUNT.into());
49        let clamped_delta_2 = delta_2.min(MAX_RESERVE_AMOUNT.into());
50
51        tracing::debug!(
52            ?clamped_delta_1,
53            ?clamped_delta_2,
54            "clamped deltas to maximum amount"
55        );
56
57        let swap_execution_1_for_2 = self
58            .route_and_fill(
59                trading_pair.asset_1(),
60                trading_pair.asset_2(),
61                clamped_delta_1,
62                params.clone(),
63                execution_circuit_breaker.clone(),
64            )
65            .await?;
66
67        let swap_execution_2_for_1 = self
68            .route_and_fill(
69                trading_pair.asset_2(),
70                trading_pair.asset_1(),
71                clamped_delta_2,
72                params.clone(),
73                execution_circuit_breaker,
74            )
75            .await?;
76
77        let (lambda_2, unfilled_1) = match &swap_execution_1_for_2 {
78            Some(swap_execution) => (
79                swap_execution.output.amount,
80                // The unfilled amount of asset 1 is the trade input minus the amount consumed, plus the excess.
81                delta_1 - swap_execution.input.amount,
82            ),
83            None => (0u64.into(), delta_1),
84        };
85        let (lambda_1, unfilled_2) = match &swap_execution_2_for_1 {
86            Some(swap_execution) => (
87                swap_execution.output.amount,
88                delta_2 - swap_execution.input.amount,
89            ),
90            None => (0u64.into(), delta_2),
91        };
92        let epoch = self.get_current_epoch().await.expect("epoch is set");
93        let output_data = BatchSwapOutputData {
94            height: block_height,
95            trading_pair,
96            delta_1,
97            delta_2,
98            lambda_1,
99            lambda_2,
100            unfilled_1,
101            unfilled_2,
102            sct_position_prefix: (
103                u16::try_from(epoch.index).expect("epoch index should be small enough"),
104                // The block index is determined by looking at how many blocks have elapsed since
105                // the start of the epoch.
106                u16::try_from(block_height - epoch.start_height)
107                    .expect("block index should be small enough"),
108                0,
109            )
110                .into(),
111        };
112
113        tracing::debug!(
114            ?output_data,
115            ?swap_execution_1_for_2,
116            ?swap_execution_2_for_1
117        );
118
119        // Update the candlestick tracking
120        if let Some(se) = swap_execution_1_for_2.clone() {
121            tracing::debug!("updating candlestick for 1=>2 swap");
122            Arc::get_mut(self)
123                .expect("expected state to have no other refs")
124                .record_swap_execution(&se)
125                .await;
126        }
127        if let Some(se) = &swap_execution_2_for_1 {
128            tracing::debug!("updating candlestick for 2=>1 swap");
129            Arc::get_mut(self)
130                .expect("expected state to have no other refs")
131                .record_swap_execution(se)
132                .await;
133        }
134
135        // Fetch the swap execution object that should have been modified during the routing and filling.
136        Arc::get_mut(self)
137            .expect("expected state to have no other refs")
138            .set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1)
139            .await?;
140
141        Ok(output_data)
142    }
143}
144
145impl<T: PositionManager> HandleBatchSwaps for T {}
146
147/// Lower-level trait that ties together the routing and filling logic.
148#[async_trait]
149pub trait RouteAndFill: StateWrite + Sized {
150    #[instrument(skip(self, asset_1, asset_2, input, params, execution_circuit_breaker))]
151    async fn route_and_fill(
152        self: &mut Arc<Self>,
153        asset_1: asset::Id,
154        asset_2: asset::Id,
155        input: Amount,
156        params: RoutingParams,
157        mut execution_circuit_breaker: ExecutionCircuitBreaker,
158    ) -> Result<Option<SwapExecution>>
159    where
160        Self: 'static,
161    {
162        tracing::debug!(?input, ?asset_1, ?asset_2, "prepare to route and fill");
163
164        if input == Amount::zero() {
165            tracing::debug!("no input, short-circuit exit");
166            return Ok(None);
167        }
168
169        // Unfilled output of asset 1
170        let mut total_unfilled_1 = input;
171        // Output of asset 2
172        let mut total_output_2 = 0u64.into();
173
174        // An ordered list of execution traces that were used to fill the trade.
175        let mut traces: Vec<Vec<Value>> = Vec::new();
176
177        // Termination conditions:
178        // 1. We have no more `delta_1` remaining
179        // 2. A path can no longer be found
180        // 3. We have reached the `RoutingParams` specified price limit
181        // 4. The execution circuit breaker has been triggered based on the number of path searches and executions
182        // 5. An unrecoverable error occurred during the execution of the route.
183        loop {
184            // Check if we have exceeded the execution circuit breaker limits.
185            if execution_circuit_breaker.exceeded_limits() {
186                tracing::debug!("execution circuit breaker triggered, exiting route_and_fill");
187                break;
188            } else {
189                // This should be done ahead of doing any path search or execution, so that we never
190                // have to reason about the specific control flow of our batch swap logic.
191                execution_circuit_breaker.increment();
192            }
193
194            // Find the best route between the two assets in the trading pair.
195            let (path, spill_price) = self
196                .path_search(asset_1, asset_2, params.clone())
197                .await
198                .context("error finding best path")?;
199
200            let Some(path) = path else {
201                tracing::debug!("no path found, exiting route_and_fill");
202                break;
203            };
204
205            if path.is_empty() {
206                tracing::debug!("empty path found, exiting route_and_fill");
207                break;
208            }
209
210            // We prepare the input for this execution round, which is the remaining unfilled amount of asset 1.
211            let delta_1 = Value {
212                asset_id: asset_1,
213                amount: total_unfilled_1,
214            };
215
216            tracing::debug!(?path, ?delta_1, "found path, filling up to spill price");
217
218            let execution_result = Arc::get_mut(self)
219                .expect("expected state to have no other refs")
220                .fill_route(delta_1, &path, spill_price)
221                .await;
222
223            let swap_execution = match execution_result {
224                Ok(execution) => execution,
225                Err(FillError::ExecutionOverflow(position_id)) => {
226                    // We have encountered an overflow during the execution of the route.
227                    // To route around this, we will close the position and try to route and fill again.
228                    tracing::debug!(culprit = ?position_id, "overflow detected during routing execution");
229                    Arc::get_mut(self)
230                        .expect("expected state to have no other refs")
231                        .close_position_by_id(&position_id)
232                        .await
233                        .expect("the position still exists");
234                    continue;
235                }
236                Err(e) => {
237                    // We have encountered an error during the execution of the route,
238                    // there are no clear ways to route around this, so we propagate the error.
239                    // `fill_route` is transactional and will have rolled back the state.
240                    anyhow::bail!("error filling route: {:?}", e);
241                }
242            };
243
244            // Immediately track the execution in the state.
245            (total_output_2, total_unfilled_1) = {
246                // The exact amount of asset 1 that was consumed in this execution round.
247                let consumed_input = swap_execution.input;
248                // The output of this execution round is the amount of asset 2 that was filled.
249                let produced_output = swap_execution.output;
250
251                tracing::debug!(consumed_input = ?consumed_input.amount, output = ?produced_output.amount, "filled along best path");
252
253                // Sanity check that the input and output assets are correct.
254                assert_eq!(produced_output.asset_id, asset_2);
255                assert_eq!(consumed_input.asset_id, asset_1);
256
257                // Append the traces from this execution to the outer traces.
258                traces.append(&mut swap_execution.traces.clone());
259
260                (
261                    // The total output of asset 2 is the sum of all outputs.
262                    total_output_2 + produced_output.amount,
263                    // The total unfilled amount of asset 1 is the remaining unfilled amount minus the amount consumed.
264                    total_unfilled_1 - consumed_input.amount,
265                )
266            };
267
268            if total_unfilled_1.value() == 0 {
269                tracing::debug!("filled all input, exiting route_and_fill");
270                break;
271            }
272
273            // Ensure that we've actually executed, or else bail out.
274            let Some(accurate_max_price) = swap_execution.max_price() else {
275                tracing::debug!("no traces in execution, exiting route_and_fill");
276                break;
277            };
278
279            // Check that the execution price is below the price limit, if one is set.
280            if let Some(price_limit) = params.price_limit {
281                if accurate_max_price >= price_limit {
282                    tracing::debug!(
283                        ?accurate_max_price,
284                        ?price_limit,
285                        "execution price above price limit, exiting route_and_fill"
286                    );
287                    break;
288                }
289            }
290        }
291
292        // If we didn't execute against any position at all, there are no execution records to return.
293        if traces.is_empty() {
294            return Ok(None);
295        } else {
296            Ok(Some(SwapExecution {
297                traces,
298                input: Value {
299                    asset_id: asset_1,
300                    // The total amount of asset 1 that was actually consumed across rounds.
301                    amount: input - total_unfilled_1,
302                },
303                output: Value {
304                    asset_id: asset_2,
305                    amount: total_output_2,
306                },
307            }))
308        }
309    }
310}
311
312impl<T: HandleBatchSwaps> RouteAndFill for T {}