1use std::{
2 collections::{BTreeMap, BTreeSet},
3 pin::Pin,
4};
5
6use anyhow::Result;
7use async_trait::async_trait;
8use cnidarium::{StateDelta, StateRead, StateWrite};
9use futures::{Stream, StreamExt};
10use penumbra_sdk_asset::{asset, Value};
11use penumbra_sdk_num::{
12 fixpoint::{Error, U128x128},
13 Amount,
14};
15use tracing::instrument;
16
17use crate::{
18 component::{metrics, PositionManager, PositionRead},
19 lp::{
20 position::{self, Position},
21 Reserves,
22 },
23 DirectedTradingPair, SwapExecution, TradingPair,
24};
25
26#[derive(Debug, thiserror::Error)]
28pub enum FillError {
29 #[error("input id {0:?} does not belong on pair: {1:?}")]
32 AssetIdMismatch(asset::Id, TradingPair),
33 #[error("overflow when executing against position {0:?}")]
36 ExecutionOverflow(position::Id),
37 #[error("invalid route length {0} (must be at least 2)")]
39 InvalidRoute(usize),
40 #[error("frontier position with id {0:?}, not found")]
42 MissingFrontierPosition(position::Id),
43 #[error("insufficient liquidity in pair {0:?}")]
45 InsufficientLiquidity(DirectedTradingPair),
46}
47
48#[async_trait]
49pub trait FillRoute: StateWrite + Sized {
50 #[instrument(skip(self, input, hops, spill_price))]
72 async fn fill_route(
73 &mut self,
74 input: Value,
75 hops: &[asset::Id],
76 spill_price: Option<U128x128>,
77 ) -> Result<SwapExecution, FillError> {
78 fill_route_inner(self, input, hops, spill_price, true).await
79 }
80}
81
82impl<S: StateWrite> FillRoute for S {}
83
84async fn fill_route_inner<S: StateWrite + Sized>(
85 state: S,
86 mut input: Value,
87 hops: &[asset::Id],
88 spill_price: Option<U128x128>,
89 ensure_progress: bool,
90) -> Result<SwapExecution, FillError> {
91 let fill_start = std::time::Instant::now();
92
93 let mut this = StateDelta::new(state);
99
100 let route = std::iter::once(input.asset_id)
103 .chain(hops.iter().cloned())
104 .collect::<Vec<_>>();
105
106 let pairs = breakdown_route(&route)?;
108
109 tracing::debug!(
110 input = ?input.amount,
111 ?route,
112 ?spill_price,
113 );
114
115 let mut output = Value {
116 amount: 0u64.into(),
117 asset_id: route
118 .last()
119 .cloned()
120 .ok_or(FillError::InvalidRoute(route.len()))?,
121 };
122
123 let mut frontier = Frontier::load(&mut this, pairs).await?;
124 tracing::debug!(?frontier, "assembled initial frontier");
125
126 let mut filled_once = if ensure_progress {
129 false
130 } else {
131 true
133 };
134
135 'filling: loop {
136 let constraining_index = frontier.sense_capacity_constraint(input)?;
145
146 tracing::debug!(
147 ?constraining_index,
148 "sensed capacity constraint, begin filling"
149 );
150
151 let tx = match constraining_index {
154 Some(constraining_index) => frontier.fill_constrained(constraining_index),
155 None => frontier.fill_unconstrained(input),
156 };
157
158 let should_apply = if let Some(spill_price) = spill_price {
164 let below_limit = tx.actual_price().map(|p| p <= spill_price).unwrap_or(false);
165
166 below_limit || !filled_once
168 } else {
169 true
170 };
171
172 if !should_apply {
173 tracing::debug!(
174 spill_price = ?spill_price.map(|x| x.to_string()),
176 actual_price = ?tx.actual_price().map(|x| x.to_string()),
177 "exceeded spill price, breaking loop"
178 );
179 break 'filling;
181 }
182
183 let (current_input, current_output) = frontier.apply(tx);
184 filled_once = true;
185
186 input.amount = input.amount - current_input;
188 output.amount += current_output;
189
190 tracing::debug!(
191 ?current_input,
192 ?current_output,
193 input = ?input.amount,
194 output = ?output.amount,
195 "completed fill iteration, updating frontier"
196 );
197
198 if !frontier.replace_empty_positions().await? {
203 tracing::debug!("ran out of positions, breaking loop");
204 break 'filling;
205 }
206
207 if constraining_index.is_none() {
208 assert_eq!(input.amount, 0u64.into());
210 tracing::debug!("filled input amount completely, breaking loop");
211 break 'filling;
212 } else {
213 continue 'filling;
214 }
215 }
216
217 frontier
220 .save()
221 .await
222 .expect("writing frontier should not fail");
223
224 let input = frontier
226 .trace
227 .iter()
228 .map(|trace| trace.first().expect("empty trace").amount)
229 .sum::<Amount>();
230 let output = frontier
232 .trace
233 .iter()
234 .map(|trace| trace.last().expect("empty trace").amount)
235 .sum::<Amount>();
236
237 let in_asset_id = frontier.pairs.first().expect("empty pairs").start;
238 let out_asset_id = frontier.pairs.last().expect("empty pairs").end;
239
240 let swap_execution = SwapExecution {
241 traces: std::mem::take(&mut frontier.trace),
242 input: Value {
243 amount: input,
244 asset_id: in_asset_id,
245 },
246 output: Value {
247 amount: output,
248 asset_id: out_asset_id,
249 },
250 };
251 std::mem::drop(frontier);
252
253 tracing::debug!(?swap_execution, "returning swap execution of filled route");
254
255 let (mut state, events) = this.apply();
263 for event in events {
264 state.record(event);
265 }
266
267 let fill_elapsed = fill_start.elapsed();
268 metrics::histogram!(metrics::DEX_ROUTE_FILL_DURATION).record(fill_elapsed);
269 Ok(swap_execution)
271}
272
273fn breakdown_route(route: &[asset::Id]) -> Result<Vec<DirectedTradingPair>, FillError> {
276 if route.len() < 2 {
277 Err(FillError::InvalidRoute(route.len()))
278 } else {
279 let mut pairs = vec![];
280 for pair in route.windows(2) {
281 let start = pair[0];
282 let end = pair[1];
283 pairs.push(DirectedTradingPair::new(start, end));
284 }
285 Ok(pairs)
286 }
287}
288
289type PositionsByPrice = BTreeMap<
290 DirectedTradingPair,
291 Pin<Box<dyn Stream<Item = Result<(position::Id, position::Position)>> + Send>>,
292>;
293
294struct Frontier<S> {
296 pub pairs: Vec<DirectedTradingPair>,
298 pub positions: Vec<Position>,
300 pub position_ids: BTreeSet<position::Id>,
306 pub state: S,
308 pub positions_by_price: PositionsByPrice,
310 pub trace: Vec<Vec<Value>>,
312}
313
314struct FrontierTx {
315 new_reserves: Vec<Option<Reserves>>,
316 trace: Vec<Option<Amount>>,
317}
318
319impl FrontierTx {
320 fn new<S>(frontier: &Frontier<S>) -> FrontierTx {
321 FrontierTx {
322 new_reserves: vec![None; frontier.positions.len()],
323 trace: vec![None; frontier.pairs.len() + 1],
324 }
325 }
326
327 fn actual_price(&self) -> Result<U128x128, Error> {
328 let input: U128x128 = self
329 .trace
330 .first()
331 .expect("input amount is set in a complete trace")
332 .expect("input amount is set in a complete trace")
333 .into();
334 let output: U128x128 = self
335 .trace
336 .last()
337 .expect("output amount is set in a complete trace")
338 .expect("output amount is set in a complete trace")
339 .into();
340
341 input / output
342 }
343}
344
345impl<S> std::fmt::Debug for Frontier<S> {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 f.debug_struct("Frontier")
348 .field("pairs", &self.pairs)
349 .field("positions", &self.positions)
350 .field("position_ids", &self.position_ids)
351 .field("trace", &self.trace)
352 .finish_non_exhaustive()
353 }
354}
355
356impl<S: StateRead + StateWrite> Frontier<S> {
357 async fn load(state: S, pairs: Vec<DirectedTradingPair>) -> Result<Frontier<S>, FillError> {
358 let mut positions = Vec::new();
359 let mut position_ids = BTreeSet::new();
360
361 let mut positions_by_price = BTreeMap::new();
365 for pair in &pairs {
366 positions_by_price
367 .entry(*pair)
368 .or_insert_with(|| state.positions_by_price(pair));
369 }
370
371 for pair in &pairs {
372 'next_position: loop {
373 let (id, position) = positions_by_price
374 .get_mut(pair)
375 .expect("positions_by_price should have an entry for each pair")
376 .as_mut()
377 .next()
378 .await
379 .ok_or(FillError::InsufficientLiquidity(*pair))?
380 .expect("stream should not error");
381
382 if !position_ids.contains(&id) {
384 position_ids.insert(id);
385 positions.push(position);
386
387 break 'next_position;
388 }
389 }
390 }
391
392 let trace: Vec<Vec<Value>> = Vec::new();
394
395 Ok(Frontier {
396 positions,
397 position_ids,
398 pairs,
399 state,
400 positions_by_price,
401 trace,
402 })
403 }
404
405 async fn save(&mut self) -> Result<()> {
406 let context = DirectedTradingPair {
407 start: self.pairs.first().expect("pairs is nonempty").start,
408 end: self.pairs.last().expect("pairs is nonempty").end,
409 };
410 for position in &self.positions {
411 self.state
412 .position_execution(position.clone(), context.clone())
413 .await?;
414 }
415 Ok(())
416 }
417
418 fn apply(&mut self, changes: FrontierTx) -> (Amount, Amount) {
421 let mut trace: Vec<Value> = vec![];
422
423 trace.push(Value {
424 amount: changes.trace[0].expect("all trace amounts must be set when applying changes"),
425 asset_id: self.pairs[0].start,
426 });
427 for (i, new_reserves) in changes.new_reserves.into_iter().enumerate() {
428 let new_reserves =
429 new_reserves.expect("all new reserves must be set when applying changes");
430 let amount =
431 changes.trace[i + 1].expect("all trace amounts must be set when applying changes");
432 self.positions[i].reserves = new_reserves;
433 trace.push(Value {
435 amount,
436 asset_id: self.pairs[i].end,
437 });
438 }
439
440 self.trace.push(trace);
442
443 (
444 changes
445 .trace
446 .first()
447 .expect("first should be set for a trace")
448 .expect("input amount should be set for a trace"),
449 changes
450 .trace
451 .last()
452 .expect("last should be set for a trace")
453 .expect("output amount should be set for a trace"),
454 )
455 }
456
457 async fn replace_empty_positions(&mut self) -> Result<bool, FillError> {
458 for i in 0..self.pairs.len() {
459 let desired_reserves = self.positions[i]
460 .reserves_for(self.pairs[i].end)
461 .ok_or_else(|| {
462 FillError::AssetIdMismatch(self.pairs[i].end, self.positions[i].phi.pair)
463 })?;
464
465 if desired_reserves == 0u64.into() {
467 if !self.replace_position(i).await {
469 return Ok(false);
470 }
471 }
472 }
473
474 Ok(true)
475 }
476
477 #[instrument(skip(self))]
480 async fn replace_position(&mut self, index: usize) -> bool {
481 let replaced_position_id = self.positions[index].id();
482 tracing::debug!(?replaced_position_id, "replacing position");
483
484 let context = DirectedTradingPair {
489 start: self.pairs.first().expect("pairs is nonempty").start,
490 end: self.pairs.last().expect("pairs is nonempty").end,
491 };
492 let updated_position = self
493 .state
494 .position_execution(self.positions[index].clone(), context)
495 .await
496 .expect("writing to storage should not fail");
497
498 self.positions[index] = updated_position;
502
503 loop {
504 let pair = &self.pairs[index];
505 let (next_position_id, next_position) = match self
506 .positions_by_price
507 .get_mut(pair)
508 .expect("positions_by_price should have an entry for each pair")
509 .as_mut()
510 .next()
511 .await
512 .transpose()
513 .expect("stream doesn't error")
514 {
515 None => {
517 tracing::debug!(?pair, "no more positions available for pair");
518 return false;
519 }
520 Some((position_id, lp)) if !self.position_ids.contains(&position_id) => {
523 (position_id, lp)
524 }
525 Some(position_id) => {
527 tracing::debug!(?position_id, "skipping position already in frontier");
528 continue;
529 }
530 };
531
532 tracing::debug!(
533 ?next_position_id,
534 ?next_position,
535 "replacing constraining position in frontier",
536 );
537
538 self.position_ids.insert(next_position_id);
539 self.positions[index] = next_position;
540
541 return true;
542 }
543 }
544
545 #[instrument(skip(self, input), fields(input = ?input.amount))]
549 fn sense_capacity_constraint(&self, input: Value) -> Result<Option<usize>, FillError> {
550 tracing::debug!(
551 ?input,
552 "sensing frontier capacity with trial swap input amount"
553 );
554 let mut constraining_index = None;
555 let mut current_input = input;
556
557 for (i, position) in self.positions.iter().enumerate() {
558 if !position.phi.matches_input(current_input.asset_id) {
559 tracing::error!(
560 ?current_input,
561 ?position,
562 "asset ids of input and position do not match, interrupt capacity sensing."
563 );
564 return Err(FillError::AssetIdMismatch(
565 current_input.asset_id,
566 position.phi.pair,
567 ));
568 }
569
570 let (unfilled, new_reserves, output) = position
571 .phi
572 .fill(current_input, &position.reserves)
573 .map_err(|_| FillError::ExecutionOverflow(position.id()))?;
574
575 if unfilled.amount > Amount::zero() {
576 tracing::debug!(
577 i,
578 current_input = ?current_input.amount,
579 unfilled = ?unfilled.amount,
580 output = ?output.amount,
581 old_reserves = ?position.reserves,
582 new_reserves = ?new_reserves,
583 "could not completely fill input amount, marking as constraining"
584 );
585 constraining_index = Some(i);
587 } else {
588 tracing::debug!(
589 i,
590 current_input = ?current_input.amount,
591 unfilled = ?unfilled.amount,
592 output = ?output.amount,
593 old_reserves = ?position.reserves,
594 new_reserves = ?new_reserves,
595 "completely filled "
596 );
597 }
598
599 current_input = output;
600 }
601
602 Ok(constraining_index)
603 }
604
605 #[instrument(skip(self, input), fields(input = ?input.amount))]
606 fn fill_unconstrained(&self, input: Value) -> FrontierTx {
607 assert_eq!(
608 input.asset_id,
609 self.pairs
610 .first()
611 .expect("first should be set for a trace")
612 .start
613 );
614
615 let mut tx = FrontierTx::new(self);
616 tx.trace[0] = Some(input.amount);
619 self.fill_forward(&mut tx, 0, input);
621
622 tx
623 }
624
625 fn fill_constrained(&self, constraining_index: usize) -> FrontierTx {
626 let mut tx = FrontierTx::new(self);
627
628 let exactly_consumed_reserves = Value {
646 amount: self.positions[constraining_index]
647 .reserves_for(self.pairs[constraining_index].end)
648 .expect("asset ids should match"),
649 asset_id: self.pairs[constraining_index].end,
650 };
651
652 tracing::debug!(
653 constraining_index,
654 exactly_consumed_reserves = ?exactly_consumed_reserves.amount,
655 "attempting to completely consume reserves of constraining position"
656 );
657
658 self.fill_backward(&mut tx, constraining_index, exactly_consumed_reserves);
660 self.fill_forward(&mut tx, constraining_index + 1, exactly_consumed_reserves);
662
663 tx
664 }
665
666 #[instrument(skip(self, input, tx), fields(input = ?input.amount))]
667 fn fill_forward(&self, tx: &mut FrontierTx, start_index: usize, input: Value) {
668 tracing::debug!("filling forward along frontier");
669 let mut current_value = input;
670
671 for i in start_index..self.positions.len() {
672 let (unfilled, new_reserves, output) = self.positions[i]
673 .phi
674 .fill(current_value, &self.positions[i].reserves)
675 .expect("forward fill should not fail");
676
677 assert_eq!(
678 unfilled.amount,
679 Amount::zero(),
680 "unfilled amount for unconstrained frontier should be zero"
681 );
682
683 tx.new_reserves[i] = Some(new_reserves);
684 tx.trace[i + 1] = Some(output.amount);
685
686 current_value = output;
687 }
688 }
689
690 #[instrument(skip(self, output, tx), fields(output = ?output.amount))]
691 fn fill_backward(&self, tx: &mut FrontierTx, start_index: usize, output: Value) {
692 tracing::debug!("filling backward along frontier");
693 let mut current_value = output;
694 for i in (0..=start_index).rev() {
695 tx.trace[i + 1] = Some(current_value.amount);
696
697 let (new_reserves, prev_input) = self.positions[i]
698 .phi
699 .fill_output(&self.positions[i].reserves, current_value)
700 .expect("backward fill should not fail")
701 .expect(
702 "working backwards from most-constraining position should not exceed reserves",
703 );
704
705 tracing::debug!(
706 i,
707 current_value = ?current_value.amount,
708 prev_input = ?prev_input.amount,
709 old_reserves = ?self.positions[i].reserves,
710 new_reserves = ?new_reserves,
711 "found previous input for current value"
712 );
713
714 tx.new_reserves[i] = Some(new_reserves);
715 current_value = prev_input;
716 }
717
718 tx.trace[0] = Some(current_value.amount);
719 }
720}