penumbra_dex/component/arb.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
use std::sync::Arc;
use crate::component::{metrics, StateReadExt};
use anyhow::Result;
use async_trait::async_trait;
use cnidarium::{StateDelta, StateWrite};
use penumbra_asset::{asset, Value};
use penumbra_proto::{DomainType as _, StateWriteProto as _};
use penumbra_sct::component::clock::EpochRead;
use tracing::instrument;
use crate::{
component::{ExecutionCircuitBreaker, InternalDexWrite, ValueCircuitBreaker},
event, SwapExecution,
};
use super::router::{RouteAndFill, RoutingParams};
#[async_trait]
pub trait Arbitrage: StateWrite + Sized {
/// Attempts to extract as much as possible of the `arb_token` from the available
/// liquidity positions, and returns the amount of `arb_token` extracted.
#[instrument(skip(self, arb_token, routing_params))]
async fn arbitrage(
self: &mut Arc<Self>,
arb_token: asset::Id,
routing_params: RoutingParams,
) -> Result<Option<Value>>
where
Self: 'static,
{
tracing::debug!(?arb_token, ?routing_params, "beginning arb search");
let arb_start = std::time::Instant::now();
// Work in a new `StateDelta`, so we can transactionally apply any state
// changes, and roll them back if we fail (e.g., if for some reason we
// discover at the end that the arb wasn't profitable).
let mut this = Arc::new(StateDelta::new(self.clone()));
// Create a flash-loan 2^64 of the arb token to ourselves.
let flash_loan = Value {
asset_id: arb_token,
amount: u64::MAX.into(),
};
let execution_budget = self.get_dex_params().await?.max_execution_budget;
let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget);
let Some(swap_execution) = this
.route_and_fill(
arb_token,
arb_token,
flash_loan.amount,
routing_params,
execution_circuit_breaker,
)
.await?
else {
return Ok(None);
};
let filled_input = swap_execution.input.amount;
let output = swap_execution.output.amount;
let unfilled_input = flash_loan
.amount
.checked_sub(&filled_input)
.expect("filled input should always be <= flash loan amount");
// Record the duration of the arb execution now, since we've computed it
// and we might return early if it's zero-valued, but in that case we still
// want to record how long we spent looking for it.
metrics::histogram!(metrics::DEX_ARB_DURATION).record(arb_start.elapsed());
// Because we're trading the arb token to itself, the total output is the
// output from the route-and-fill, plus the unfilled input.
let total_output = output + unfilled_input;
// Now "repay" the flash loan by subtracting it from the total output.
let Some(arb_profit) = total_output.checked_sub(&flash_loan.amount) else {
// This shouldn't happen, but because route-and-fill prioritizes
// guarantees about forward progress over precise application of
// price limits, it technically could occur.
tracing::debug!("mis-estimation in route-and-fill led to unprofitable arb, discarding");
return Ok(None);
};
if arb_profit == 0u64.into() {
// If we didn't make any profit, we don't need to do anything,
// and we can just discard the state delta entirely.
tracing::debug!("found 0-profit arb, discarding");
return Ok(None);
} else {
tracing::debug!(
?filled_input,
?output,
?unfilled_input,
?total_output,
?arb_profit,
"arb search detected surplus!"
);
}
// TODO: this is a bit nasty, can it be simplified?
// should this even be done "inside" the method, or all the way at the top?
let (self2, cache) = Arc::try_unwrap(this)
.map_err(|_| ())
.expect("no more outstanding refs to state after routing")
.flatten();
std::mem::drop(self2);
// Now there is only one reference to self again
let mut self_mut = Arc::get_mut(self).expect("self was unique ref");
cache.apply_to(&mut self_mut);
// Finally, record the arb execution in the state:
let height = self_mut.get_block_height().await?;
let se = SwapExecution {
traces: swap_execution.traces,
input: Value {
asset_id: arb_token,
amount: filled_input,
},
output: Value {
amount: filled_input + arb_profit,
asset_id: arb_token,
},
};
self_mut.set_arb_execution(height, se.clone());
// Deduct the input surplus from the dex's VCB.
self_mut
.dex_vcb_debit(Value {
amount: arb_profit,
asset_id: arb_token,
})
.await?;
// Emit an ABCI event detailing the arb execution.
self_mut.record_proto(
event::EventArbExecution {
height,
swap_execution: se,
}
.to_proto(),
);
return Ok(Some(Value {
amount: arb_profit,
asset_id: arb_token,
}));
}
}
impl<T: StateWrite> Arbitrage for T {}