1use std::sync::Arc;
23use anyhow::{Context, Result};
4use async_trait::async_trait;
5use cnidarium::StateWrite;
6use penumbra_sdk_asset::{asset, Value};
7use penumbra_sdk_num::Amount;
8use penumbra_sdk_sct::component::clock::EpochRead;
9use tracing::instrument;
1011use crate::{
12 component::{
13 chandelier::Chandelier,
14 flow::SwapFlow,
15 router::{FillRoute, PathSearch, RoutingParams},
16 ExecutionCircuitBreaker, InternalDexWrite, PositionManager,
17 },
18 lp::position::MAX_RESERVE_AMOUNT,
19 BatchSwapOutputData, SwapExecution, TradingPair,
20};
2122use super::fill_route::FillError;
2324/// Ties together the routing and filling logic, to process
25/// a block's batch swap flows.
26#[async_trait]
27pub trait HandleBatchSwaps: StateWrite + Sized {
28#[instrument(skip(self, trading_pair, batch_data, block_height, params))]
29async fn handle_batch_swaps(
30self: &mut Arc<Self>,
31 trading_pair: TradingPair,
32 batch_data: SwapFlow,
33 block_height: u64,
34 params: RoutingParams,
35 execution_budget: u32,
36 ) -> Result<BatchSwapOutputData>
37where
38Self: 'static,
39 {
40let (delta_1, delta_2) = (batch_data.0, batch_data.1);
41tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps");
4243// We initialize a circuit breaker for this batch swap. This will limit the number of frontier
44 // executions up to the specified `execution_budget` parameter.
45let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
4647// We clamp the deltas to the maximum input for batch swaps.
48let clamped_delta_1 = delta_1.min(MAX_RESERVE_AMOUNT.into());
49let clamped_delta_2 = delta_2.min(MAX_RESERVE_AMOUNT.into());
5051tracing::debug!(
52?clamped_delta_1,
53?clamped_delta_2,
54"clamped deltas to maximum amount"
55);
5657let swap_execution_1_for_2 = self
58.route_and_fill(
59 trading_pair.asset_1(),
60 trading_pair.asset_2(),
61 clamped_delta_1,
62 params.clone(),
63 execution_circuit_breaker.clone(),
64 )
65 .await?;
6667let swap_execution_2_for_1 = self
68.route_and_fill(
69 trading_pair.asset_2(),
70 trading_pair.asset_1(),
71 clamped_delta_2,
72 params.clone(),
73 execution_circuit_breaker,
74 )
75 .await?;
7677let (lambda_2, unfilled_1) = match &swap_execution_1_for_2 {
78Some(swap_execution) => (
79 swap_execution.output.amount,
80// The unfilled amount of asset 1 is the trade input minus the amount consumed, plus the excess.
81delta_1 - swap_execution.input.amount,
82 ),
83None => (0u64.into(), delta_1),
84 };
85let (lambda_1, unfilled_2) = match &swap_execution_2_for_1 {
86Some(swap_execution) => (
87 swap_execution.output.amount,
88 delta_2 - swap_execution.input.amount,
89 ),
90None => (0u64.into(), delta_2),
91 };
92let epoch = self.get_current_epoch().await.expect("epoch is set");
93let output_data = BatchSwapOutputData {
94 height: block_height,
95 trading_pair,
96 delta_1,
97 delta_2,
98 lambda_1,
99 lambda_2,
100 unfilled_1,
101 unfilled_2,
102 sct_position_prefix: (
103 u16::try_from(epoch.index).expect("epoch index should be small enough"),
104// The block index is determined by looking at how many blocks have elapsed since
105 // the start of the epoch.
106u16::try_from(block_height - epoch.start_height)
107 .expect("block index should be small enough"),
1080,
109 )
110 .into(),
111 };
112113tracing::debug!(
114?output_data,
115?swap_execution_1_for_2,
116?swap_execution_2_for_1
117 );
118119// Update the candlestick tracking
120if let Some(se) = swap_execution_1_for_2.clone() {
121tracing::debug!("updating candlestick for 1=>2 swap");
122 Arc::get_mut(self)
123 .expect("expected state to have no other refs")
124 .record_swap_execution(&se)
125 .await;
126 }
127if let Some(se) = &swap_execution_2_for_1 {
128tracing::debug!("updating candlestick for 2=>1 swap");
129 Arc::get_mut(self)
130 .expect("expected state to have no other refs")
131 .record_swap_execution(se)
132 .await;
133 }
134135// Fetch the swap execution object that should have been modified during the routing and filling.
136Arc::get_mut(self)
137 .expect("expected state to have no other refs")
138 .set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1)
139 .await?;
140141Ok(output_data)
142 }
143}
144145impl<T: PositionManager> HandleBatchSwaps for T {}
146147/// Lower-level trait that ties together the routing and filling logic.
148#[async_trait]
149pub trait RouteAndFill: StateWrite + Sized {
150#[instrument(skip(self, asset_1, asset_2, input, params, execution_circuit_breaker))]
151async fn route_and_fill(
152self: &mut Arc<Self>,
153 asset_1: asset::Id,
154 asset_2: asset::Id,
155 input: Amount,
156 params: RoutingParams,
157mut execution_circuit_breaker: ExecutionCircuitBreaker,
158 ) -> Result<Option<SwapExecution>>
159where
160Self: 'static,
161 {
162tracing::debug!(?input, ?asset_1, ?asset_2, "prepare to route and fill");
163164if input == Amount::zero() {
165tracing::debug!("no input, short-circuit exit");
166return Ok(None);
167 }
168169// Unfilled output of asset 1
170let mut total_unfilled_1 = input;
171// Output of asset 2
172let mut total_output_2 = 0u64.into();
173174// An ordered list of execution traces that were used to fill the trade.
175let mut traces: Vec<Vec<Value>> = Vec::new();
176177// Termination conditions:
178 // 1. We have no more `delta_1` remaining
179 // 2. A path can no longer be found
180 // 3. We have reached the `RoutingParams` specified price limit
181 // 4. The execution circuit breaker has been triggered based on the number of path searches and executions
182 // 5. An unrecoverable error occurred during the execution of the route.
183loop {
184// Check if we have exceeded the execution circuit breaker limits.
185if execution_circuit_breaker.exceeded_limits() {
186tracing::debug!("execution circuit breaker triggered, exiting route_and_fill");
187break;
188 } else {
189// This should be done ahead of doing any path search or execution, so that we never
190 // have to reason about the specific control flow of our batch swap logic.
191execution_circuit_breaker.increment();
192 }
193194// Find the best route between the two assets in the trading pair.
195let (path, spill_price) = self
196.path_search(asset_1, asset_2, params.clone())
197 .await
198.context("error finding best path")?;
199200let Some(path) = path else {
201tracing::debug!("no path found, exiting route_and_fill");
202break;
203 };
204205if path.is_empty() {
206tracing::debug!("empty path found, exiting route_and_fill");
207break;
208 }
209210// We prepare the input for this execution round, which is the remaining unfilled amount of asset 1.
211let delta_1 = Value {
212 asset_id: asset_1,
213 amount: total_unfilled_1,
214 };
215216tracing::debug!(?path, ?delta_1, "found path, filling up to spill price");
217218let execution_result = Arc::get_mut(self)
219 .expect("expected state to have no other refs")
220 .fill_route(delta_1, &path, spill_price)
221 .await;
222223let swap_execution = match execution_result {
224Ok(execution) => execution,
225Err(FillError::ExecutionOverflow(position_id)) => {
226// We have encountered an overflow during the execution of the route.
227 // To route around this, we will close the position and try to route and fill again.
228tracing::debug!(culprit = ?position_id, "overflow detected during routing execution");
229 Arc::get_mut(self)
230 .expect("expected state to have no other refs")
231 .close_position_by_id(&position_id)
232 .await
233.expect("the position still exists");
234continue;
235 }
236Err(e) => {
237// We have encountered an error during the execution of the route,
238 // there are no clear ways to route around this, so we propagate the error.
239 // `fill_route` is transactional and will have rolled back the state.
240anyhow::bail!("error filling route: {:?}", e);
241 }
242 };
243244// Immediately track the execution in the state.
245(total_output_2, total_unfilled_1) = {
246// The exact amount of asset 1 that was consumed in this execution round.
247let consumed_input = swap_execution.input;
248// The output of this execution round is the amount of asset 2 that was filled.
249let produced_output = swap_execution.output;
250251tracing::debug!(consumed_input = ?consumed_input.amount, output = ?produced_output.amount, "filled along best path");
252253// Sanity check that the input and output assets are correct.
254assert_eq!(produced_output.asset_id, asset_2);
255assert_eq!(consumed_input.asset_id, asset_1);
256257// Append the traces from this execution to the outer traces.
258traces.append(&mut swap_execution.traces.clone());
259260 (
261// The total output of asset 2 is the sum of all outputs.
262total_output_2 + produced_output.amount,
263// The total unfilled amount of asset 1 is the remaining unfilled amount minus the amount consumed.
264total_unfilled_1 - consumed_input.amount,
265 )
266 };
267268if total_unfilled_1.value() == 0 {
269tracing::debug!("filled all input, exiting route_and_fill");
270break;
271 }
272273// Ensure that we've actually executed, or else bail out.
274let Some(accurate_max_price) = swap_execution.max_price() else {
275tracing::debug!("no traces in execution, exiting route_and_fill");
276break;
277 };
278279// Check that the execution price is below the price limit, if one is set.
280if let Some(price_limit) = params.price_limit {
281if accurate_max_price >= price_limit {
282tracing::debug!(
283?accurate_max_price,
284?price_limit,
285"execution price above price limit, exiting route_and_fill"
286);
287break;
288 }
289 }
290 }
291292// If we didn't execute against any position at all, there are no execution records to return.
293if traces.is_empty() {
294return Ok(None);
295 } else {
296Ok(Some(SwapExecution {
297 traces,
298 input: Value {
299 asset_id: asset_1,
300// The total amount of asset 1 that was actually consumed across rounds.
301amount: input - total_unfilled_1,
302 },
303 output: Value {
304 asset_id: asset_2,
305 amount: total_output_2,
306 },
307 }))
308 }
309 }
310}
311312impl<T: HandleBatchSwaps> RouteAndFill for T {}