penumbra_sdk_dex/component/
dex.rs

1use std::collections::{BTreeMap, BTreeSet};
2use std::sync::Arc;
3
4use anyhow::Result;
5use async_trait::async_trait;
6use cnidarium::{StateRead, StateWrite};
7use cnidarium_component::Component;
8use penumbra_sdk_asset::asset;
9use penumbra_sdk_asset::{Value, STAKING_TOKEN_ASSET_ID};
10use penumbra_sdk_fee::component::StateWriteExt as _;
11use penumbra_sdk_fee::Fee;
12use penumbra_sdk_num::Amount;
13use penumbra_sdk_proto::{DomainType as _, StateReadProto, StateWriteProto};
14use tendermint::v0_37::abci;
15use tracing::instrument;
16
17use crate::state_key::block_scoped;
18use crate::{
19    component::SwapDataRead, component::SwapDataWrite, event, genesis, state_key,
20    BatchSwapOutputData, DexParameters, DirectedTradingPair, SwapExecution, TradingPair,
21};
22
23use super::eviction_manager::EvictionManager;
24use super::{
25    chandelier::Chandelier,
26    router::{HandleBatchSwaps, RoutingParams},
27    Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker,
28};
29
30pub struct Dex {}
31
32#[async_trait]
33impl Component for Dex {
34    type AppState = genesis::Content;
35
36    #[instrument(name = "dex", skip(state, app_state))]
37    async fn init_chain<S: StateWrite>(mut state: S, app_state: Option<&Self::AppState>) {
38        match app_state {
39            None => { /* no-op */ }
40            Some(app_state) => {
41                state.put_dex_params(app_state.dex_params.clone());
42            }
43        }
44    }
45
46    #[instrument(name = "dex", skip(_state, _begin_block))]
47    async fn begin_block<S: StateWrite + 'static>(
48        _state: &mut Arc<S>,
49        _begin_block: &abci::request::BeginBlock,
50    ) {
51    }
52
53    #[instrument(name = "dex", skip(state, end_block))]
54    async fn end_block<S: StateWrite + 'static>(
55        state: &mut Arc<S>,
56        end_block: &abci::request::EndBlock,
57    ) {
58        // F.0. Add all non-native fee payments as swap flows.
59        let base_fees_and_tips = {
60            let state_ref =
61                Arc::get_mut(state).expect("should have unique ref at start of Dex::end_block");
62
63            // Extract the accumulated base fees and tips from the fee component, leaving 0 in its place.
64            let base_fees_and_tips = state_ref.take_accumulated_base_fees_and_tips();
65
66            // For each nonnative fee asset, add it in as if it were a chain-submitted swap.
67            for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
68                if *asset_id == *STAKING_TOKEN_ASSET_ID {
69                    continue;
70                }
71                let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
72                // We want to swap all of the fees into the native token, the base/tip distinction
73                // just affects where the resulting fees go.
74                let total = *base_fee + *tip;
75                // DANGEROUS: need to be careful about which side of the pair is which,
76                // but the existing API is unsafe and fixing it would be a much larger refactor.
77                let flow = if pair.asset_1() == *asset_id {
78                    (total, Amount::zero())
79                } else {
80                    (Amount::zero(), total)
81                };
82                tracing::debug!(
83                    ?asset_id,
84                    ?base_fee,
85                    ?tip,
86                    ?total,
87                    ?flow,
88                    "inserting chain-submitted swap for alt fee token"
89                );
90
91                // Accumulate into the swap flows for this block.
92                state_ref
93                    .accumulate_swap_flow(&pair, flow.into())
94                    .await
95                    .expect("should be able to credit DEX VCB");
96            }
97
98            // Hold on to the list of base fees and tips so we can claim outputs correctly.
99            base_fees_and_tips
100        };
101
102        // 1. Add all newly opened positions to the DEX.
103        // This has already happened in the action handlers for each `PositionOpen` action.
104
105        // 2. For each batch swap during the block, calculate clearing prices and set in the JMT.
106        let routing_params = state.routing_params().await.expect("dex params are set");
107        let execution_budget = state
108            .get_dex_params()
109            .await
110            .expect("dex params are set")
111            .max_execution_budget;
112
113        // Local cache of BSODs used for claiming fee swaps.
114        let mut bsods = BTreeMap::new();
115
116        for (trading_pair, swap_flows) in state.swap_flows() {
117            let batch_start = std::time::Instant::now();
118            let bsod = state
119                .handle_batch_swaps(
120                    trading_pair,
121                    swap_flows,
122                    end_block
123                        .height
124                        .try_into()
125                        .expect("height is part of the end block data"),
126                    // Always include both ends of the target pair as fixed candidates.
127                    routing_params
128                        .clone()
129                        .with_extra_candidates([trading_pair.asset_1(), trading_pair.asset_2()]),
130                    execution_budget,
131                )
132                .await
133                .expect("handling batch swaps is infaillible");
134            metrics::histogram!(crate::component::metrics::DEX_BATCH_DURATION)
135                .record(batch_start.elapsed());
136
137            bsods.insert(trading_pair, bsod);
138        }
139
140        // F.1. Having performed all batch swaps, "claim" the base fees and tips.
141        // The VCB has already been debited through the BSOD.
142        {
143            let state_ref =
144                Arc::get_mut(state).expect("should have unique ref after finishing batch swaps");
145            for (asset_id, (base_fee, tip)) in base_fees_and_tips.iter() {
146                if *asset_id == *STAKING_TOKEN_ASSET_ID {
147                    // In this case, there was nothing to swap, so there's nothing
148                    // to claim and we just accumulate the fee we took back into the fee component.
149                    state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(*base_fee));
150                    state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(*tip));
151                    continue;
152                }
153                let pair = TradingPair::new(*asset_id, *STAKING_TOKEN_ASSET_ID);
154                let bsod = bsods
155                    .get(&pair)
156                    .expect("bsod should be present for chain-submitted swap");
157
158                let (base_input, tip_input) = if pair.asset_1() == *asset_id {
159                    ((*base_fee, 0u64.into()), (*tip, 0u64.into()))
160                } else {
161                    ((0u64.into(), *base_fee), (0u64.into(), *tip))
162                };
163
164                let base_output = bsod.pro_rata_outputs(base_input);
165                let tip_output = bsod.pro_rata_outputs(tip_input);
166                tracing::debug!(
167                    ?asset_id,
168                    ?base_input,
169                    ?tip_input,
170                    ?base_output,
171                    ?tip_output,
172                    "claiming chain-submitted swap for alt fee token"
173                );
174
175                // Obtain the base fee and tip amounts in the native token, discarding any unfilled amounts.
176                let (swapped_base, swapped_tip) = if pair.asset_1() == *asset_id {
177                    // If `asset_id` is `R_1` we want to pull the other leg of the pair.
178                    (base_output.1, tip_output.1)
179                } else {
180                    // and vice-versa. `R_1` contains native tokens.
181                    (base_output.0, tip_output.0)
182                };
183
184                // Finally, accumulate the swapped base fee and tip back into the fee component.
185                // (We already took all the fees out).
186                state_ref.raw_accumulate_base_fee(Fee::from_staking_token_amount(swapped_base));
187                state_ref.raw_accumulate_tip(Fee::from_staking_token_amount(swapped_tip));
188            }
189        }
190
191        // 3. Perform arbitrage to ensure all prices are consistent post-execution:
192
193        // For arbitrage, we extend the path search by 2 hops to allow a path out of the
194        // staking token and back.
195
196        // Extend the fixed candidate set to include recently accessed assets, to have
197        // more arbitrage execution against newly opened positions.
198        let fixed_candidates = Arc::new(
199            routing_params
200                .fixed_candidates
201                .iter()
202                .cloned()
203                // The set of recently accessed assets is already limited to avoid
204                // potentially blowing up routing time.
205                .chain(state.recently_accessed_assets().iter().cloned())
206                .collect::<Vec<_>>(),
207        );
208
209        let arb_routing_params = RoutingParams {
210            max_hops: routing_params.max_hops + 2,
211            fixed_candidates,
212            price_limit: Some(1u64.into()),
213        };
214
215        match state
216            .arbitrage(*STAKING_TOKEN_ASSET_ID, arb_routing_params)
217            .await
218        {
219            // The arb search completed successfully, and surfaced some surplus.
220            Ok(Some(v)) => tracing::info!(surplus = ?v, "arbitrage successful!"),
221            // The arb completed without errors, but resulted in no surplus, so
222            // the state fork was discarded.
223            Ok(None) => tracing::debug!("no arbitrage found"),
224            // The arbitrage search should not error, but if it does, we should
225            // simply not perform arbitrage, rather than halting the entire chain.
226            Err(e) => tracing::warn!(?e, "error processing arb, this is a bug"),
227        }
228
229        // 4. Inspect trading pairs that saw new position opened during this block, and
230        // evict their excess LPs if any are found.
231        let _ = Arc::get_mut(state)
232            .expect("state should be uniquely referenced after batch swaps complete")
233            .evict_positions()
234            .await
235            .map_err(|e| tracing::error!(?e, "error evicting positions, skipping"));
236
237        // 5. Close all positions queued for closure at the end of the block.
238        // It's important to do this after execution, to allow block-scoped JIT liquidity.
239        Arc::get_mut(state)
240            .expect("state should be uniquely referenced after batch swaps complete")
241            .close_queued_positions()
242            .await
243            .expect("closing queued positions should not fail");
244
245        // 5. Finalize the candlestick data for the block.
246        Arc::get_mut(state)
247            .expect("state should be uniquely referenced after batch swaps complete")
248            .finalize_block_candlesticks()
249            .await
250            .expect("finalizing block candlesticks should not fail");
251    }
252
253    #[instrument(name = "dex", skip(_state))]
254    async fn end_epoch<S: StateWrite + 'static>(mut _state: &mut Arc<S>) -> Result<()> {
255        Ok(())
256    }
257}
258
259/// Provides public read access to DEX data.
260#[async_trait]
261pub trait StateReadExt: StateRead {
262    /// Gets the DEX parameters from the state.
263    async fn get_dex_params(&self) -> Result<DexParameters> {
264        self.get(state_key::config::dex_params())
265            .await?
266            .ok_or_else(|| anyhow::anyhow!("Missing DexParameters"))
267    }
268
269    /// Uses the DEX parameters to construct a `RoutingParams` for use in execution or simulation.
270    async fn routing_params(&self) -> Result<RoutingParams> {
271        self.get_dex_params().await.map(RoutingParams::from)
272    }
273
274    async fn output_data(
275        &self,
276        height: u64,
277        trading_pair: TradingPair,
278    ) -> Result<Option<BatchSwapOutputData>> {
279        self.get(&state_key::output_data(height, trading_pair))
280            .await
281    }
282
283    async fn swap_execution(
284        &self,
285        height: u64,
286        trading_pair: DirectedTradingPair,
287    ) -> Result<Option<SwapExecution>> {
288        self.nonverifiable_get(state_key::swap_execution(height, trading_pair).as_bytes())
289            .await
290    }
291
292    async fn arb_execution(&self, height: u64) -> Result<Option<SwapExecution>> {
293        self.get(&state_key::arb_execution(height)).await
294    }
295
296    /// Return a set of [`TradingPair`]s for which liquidity positions were opened
297    /// during this block.
298    fn get_active_trading_pairs_in_block(&self) -> BTreeSet<TradingPair> {
299        self.object_get(block_scoped::active::trading_pairs())
300            .unwrap_or_default()
301    }
302}
303
304impl<T: StateRead + ?Sized> StateReadExt for T {}
305
306/// Extension trait providing write access to dex data.
307#[async_trait]
308pub trait StateWriteExt: StateWrite {
309    fn put_dex_params(&mut self, params: DexParameters) {
310        self.put(state_key::config::dex_params().to_string(), params);
311    }
312}
313
314impl<T: StateWrite + ?Sized> StateWriteExt for T {}
315
316/// The maximum number of "hot" asset identifiers to track for this block.
317const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;
318
319/// Provide write access to internal dex data.
320pub(crate) trait InternalDexWrite: StateWrite {
321    /// Adds an asset ID to the list of recently accessed assets,
322    /// making it a candidate for the current block's arbitrage routing.
323    ///
324    /// This ensures that assets associated with recently active positions
325    /// will be eligible for arbitrage if mispriced positions are opened.
326    #[tracing::instrument(level = "debug", skip_all)]
327    fn add_recently_accessed_asset(
328        &mut self,
329        asset_id: asset::Id,
330        fixed_candidates: Arc<Vec<asset::Id>>,
331    ) {
332        let mut assets = self.recently_accessed_assets();
333
334        // Limit the number of recently accessed assets to prevent blowing
335        // up routing time.
336        if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
337            return;
338        }
339
340        // If the asset is already in the fixed candidate list, don't insert it.
341        if fixed_candidates.contains(&asset_id) {
342            return;
343        }
344
345        assets.insert(asset_id);
346        self.object_put(state_key::recently_accessed_assets(), assets);
347    }
348
349    /// Mark a [`TradingPair`] as active during this block.
350    fn mark_trading_pair_as_active(&mut self, pair: TradingPair) {
351        let mut active_pairs = self.get_active_trading_pairs_in_block();
352
353        if active_pairs.insert(pair) {
354            self.object_put(block_scoped::active::trading_pairs(), active_pairs)
355        }
356    }
357
358    async fn set_output_data(
359        &mut self,
360        output_data: BatchSwapOutputData,
361        swap_execution_1_for_2: Option<SwapExecution>,
362        swap_execution_2_for_1: Option<SwapExecution>,
363    ) -> Result<()> {
364        // Debit the DEX for the swap outflows.
365        // Note that since we credited the DEX for _all_ inflows, we need to debit the
366        // unfilled amounts as well as the filled amounts.
367        //
368        // In the case of a value inflation bug, the debit call will return an underflow
369        // error, which will halt the chain.
370        self.dex_vcb_debit(Value {
371            amount: output_data.unfilled_1 + output_data.lambda_1,
372            asset_id: output_data.trading_pair.asset_1,
373        })
374        .await?;
375        self.dex_vcb_debit(Value {
376            amount: output_data.unfilled_2 + output_data.lambda_2,
377            asset_id: output_data.trading_pair.asset_2,
378        })
379        .await?;
380
381        // Write the output data to the state under a known key, for querying, ...
382        let height = output_data.height;
383        let trading_pair = output_data.trading_pair;
384        self.put(state_key::output_data(height, trading_pair), output_data);
385
386        // Store the swap executions for both directions in the state as well.
387        if let Some(swap_execution) = swap_execution_1_for_2.clone() {
388            let tp_1_for_2 = DirectedTradingPair::new(trading_pair.asset_1, trading_pair.asset_2);
389            self.put_swap_execution_at_height(height, tp_1_for_2, swap_execution);
390        }
391        if let Some(swap_execution) = swap_execution_2_for_1.clone() {
392            let tp_2_for_1 = DirectedTradingPair::new(trading_pair.asset_2, trading_pair.asset_1);
393            self.put_swap_execution_at_height(height, tp_2_for_1, swap_execution);
394        }
395
396        // ... and also add it to the set in the compact block to be pushed out to clients.
397        let mut outputs = self.pending_batch_swap_outputs();
398        outputs.insert(trading_pair, output_data);
399        self.object_put(state_key::pending_outputs(), outputs);
400
401        // Also generate an ABCI event for indexing:
402        self.record_proto(
403            event::EventBatchSwap {
404                batch_swap_output_data: output_data,
405                swap_execution_1_for_2,
406                swap_execution_2_for_1,
407            }
408            .to_proto(),
409        );
410
411        Ok(())
412    }
413
414    fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) {
415        self.put(state_key::arb_execution(height), execution);
416    }
417}
418
419impl<T: StateWrite + ?Sized> InternalDexWrite for T {}