penumbra_sdk_dex/component/
dex.rs1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use cnidarium::{StateRead, StateWrite};
7use cnidarium_component::Component;
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_asset::{Value, STAKING_TOKEN_ASSET_ID};
10use penumbra_sdk_fee::component::StateWriteExt as _;
11use penumbra_sdk_fee::Fee;
12use penumbra_sdk_num::Amount;
13use penumbra_sdk_proto::{DomainType as _, StateReadProto, StateWriteProto};
14use tendermint::v0_37::abci;
15use tracing::instrument;
16
17use crate::state_key::block_scoped;
18use crate::{
19 component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key,
20 BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair,
21};
22
23use super::eviction_manager::EvictionManager;
24use super::{
25 chandelier::Chandelier,
26 router::{HandleBatchSwaps, RoutingParams},
27 Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker,
28};
29
30pub struct Dex {}
31
32#[async_trait]
33impl Component for Dex {
34 type AppState = genesis::Content;
35
36 #[instrument(name = "dex", skip(state, app_state))]
37 async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
38 match app_state {
39 None => { }
40 Some(app_state) => {
41 state.put_dex_params(app_state.dex_params.clone());
42 }
43 }
44 }
45
46 #[instrument(name = "dex", skip(_state, _begin_block))]
47 async fn begin_block<S: StateWrite + 'static>(
48 _state: &mut Arc<S>,
49 _begin_block: &abci::request::BeginBlock,
50 ) {
51 }
52
53 #[instrument(name = "dex", skip(state, end_block))]
54 async fn end_block<S: StateWrite + 'static>(
55 state: &mut Arc<S>,
56 end_block: &abci::request::EndBlock,
57 ) {
58 let base_fees_and_tips = {
60 let state_ref =
61 Arc::get_mut(state).expect("should have unique ref at start of Dex::end_block");
62
63 let base_fees_and_tips = state_ref.take_accumulated_base_fees_and_tips();
65
66 for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
68 if *asset_id == *STAKING_TOKEN_ASSET_ID {
69 continue;
70 }
71 let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
72 let total = *base_fee + *tip;
75 let flow = if pair.asset_1() == *asset_id {
78 (total, Amount::zero())
79 } else {
80 (Amount::zero(), total)
81 };
82 tracing::debug!(
83 ?asset_id,
84 ?base_fee,
85 ?tip,
86 ?total,
87 ?flow,
88 "inserting chain-submitted swap for alt fee token"
89 );
90
91 state_ref
93 .accumulate_swap_flow(&pair, flow.into())
94 .await
95 .expect("should be able to credit DEX VCB");
96 }
97
98 base_fees_and_tips
100 };
101
102 let routing_params = state.routing_params().await.expect("dex params are set");
107 let execution_budget = state
108 .get_dex_params()
109 .await
110 .expect("dex params are set")
111 .max_execution_budget;
112
113 let mut bsods = BTreeMap::new();
115
116 for (trading_pair, swap_flows) in state.swap_flows() {
117 let batch_start = std::time::Instant::now();
118 let bsod = state
119 .handle_batch_swaps(
120 trading_pair,
121 swap_flows,
122 end_block
123 .height
124 .try_into()
125 .expect("height is part of the end block data"),
126 routing_params
128 .clone()
129 .with_extra_candidates([trading_pair.asset_1(), trading_pair.asset_2()]),
130 execution_budget,
131 )
132 .await
133 .expect("handling batch swaps is infaillible");
134 metrics::histogram!(crate::component::metrics::DEX_BATCH_DURATION)
135 .record(batch_start.elapsed());
136
137 bsods.insert(trading_pair, bsod);
138 }
139
140 {
143 let state_ref =
144 Arc::get_mut(state).expect("should have unique ref after finishing batch swaps");
145 for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
146 if *asset_id == *STAKING_TOKEN_ASSET_ID {
147 state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(*base_fee));
150 state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(*tip));
151 continue;
152 }
153 let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
154 let bsod = bsods
155 .get(&pair)
156 .expect("bsod should be present for chain-submitted swap");
157
158 let (base_input, tip_input) = if pair.asset_1() == *asset_id {
159 ((*base_fee, 0u64.into()), (*tip, 0u64.into()))
160 } else {
161 ((0u64.into(), *base_fee), (0u64.into(), *tip))
162 };
163
164 let base_output = bsod.pro_rata_outputs(base_input);
165 let tip_output = bsod.pro_rata_outputs(tip_input);
166 tracing::debug!(
167 ?asset_id,
168 ?base_input,
169 ?tip_input,
170 ?base_output,
171 ?tip_output,
172 "claiming chain-submitted swap for alt fee token"
173 );
174
175 let (swapped_base, swapped_tip) = if pair.asset_1() == *asset_id {
177 (base_output.1, tip_output.1)
179 } else {
180 (base_output.0, tip_output.0)
182 };
183
184 state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(swapped_base));
187 state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(swapped_tip));
188 }
189 }
190
191 let fixed_candidates = Arc::new(
199 routing_params
200 .fixed_candidates
201 .iter()
202 .cloned()
203 .chain(state.recently_accessed_assets().iter().cloned())
206 .collect::<Vec<_>>(),
207 );
208
209 let arb_routing_params = RoutingParams {
210 max_hops: routing_params.max_hops + 2,
211 fixed_candidates,
212 price_limit: Some(1u64.into()),
213 };
214
215 match state
216 .arbitrage(*STAKING_TOKEN_ASSET_ID, arb_routing_params)
217 .await
218 {
219 Ok(Some(v)) => tracing::info!(surplus = ?v, "arbitrage successful!"),
221 Ok(None) => tracing::debug!("no arbitrage found"),
224 Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"),
227 }
228
229 let _ = Arc::get_mut(state)
232 .expect("state should be uniquely referenced after batch swaps complete")
233 .evict_positions()
234 .await
235 .map_err(|e| tracing::error!(?e, "error evicting positions, skipping"));
236
237 Arc::get_mut(state)
240 .expect("state should be uniquely referenced after batch swaps complete")
241 .close_queued_positions()
242 .await
243 .expect("closing queued positions should not fail");
244
245 Arc::get_mut(state)
247 .expect("state should be uniquely referenced after batch swaps complete")
248 .finalize_block_candlesticks()
249 .await
250 .expect("finalizing block candlesticks should not fail");
251 }
252
253 #[instrument(name = "dex", skip(_state))]
254 async fn end_epoch<S: StateWrite + 'static>(mut _state: &mut Arc<S>) -> Result<()> {
255 Ok(())
256 }
257}
258
259#[async_trait]
261pub trait StateReadExt: StateRead {
262 async fn get_dex_params(&self) -> Result<DexParameters> {
264 self.get(state_key::config::dex_params())
265 .await?
266 .ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
267 }
268
269 async fn routing_params(&self) -> Result<RoutingParams> {
271 self.get_dex_params().await.map(RoutingParams::from)
272 }
273
274 async fn output_data(
275 &self,
276 height: u64,
277 trading_pair: TradingPair,
278 ) -> Result<Option<BatchSwapOutputData>> {
279 self.get(&state_key::output_data(height, trading_pair))
280 .await
281 }
282
283 async fn swap_execution(
284 &self,
285 height: u64,
286 trading_pair: DirectedTradingPair,
287 ) -> Result<Option<SwapExecution>> {
288 self.nonverifiable_get(state_key::swap_execution(height, trading_pair).as_bytes())
289 .await
290 }
291
292 async fn arb_execution(&self, height: u64) -> Result<Option<SwapExecution>> {
293 self.get(&state_key::arb_execution(height)).await
294 }
295
296 fn get_active_trading_pairs_in_block(&self) -> BTreeSet<TradingPair> {
299 self.object_get(block_scoped::active::trading_pairs())
300 .unwrap_or_default()
301 }
302}
303
304impl<T: StateRead + ?Sized> StateReadExt for T {}
305
306#[async_trait]
308pub trait StateWriteExt: StateWrite {
309 fn put_dex_params(&mut self, params: DexParameters) {
310 self.put(state_key::config::dex_params().to_string(), params);
311 }
312}
313
314impl<T: StateWrite + ?Sized> StateWriteExt for T {}
315
316const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;
318
319pub(crate) trait InternalDexWrite: StateWrite {
321 #[tracing::instrument(level = "debug", skip_all)]
327 fn add_recently_accessed_asset(
328 &mut self,
329 asset_id: asset::Id,
330 fixed_candidates: Arc<Vec<asset::Id>>,
331 ) {
332 let mut assets = self.recently_accessed_assets();
333
334 if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
337 return;
338 }
339
340 if fixed_candidates.contains(&asset_id) {
342 return;
343 }
344
345 assets.insert(asset_id);
346 self.object_put(state_key::recently_accessed_assets(), assets);
347 }
348
349 fn mark_trading_pair_as_active(&mut self, pair: TradingPair) {
351 let mut active_pairs = self.get_active_trading_pairs_in_block();
352
353 if active_pairs.insert(pair) {
354 self.object_put(block_scoped::active::trading_pairs(), active_pairs)
355 }
356 }
357
358 async fn set_output_data(
359 &mut self,
360 output_data: BatchSwapOutputData,
361 swap_execution_1_for_2: Option<SwapExecution>,
362 swap_execution_2_for_1: Option<SwapExecution>,
363 ) -> Result<()> {
364 self.dex_vcb_debit(Value {
371 amount: output_data.unfilled_1 + output_data.lambda_1,
372 asset_id: output_data.trading_pair.asset_1,
373 })
374 .await?;
375 self.dex_vcb_debit(Value {
376 amount: output_data.unfilled_2 + output_data.lambda_2,
377 asset_id: output_data.trading_pair.asset_2,
378 })
379 .await?;
380
381 let height = output_data.height;
383 let trading_pair = output_data.trading_pair;
384 self.put(state_key::output_data(height, trading_pair), output_data);
385
386 if let Some(swap_execution) = swap_execution_1_for_2.clone() {
388 let tp_1_for_2 = DirectedTradingPair::new(trading_pair.asset_1, trading_pair.asset_2);
389 self.put_swap_execution_at_height(height, tp_1_for_2, swap_execution);
390 }
391 if let Some(swap_execution) = swap_execution_2_for_1.clone() {
392 let tp_2_for_1 = DirectedTradingPair::new(trading_pair.asset_2, trading_pair.asset_1);
393 self.put_swap_execution_at_height(height, tp_2_for_1, swap_execution);
394 }
395
396 let mut outputs = self.pending_batch_swap_outputs();
398 outputs.insert(trading_pair, output_data);
399 self.object_put(state_key::pending_outputs(), outputs);
400
401 self.record_proto(
403 event::EventBatchSwap {
404 batch_swap_output_data: output_data,
405 swap_execution_1_for_2,
406 swap_execution_2_for_1,
407 }
408 .to_proto(),
409 );
410
411 Ok(())
412 }
413
414 fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
415 self.put(state_key::arb_execution(height), execution);
416 }
417}
418
419impl<T: StateWrite + ?Sized> InternalDexWrite for T {}